Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
"AssumedRole": ws.AssumedRole,
"AuthenticatedRoles": ws.AuthenticatedRoles,
"Output": ws.Output,
"Error": ws.Error,
"ExecutorID": ws.ExecutorID,
"ApplicationVersion": ws.ApplicationVersion,
"ApplicationID": ws.ApplicationID,
Expand Down Expand Up @@ -169,6 +168,18 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
result["Output"] = string(bytes)
}

if ws.Error != nil {
// Convert error to string first, then marshal as JSON
errStr := ws.Error.Error()
bytes, err := json.Marshal(errStr)
if err != nil {
return nil, fmt.Errorf("failed to marshal error: %w", err)
}
result["Error"] = string(bytes)
} else {
result["Error"] = ""
}

return result, nil
}

Expand Down Expand Up @@ -422,13 +433,37 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
// Transform to snake_case format with function_id and function_name
formattedSteps := make([]map[string]any, len(steps))
for i, step := range steps {
formattedSteps[i] = map[string]any{
formattedStep := map[string]any{
"function_id": step.StepID,
"function_name": step.StepName,
"output": step.Output,
"error": step.Error,
"child_workflow_id": step.ChildWorkflowID,
}

// Marshal Output as JSON string if present
if step.Output != nil && step.Output != "" {
bytes, err := json.Marshal(step.Output)
if err != nil {
ctx.logger.Error("Failed to marshal step output", "error", err)
http.Error(w, fmt.Sprintf("Failed to format step output: %v", err), http.StatusInternalServerError)
return
}
formattedStep["output"] = string(bytes)
}

// Marshal Error as JSON string if present
if step.Error != nil {
// Convert error to string first, then marshal as JSON
errStr := step.Error.Error()
bytes, err := json.Marshal(errStr)
if err != nil {
ctx.logger.Error("Failed to marshal step error", "error", err)
http.Error(w, fmt.Sprintf("Failed to format step error: %v", err), http.StatusInternalServerError)
return
}
formattedStep["error"] = string(bytes)
}

formattedSteps[i] = formattedStep
}

w.Header().Set("Content-Type", "application/json")
Expand Down
169 changes: 169 additions & 0 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
"go.uber.org/goleak"
)

// TestStepResult is a custom struct for testing step outputs
type TestStepResult struct {
Message string `json:"message"`
Count int `json:"count"`
Success bool `json:"success"`
}

func TestAdminServer(t *testing.T) {
defer goleak.VerifyNone(t,
goleak.IgnoreAnyFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"),
Expand Down Expand Up @@ -726,6 +733,168 @@ func TestAdminServer(t *testing.T) {
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
})

t.Run("WorkflowSteps", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(context.Background(), Config{
DatabaseURL: databaseURL,
AppName: "test-app",
AdminServer: true,
})
require.NoError(t, err)

// Test workflow with multiple steps - simpler version that won't fail on serialization
testWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
// Step 1: Return a string
stepResult1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
return "step1-output", nil
}, WithStepName("stringStep"))
if err != nil {
return "", err
}

// Step 2: Return a user-defined struct
stepResult2, err := RunAsStep(dbosCtx, func(ctx context.Context) (TestStepResult, error) {
return TestStepResult{
Message: "structured data",
Count: 100,
Success: true,
}, nil
}, WithStepName("structStep"))
if err != nil {
return "", err
}

// Step 3: Return an error - but we don't abort on error to test error marshaling
_, _ = RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
return "", fmt.Errorf("deliberate error for testing")
}, WithStepName("errorStep"))

// Step 4: Return empty string (to test empty value handling)
stepResult4, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
return "", nil
}, WithStepName("emptyStep"))
if err != nil {
return "", err
}

// Combine results
return fmt.Sprintf("workflow complete: %s, struct(%s,%d,%v), %s", stepResult1, stepResult2.Message, stepResult2.Count, stepResult2.Success, stepResult4), nil
}

RegisterWorkflow(ctx, testWorkflow)

err = Launch(ctx)
require.NoError(t, err)

// Ensure cleanup
defer func() {
if ctx != nil {
Shutdown(ctx, 1*time.Minute)
}
}()

// Give the server a moment to start
time.Sleep(100 * time.Millisecond)

client := &http.Client{Timeout: 5 * time.Second}

// Create and run the workflow
handle, err := RunWorkflow(ctx, testWorkflow, "test-input")
require.NoError(t, err, "Failed to create workflow")

// Wait for workflow to complete
result, err := handle.GetResult()
require.NoError(t, err, "Workflow should complete successfully")
t.Logf("Workflow result: %s", result)

// Call the workflow steps endpoint
workflowID := handle.GetWorkflowID()
endpoint := fmt.Sprintf("http://localhost:%d/workflows/%s/steps", _DEFAULT_ADMIN_SERVER_PORT, workflowID)
req, err := http.NewRequest("GET", endpoint, nil)
require.NoError(t, err, "Failed to create request")

resp, err := client.Do(req)
require.NoError(t, err, "Failed to make request")
defer resp.Body.Close()

assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from steps endpoint")

// Decode the response
var steps []map[string]any
err = json.NewDecoder(resp.Body).Decode(&steps)
require.NoError(t, err, "Failed to decode steps response")

// Should have 4 steps
assert.Equal(t, 4, len(steps), "Expected exactly 4 steps")

// Verify each step's output/error is properly marshaled
for i, step := range steps {
functionName, ok := step["function_name"].(string)
require.True(t, ok, "function_name should be a string for step %d", i)

t.Logf("Step %d (%s): output=%v, error=%v", i, functionName, step["output"], step["error"])

switch functionName {
case "stringStep":
// String output should be marshaled as JSON string
outputStr, ok := step["output"].(string)
require.True(t, ok, "String step output should be a JSON string")

var unmarshaledOutput string
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
require.NoError(t, err, "Failed to unmarshal string step output")
assert.Equal(t, "step1-output", unmarshaledOutput, "String step output should match")

assert.Nil(t, step["error"], "String step should have no error")

case "structStep":
// Struct output should be marshaled as JSON string
outputStr, ok := step["output"].(string)
require.True(t, ok, "Struct step output should be a JSON string")

var unmarshaledOutput TestStepResult
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
require.NoError(t, err, "Failed to unmarshal struct step output")
assert.Equal(t, TestStepResult{
Message: "structured data",
Count: 100,
Success: true,
}, unmarshaledOutput, "Struct step output should match")

assert.Nil(t, step["error"], "Struct step should have no error")

case "errorStep":
// Error step should have error marshaled as JSON string
errorStr, ok := step["error"].(string)
require.True(t, ok, "Error step error should be a JSON string")

var unmarshaledError string
err = json.Unmarshal([]byte(errorStr), &unmarshaledError)
require.NoError(t, err, "Failed to unmarshal error step error")
assert.Contains(t, unmarshaledError, "deliberate error for testing", "Error message should be preserved")

case "emptyStep":
// Empty string might be returned as nil or as an empty JSON string
output := step["output"]
if output == nil {
// Empty string was not included in response (which is fine)
t.Logf("Empty step output was nil (not included)")
} else {
// If it was included, it should be marshaled as JSON string `""`
outputStr, ok := output.(string)
require.True(t, ok, "If present, empty step output should be a JSON string")

var unmarshaledOutput string
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
require.NoError(t, err, "Failed to unmarshal empty step output")
assert.Equal(t, "", unmarshaledOutput, "Empty step output should be empty string")
}

assert.Nil(t, step["error"], "Empty step should have no error")
}
}
})

t.Run("TestDeactivate", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(context.Background(), Config{
Expand Down
12 changes: 8 additions & 4 deletions dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,10 @@ func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error
return *new(R), newStepExecutionError("", "", "step function cannot be nil")
}

// Register the output type for gob encoding
var r R
gob.Register(r)

// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
opts = append(opts, WithStepName(stepName))
Expand Down Expand Up @@ -1490,9 +1494,9 @@ func (c *dbosContext) CancelWorkflow(_ DBOSContext, workflowID string) error {
workflowState, ok := c.Value(workflowStateKey).(*workflowState)
isWithinWorkflow := ok && workflowState != nil
if isWithinWorkflow {
_, err := RunAsStep(c, func(ctx context.Context) (any, error) {
_, err := RunAsStep(c, func(ctx context.Context) (string, error) {
err := c.systemDB.cancelWorkflow(ctx, workflowID)
return nil, err
return "", err
}, WithStepName("DBOS.cancelWorkflow"))
return err
} else {
Expand Down Expand Up @@ -1527,9 +1531,9 @@ func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (Workflow
isWithinWorkflow := ok && workflowState != nil
var err error
if isWithinWorkflow {
_, err = RunAsStep(c, func(ctx context.Context) (any, error) {
_, err = RunAsStep(c, func(ctx context.Context) (string, error) {
err := c.systemDB.resumeWorkflow(ctx, workflowID)
return nil, err
return "", err
}, WithStepName("DBOS.resumeWorkflow"))
} else {
err = c.systemDB.resumeWorkflow(c, workflowID)
Expand Down
107 changes: 107 additions & 0 deletions dbos/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,113 @@ func TestSteps(t *testing.T) {
assert.Equal(t, "MyCustomStep2", steps[1].StepName, "expected second step to have custom name")
assert.Equal(t, 1, steps[1].StepID)
})

t.Run("stepsOutputEncoding", func(t *testing.T) {
// Define user-defined types for testing serialization
type StepInput struct {
Name string `json:"name"`
Count int `json:"count"`
Active bool `json:"active"`
Metadata map[string]string `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
}

type StepOutput struct {
ProcessedName string `json:"processed_name"`
TotalCount int `json:"total_count"`
Success bool `json:"success"`
ProcessedAt time.Time `json:"processed_at"`
Details []string `json:"details"`
}

// Create a step function that accepts StepInput and returns StepOutput
processUserObjectStep := func(_ context.Context, input StepInput) (StepOutput, error) {
// Process the input and create output
output := StepOutput{
ProcessedName: fmt.Sprintf("Processed_%s", input.Name),
TotalCount: input.Count * 2,
Success: input.Active,
ProcessedAt: time.Now(),
Details: []string{"step1", "step2", "step3"},
}

// Verify input was correctly deserialized
if input.Metadata == nil {
return StepOutput{}, fmt.Errorf("metadata map was not properly deserialized")
}

return output, nil
}

// Create a workflow that uses the step with user-defined objects
userObjectWorkflow := func(dbosCtx DBOSContext, workflowInput string) (string, error) {
// Create input for the step
stepInput := StepInput{
Name: workflowInput,
Count: 42,
Active: true,
Metadata: map[string]string{
"key1": "value1",
"key2": "value2",
},
CreatedAt: time.Now(),
}

// Run the step with user-defined input and output
output, err := RunAsStep(dbosCtx, func(ctx context.Context) (StepOutput, error) {
return processUserObjectStep(ctx, stepInput)
})
if err != nil {
return "", fmt.Errorf("step failed: %w", err)
}

// Verify the output was correctly returned
if output.ProcessedName == "" {
return "", fmt.Errorf("output ProcessedName is empty")
}
if output.TotalCount != 84 {
return "", fmt.Errorf("expected TotalCount to be 84, got %d", output.TotalCount)
}
if len(output.Details) != 3 {
return "", fmt.Errorf("expected 3 details, got %d", len(output.Details))
}

return "", nil
}

// Register the workflow
RegisterWorkflow(dbosCtx, userObjectWorkflow)

// Execute the workflow
handle, err := RunWorkflow(dbosCtx, userObjectWorkflow, "TestObject")
require.NoError(t, err, "failed to run workflow with user-defined objects")

// Get the result
_, err = handle.GetResult()
require.NoError(t, err, "failed to get result from workflow")

// Verify the step was recorded
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
require.NoError(t, err, "failed to get workflow steps")
require.Len(t, steps, 1, "expected 1 step")

// Verify step output was properly serialized and stored
step := steps[0]
require.NotNil(t, step.Output, "step output should not be nil")
assert.Nil(t, step.Error)

// Deserialize the output from the database to verify proper encoding
storedOutput, ok := step.Output.(StepOutput)
require.True(t, ok, "failed to cast step output to StepOutput")

// Verify all fields were correctly serialized and deserialized
assert.Equal(t, "Processed_TestObject", storedOutput.ProcessedName, "ProcessedName not correctly serialized")
assert.Equal(t, 84, storedOutput.TotalCount, "TotalCount not correctly serialized")
assert.True(t, storedOutput.Success, "Success flag not correctly serialized")
assert.Len(t, storedOutput.Details, 3, "Details array length incorrect")
assert.Equal(t, []string{"step1", "step2", "step3"}, storedOutput.Details, "Details array not correctly serialized")
assert.False(t, storedOutput.ProcessedAt.IsZero(), "ProcessedAt timestamp should not be zero")
})
}

func TestChildWorkflow(t *testing.T) {
Expand Down
Loading