From 341ef1df3dfcf4fd18c599db90bcb9b4003c0fe1 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 30 Sep 2025 12:36:05 -0700 Subject: [PATCH 1/4] admin server: fix error formatting (to string) --- dbos/admin_server.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index ebe67544..1ca49426 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -117,7 +117,6 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) { "AssumedRole": ws.AssumedRole, "AuthenticatedRoles": ws.AuthenticatedRoles, "Output": ws.Output, - "Error": ws.Error, "ExecutorID": ws.ExecutorID, "ApplicationVersion": ws.ApplicationVersion, "ApplicationID": ws.ApplicationID, @@ -169,6 +168,18 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) { result["Output"] = string(bytes) } + if ws.Error != nil { + // Convert error to string first, then marshal as JSON + errStr := ws.Error.Error() + bytes, err := json.Marshal(errStr) + if err != nil { + return nil, fmt.Errorf("failed to marshal error: %w", err) + } + result["Error"] = string(bytes) + } else { + result["Error"] = "" + } + return result, nil } From 7e0141fff7ff52be90731b382a5e5a75acc90358 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 30 Sep 2025 12:36:26 -0700 Subject: [PATCH 2/4] admin server: marshall output/error to string --- dbos/admin_server.go | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index 1ca49426..4271c756 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -433,13 +433,37 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { // Transform to snake_case format with function_id and function_name formattedSteps := make([]map[string]any, len(steps)) for i, step := range steps { - formattedSteps[i] = map[string]any{ + formattedStep := map[string]any{ "function_id": step.StepID, "function_name": step.StepName, - "output": step.Output, - "error": step.Error, "child_workflow_id": step.ChildWorkflowID, } + + // Marshal Output as JSON string if present + if step.Output != nil && step.Output != "" { + bytes, err := json.Marshal(step.Output) + if err != nil { + ctx.logger.Error("Failed to marshal step output", "error", err) + http.Error(w, fmt.Sprintf("Failed to format step output: %v", err), http.StatusInternalServerError) + return + } + formattedStep["output"] = string(bytes) + } + + // Marshal Error as JSON string if present + if step.Error != nil { + // Convert error to string first, then marshal as JSON + errStr := step.Error.Error() + bytes, err := json.Marshal(errStr) + if err != nil { + ctx.logger.Error("Failed to marshal step error", "error", err) + http.Error(w, fmt.Sprintf("Failed to format step error: %v", err), http.StatusInternalServerError) + return + } + formattedStep["error"] = string(bytes) + } + + formattedSteps[i] = formattedStep } w.Header().Set("Content-Type", "application/json") From 19994c9a642e7db175194c356290ae364d36e16d Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 30 Sep 2025 12:37:03 -0700 Subject: [PATCH 3/4] admin server: test WorkflowSteps --- dbos/admin_server_test.go | 169 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index e9fb6c35..45d416c7 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -17,6 +17,13 @@ import ( "go.uber.org/goleak" ) +// TestStepResult is a custom struct for testing step outputs +type TestStepResult struct { + Message string `json:"message"` + Count int `json:"count"` + Success bool `json:"success"` +} + func TestAdminServer(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreAnyFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"), @@ -726,6 +733,168 @@ func TestAdminServer(t *testing.T) { assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'") }) + t.Run("WorkflowSteps", func(t *testing.T) { + resetTestDatabase(t, databaseURL) + ctx, err := NewDBOSContext(context.Background(), Config{ + DatabaseURL: databaseURL, + AppName: "test-app", + AdminServer: true, + }) + require.NoError(t, err) + + // Test workflow with multiple steps - simpler version that won't fail on serialization + testWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + // Step 1: Return a string + stepResult1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return "step1-output", nil + }, WithStepName("stringStep")) + if err != nil { + return "", err + } + + // Step 2: Return a user-defined struct + stepResult2, err := RunAsStep(dbosCtx, func(ctx context.Context) (TestStepResult, error) { + return TestStepResult{ + Message: "structured data", + Count: 100, + Success: true, + }, nil + }, WithStepName("structStep")) + if err != nil { + return "", err + } + + // Step 3: Return an error - but we don't abort on error to test error marshaling + _, _ = RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return "", fmt.Errorf("deliberate error for testing") + }, WithStepName("errorStep")) + + // Step 4: Return empty string (to test empty value handling) + stepResult4, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { + return "", nil + }, WithStepName("emptyStep")) + if err != nil { + return "", err + } + + // Combine results + return fmt.Sprintf("workflow complete: %s, struct(%s,%d,%v), %s", stepResult1, stepResult2.Message, stepResult2.Count, stepResult2.Success, stepResult4), nil + } + + RegisterWorkflow(ctx, testWorkflow) + + err = Launch(ctx) + require.NoError(t, err) + + // Ensure cleanup + defer func() { + if ctx != nil { + Shutdown(ctx, 1*time.Minute) + } + }() + + // Give the server a moment to start + time.Sleep(100 * time.Millisecond) + + client := &http.Client{Timeout: 5 * time.Second} + + // Create and run the workflow + handle, err := RunWorkflow(ctx, testWorkflow, "test-input") + require.NoError(t, err, "Failed to create workflow") + + // Wait for workflow to complete + result, err := handle.GetResult() + require.NoError(t, err, "Workflow should complete successfully") + t.Logf("Workflow result: %s", result) + + // Call the workflow steps endpoint + workflowID := handle.GetWorkflowID() + endpoint := fmt.Sprintf("http://localhost:%d/workflows/%s/steps", _DEFAULT_ADMIN_SERVER_PORT, workflowID) + req, err := http.NewRequest("GET", endpoint, nil) + require.NoError(t, err, "Failed to create request") + + resp, err := client.Do(req) + require.NoError(t, err, "Failed to make request") + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from steps endpoint") + + // Decode the response + var steps []map[string]any + err = json.NewDecoder(resp.Body).Decode(&steps) + require.NoError(t, err, "Failed to decode steps response") + + // Should have 4 steps + assert.Equal(t, 4, len(steps), "Expected exactly 4 steps") + + // Verify each step's output/error is properly marshaled + for i, step := range steps { + functionName, ok := step["function_name"].(string) + require.True(t, ok, "function_name should be a string for step %d", i) + + t.Logf("Step %d (%s): output=%v, error=%v", i, functionName, step["output"], step["error"]) + + switch functionName { + case "stringStep": + // String output should be marshaled as JSON string + outputStr, ok := step["output"].(string) + require.True(t, ok, "String step output should be a JSON string") + + var unmarshaledOutput string + err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput) + require.NoError(t, err, "Failed to unmarshal string step output") + assert.Equal(t, "step1-output", unmarshaledOutput, "String step output should match") + + assert.Nil(t, step["error"], "String step should have no error") + + case "structStep": + // Struct output should be marshaled as JSON string + outputStr, ok := step["output"].(string) + require.True(t, ok, "Struct step output should be a JSON string") + + var unmarshaledOutput TestStepResult + err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput) + require.NoError(t, err, "Failed to unmarshal struct step output") + assert.Equal(t, TestStepResult{ + Message: "structured data", + Count: 100, + Success: true, + }, unmarshaledOutput, "Struct step output should match") + + assert.Nil(t, step["error"], "Struct step should have no error") + + case "errorStep": + // Error step should have error marshaled as JSON string + errorStr, ok := step["error"].(string) + require.True(t, ok, "Error step error should be a JSON string") + + var unmarshaledError string + err = json.Unmarshal([]byte(errorStr), &unmarshaledError) + require.NoError(t, err, "Failed to unmarshal error step error") + assert.Contains(t, unmarshaledError, "deliberate error for testing", "Error message should be preserved") + + case "emptyStep": + // Empty string might be returned as nil or as an empty JSON string + output := step["output"] + if output == nil { + // Empty string was not included in response (which is fine) + t.Logf("Empty step output was nil (not included)") + } else { + // If it was included, it should be marshaled as JSON string `""` + outputStr, ok := output.(string) + require.True(t, ok, "If present, empty step output should be a JSON string") + + var unmarshaledOutput string + err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput) + require.NoError(t, err, "Failed to unmarshal empty step output") + assert.Equal(t, "", unmarshaledOutput, "Empty step output should be empty string") + } + + assert.Nil(t, step["error"], "Empty step should have no error") + } + } + }) + t.Run("TestDeactivate", func(t *testing.T) { resetTestDatabase(t, databaseURL) ctx, err := NewDBOSContext(context.Background(), Config{ From 0ad397ae29da4cc85113d36014be29d564c96516 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 30 Sep 2025 12:37:41 -0700 Subject: [PATCH 4/4] fix steps output type gob registration and add a test --- dbos/workflow.go | 12 +++-- dbos/workflows_test.go | 107 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 4 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index a7d60130..b7bbaf20 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1041,6 +1041,10 @@ func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error return *new(R), newStepExecutionError("", "", "step function cannot be nil") } + // Register the output type for gob encoding + var r R + gob.Register(r) + // Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() opts = append(opts, WithStepName(stepName)) @@ -1490,9 +1494,9 @@ func (c *dbosContext) CancelWorkflow(_ DBOSContext, workflowID string) error { workflowState, ok := c.Value(workflowStateKey).(*workflowState) isWithinWorkflow := ok && workflowState != nil if isWithinWorkflow { - _, err := RunAsStep(c, func(ctx context.Context) (any, error) { + _, err := RunAsStep(c, func(ctx context.Context) (string, error) { err := c.systemDB.cancelWorkflow(ctx, workflowID) - return nil, err + return "", err }, WithStepName("DBOS.cancelWorkflow")) return err } else { @@ -1527,9 +1531,9 @@ func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (Workflow isWithinWorkflow := ok && workflowState != nil var err error if isWithinWorkflow { - _, err = RunAsStep(c, func(ctx context.Context) (any, error) { + _, err = RunAsStep(c, func(ctx context.Context) (string, error) { err := c.systemDB.resumeWorkflow(ctx, workflowID) - return nil, err + return "", err }, WithStepName("DBOS.resumeWorkflow")) } else { err = c.systemDB.resumeWorkflow(c, workflowID) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 73eeb547..c48421fa 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -567,6 +567,113 @@ func TestSteps(t *testing.T) { assert.Equal(t, "MyCustomStep2", steps[1].StepName, "expected second step to have custom name") assert.Equal(t, 1, steps[1].StepID) }) + + t.Run("stepsOutputEncoding", func(t *testing.T) { + // Define user-defined types for testing serialization + type StepInput struct { + Name string `json:"name"` + Count int `json:"count"` + Active bool `json:"active"` + Metadata map[string]string `json:"metadata"` + CreatedAt time.Time `json:"created_at"` + } + + type StepOutput struct { + ProcessedName string `json:"processed_name"` + TotalCount int `json:"total_count"` + Success bool `json:"success"` + ProcessedAt time.Time `json:"processed_at"` + Details []string `json:"details"` + } + + // Create a step function that accepts StepInput and returns StepOutput + processUserObjectStep := func(_ context.Context, input StepInput) (StepOutput, error) { + // Process the input and create output + output := StepOutput{ + ProcessedName: fmt.Sprintf("Processed_%s", input.Name), + TotalCount: input.Count * 2, + Success: input.Active, + ProcessedAt: time.Now(), + Details: []string{"step1", "step2", "step3"}, + } + + // Verify input was correctly deserialized + if input.Metadata == nil { + return StepOutput{}, fmt.Errorf("metadata map was not properly deserialized") + } + + return output, nil + } + + // Create a workflow that uses the step with user-defined objects + userObjectWorkflow := func(dbosCtx DBOSContext, workflowInput string) (string, error) { + // Create input for the step + stepInput := StepInput{ + Name: workflowInput, + Count: 42, + Active: true, + Metadata: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + CreatedAt: time.Now(), + } + + // Run the step with user-defined input and output + output, err := RunAsStep(dbosCtx, func(ctx context.Context) (StepOutput, error) { + return processUserObjectStep(ctx, stepInput) + }) + if err != nil { + return "", fmt.Errorf("step failed: %w", err) + } + + // Verify the output was correctly returned + if output.ProcessedName == "" { + return "", fmt.Errorf("output ProcessedName is empty") + } + if output.TotalCount != 84 { + return "", fmt.Errorf("expected TotalCount to be 84, got %d", output.TotalCount) + } + if len(output.Details) != 3 { + return "", fmt.Errorf("expected 3 details, got %d", len(output.Details)) + } + + return "", nil + } + + // Register the workflow + RegisterWorkflow(dbosCtx, userObjectWorkflow) + + // Execute the workflow + handle, err := RunWorkflow(dbosCtx, userObjectWorkflow, "TestObject") + require.NoError(t, err, "failed to run workflow with user-defined objects") + + // Get the result + _, err = handle.GetResult() + require.NoError(t, err, "failed to get result from workflow") + + // Verify the step was recorded + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + require.NoError(t, err, "failed to get workflow steps") + require.Len(t, steps, 1, "expected 1 step") + + // Verify step output was properly serialized and stored + step := steps[0] + require.NotNil(t, step.Output, "step output should not be nil") + assert.Nil(t, step.Error) + + // Deserialize the output from the database to verify proper encoding + storedOutput, ok := step.Output.(StepOutput) + require.True(t, ok, "failed to cast step output to StepOutput") + + // Verify all fields were correctly serialized and deserialized + assert.Equal(t, "Processed_TestObject", storedOutput.ProcessedName, "ProcessedName not correctly serialized") + assert.Equal(t, 84, storedOutput.TotalCount, "TotalCount not correctly serialized") + assert.True(t, storedOutput.Success, "Success flag not correctly serialized") + assert.Len(t, storedOutput.Details, 3, "Details array length incorrect") + assert.Equal(t, []string{"step1", "step2", "step3"}, storedOutput.Details, "Details array not correctly serialized") + assert.False(t, storedOutput.ProcessedAt.IsZero(), "ProcessedAt timestamp should not be zero") + }) } func TestChildWorkflow(t *testing.T) {