diff --git a/README.md b/README.md index 39283ec7..b9feadbb 100644 --- a/README.md +++ b/README.md @@ -303,7 +303,7 @@ flow run human_approval.flow.yaml **What happens?** - The flow sends an SMS for approval. -- It pauses until a reply is received (via webhook or manual event). +- It pauses until a reply is received (via HTTP endpoint or manual event). - When the human replies, the flow resumes and prints the result. --- @@ -479,8 +479,8 @@ flow run cfo_daily_cash.flow.yaml ```yaml name: ecommerce_autopilot -on: schedule.interval -every: "1h" +on: schedule.cron +cron: "0 * * * *" # Every hour vars: MIN_MARGIN_PCT: 20 @@ -617,7 +617,7 @@ steps: ✨ **Templating:** `{{…}}` gives you outputs, vars, secrets, helper funcs. -⏳ **Durable waits:** `await_event` pauses until external approval / webhook. +⏳ **Durable waits:** `await_event` pauses until external approval / HTTP event. ⚡ **Parallelism & retries:** `parallel: true` blocks and `retry:` back-offs. @@ -686,7 +686,7 @@ BeemFlow provides **three complementary ways** to integrate with HTTP APIs and e **How it works:** - **Complete HTTP control** - any method, headers, body, authentication - **No assumptions** - you specify exactly what gets sent -- **Perfect for** - REST APIs, webhooks, custom protocols +- **Perfect for** - REST APIs, HTTP endpoints, custom protocols - **Raw power** - handles any HTTP scenario ### 🚀 Pattern 3: MCP Servers (For complex integrations) @@ -725,7 +725,7 @@ BeemFlow provides **three complementary ways** to integrate with HTTP APIs and e | Custom REST API (advanced) | MCP server | `mcp://my-api/search` with caching, retries, etc. | | Database queries | MCP server | `mcp://postgres/query` | | File processing | MCP server | `mcp://filesystem/read` | -| One-off webhook/custom request | Generic HTTP | `http` with custom headers | +| One-off HTTP/custom request | Generic HTTP | `http` with custom headers | ### Testing All Patterns diff --git a/adapter/http_adapter.go b/adapter/http_adapter.go index 7053ff89..a68d8b42 100644 --- a/adapter/http_adapter.go +++ b/adapter/http_adapter.go @@ -10,15 +10,19 @@ import ( "os" "regexp" "strings" - "time" "github.com/awantoch/beemflow/constants" "github.com/awantoch/beemflow/registry" "github.com/awantoch/beemflow/utils" ) -// defaultClient is used for HTTP requests with a timeout to avoid hanging. -var defaultClient = &http.Client{Timeout: 30 * time.Second} +// getHTTPClient returns an HTTP client that respects context deadlines +func getHTTPClient() *http.Client { + return &http.Client{ + // Don't set a timeout here - let the context handle timeouts + // This allows proper context cancellation and deadline handling + } +} // Environment variable pattern for safe parsing var envVarPattern = regexp.MustCompile(`\$env:([A-Za-z_][A-Za-z0-9_]*)`) @@ -148,8 +152,9 @@ func (a *HTTPAdapter) executeHTTPRequest(ctx context.Context, req HTTPRequest) ( } httpReq.Header.Set(constants.HeaderAccept, constants.DefaultJSONAccept) - // Execute request - resp, err := defaultClient.Do(httpReq) + // Execute request with context-aware client + client := getHTTPClient() + resp, err := client.Do(httpReq) if err != nil { return nil, utils.Errorf("HTTP request failed: %w", err) } diff --git a/api/index.go b/api/index.go deleted file mode 100644 index c411b781..00000000 --- a/api/index.go +++ /dev/null @@ -1,88 +0,0 @@ -package handler - -import ( - "context" - "net/http" - "os" - "strings" - "time" - - "github.com/awantoch/beemflow/config" - api "github.com/awantoch/beemflow/core" -) - -// Handler is the entry point for Vercel serverless functions -func Handler(w http.ResponseWriter, r *http.Request) { - // CORS headers - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") - if r.Method == "OPTIONS" { - w.WriteHeader(http.StatusOK) - return - } - - // Add serverless flag to context with timeout - ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) - defer cancel() - ctx = context.WithValue(ctx, "serverless", true) - r = r.WithContext(ctx) - - // Initialize dependencies fresh for each request - // This ensures clean resource management - everything is created - // and destroyed within the request lifecycle - - // Determine storage driver and DSN from DATABASE_URL - var driver, dsn string - if databaseURL := os.Getenv("DATABASE_URL"); databaseURL != "" { - if strings.HasPrefix(databaseURL, "postgres://") || strings.HasPrefix(databaseURL, "postgresql://") { - driver = "postgres" - dsn = databaseURL - } else { - driver = "sqlite" - dsn = databaseURL - } - } else { - driver = "sqlite" - dsn = ":memory:" - } - - cfg := &config.Config{ - Storage: config.StorageConfig{ - Driver: driver, - DSN: dsn, - }, - FlowsDir: os.Getenv("FLOWS_DIR"), - Event: &config.EventConfig{ - Driver: "memory", // In-memory event bus for serverless - }, - } - if cfg.FlowsDir != "" { - api.SetFlowsDir(cfg.FlowsDir) - } - - // Initialize dependencies with automatic cleanup - cleanup, err := api.InitializeDependencies(cfg) - if err != nil { - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - defer cleanup() // Ensure all resources are released when request ends - - // Generate handlers - mux := http.NewServeMux() - if endpoints := os.Getenv("BEEMFLOW_ENDPOINTS"); endpoints != "" { - filteredOps := api.GetOperationsMapByGroups(strings.Split(endpoints, ",")) - api.GenerateHTTPHandlersForOperations(mux, filteredOps) - } else { - api.GenerateHTTPHandlers(mux) - } - - // Health check endpoint - mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(`{"status":"healthy"}`)) - }) - - mux.ServeHTTP(w, r) -} \ No newline at end of file diff --git a/api/index_test.go b/api/index_test.go deleted file mode 100644 index 23e90037..00000000 --- a/api/index_test.go +++ /dev/null @@ -1,189 +0,0 @@ -package handler - -import ( - "net/http" - "net/http/httptest" - "os" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestHandler_CORS(t *testing.T) { - // Test OPTIONS request for CORS - req := httptest.NewRequest(http.MethodOptions, "/", nil) - rec := httptest.NewRecorder() - - Handler(rec, req) - - assert.Equal(t, http.StatusOK, rec.Code) - assert.Equal(t, "*", rec.Header().Get("Access-Control-Allow-Origin")) - assert.Equal(t, "GET, POST, PUT, DELETE, OPTIONS", rec.Header().Get("Access-Control-Allow-Methods")) - assert.Equal(t, "Content-Type, Authorization", rec.Header().Get("Access-Control-Allow-Headers")) -} - -func TestHandler_HealthCheck(t *testing.T) { - // Set up temporary flows directory - tmpDir := t.TempDir() - oldFlowsDir := os.Getenv("FLOWS_DIR") - os.Setenv("FLOWS_DIR", tmpDir) - defer os.Setenv("FLOWS_DIR", oldFlowsDir) - - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) - rec := httptest.NewRecorder() - - Handler(rec, req) - - assert.Equal(t, http.StatusOK, rec.Code) - assert.Equal(t, "application/json", rec.Header().Get("Content-Type")) - assert.JSONEq(t, `{"status":"healthy"}`, rec.Body.String()) -} - -func TestHandler_WithDatabaseURL(t *testing.T) { - tests := []struct { - name string - databaseURL string - wantStatus int - }{ - { - name: "PostgreSQL URL - invalid", - databaseURL: "postgres://user:pass@host:5432/db", - wantStatus: http.StatusInternalServerError, // Can't connect - }, - { - name: "PostgreSQL URL with postgresql scheme - invalid", - databaseURL: "postgresql://user:pass@host:5432/db", - wantStatus: http.StatusInternalServerError, // Can't connect - }, - { - name: "SQLite URL", - databaseURL: "file:" + t.TempDir() + "/test.db", - wantStatus: http.StatusOK, - }, - { - name: "No DATABASE_URL", - databaseURL: "", - wantStatus: http.StatusOK, // defaults to in-memory - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Set up environment - oldDB := os.Getenv("DATABASE_URL") - if tt.databaseURL != "" { - os.Setenv("DATABASE_URL", tt.databaseURL) - } else { - os.Unsetenv("DATABASE_URL") - } - defer func() { - if oldDB != "" { - os.Setenv("DATABASE_URL", oldDB) - } else { - os.Unsetenv("DATABASE_URL") - } - }() - - tmpDir := t.TempDir() - oldFlowsDir := os.Getenv("FLOWS_DIR") - os.Setenv("FLOWS_DIR", tmpDir) - defer os.Setenv("FLOWS_DIR", oldFlowsDir) - - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) - rec := httptest.NewRecorder() - - Handler(rec, req) - - // Check expected status - assert.Equal(t, tt.wantStatus, rec.Code) - }) - } -} - -func TestHandler_CleanupOnRequestEnd(t *testing.T) { - // This test verifies that resources are cleaned up after each request - // by making multiple requests and checking they don't interfere - - tmpDir := t.TempDir() - oldFlowsDir := os.Getenv("FLOWS_DIR") - os.Setenv("FLOWS_DIR", tmpDir) - defer os.Setenv("FLOWS_DIR", oldFlowsDir) - - // Make multiple requests - for i := 0; i < 3; i++ { - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) - rec := httptest.NewRecorder() - - Handler(rec, req) - - assert.Equal(t, http.StatusOK, rec.Code) - // Each request should work independently - } -} - -func TestHandler_ContextTimeout(t *testing.T) { - // Test that context has timeout set - tmpDir := t.TempDir() - oldFlowsDir := os.Getenv("FLOWS_DIR") - os.Setenv("FLOWS_DIR", tmpDir) - defer os.Setenv("FLOWS_DIR", oldFlowsDir) - - // We verify context timeout by making a request - // The handler sets a 30-second timeout and serverless=true - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) - rec := httptest.NewRecorder() - - Handler(rec, req) - - // If we got here without hanging, the timeout is working - assert.Equal(t, http.StatusOK, rec.Code) -} - -func TestHandler_EndpointFiltering(t *testing.T) { - // Test BEEMFLOW_ENDPOINTS filtering - tmpDir := t.TempDir() - oldFlowsDir := os.Getenv("FLOWS_DIR") - oldEndpoints := os.Getenv("BEEMFLOW_ENDPOINTS") - - os.Setenv("FLOWS_DIR", tmpDir) - os.Setenv("BEEMFLOW_ENDPOINTS", "core,flow") - - defer func() { - os.Setenv("FLOWS_DIR", oldFlowsDir) - if oldEndpoints != "" { - os.Setenv("BEEMFLOW_ENDPOINTS", oldEndpoints) - } else { - os.Unsetenv("BEEMFLOW_ENDPOINTS") - } - }() - - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) - rec := httptest.NewRecorder() - - Handler(rec, req) - - // Should still have health endpoint - assert.Equal(t, http.StatusOK, rec.Code) -} - -func TestHandler_InitializationError(t *testing.T) { - // Test handling of initialization errors - // Force an error by setting invalid database URL - oldDB := os.Getenv("DATABASE_URL") - os.Setenv("DATABASE_URL", "postgres://invalid:invalid@nonexistent:5432/db") - defer func() { - if oldDB != "" { - os.Setenv("DATABASE_URL", oldDB) - } else { - os.Unsetenv("DATABASE_URL") - } - }() - - req := httptest.NewRequest(http.MethodGet, "/healthz", nil) - rec := httptest.NewRecorder() - - Handler(rec, req) - - // Should return 500 on initialization error - assert.Equal(t, http.StatusInternalServerError, rec.Code) -} \ No newline at end of file diff --git a/cmd/flow/main.go b/cmd/flow/main.go index 9d074340..2583ba4d 100644 --- a/cmd/flow/main.go +++ b/cmd/flow/main.go @@ -141,11 +141,6 @@ func newServeCmd() *cobra.Command { } utils.Info("Starting BeemFlow HTTP server...") - // Skip actual server start in tests - if os.Getenv("BEEMFLOW_TEST") == "1" { - utils.User("flow serve (test mode)") - return - } if err := beemhttp.StartServer(cfg); err != nil { utils.Error("Failed to start server: %v", err) exit(1) diff --git a/cmd/flow/main_test.go b/cmd/flow/main_test.go index 97fcc006..ba626f0d 100644 --- a/cmd/flow/main_test.go +++ b/cmd/flow/main_test.go @@ -67,15 +67,10 @@ func captureStderrExit(f func()) (output string, code int) { } func TestMainCommands(t *testing.T) { - // Set test mode to prevent actual server start - os.Setenv("BEEMFLOW_TEST", "1") - defer os.Unsetenv("BEEMFLOW_TEST") - cases := []struct { args []string wantsOutput bool }{ - {[]string{"flow", "serve"}, true}, {[]string{"flow", "run"}, true}, {[]string{"flow", "test"}, true}, {[]string{"flow", "convert", "--help"}, true}, @@ -124,22 +119,6 @@ steps: if !strings.Contains(out, "Lint OK") { t.Errorf("expected Lint OK, got %q", out) } - - os.Args = []string{"flow", "flows", "validate", tmpPath} - out = captureOutput(func() { - if err := NewRootCmd().Execute(); err != nil { - log.Printf("Execute failed: %v", err) - t.Errorf("validate command failed: %v", err) - } - }) - t.Logf("validate output: %q", out) - if !strings.Contains(out, "Validation OK") { - t.Errorf("expected Validation OK, got %q", out) - } - - // TODO: Error case tests temporarily disabled due to CLI structure changes - // These tests were failing due to changes in command structure and error handling - // They can be re-enabled and updated once the CLI structure is stabilized } func TestMain_ToolStub(t *testing.T) { @@ -212,6 +191,25 @@ func TestNewServeCmd(t *testing.T) { } } +func TestServeCommand_Help(t *testing.T) { + // Test that serve --help works without starting the server + cmd := NewRootCmd() + cmd.SetArgs([]string{"serve", "--help"}) + + var stdout bytes.Buffer + cmd.SetOut(&stdout) + + err := cmd.Execute() + if err != nil { + t.Errorf("serve --help failed: %v", err) + } + + output := stdout.String() + if !strings.Contains(output, "Start the BeemFlow") { + t.Error("Expected help text to contain server description") + } +} + func TestNewRunCmd(t *testing.T) { cmd := newRunCmd() diff --git a/core/api.go b/core/api.go index 27216cdb..64618c92 100644 --- a/core/api.go +++ b/core/api.go @@ -191,7 +191,7 @@ func tryFindPausedRun(store storage.Storage, execErr error) (uuid.UUID, error) { return uuid.Nil, execErr } - paused, err := store.LoadPausedRuns() + paused, err := store.LoadPausedRuns(context.Background()) if err != nil { return uuid.Nil, execErr } diff --git a/core/api_test.go b/core/api_test.go index e617f11e..bd625ce4 100644 --- a/core/api_test.go +++ b/core/api_test.go @@ -9,6 +9,7 @@ import ( "github.com/awantoch/beemflow/config" "github.com/awantoch/beemflow/model" + "github.com/awantoch/beemflow/storage" "github.com/awantoch/beemflow/utils" "github.com/google/uuid" ) @@ -47,7 +48,9 @@ func TestGraphFlow(t *testing.T) { } func TestStartRun(t *testing.T) { - _, err := StartRun(context.Background(), "dummy", map[string]any{}) + // Use test context with memory storage + ctx := WithStore(context.Background(), storage.NewMemoryStorage()) + _, err := StartRun(ctx, "dummy", map[string]any{}) if err != nil { t.Errorf("StartRun returned error: %v", err) } diff --git a/core/cron.go b/core/cron.go index 789a6be9..c219c8ac 100644 --- a/core/cron.go +++ b/core/cron.go @@ -8,9 +8,11 @@ import ( "net/url" "os/exec" "strings" + "time" "github.com/awantoch/beemflow/model" "github.com/awantoch/beemflow/utils" + "github.com/robfig/cron/v3" ) const cronMarker = "# BeemFlow managed - do not edit" @@ -153,23 +155,124 @@ func (c *CronManager) RemoveAllEntries() error { // extractCronExpression gets cron from flow (reuse existing logic) func extractCronExpression(flow *model.Flow) string { - // Check if triggered by schedule.cron - hasScheduleCron := false + if !hasScheduleCronTrigger(flow) || flow.Cron == "" { + return "" + } + return flow.Cron +} + +// CheckAndExecuteCronFlows checks all flows for cron schedules and executes those that are due +// This is stateless and relies only on the database +func CheckAndExecuteCronFlows(ctx context.Context) (map[string]interface{}, error) { + // List all flows + flows, err := ListFlows(ctx) + if err != nil { + return nil, err + } + + triggered := []string{} + errors := []string{} + checked := 0 + + // Get current time + now := time.Now().UTC() + + // Create cron parser - using standard cron format (5 fields) + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + + for _, flowName := range flows { + flow, err := GetFlow(ctx, flowName) + if err != nil { + errors = append(errors, flowName + ": " + err.Error()) + continue + } + + // Check if flow has schedule.cron trigger + if !hasScheduleCronTrigger(&flow) { + continue + } + + checked++ + + // Parse cron expression + if flow.Cron == "" { + errors = append(errors, flowName + ": missing cron expression") + continue + } + + schedule, err := parser.Parse(flow.Cron) + if err != nil { + errors = append(errors, flowName + ": invalid cron: " + err.Error()) + continue + } + + // Check if we should run now + // We check if the schedule matches within our check window + // System cron typically runs every 5 minutes, so we check a 5-minute window + scheduledTime := shouldRunNowWithTime(schedule, now, 5*time.Minute) + if !scheduledTime.IsZero() { + // Create event with the actual scheduled time to enable proper deduplication + event := map[string]interface{}{ + "trigger": "schedule.cron", + "workflow": flowName, + "timestamp": now.Format(time.RFC3339), + "scheduled_for": scheduledTime.Format(time.RFC3339), // Actual cron time + } + + if _, err := StartRun(ctx, flowName, event); err != nil { + errors = append(errors, flowName + ": failed to start: " + err.Error()) + } else { + triggered = append(triggered, flowName) + utils.Info("Triggered cron workflow: %s for scheduled time: %s", flowName, scheduledTime.Format(time.RFC3339)) + } + } + } + + return map[string]interface{}{ + "status": "completed", + "timestamp": now.Format(time.RFC3339), + "triggered": len(triggered), + "workflows": triggered, + "errors": errors, + "checked": checked, + "total": len(flows), + }, nil +} + +// shouldRunNowWithTime checks if a cron schedule should run within the given window +// Returns the scheduled time if it should run, or zero time if not +// This handles the fact that system cron might not run exactly on time +func shouldRunNowWithTime(schedule cron.Schedule, now time.Time, window time.Duration) time.Time { + // Get the previous scheduled time by looking back from now + // We need to find the most recent scheduled time that should have run + checkFrom := now.Add(-window) + + // Get when it should next run after our check start time + nextRun := schedule.Next(checkFrom) + + // Check if this scheduled time falls within our window + // The scheduled time must be: + // 1. After our check start time (checkFrom) + // 2. Before or at the current time (with 1 minute buffer for early triggers) + if nextRun.After(checkFrom) && nextRun.Before(now.Add(1*time.Minute)) { + // Return the actual scheduled time for deduplication + return nextRun + } + + return time.Time{} // Zero time means don't run +} + +// hasScheduleCronTrigger checks if a flow has schedule.cron in its triggers +func hasScheduleCronTrigger(flow *model.Flow) bool { switch on := flow.On.(type) { case string: - hasScheduleCron = (on == "schedule.cron") + return on == "schedule.cron" case []interface{}: for _, trigger := range on { if str, ok := trigger.(string); ok && str == "schedule.cron" { - hasScheduleCron = true - break + return true } } } - - if !hasScheduleCron || flow.Cron == "" { - return "" - } - - return flow.Cron + return false } \ No newline at end of file diff --git a/core/cron_test.go b/core/cron_test.go index 336063b2..e38c6648 100644 --- a/core/cron_test.go +++ b/core/cron_test.go @@ -46,7 +46,7 @@ func TestShellQuote(t *testing.T) { for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - got := shellQuote(tt.input) + got := ShellQuote(tt.input) assert.Equal(t, tt.expected, got) }) } @@ -166,8 +166,8 @@ func TestCronCommandInjection(t *testing.T) { for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { // Test that dangerous characters are safely escaped - quotedSecret := shellQuote("Authorization: Bearer " + tt.cronSecret) - quotedURL := shellQuote(tt.serverURL + "/cron/" + url.PathEscape(tt.flowName)) + quotedSecret := ShellQuote("Authorization: Bearer " + tt.cronSecret) + quotedURL := ShellQuote(tt.serverURL + "/cron/" + url.PathEscape(tt.flowName)) // Verify that the quoted strings are safe // The single quote escaping should handle all dangerous input @@ -283,8 +283,8 @@ steps: t.Error("Missing triggered count in response") } - if _, ok := response["results"]; !ok { - t.Error("Missing results in response") + if _, ok := response["workflows"]; !ok { + t.Error("Missing workflows in response") } // Note: The new cron system uses storage-based scheduling and async events @@ -399,7 +399,7 @@ func TestCron_ValidationError(t *testing.T) { // Create a workflow WITHOUT schedule.cron trigger testFlow := `name: non_cron_workflow -on: webhook +on: http.request steps: - id: echo @@ -428,6 +428,54 @@ steps: assert.Equal(t, http.StatusBadRequest, w.Code) } +func TestCron_ErrorHandling(t *testing.T) { + // Create temp directory with test workflow + tmpDir, err := os.MkdirTemp("", "cron_error_test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + // Create a workflow that will fail (missing required step) + testFlow := `name: failing_workflow +on: schedule.cron +steps: + - id: fail_step + use: non.existent.tool +` + flowPath := filepath.Join(tmpDir, "failing_workflow.flow.yaml") + err = os.WriteFile(flowPath, []byte(testFlow), 0644) + require.NoError(t, err) + + // Set flows directory + SetFlowsDir(tmpDir) + + // Get cron operation + cronOp, exists := GetOperation("system_cron") + require.True(t, exists) + + // Test the endpoint + req := httptest.NewRequest(http.MethodPost, "/cron", nil) + store := storage.NewMemoryStorage() + req = req.WithContext(WithStore(req.Context(), store)) + w := httptest.NewRecorder() + + cronOp.HTTPHandler(w, req) + + // Check response + assert.Equal(t, http.StatusOK, w.Code) + + var response map[string]interface{} + err = json.Unmarshal(w.Body.Bytes(), &response) + require.NoError(t, err) + + // Should have errors for the failing workflow + if errList, ok := response["errors"].([]interface{}); ok && len(errList) > 0 { + // Good - we got errors as expected + t.Logf("Got expected errors: %v", errList) + } else { + t.Error("Expected errors for failing workflow, but got none") + } +} + func TestCron_Security(t *testing.T) { // Create temp directory tmpDir, err := os.MkdirTemp("", "cron_security") diff --git a/core/operations.go b/core/operations.go index 96263340..3b8983a4 100644 --- a/core/operations.go +++ b/core/operations.go @@ -612,7 +612,7 @@ func init() { RegisterOperation(&OperationDefinition{ ID: "system_cron", Name: "System Cron Trigger", - Description: "Triggers all workflows with schedule.cron (called by Vercel or system cron)", + Description: "Triggers all workflows with schedule.cron (called by system cron)", Group: "system", HTTPMethod: http.MethodPost, HTTPPath: "/cron", @@ -620,7 +620,7 @@ func init() { SkipMCP: true, ArgsType: reflect.TypeOf(EmptyArgs{}), HTTPHandler: func(w http.ResponseWriter, r *http.Request) { - // Verify CRON_SECRET if set (Vercel security) + // Verify CRON_SECRET if set for security if secret := os.Getenv("CRON_SECRET"); secret != "" { auth := r.Header.Get("Authorization") if auth != "Bearer "+secret { @@ -630,80 +630,17 @@ func init() { } ctx := r.Context() - triggeredWorkflows := []string{} - // List all workflows - flows, err := ListFlows(ctx) + // Use the optimized cron checker that respects cron expressions + result, err := CheckAndExecuteCronFlows(ctx) if err != nil { - utils.Error("Failed to list flows: %v", err) - http.Error(w, "Failed to list workflows", http.StatusInternalServerError) + utils.Error("Failed to check cron flows: %v", err) + http.Error(w, "Failed to check workflows", http.StatusInternalServerError) return } - // Early exit if no workflows - if len(flows) == 0 { - response := map[string]interface{}{ - "status": "completed", - "timestamp": time.Now().UTC().Format(time.RFC3339), - "triggered": 0, - "workflows": []string{}, - "results": map[string]string{}, - } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) - return - } - - // Trigger each workflow that has schedule.cron - for _, flowName := range flows { - flow, err := GetFlow(ctx, flowName) - if err != nil { - continue - } - - // Check if workflow has schedule.cron trigger - hasCron := false - switch on := flow.On.(type) { - case string: - hasCron = (on == "schedule.cron") - case []interface{}: - for _, trigger := range on { - if str, ok := trigger.(string); ok && str == "schedule.cron" { - hasCron = true - break - } - } - } - - if !hasCron { - continue - } - - // Trigger the workflow - event := map[string]interface{}{ - "trigger": "schedule.cron", - "workflow": flowName, - "timestamp": time.Now().UTC().Format(time.RFC3339), - } - - if _, err := StartRun(ctx, flowName, event); err != nil { - utils.Error("Failed to trigger %s: %v", flowName, err) - } else { - triggeredWorkflows = append(triggeredWorkflows, flowName) - } - } - - // Response for compatibility - response := map[string]interface{}{ - "status": "completed", - "timestamp": time.Now().UTC().Format(time.RFC3339), - "triggered": len(triggeredWorkflows), - "workflows": triggeredWorkflows, - "results": map[string]string{}, // For backward compatibility - } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) + json.NewEncoder(w).Encode(result) }, }) @@ -719,7 +656,7 @@ func init() { SkipMCP: true, ArgsType: reflect.TypeOf(EmptyArgs{}), HTTPHandler: func(w http.ResponseWriter, r *http.Request) { - // Verify CRON_SECRET if set (Vercel security) + // Verify CRON_SECRET if set for security if secret := os.Getenv("CRON_SECRET"); secret != "" { auth := r.Header.Get("Authorization") if auth != "Bearer "+secret { diff --git a/docs/BEEMFLOW.md b/docs/BEEMFLOW.md index f6f45b9e..b00f3e33 100644 --- a/docs/BEEMFLOW.md +++ b/docs/BEEMFLOW.md @@ -185,14 +185,14 @@ A BeemFlow flow is started by one or more triggers, defined in the `on:` field a - `cli.manual` — Manual trigger from the CLI or API. - `event: ` — Subscribes to a named event topic (e.g. `event: tweet.request`). - `schedule.cron` — Runs on a cron schedule (requires a `cron:` field). -- `schedule.interval` — Runs on a fixed interval (requires an `every:` field). +- `http.request` — Triggered by HTTP request (API endpoints). ### Events - When a flow is triggered by an event, the event payload is available as `.event` in templates. - For scheduled triggers, `.event` is usually empty unless injected by the runner. ### Await Event (`await_event`) -The `await_event` step pauses the flow until a matching event is received. This enables human-in-the-loop, webhook, or external event-driven automations. +The `await_event` step pauses the flow until a matching event is received. This enables human-in-the-loop or external event-driven automations. **Schema:** ```yaml diff --git a/docs/CRON_SETUP.md b/docs/CRON_SETUP.md index 74ee70a7..ba8ac84c 100644 --- a/docs/CRON_SETUP.md +++ b/docs/CRON_SETUP.md @@ -36,21 +36,7 @@ steps: ## Setup Options -### 1. Vercel (Serverless) - -Add to `vercel.json`: -```json -{ - "crons": [{ - "path": "/cron", - "schedule": "*/5 * * * *" - }] -} -``` - -For security, set the `CRON_SECRET` environment variable in Vercel. BeemFlow will automatically verify this secret on incoming cron requests. - -### 2. System Cron (Linux/Mac) +### 1. System Cron (Linux/Mac) Add to crontab: ```bash @@ -62,7 +48,7 @@ Add to crontab: 0 * * * * curl -X POST http://localhost:3333/cron/hourly_sync ``` -### 3. Kubernetes CronJob +### 2. Kubernetes CronJob ```yaml apiVersion: batch/v1 @@ -85,7 +71,7 @@ spec: restartPolicy: OnFailure ``` -### 4. GitHub Actions +### 3. GitHub Actions ```yaml name: Trigger BeemFlow Workflows @@ -101,19 +87,19 @@ jobs: curl -X POST https://your-beemflow-instance.com/cron ``` -### 5. AWS EventBridge / CloudWatch Events +### 4. AWS EventBridge / CloudWatch Events Create a rule that triggers a Lambda function or directly calls your BeemFlow endpoint. ## Auto-Setup (Server Mode) -When running BeemFlow in server mode, it can automatically manage system cron entries: +When running BeemFlow in server mode, it automatically manages system cron entries: ```bash -beemflow serve --auto-cron +beemflow serve ``` -This will: +The server will: 1. Add cron entries for each workflow based on their `cron` field 2. Clean up entries on shutdown 3. Update entries when workflows change diff --git a/docs/SPEC.md b/docs/SPEC.md index 43b4ec6f..91904540 100644 --- a/docs/SPEC.md +++ b/docs/SPEC.md @@ -168,7 +168,7 @@ steps: | Custom REST API (advanced) | MCP Server | `mcp://my-api/search` | Caching, retries, business logic | | Database operations | MCP Server | `mcp://postgres/query` | Stateful connections, complex logic | | File processing | MCP Server | `mcp://filesystem/read` | File system operations | -| One-off webhook/custom request | Generic HTTP | `http` with POST | Single-use, non-reusable requests | +| One-off HTTP/custom request | Generic HTTP | `http` with POST | Single-use, non-reusable requests | --- @@ -239,7 +239,7 @@ steps: **Key characteristics:** - **Complete HTTP control** - any method, headers, body - **No assumptions** - you specify exactly what gets sent -- **Perfect for** - REST APIs, webhooks, custom protocols +- **Perfect for** - REST APIs, HTTP endpoints, custom protocols - **Raw power** - handles any HTTP scenario ### Examples: @@ -298,10 +298,10 @@ steps: #### Webhook Integration ```yaml -- id: send_webhook +- id: send_notification use: http with: - url: "{{ webhook_url }}" + url: "{{ notification_url }}" method: "POST" headers: Content-Type: "application/json" @@ -531,7 +531,7 @@ A BeemFlow flow is started by one or more triggers, defined in the `on:` field a - `cli.manual` — Manual trigger from the CLI or API. - `event: ` — Subscribes to a named event topic (e.g. `event: tweet.request`). - `schedule.cron` — Runs on a cron schedule (requires a `cron:` field). -- `schedule.interval` — Runs on a fixed interval (requires an `every:` field). +- `http.request` — Triggered by HTTP request (API endpoints). **Examples:** @@ -547,9 +547,7 @@ cron: "0 9 * * 1-5" # every weekday at 09:00 ``` ```yaml -on: - - schedule.interval -every: "1h" +on: http.request ``` --- @@ -572,7 +570,7 @@ steps: --- ## Await Event (`await_event`) -The `await_event` step pauses the flow until a matching event is received. This enables human-in-the-loop, webhook, or external event-driven automations. +The `await_event` step pauses the flow until a matching event is received. This enables human-in-the-loop or external event-driven automations. **Schema:** @@ -895,7 +893,7 @@ Flows can pause on `await_event` and resume via `POST /resume/{token}` (HMAC-sig ``` **Why it's powerful:** -- Enables human-in-the-loop, webhook, or external event-driven automations. +- Enables human-in-the-loop or external event-driven automations. - Flows are durable and resumable. --- diff --git a/dsl/dsl.go b/dsl/dsl.go new file mode 100644 index 00000000..651ee30d --- /dev/null +++ b/dsl/dsl.go @@ -0,0 +1,70 @@ +package dsl + +import ( + "encoding/json" + "os" + + "github.com/awantoch/beemflow/docs" + "github.com/awantoch/beemflow/model" + "github.com/santhosh-tekuri/jsonschema/v5" + "gopkg.in/yaml.v3" +) + +// Parse reads a YAML flow file from the given path and unmarshals it into a Flow struct. +func Parse(path string) (*model.Flow, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + return ParseFromString(string(data)) +} + +// ParseFromString unmarshals a YAML string into a Flow struct. +func ParseFromString(yamlStr string) (*model.Flow, error) { + var flow model.Flow + if err := yaml.Unmarshal([]byte(yamlStr), &flow); err != nil { + return nil, err + } + return &flow, nil +} + +// Validate runs JSON-Schema validation against the embedded BeemFlow schema. +func Validate(flow *model.Flow) error { + // Marshal the flow to JSON for validation + jsonBytes, err := json.Marshal(flow) + if err != nil { + return err + } + // Compile the embedded schema + schema, err := jsonschema.CompileString("beemflow.schema.json", docs.BeemflowSchema) + if err != nil { + return err + } + // Unmarshal JSON into a generic interface for validation + var doc any + if err := json.Unmarshal(jsonBytes, &doc); err != nil { + return err + } + // Validate the flow + return schema.Validate(doc) +} + +// Load reads, templates, parses, and validates a flow file in one step. +func Load(path string, vars map[string]any) (*model.Flow, error) { + raw, err := os.ReadFile(path) + if err != nil { + return nil, err + } + rendered, err := Render(string(raw), vars) + if err != nil { + return nil, err + } + flow, err := ParseFromString(rendered) + if err != nil { + return nil, err + } + if err := Validate(flow); err != nil { + return nil, err + } + return flow, nil +} \ No newline at end of file diff --git a/dsl/load.go b/dsl/load.go deleted file mode 100644 index c57ef962..00000000 --- a/dsl/load.go +++ /dev/null @@ -1,27 +0,0 @@ -package dsl - -import ( - "os" - - "github.com/awantoch/beemflow/model" -) - -// Load reads, templates, parses, and validates a flow file in one step. -func Load(path string, vars map[string]any) (*model.Flow, error) { - raw, err := os.ReadFile(path) - if err != nil { - return nil, err - } - rendered, err := Render(string(raw), vars) - if err != nil { - return nil, err - } - flow, err := ParseFromString(rendered) - if err != nil { - return nil, err - } - if err := Validate(flow); err != nil { - return nil, err - } - return flow, nil -} diff --git a/dsl/parse.go b/dsl/parse.go deleted file mode 100644 index 1356bb29..00000000 --- a/dsl/parse.go +++ /dev/null @@ -1,27 +0,0 @@ -package dsl - -import ( - "os" - - "gopkg.in/yaml.v3" - - "github.com/awantoch/beemflow/model" -) - -// Parse reads a YAML flow file from the given path and unmarshals it into a Flow struct. -func Parse(path string) (*model.Flow, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - return ParseFromString(string(data)) -} - -// ParseFromString unmarshals a YAML string into a Flow struct. -func ParseFromString(yamlStr string) (*model.Flow, error) { - var flow model.Flow - if err := yaml.Unmarshal([]byte(yamlStr), &flow); err != nil { - return nil, err - } - return &flow, nil -} diff --git a/dsl/validate.go b/dsl/validate.go deleted file mode 100644 index 50fb2ad8..00000000 --- a/dsl/validate.go +++ /dev/null @@ -1,30 +0,0 @@ -package dsl - -import ( - "encoding/json" - - "github.com/awantoch/beemflow/docs" - "github.com/awantoch/beemflow/model" - "github.com/santhosh-tekuri/jsonschema/v5" -) - -// Validate runs JSON-Schema validation against the embedded BeemFlow schema. -func Validate(flow *model.Flow) error { - // Marshal the flow to JSON for validation - jsonBytes, err := json.Marshal(flow) - if err != nil { - return err - } - // Compile the embedded schema - schema, err := jsonschema.CompileString("beemflow.schema.json", docs.BeemflowSchema) - if err != nil { - return err - } - // Unmarshal JSON into a generic interface for validation - var doc any - if err := json.Unmarshal(jsonBytes, &doc); err != nil { - return err - } - // Validate the flow - return schema.Validate(doc) -} diff --git a/engine/engine.go b/engine/engine.go index 9d6f7650..4a092c4a 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -2,8 +2,11 @@ package engine import ( "context" + "encoding/json" + "fmt" "maps" "regexp" + "sort" "strings" "sync" "time" @@ -26,6 +29,41 @@ type runIDKeyType struct{} var runIDKey = runIDKeyType{} +// generateDeterministicRunID creates a deterministic UUID based on flow name and event data +// This enables deduplication of runs with identical inputs within a time window +func generateDeterministicRunID(flowName string, event map[string]any) uuid.UUID { + // Build raw data for UUID v5 generation + var data []byte + data = append(data, []byte(flowName)...) + + // Add time window (5 minute buckets) to allow same workflow to run again after window + now := time.Now().UTC() + timeBucket := now.Truncate(5 * time.Minute).Unix() + data = append(data, []byte(fmt.Sprintf(":%d", timeBucket))...) + + // Sort map keys for deterministic ordering + keys := make([]string, 0, len(event)) + for k := range event { + keys = append(keys, k) + } + sort.Strings(keys) + + // Add event data in sorted order + for _, k := range keys { + data = append(data, []byte(k)...) + if v, err := json.Marshal(event[k]); err == nil { + data = append(data, v...) + } else { + // Fallback for unmarshalable values + data = append(data, []byte(fmt.Sprintf("%v", event[k]))...) + } + } + + // Generate UUID v5 (deterministic) using SHA1 internally + // uuid.NewSHA1 will hash the raw data with SHA1 + return uuid.NewSHA1(uuid.NameSpaceDNS, data) +} + // Type aliases for better readability and type safety type ( StepInputs = map[string]any @@ -180,6 +218,12 @@ func (e *Engine) Execute(ctx context.Context, flow *model.Flow, event map[string // Setup execution context stepCtx, runID := e.setupExecutionContext(ctx, flow, event) + + // Check if this is a duplicate run + if runID == uuid.Nil { + // Duplicate detected - return empty outputs and no error to signal successful deduplication + return map[string]any{}, nil + } // Execute the flow steps outputs, err := e.executeStepsWithPersistence(ctx, flow, stepCtx, 0, runID) @@ -196,8 +240,22 @@ func (e *Engine) setupExecutionContext(ctx context.Context, flow *model.Flow, ev // Create step context using the new constructor stepCtx := NewStepContext(event, flow.Vars, secretsMap) - // Create and persist the run - runID := uuid.New() + // Create deterministic run ID based on flow name, event data, and time window + runID := generateDeterministicRunID(flow.Name, event) + + // Check if this run already exists (deduplication) + existingRun, err := e.Storage.GetRun(ctx, runID) + if err == nil && existingRun != nil { + // Run already exists, check if it's recent (within 5 minutes) + if time.Since(existingRun.StartedAt) < 5*time.Minute { + // This is a duplicate run within the deduplication window + utils.Info("Duplicate run detected for %s, skipping (existing run: %s)", flow.Name, existingRun.ID) + return stepCtx, uuid.Nil // Return nil ID to signal duplicate + } + // Older run with same ID, generate a new unique ID + runID = uuid.New() + } + run := &model.Run{ ID: runID, FlowName: flow.Name, @@ -392,7 +450,7 @@ func (e *Engine) handleExistingPausedRun(ctx context.Context, token string) { utils.ErrorCtx(ctx, "Failed to mark existing run as skipped: %v", "error", err) } } - if err := e.Storage.DeletePausedRun(token); err != nil { + if err := e.Storage.DeletePausedRun(ctx, token); err != nil { utils.ErrorCtx(ctx, constants.ErrFailedToDeletePausedRun, "error", err) } } @@ -417,7 +475,7 @@ func (e *Engine) registerPausedRun(ctx context.Context, token string, flow *mode } if e.Storage != nil { - if err := e.Storage.SavePausedRun(token, pausedRunToMap(e.waiting[token])); err != nil { + if err := e.Storage.SavePausedRun(ctx, token, pausedRunToMap(e.waiting[token])); err != nil { utils.ErrorCtx(ctx, "Failed to save paused run: %v", "error", err) } } @@ -486,7 +544,7 @@ func (e *Engine) retrieveAndRemovePausedRun(ctx context.Context, token string) * delete(e.waiting, token) if e.Storage != nil { - if err := e.Storage.DeletePausedRun(token); err != nil { + if err := e.Storage.DeletePausedRun(ctx, token); err != nil { utils.ErrorCtx(ctx, constants.ErrFailedToDeletePausedRun, "error", err) } } diff --git a/engine/engine_test.go b/engine/engine_test.go index dcc1209a..c69845c4 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -24,6 +24,113 @@ func TestMain(m *testing.M) { utils.WithCleanDirs(m, ".beemflow", config.DefaultConfigDir, config.DefaultFlowsDir) } +func TestGenerateDeterministicRunID(t *testing.T) { + // Test that the same inputs generate the same UUID + flowName := "test-flow" + event := map[string]any{ + "key1": "value1", + "key2": 42, + "key3": true, + } + + // Generate UUID multiple times with same inputs + id1 := generateDeterministicRunID(flowName, event) + id2 := generateDeterministicRunID(flowName, event) + + // They should be identical + if id1 != id2 { + t.Errorf("Same inputs generated different UUIDs: %s != %s", id1, id2) + } + + // Test that different inputs generate different UUIDs + event2 := map[string]any{ + "key1": "value1", + "key2": 43, // Changed value + "key3": true, + } + + id3 := generateDeterministicRunID(flowName, event2) + if id1 == id3 { + t.Error("Different inputs generated the same UUID") + } + + // Test that different flow names generate different UUIDs + id4 := generateDeterministicRunID("different-flow", event) + if id1 == id4 { + t.Error("Different flow names generated the same UUID") + } + + // Test that order doesn't matter (map keys are sorted) + eventReordered := map[string]any{ + "key3": true, + "key1": "value1", + "key2": 42, + } + + id5 := generateDeterministicRunID(flowName, eventReordered) + if id1 != id5 { + t.Error("Same event with different key order generated different UUIDs") + } + + // Verify it's a valid UUID v5 (has correct version and variant bits) + if id1.Version() != 5 { + t.Errorf("Expected UUID version 5, got %d", id1.Version()) + } + + // Test with empty event + idEmpty := generateDeterministicRunID(flowName, map[string]any{}) + if idEmpty == uuid.Nil { + t.Error("Empty event generated nil UUID") + } + + // Test with complex nested structures + complexEvent := map[string]any{ + "nested": map[string]any{ + "deep": "value", + }, + "array": []any{1, 2, 3}, + } + + idComplex1 := generateDeterministicRunID(flowName, complexEvent) + idComplex2 := generateDeterministicRunID(flowName, complexEvent) + + if idComplex1 != idComplex2 { + t.Error("Complex event generated different UUIDs on repeated calls") + } +} + +func TestGenerateDeterministicRunID_TimeWindow(t *testing.T) { + // This test verifies that UUIDs change after the 5-minute time window + // We can't easily test this without mocking time, but we can verify + // that UUIDs generated at different times are different + + flowName := "test-flow" + event := map[string]any{"key": "value"} + + // Generate first UUID + id1 := generateDeterministicRunID(flowName, event) + + // Sleep a tiny bit to ensure time has changed + time.Sleep(time.Millisecond) + + // Generate second UUID - should still be the same (within 5 min window) + id2 := generateDeterministicRunID(flowName, event) + + // Within the same 5-minute window, UUIDs should be identical + if id1 != id2 { + t.Log("Note: UUIDs differ within time window, this might happen if test runs across minute boundary") + // This is not necessarily an error - it depends on when the test runs + } + + // Verify the UUID is deterministic by regenerating with exact same inputs + id3 := generateDeterministicRunID(flowName, event) + id4 := generateDeterministicRunID(flowName, event) + + if id3 != id4 { + t.Error("Immediate regeneration produced different UUIDs") + } +} + func TestNewEngine(t *testing.T) { e := NewDefaultEngine(context.Background()) if e == nil { diff --git a/event/event.go b/event/event.go index 9bbef846..38d62fca 100644 --- a/event/event.go +++ b/event/event.go @@ -17,6 +17,7 @@ func NewInProcEventBus() *WatermillEventBus { return NewWatermillInMemBus() } + // NewEventBusFromConfig returns an EventBus based on config. Supported: memory (default), nats (with url). // Unknown drivers fail cleanly. See docs/flow.config.schema.json for config schema. func NewEventBusFromConfig(cfg *config.EventConfig) (EventBus, error) { diff --git a/event/watermill_bus.go b/event/watermill_bus.go index d8bec63a..9c10498f 100644 --- a/event/watermill_bus.go +++ b/event/watermill_bus.go @@ -27,6 +27,7 @@ func NewWatermillInMemBus() *WatermillEventBus { return &WatermillEventBus{publisher: ps, subscriber: ps} } + // NewWatermillNATSBUS returns a NATS-backed bus or error if setup fails. func NewWatermillNATSBUS(clusterID, clientID, url string) (*WatermillEventBus, error) { logger := watermill.NewStdLogger(false, false) @@ -80,6 +81,7 @@ func (b *WatermillEventBus) Publish(topic string, payload any) error { } func (b *WatermillEventBus) Subscribe(ctx context.Context, topic string, handler func(payload any)) { + ch, err := b.subscriber.Subscribe(ctx, topic) if err != nil { return diff --git a/go.mod b/go.mod index a5af3f00..4e601602 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/metoro-io/mcp-golang v0.13.0 github.com/nats-io/stan.go v0.10.4 github.com/prometheus/client_golang v1.22.0 + github.com/robfig/cron/v3 v3.0.1 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 diff --git a/go.sum b/go.sum index ca25a1d3..f1fed4b6 100644 --- a/go.sum +++ b/go.sum @@ -207,6 +207,8 @@ github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzM github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/http/http.go b/http/http.go index 1d965a6e..507a6a2d 100644 --- a/http/http.go +++ b/http/http.go @@ -51,7 +51,9 @@ func init() { // StartServer starts the HTTP server with minimal setup - all the heavy lifting // is now done by the unified operations system -func StartServer(cfg *config.Config) error { +// NewHandler creates the HTTP handler with all routes configured. +// This is useful for testing without starting a real server. +func NewHandler(cfg *config.Config) (http.Handler, func(), error) { // Initialize tracing initTracerFromConfig(cfg) @@ -77,6 +79,24 @@ func StartServer(cfg *config.Config) error { // Initialize all dependencies (this could be moved to a separate DI package) cleanup, err := api.InitializeDependencies(cfg) + if err != nil { + return nil, nil, err + } + + // Create wrapped handler with middleware + wrappedMux := otelhttp.NewHandler( + requestIDMiddleware( + metricsMiddleware("root", mux), + ), + "http.root", + ) + + return wrappedMux, cleanup, nil +} + +func StartServer(cfg *config.Config) error { + // Create handler + handler, cleanup, err := NewHandler(cfg) if err != nil { return err } @@ -94,16 +114,8 @@ func StartServer(cfg *config.Config) error { // Determine server address addr := getServerAddress(cfg) - // Create wrapped handler with middleware - wrappedMux := otelhttp.NewHandler( - requestIDMiddleware( - metricsMiddleware("root", mux), - ), - "http.root", - ) - // Start server with graceful shutdown - return startServerWithGracefulShutdown(addr, wrappedMux) + return startServerWithGracefulShutdown(addr, handler) } // getServerAddress determines the server address from config diff --git a/http/http_test.go b/http/http_test.go index c0b34413..9f83acc7 100644 --- a/http/http_test.go +++ b/http/http_test.go @@ -9,7 +9,6 @@ import ( "path/filepath" "strings" "testing" - "time" "github.com/awantoch/beemflow/config" "github.com/awantoch/beemflow/constants" @@ -251,42 +250,25 @@ func TestUpdateRunEvent(t *testing.T) { } func TestHTTPServer_ListRuns(t *testing.T) { - t.Skip("Skipping flaky test that depends on server startup timing") tempConfig := createTestConfig(t) - tempConfig.HTTP = &config.HTTPConfig{Port: 18080} - createTempConfigFile(t, tempConfig) defer os.Remove(constants.ConfigFileName) - // Start server in goroutine with error channel - serverErr := make(chan error, 1) - go func() { - serverErr <- StartServer(tempConfig) - }() - - // Wait for server with retry - var resp *http.Response - var err error - for i := 0; i < 10; i++ { - time.Sleep(500 * time.Millisecond) - - // Check if server failed to start - select { - case sErr := <-serverErr: - if sErr != nil { - t.Fatalf("Server failed to start: %v", sErr) - } - default: - // Server still starting, continue - } - - resp, err = http.Get("http://localhost:18080/runs") - if err == nil { - break - } + // Create handler without starting a real server + handler, cleanup, err := NewHandler(tempConfig) + if err != nil { + t.Fatalf("Failed to create handler: %v", err) } + defer cleanup() + + // Create test server + server := httptest.NewServer(handler) + defer server.Close() + + // Test the endpoint + resp, err := http.Get(server.URL + "/runs") if err != nil { - t.Fatalf("Failed to GET /runs after retries: %v", err) + t.Fatalf("Failed to GET /runs: %v", err) } defer resp.Body.Close() diff --git a/http/serverless.go b/http/serverless.go deleted file mode 100644 index f989b4d1..00000000 --- a/http/serverless.go +++ /dev/null @@ -1,68 +0,0 @@ -package http - -import ( - "net/http" - "os" - "strings" - "sync" - - "github.com/awantoch/beemflow/config" - api "github.com/awantoch/beemflow/core" -) - -var ( - initServerless sync.Once - initErr error -) - -// ServerlessHandler is the minimal Vercel function for BeemFlow -func ServerlessHandler(w http.ResponseWriter, r *http.Request) { - // CORS - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") - if r.Method == "OPTIONS" { - w.WriteHeader(http.StatusOK) - return - } - - // Initialize once - initServerless.Do(func() { - cfg := &config.Config{ - Storage: config.StorageConfig{ - Driver: "sqlite", - DSN: os.Getenv("DATABASE_URL"), - }, - FlowsDir: os.Getenv("FLOWS_DIR"), - } - if cfg.Storage.DSN == "" { - cfg.Storage.DSN = ":memory:" - } - if cfg.FlowsDir != "" { - api.SetFlowsDir(cfg.FlowsDir) - } - _, initErr = api.InitializeDependencies(cfg) - }) - - if initErr != nil { - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - - // Generate handlers - mux := http.NewServeMux() - if endpoints := os.Getenv("BEEMFLOW_ENDPOINTS"); endpoints != "" { - filteredOps := api.GetOperationsMapByGroups(strings.Split(endpoints, ",")) - api.GenerateHTTPHandlersForOperations(mux, filteredOps) - } else { - api.GenerateHTTPHandlers(mux) - } - - // Health check - mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.Write([]byte(`{"status":"healthy"}`)) - }) - - mux.ServeHTTP(w, r) -} diff --git a/http/serverless_test.go b/http/serverless_test.go deleted file mode 100644 index 90f153ae..00000000 --- a/http/serverless_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package http - -import ( - "net/http" - "net/http/httptest" - "testing" - - api "github.com/awantoch/beemflow/core" -) - -// TestServerlessOperationsFiltering demonstrates that the serverless handler -// correctly filters operations by group, and that this approach is future-proof -func TestServerlessOperationsFiltering(t *testing.T) { - tests := []struct { - name string - endpointsEnvVar string - expectedAllowed []string - expectedBlocked []string - }{ - { - name: "no filter - all endpoints", - endpointsEnvVar: "", - expectedAllowed: []string{"/healthz", "/flows", "/runs", "/tools", "/spec"}, - expectedBlocked: []string{}, // None blocked - }, - { - name: "flows only", - endpointsEnvVar: "flows", - expectedAllowed: []string{"/healthz", "/flows", "/validate"}, - expectedBlocked: []string{"/runs", "/tools", "/spec"}, - }, - { - name: "runs only", - endpointsEnvVar: "runs", - expectedAllowed: []string{"/healthz", "/runs"}, - expectedBlocked: []string{"/flows", "/validate", "/tools", "/spec"}, - }, - { - name: "tools only", - endpointsEnvVar: "tools", - expectedAllowed: []string{"/healthz", "/tools"}, - expectedBlocked: []string{"/flows", "/runs", "/validate", "/spec"}, - }, - { - name: "system only", - endpointsEnvVar: "system", - expectedAllowed: []string{"/healthz", "/spec"}, - expectedBlocked: []string{"/flows", "/runs", "/tools", "/validate"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Set environment variable for this test - if tt.endpointsEnvVar != "" { - t.Setenv("BEEMFLOW_ENDPOINTS", tt.endpointsEnvVar) - } - - // Test allowed endpoints - for _, endpoint := range tt.expectedAllowed { - req := httptest.NewRequest("GET", endpoint, nil) - w := httptest.NewRecorder() - - ServerlessHandler(w, req) - - if w.Code == http.StatusNotFound { - t.Errorf("Expected endpoint %s to be allowed, got 404", endpoint) - } - } - - // Test blocked endpoints - for _, endpoint := range tt.expectedBlocked { - req := httptest.NewRequest("GET", endpoint, nil) - w := httptest.NewRecorder() - - ServerlessHandler(w, req) - - if w.Code != http.StatusNotFound { - t.Errorf("Expected endpoint %s to be blocked (404), got status %d", endpoint, w.Code) - } - } - }) - } -} - -// TestOperationsAbstraction demonstrates that our approach works with the -// operations system and is future-proof for new operations -func TestOperationsAbstraction(t *testing.T) { - // Verify that our group filtering works at the operations level - allOps := api.GetAllOperations() - flowsOps := api.GetOperationsMapByGroups([]string{"flows"}) - - // Should have fewer flows operations than total operations - if len(flowsOps) >= len(allOps) { - t.Error("Flows filtering should return subset of operations") - } - - // All flows operations should have the flows group - for _, op := range flowsOps { - if op.Group != "flows" { - t.Errorf("Operation %s should have group 'flows', got '%s'", op.ID, op.Group) - } - } - - // Demonstrate that adding a new operation with a group automatically works - // (This shows why our approach is future-proof - no hardcoded paths!) - // If we were to register a new operation with group="flows", it would automatically - // be included in flows filtering without any changes to serverless code - filteredOps := api.GetOperationsMapByGroups([]string{"flows"}) - - // Show that the filtering logic works for any operation with the right group - hasFlowsOps := false - for _, op := range filteredOps { - if op.Group == "flows" { - hasFlowsOps = true - break - } - } - - if !hasFlowsOps { - t.Error("Should have found flows operations in filtered set") - } - - // This demonstrates the key insight: we filter by operation metadata, - // not by hardcoded HTTP paths. New operations automatically work! - t.Logf("✅ Future-proof: New operations with group='flows' would automatically be included") - t.Logf("✅ No hardcoded paths: Filtering happens at the operations level") - t.Logf("✅ Consistent: Same grouping logic works for CLI, HTTP, and MCP") -} diff --git a/http/vercel.go b/http/vercel.go deleted file mode 100644 index 937bfe37..00000000 --- a/http/vercel.go +++ /dev/null @@ -1,9 +0,0 @@ -package http - -import "net/http" - -// Handler is the entry point for Vercel serverless functions -// It delegates to the integrated ServerlessHandler in this same package -func Handler(w http.ResponseWriter, r *http.Request) { - ServerlessHandler(w, r) -} diff --git a/http/vercel_test.go b/http/vercel_test.go deleted file mode 100644 index 0895de63..00000000 --- a/http/vercel_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package http - -import ( - "net/http" - "net/http/httptest" - "strings" - "testing" -) - -func TestVercelHandler(t *testing.T) { - // Create a test request - req := httptest.NewRequest("GET", "/healthz", nil) - w := httptest.NewRecorder() - - // Call the Vercel handler - Handler(w, req) - - // Check that we get a response (the serverless handler should respond) - if w.Code != http.StatusOK { - t.Errorf("Expected status 200, got %d", w.Code) - } - - // Check that we got JSON content - contentType := w.Header().Get("Content-Type") - if contentType != "application/json" { - t.Errorf("Expected JSON content type, got %s", contentType) - } - - // Check that the response body contains health check data - body := w.Body.String() - if body == "" { - t.Error("Expected non-empty response body") - } -} - -func TestVercelHandlerWithEndpointFilter(t *testing.T) { - // Set environment variable for endpoint filtering - t.Setenv("BEEMFLOW_ENDPOINTS", "system") - - req := httptest.NewRequest("GET", "/healthz", nil) - w := httptest.NewRecorder() - - // This should work (system endpoints allowed) - Handler(w, req) - if w.Code != http.StatusOK { - t.Errorf("Expected system endpoint to be allowed, got status %d", w.Code) - } - - // Test blocked endpoint - req2 := httptest.NewRequest("GET", "/flows", nil) - w2 := httptest.NewRecorder() - - Handler(w2, req2) - if w2.Code != http.StatusNotFound { - t.Errorf("Expected flows endpoint to be blocked, got status %d", w2.Code) - } -} - -func TestRootGreeting(t *testing.T) { - req := httptest.NewRequest("GET", "/", nil) - w := httptest.NewRecorder() - - Handler(w, req) - - if w.Code != http.StatusOK { - t.Fatalf("expected 200, got %d", w.Code) - } - - if body := w.Body.String(); strings.TrimSpace(body) != "Hi, I'm BeemBeem! :D" { - t.Fatalf("unexpected root response: %s", body) - } -} diff --git a/model/model.go b/model/model.go index 08f4c0c6..3b6bb985 100644 --- a/model/model.go +++ b/model/model.go @@ -11,7 +11,6 @@ type Flow struct { Version string `yaml:"version,omitempty" json:"version,omitempty"` On any `yaml:"on" json:"on,omitempty"` Cron string `yaml:"cron,omitempty" json:"cron,omitempty"` // Cron expression for schedule.cron - Every string `yaml:"every,omitempty" json:"every,omitempty"` // Interval for schedule.interval Vars map[string]any `yaml:"vars,omitempty" json:"vars,omitempty"` Steps []Step `yaml:"steps" json:"steps"` Catch []Step `yaml:"catch,omitempty" json:"catch,omitempty"` diff --git a/registry/remote.go b/registry/remote.go index f04c90e0..cbd4cfbf 100644 --- a/registry/remote.go +++ b/registry/remote.go @@ -29,7 +29,18 @@ func NewRemoteRegistry(baseURL, registryName string) *RemoteRegistry { // ListServers fetches and returns all entries from the remote registry func (r *RemoteRegistry) ListServers(ctx context.Context, opts ListOptions) ([]RegistryEntry, error) { - client := &http.Client{Timeout: 30 * time.Second} + // Use a default timeout only if no deadline is set in context + if _, ok := ctx.Deadline(); !ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 30*time.Second) + defer cancel() + } + + // Use http.DefaultClient which respects context deadlines + client := &http.Client{ + // Don't set a client timeout - let the context handle it + // This allows the context deadline to take precedence + } req, err := http.NewRequestWithContext(ctx, "GET", r.BaseURL, nil) if err != nil { diff --git a/storage/memory.go b/storage/memory.go index e42f11c3..558b55ef 100644 --- a/storage/memory.go +++ b/storage/memory.go @@ -76,14 +76,14 @@ func (m *MemoryStorage) ListRuns(ctx context.Context) ([]*model.Run, error) { return out, nil } -func (m *MemoryStorage) SavePausedRun(token string, paused any) error { +func (m *MemoryStorage) SavePausedRun(ctx context.Context, token string, paused any) error { m.mu.Lock() defer m.mu.Unlock() m.paused[token] = paused return nil } -func (m *MemoryStorage) LoadPausedRuns() (map[string]any, error) { +func (m *MemoryStorage) LoadPausedRuns(ctx context.Context) (map[string]any, error) { m.mu.RLock() defer m.mu.RUnlock() out := make(map[string]any, len(m.paused)) @@ -91,7 +91,7 @@ func (m *MemoryStorage) LoadPausedRuns() (map[string]any, error) { return out, nil } -func (m *MemoryStorage) DeletePausedRun(token string) error { +func (m *MemoryStorage) DeletePausedRun(ctx context.Context, token string) error { m.mu.Lock() defer m.mu.Unlock() delete(m.paused, token) diff --git a/storage/postgres.go b/storage/postgres.go index b7ee8206..8a53494d 100644 --- a/storage/postgres.go +++ b/storage/postgres.go @@ -33,8 +33,8 @@ func NewPostgresStorage(dsn string) (*PostgresStorage, error) { return nil, fmt.Errorf("failed to ping postgres database: %w", err) } - // Configure connection pool for serverless - // Keep connections minimal to avoid hanging + // Configure connection pool settings + // Keep connections minimal for efficiency db.SetMaxOpenConns(2) db.SetMaxIdleConns(1) db.SetConnMaxLifetime(30 * time.Second) @@ -201,7 +201,7 @@ func (s *PostgresStorage) ResolveWait(ctx context.Context, token uuid.UUID) (*mo return nil, nil } -func (s *PostgresStorage) SavePausedRun(token string, paused any) error { +func (s *PostgresStorage) SavePausedRun(ctx context.Context, token string, paused any) error { b, err := json.Marshal(paused) if err != nil { return err @@ -224,7 +224,7 @@ func (s *PostgresStorage) SavePausedRun(token string, paused any) error { return err } - _, err = s.db.Exec(` + _, err = s.db.ExecContext(ctx, ` INSERT INTO paused_runs (token, flow, step_idx, step_ctx, outputs) VALUES ($1, $2, $3, $4, $5) ON CONFLICT(token) DO UPDATE SET @@ -236,8 +236,8 @@ ON CONFLICT(token) DO UPDATE SET return err } -func (s *PostgresStorage) LoadPausedRuns() (map[string]any, error) { - rows, err := s.db.Query(`SELECT token, flow, step_idx, step_ctx, outputs FROM paused_runs`) +func (s *PostgresStorage) LoadPausedRuns(ctx context.Context) (map[string]any, error) { + rows, err := s.db.QueryContext(ctx, `SELECT token, flow, step_idx, step_ctx, outputs FROM paused_runs`) if err != nil { return nil, err } @@ -277,8 +277,8 @@ func (s *PostgresStorage) LoadPausedRuns() (map[string]any, error) { return result, nil } -func (s *PostgresStorage) DeletePausedRun(token string) error { - _, err := s.db.Exec(`DELETE FROM paused_runs WHERE token = $1`, token) +func (s *PostgresStorage) DeletePausedRun(ctx context.Context, token string) error { + _, err := s.db.ExecContext(ctx, `DELETE FROM paused_runs WHERE token = $1`, token) return err } diff --git a/storage/sqlite.go b/storage/sqlite.go index 8272471c..0df8655b 100644 --- a/storage/sqlite.go +++ b/storage/sqlite.go @@ -229,7 +229,7 @@ func (s *SqliteStorage) ResolveWait(ctx context.Context, token uuid.UUID) (*mode // PausedRunPersist and helpers -func (s *SqliteStorage) SavePausedRun(token string, paused any) error { +func (s *SqliteStorage) SavePausedRun(ctx context.Context, token string, paused any) error { b, err := json.Marshal(paused) if err != nil { return err @@ -255,7 +255,7 @@ func (s *SqliteStorage) SavePausedRun(token string, paused any) error { persist.RunID = s } } - _, err = s.db.Exec(` + _, err = s.db.ExecContext(ctx, ` INSERT INTO paused_runs (token, flow, step_idx, step_ctx, outputs) VALUES (?, ?, ?, ?, ?) ON CONFLICT(token) DO UPDATE SET flow=excluded.flow, step_idx=excluded.step_idx, step_ctx=excluded.step_ctx, outputs=excluded.outputs @@ -263,8 +263,8 @@ func (s *SqliteStorage) SavePausedRun(token string, paused any) error { return err } -func (s *SqliteStorage) LoadPausedRuns() (map[string]any, error) { - rows, err := s.db.Query(`SELECT token, flow, step_idx, step_ctx, outputs FROM paused_runs`) +func (s *SqliteStorage) LoadPausedRuns(ctx context.Context) (map[string]any, error) { + rows, err := s.db.QueryContext(ctx, `SELECT token, flow, step_idx, step_ctx, outputs FROM paused_runs`) if err != nil { return nil, err } @@ -301,8 +301,8 @@ func (s *SqliteStorage) LoadPausedRuns() (map[string]any, error) { return result, nil } -func (s *SqliteStorage) DeletePausedRun(token string) error { - _, err := s.db.Exec(`DELETE FROM paused_runs WHERE token=?`, token) +func (s *SqliteStorage) DeletePausedRun(ctx context.Context, token string) error { + _, err := s.db.ExecContext(ctx, `DELETE FROM paused_runs WHERE token=?`, token) return err } diff --git a/storage/storage.go b/storage/storage.go index 318c95c3..5069a1ae 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -16,8 +16,8 @@ type Storage interface { RegisterWait(ctx context.Context, token uuid.UUID, wakeAt *int64) error ResolveWait(ctx context.Context, token uuid.UUID) (*model.Run, error) ListRuns(ctx context.Context) ([]*model.Run, error) - SavePausedRun(token string, paused any) error - LoadPausedRuns() (map[string]any, error) - DeletePausedRun(token string) error + SavePausedRun(ctx context.Context, token string, paused any) error + LoadPausedRuns(ctx context.Context) (map[string]any, error) + DeletePausedRun(ctx context.Context, token string) error DeleteRun(ctx context.Context, id uuid.UUID) error } diff --git a/storage/storage_test.go b/storage/storage_test.go index 5bfd4eb2..561c673b 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -138,12 +138,12 @@ func TestMemoryStorage_AllOperations(t *testing.T) { "stepName": "paused_step", } - err = storage.SavePausedRun("pause_token", pausedData) + err = storage.SavePausedRun(context.Background(), "pause_token", pausedData) if err != nil { t.Fatalf("SavePausedRun failed: %v", err) } - pausedRuns, err := storage.LoadPausedRuns() + pausedRuns, err := storage.LoadPausedRuns(context.Background()) if err != nil { t.Fatalf("LoadPausedRuns failed: %v", err) } @@ -155,12 +155,12 @@ func TestMemoryStorage_AllOperations(t *testing.T) { } // Test DeletePausedRun - err = storage.DeletePausedRun("pause_token") + err = storage.DeletePausedRun(context.Background(), "pause_token") if err != nil { t.Fatalf("DeletePausedRun failed: %v", err) } - pausedRuns, err = storage.LoadPausedRuns() + pausedRuns, err = storage.LoadPausedRuns(context.Background()) if err != nil { t.Fatalf("LoadPausedRuns after delete failed: %v", err) } @@ -207,12 +207,12 @@ func TestSqliteStorage_AllOperations(t *testing.T) { "stepName": "paused_step", } - err = storage.SavePausedRun("sqlite_pause_token", pausedData) + err = storage.SavePausedRun(context.Background(), "sqlite_pause_token", pausedData) if err != nil { t.Fatalf("SavePausedRun failed: %v", err) } - pausedRuns, err := storage.LoadPausedRuns() + pausedRuns, err := storage.LoadPausedRuns(context.Background()) if err != nil { t.Fatalf("LoadPausedRuns failed: %v", err) } @@ -221,12 +221,12 @@ func TestSqliteStorage_AllOperations(t *testing.T) { } // Test DeletePausedRun - err = storage.DeletePausedRun("sqlite_pause_token") + err = storage.DeletePausedRun(context.Background(), "sqlite_pause_token") if err != nil { t.Fatalf("DeletePausedRun failed: %v", err) } - pausedRuns, err = storage.LoadPausedRuns() + pausedRuns, err = storage.LoadPausedRuns(context.Background()) if err != nil { t.Fatalf("LoadPausedRuns after delete failed: %v", err) } @@ -685,20 +685,20 @@ func TestSqliteStorage_SavePausedRun_ErrorCases(t *testing.T) { "data": map[string]any{"nested": "value"}, } - err = storage.SavePausedRun("test_token", pausedData) + err = storage.SavePausedRun(context.Background(), "test_token", pausedData) if err != nil { t.Fatalf("SavePausedRun failed: %v", err) } // Test SavePausedRun with nil data - err = storage.SavePausedRun("nil_token", nil) + err = storage.SavePausedRun(context.Background(), "nil_token", nil) if err != nil { t.Fatalf("SavePausedRun with nil data failed: %v", err) } // Test SavePausedRun with empty data emptyData := map[string]any{} - err = storage.SavePausedRun("empty_token", emptyData) + err = storage.SavePausedRun(context.Background(), "empty_token", emptyData) if err != nil { t.Fatalf("SavePausedRun with empty data failed: %v", err) } @@ -707,19 +707,19 @@ func TestSqliteStorage_SavePausedRun_ErrorCases(t *testing.T) { invalidData := map[string]any{ "channel": make(chan int), // channels can't be marshaled to JSON } - err = storage.SavePausedRun("invalid_token", invalidData) + err = storage.SavePausedRun(context.Background(), "invalid_token", invalidData) if err == nil { t.Error("Expected error for data that can't be marshaled to JSON") } // Test SavePausedRun with empty token - err = storage.SavePausedRun("", pausedData) + err = storage.SavePausedRun(context.Background(), "", pausedData) if err != nil { t.Fatalf("SavePausedRun with empty token failed: %v", err) } // Verify saved runs - pausedRuns, err := storage.LoadPausedRuns() + pausedRuns, err := storage.LoadPausedRuns(context.Background()) if err != nil { t.Fatalf("LoadPausedRuns failed: %v", err) } diff --git a/vercel.json b/vercel.json deleted file mode 100644 index ea024fb9..00000000 --- a/vercel.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "$schema": "https://openapi.vercel.sh/vercel.json", - "build": { - "env": { - "GO_BUILD_FLAGS": "-ldflags '-s -w'" - } - }, - "crons": [ - { - "path": "/cron", - "schedule": "*/30 * * * *" - } - ], - "routes": [ - { "src": "(?:.*)", "dest": "api/index.go" } - ] -} \ No newline at end of file