Skip to content
This repository was archived by the owner on Mar 25, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ vendor/
.env
beemflow.db
.beemflow/
test.db
*.db

# Build artifacts
*.log
Expand Down
108 changes: 58 additions & 50 deletions api/index.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package handler

import (
"context"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/awantoch/beemflow/config"
api "github.com/awantoch/beemflow/core"
)

var (
initServerless sync.Once
initErr error
cachedMux *http.ServeMux
)

// Handler is the entry point for Vercel serverless functions
func Handler(w http.ResponseWriter, r *http.Request) {
// CORS headers
Expand All @@ -26,55 +21,68 @@ func Handler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
return
}

// Add serverless flag to context with timeout
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
ctx = context.WithValue(ctx, "serverless", true)
r = r.WithContext(ctx)

// Initialize once
initServerless.Do(func() {
cfg := &config.Config{
Storage: config.StorageConfig{
Driver: "sqlite",
DSN: os.Getenv("DATABASE_URL"),
},
FlowsDir: os.Getenv("FLOWS_DIR"),
}
if cfg.Storage.DSN == "" {
cfg.Storage.DSN = ":memory:"
}
if cfg.FlowsDir != "" {
api.SetFlowsDir(cfg.FlowsDir)
}

_, initErr = api.InitializeDependencies(cfg)
if initErr != nil {
return
}

// Generate handlers once during initialization
mux := http.NewServeMux()
if endpoints := os.Getenv("BEEMFLOW_ENDPOINTS"); endpoints != "" {
filteredOps := api.GetOperationsMapByGroups(strings.Split(endpoints, ","))
api.GenerateHTTPHandlersForOperations(mux, filteredOps)
// Initialize dependencies fresh for each request
// This ensures clean resource management - everything is created
// and destroyed within the request lifecycle

// Determine storage driver and DSN from DATABASE_URL
var driver, dsn string
if databaseURL := os.Getenv("DATABASE_URL"); databaseURL != "" {
if strings.HasPrefix(databaseURL, "postgres://") || strings.HasPrefix(databaseURL, "postgresql://") {
driver = "postgres"
dsn = databaseURL
} else {
api.GenerateHTTPHandlers(mux)
driver = "sqlite"
dsn = databaseURL
}

// Health check endpoint
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"healthy"}`))
})

cachedMux = mux
})
} else {
driver = "sqlite"
dsn = ":memory:"
}

if initErr != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
cfg := &config.Config{
Storage: config.StorageConfig{
Driver: driver,
DSN: dsn,
},
FlowsDir: os.Getenv("FLOWS_DIR"),
Event: &config.EventConfig{
Driver: "memory", // In-memory event bus for serverless
},
}
if cfg.FlowsDir != "" {
api.SetFlowsDir(cfg.FlowsDir)
}

if cachedMux == nil {
http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
// Initialize dependencies with automatic cleanup
cleanup, err := api.InitializeDependencies(cfg)
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
defer cleanup() // Ensure all resources are released when request ends

// Generate handlers
mux := http.NewServeMux()
if endpoints := os.Getenv("BEEMFLOW_ENDPOINTS"); endpoints != "" {
filteredOps := api.GetOperationsMapByGroups(strings.Split(endpoints, ","))
api.GenerateHTTPHandlersForOperations(mux, filteredOps)
} else {
api.GenerateHTTPHandlers(mux)
}

// Health check endpoint
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"healthy"}`))
})

cachedMux.ServeHTTP(w, r)
}
mux.ServeHTTP(w, r)
}
189 changes: 189 additions & 0 deletions api/index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package handler

import (
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestHandler_CORS(t *testing.T) {
// Test OPTIONS request for CORS
req := httptest.NewRequest(http.MethodOptions, "/", nil)
rec := httptest.NewRecorder()

Handler(rec, req)

assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "*", rec.Header().Get("Access-Control-Allow-Origin"))
assert.Equal(t, "GET, POST, PUT, DELETE, OPTIONS", rec.Header().Get("Access-Control-Allow-Methods"))
assert.Equal(t, "Content-Type, Authorization", rec.Header().Get("Access-Control-Allow-Headers"))
}

func TestHandler_HealthCheck(t *testing.T) {
// Set up temporary flows directory
tmpDir := t.TempDir()
oldFlowsDir := os.Getenv("FLOWS_DIR")
os.Setenv("FLOWS_DIR", tmpDir)
defer os.Setenv("FLOWS_DIR", oldFlowsDir)

req := httptest.NewRequest(http.MethodGet, "/healthz", nil)
rec := httptest.NewRecorder()

Handler(rec, req)

assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "application/json", rec.Header().Get("Content-Type"))
assert.JSONEq(t, `{"status":"healthy"}`, rec.Body.String())
}

func TestHandler_WithDatabaseURL(t *testing.T) {
tests := []struct {
name string
databaseURL string
wantStatus int
}{
{
name: "PostgreSQL URL - invalid",
databaseURL: "postgres://user:pass@host:5432/db",
wantStatus: http.StatusInternalServerError, // Can't connect
},
{
name: "PostgreSQL URL with postgresql scheme - invalid",
databaseURL: "postgresql://user:pass@host:5432/db",
wantStatus: http.StatusInternalServerError, // Can't connect
},
{
name: "SQLite URL",
databaseURL: "file:" + t.TempDir() + "/test.db",
wantStatus: http.StatusOK,
},
{
name: "No DATABASE_URL",
databaseURL: "",
wantStatus: http.StatusOK, // defaults to in-memory
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set up environment
oldDB := os.Getenv("DATABASE_URL")
if tt.databaseURL != "" {
os.Setenv("DATABASE_URL", tt.databaseURL)
} else {
os.Unsetenv("DATABASE_URL")
}
defer func() {
if oldDB != "" {
os.Setenv("DATABASE_URL", oldDB)
} else {
os.Unsetenv("DATABASE_URL")
}
}()

tmpDir := t.TempDir()
oldFlowsDir := os.Getenv("FLOWS_DIR")
os.Setenv("FLOWS_DIR", tmpDir)
defer os.Setenv("FLOWS_DIR", oldFlowsDir)

req := httptest.NewRequest(http.MethodGet, "/healthz", nil)
rec := httptest.NewRecorder()

Handler(rec, req)

// Check expected status
assert.Equal(t, tt.wantStatus, rec.Code)
})
}
}

func TestHandler_CleanupOnRequestEnd(t *testing.T) {
// This test verifies that resources are cleaned up after each request
// by making multiple requests and checking they don't interfere

tmpDir := t.TempDir()
oldFlowsDir := os.Getenv("FLOWS_DIR")
os.Setenv("FLOWS_DIR", tmpDir)
defer os.Setenv("FLOWS_DIR", oldFlowsDir)

// Make multiple requests
for i := 0; i < 3; i++ {
req := httptest.NewRequest(http.MethodGet, "/healthz", nil)
rec := httptest.NewRecorder()

Handler(rec, req)

assert.Equal(t, http.StatusOK, rec.Code)
// Each request should work independently
}
}

func TestHandler_ContextTimeout(t *testing.T) {
// Test that context has timeout set
tmpDir := t.TempDir()
oldFlowsDir := os.Getenv("FLOWS_DIR")
os.Setenv("FLOWS_DIR", tmpDir)
defer os.Setenv("FLOWS_DIR", oldFlowsDir)

// We verify context timeout by making a request
// The handler sets a 30-second timeout and serverless=true
req := httptest.NewRequest(http.MethodGet, "/healthz", nil)
rec := httptest.NewRecorder()

Handler(rec, req)

// If we got here without hanging, the timeout is working
assert.Equal(t, http.StatusOK, rec.Code)
}

func TestHandler_EndpointFiltering(t *testing.T) {
// Test BEEMFLOW_ENDPOINTS filtering
tmpDir := t.TempDir()
oldFlowsDir := os.Getenv("FLOWS_DIR")
oldEndpoints := os.Getenv("BEEMFLOW_ENDPOINTS")

os.Setenv("FLOWS_DIR", tmpDir)
os.Setenv("BEEMFLOW_ENDPOINTS", "core,flow")

defer func() {
os.Setenv("FLOWS_DIR", oldFlowsDir)
if oldEndpoints != "" {
os.Setenv("BEEMFLOW_ENDPOINTS", oldEndpoints)
} else {
os.Unsetenv("BEEMFLOW_ENDPOINTS")
}
}()

req := httptest.NewRequest(http.MethodGet, "/healthz", nil)
rec := httptest.NewRecorder()

Handler(rec, req)

// Should still have health endpoint
assert.Equal(t, http.StatusOK, rec.Code)
}

func TestHandler_InitializationError(t *testing.T) {
// Test handling of initialization errors
// Force an error by setting invalid database URL
oldDB := os.Getenv("DATABASE_URL")
os.Setenv("DATABASE_URL", "postgres://invalid:invalid@nonexistent:5432/db")
defer func() {
if oldDB != "" {
os.Setenv("DATABASE_URL", oldDB)
} else {
os.Unsetenv("DATABASE_URL")
}
}()

req := httptest.NewRequest(http.MethodGet, "/healthz", nil)
rec := httptest.NewRecorder()

Handler(rec, req)

// Should return 500 on initialization error
assert.Equal(t, http.StatusInternalServerError, rec.Code)
}
6 changes: 3 additions & 3 deletions cmd/flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ func newServeCmd() *cobra.Command {
}

utils.Info("Starting BeemFlow HTTP server...")
// If stdout is not a terminal (e.g., piped in tests), skip starting the server to avoid blocking
if fi, statErr := os.Stdout.Stat(); statErr == nil && fi.Mode()&os.ModeCharDevice == 0 {
utils.User("flow serve (stub)")
// Skip actual server start in tests
if os.Getenv("BEEMFLOW_TEST") == "1" {
utils.User("flow serve (test mode)")
return
}
if err := beemhttp.StartServer(cfg); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/flow/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func captureStderrExit(f func()) (output string, code int) {
}

func TestMainCommands(t *testing.T) {
// Set test mode to prevent actual server start
os.Setenv("BEEMFLOW_TEST", "1")
defer os.Unsetenv("BEEMFLOW_TEST")

cases := []struct {
args []string
wantsOutput bool
Expand Down
Loading