Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
}

if !ws.Deadline.IsZero() {
result["Deadline"] = ws.Deadline.UTC().UnixMilli()
result["WorkflowDeadlineEpochMS"] = ws.Deadline.UTC().UnixMilli()
} else {
result["Deadline"] = nil
result["WorkflowDeadlineEpochMS"] = nil
}

if !ws.StartedAt.IsZero() {
Expand Down Expand Up @@ -444,6 +444,14 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
"child_workflow_id": step.ChildWorkflowID,
}

// Add timestamps if present
if !step.StartedAt.IsZero() {
formattedStep["started_at_epoch_ms"] = step.StartedAt.UnixMilli()
}
if !step.CompletedAt.IsZero() {
formattedStep["completed_at_epoch_ms"] = step.CompletedAt.UnixMilli()
}

if step.Output != nil {
// If there is a value, it should be a JSON string
jsonOutput, ok := step.Output.(string)
Expand Down
6 changes: 6 additions & 0 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,12 @@ func TestAdminServer(t *testing.T) {
functionName, ok := step["function_name"].(string)
require.True(t, ok, "function_name should be a string for step %d", i)

// Verify timestamps are present
_, hasStartedAt := step["started_at_epoch_ms"]
assert.True(t, hasStartedAt, "Step %d should have started_at_epoch_ms field", i)
_, hasCompletedAt := step["completed_at_epoch_ms"]
assert.True(t, hasCompletedAt, "Step %d should have completed_at_epoch_ms field", i)

t.Logf("Step %d (%s): output=%v, error=%v", i, functionName, step["output"], step["error"])

switch functionName {
Expand Down
21 changes: 21 additions & 0 deletions dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,11 @@ func TestForkWorkflow(t *testing.T) {
forkedWorkflowID := forkedHandle.GetWorkflowID()
assert.Equal(t, customForkedWorkflowID, forkedWorkflowID, "expected forked workflow ID to match")

// Verify forked_from is set
forkedStatus, err := forkedHandle.GetStatus()
require.NoError(t, err, "failed to get forked workflow status")
assert.Equal(t, originalWorkflowID, forkedStatus.ForkedFrom, "expected forked_from to be set to original workflow ID")

forkedResult, err := forkedHandle.GetResult()
require.NoError(t, err, "failed to get result from forked workflow at step %d", step)

Expand Down Expand Up @@ -882,6 +887,17 @@ func TestListWorkflows(t *testing.T) {
require.NoError(t, err, "failed to list all workflows")
assert.GreaterOrEqual(t, len(allWorkflows), 10, "expected at least 10 workflows")

for _, wf := range allWorkflows {
// These fields should exist (may be zero/empty for some workflows)
// Timeout and Deadline are time.Duration and time.Time, so they're always present
_ = wf.Timeout
_ = wf.Deadline
_ = wf.DeduplicationID
_ = wf.Priority
_ = wf.QueuePartitionKey
_ = wf.ForkedFrom
}

// Test 2: Filter by workflow IDs
expectedIDs := workflowIDs[:3]
specificWorkflows, err := client.ListWorkflows(WithWorkflowIDs(expectedIDs))
Expand Down Expand Up @@ -1070,6 +1086,11 @@ func TestGetWorkflowSteps(t *testing.T) {
assert.Nil(t, step.Error, "expected no error in step")
assert.Equal(t, "", step.ChildWorkflowID, "expected no child workflow ID")

// Verify timestamps are present
assert.False(t, step.StartedAt.IsZero(), "expected step to have StartedAt timestamp")
assert.False(t, step.CompletedAt.IsZero(), "expected step to have CompletedAt timestamp")
assert.True(t, step.CompletedAt.After(step.StartedAt) || step.CompletedAt.Equal(step.StartedAt), "expected CompletedAt to be after or equal to StartedAt")

// Verify the output wasn't loaded
require.Nil(t, step.Output, "expected output not to be loaded")

Expand Down
3 changes: 3 additions & 0 deletions dbos/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,9 @@ func (c *conductor) handleListWorkflowsRequest(data []byte, requestID string) er
if req.Body.Status != nil {
opts = append(opts, WithStatus([]WorkflowStatusType{WorkflowStatusType(*req.Body.Status)}))
}
if req.Body.ForkedFrom != nil {
opts = append(opts, WithForkedFrom(*req.Body.ForkedFrom))
}

workflows, err := c.dbosCtx.ListWorkflows(c.dbosCtx, opts...)
if err != nil {
Expand Down
81 changes: 57 additions & 24 deletions dbos/conductor_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type listWorkflowsConductorRequestBody struct {
SortDesc bool `json:"sort_desc"`
LoadInput bool `json:"load_input"`
LoadOutput bool `json:"load_output"`
ForkedFrom *string `json:"forked_from,omitempty"`
}

// listWorkflowsConductorRequest is sent by the conductor to list workflows
Expand All @@ -73,25 +74,28 @@ type listWorkflowsConductorRequest struct {

// listWorkflowsConductorResponseBody represents a single workflow in the list response
type listWorkflowsConductorResponseBody struct {
WorkflowUUID string `json:"WorkflowUUID"`
Status *string `json:"Status,omitempty"`
WorkflowName *string `json:"WorkflowName,omitempty"`
WorkflowClassName *string `json:"WorkflowClassName,omitempty"`
WorkflowConfigName *string `json:"WorkflowConfigName,omitempty"`
AuthenticatedUser *string `json:"AuthenticatedUser,omitempty"`
AssumedRole *string `json:"AssumedRole,omitempty"`
AuthenticatedRoles *string `json:"AuthenticatedRoles,omitempty"`
Input *string `json:"Input,omitempty"`
Output *string `json:"Output,omitempty"`
Error *string `json:"Error,omitempty"`
CreatedAt *string `json:"CreatedAt,omitempty"`
UpdatedAt *string `json:"UpdatedAt,omitempty"`
QueueName *string `json:"QueueName,omitempty"`
QueuePartitionKey *string `json:"QueuePartitionKey,omitempty"`
DeduplicationID *string `json:"DeduplicationID,omitempty"`
Priority *int `json:"Priority,omitempty"`
ApplicationVersion *string `json:"ApplicationVersion,omitempty"`
ExecutorID *string `json:"ExecutorID,omitempty"`
WorkflowUUID string `json:"WorkflowUUID"`
Status *string `json:"Status,omitempty"`
WorkflowName *string `json:"WorkflowName,omitempty"`
WorkflowClassName *string `json:"WorkflowClassName,omitempty"`
WorkflowConfigName *string `json:"WorkflowConfigName,omitempty"`
AuthenticatedUser *string `json:"AuthenticatedUser,omitempty"`
AssumedRole *string `json:"AssumedRole,omitempty"`
AuthenticatedRoles *string `json:"AuthenticatedRoles,omitempty"`
Input *string `json:"Input,omitempty"`
Output *string `json:"Output,omitempty"`
Error *string `json:"Error,omitempty"`
CreatedAt *string `json:"CreatedAt,omitempty"`
UpdatedAt *string `json:"UpdatedAt,omitempty"`
QueueName *string `json:"QueueName,omitempty"`
QueuePartitionKey *string `json:"QueuePartitionKey,omitempty"`
DeduplicationID *string `json:"DeduplicationID,omitempty"`
Priority *int `json:"Priority,omitempty"`
ApplicationVersion *string `json:"ApplicationVersion,omitempty"`
ExecutorID *string `json:"ExecutorID,omitempty"`
WorkflowTimeoutMS *string `json:"WorkflowTimeoutMS,omitempty"`
WorkflowDeadlineEpochMS *string `json:"WorkflowDeadlineEpochMS,omitempty"`
ForkedFrom *string `json:"ForkedFrom,omitempty"`
}

// listWorkflowsConductorResponse is sent in response to list workflows requests
Expand Down Expand Up @@ -195,6 +199,23 @@ func formatListWorkflowsResponseBody(wf WorkflowStatus) listWorkflowsConductorRe
output.ExecutorID = &wf.ExecutorID
}

// Convert timeout to milliseconds string
if wf.Timeout > 0 {
timeoutStr := strconv.FormatInt(wf.Timeout.Milliseconds(), 10)
output.WorkflowTimeoutMS = &timeoutStr
}

// Convert deadline to epoch milliseconds string
if !wf.Deadline.IsZero() {
deadlineStr := strconv.FormatInt(wf.Deadline.UnixMilli(), 10)
output.WorkflowDeadlineEpochMS = &deadlineStr
}

// Copy forked from
if wf.ForkedFrom != "" {
output.ForkedFrom = &wf.ForkedFrom
}

return output
}

Expand All @@ -206,11 +227,13 @@ type listStepsConductorRequest struct {

// workflowStepsConductorResponseBody represents a single workflow step in the list response
type workflowStepsConductorResponseBody struct {
FunctionID int `json:"function_id"`
FunctionName string `json:"function_name"`
Output *string `json:"output,omitempty"`
Error *string `json:"error,omitempty"`
ChildWorkflowID *string `json:"child_workflow_id,omitempty"`
FunctionID int `json:"function_id"`
FunctionName string `json:"function_name"`
Output *string `json:"output,omitempty"`
Error *string `json:"error,omitempty"`
ChildWorkflowID *string `json:"child_workflow_id,omitempty"`
StartedAtEpochMs *string `json:"started_at_epoch_ms,omitempty"`
CompletedAtEpochMs *string `json:"completed_at_epoch_ms,omitempty"`
}

// listStepsConductorResponse is sent in response to list steps requests
Expand Down Expand Up @@ -246,6 +269,16 @@ func formatWorkflowStepsResponseBody(step StepInfo) workflowStepsConductorRespon
output.ChildWorkflowID = &step.ChildWorkflowID
}

// Convert timestamps to epoch milliseconds strings
if !step.StartedAt.IsZero() {
startedAtStr := strconv.FormatInt(step.StartedAt.UnixMilli(), 10)
output.StartedAtEpochMs = &startedAtStr
}
if !step.CompletedAt.IsZero() {
completedAtStr := strconv.FormatInt(step.CompletedAt.UnixMilli(), 10)
output.CompletedAtEpochMs = &completedAtStr
}

return output
}

Expand Down
8 changes: 4 additions & 4 deletions dbos/dbos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
assert.True(t, exists, "dbos_migrations table should exist")

// Verify migration version is 3 (after initial migration, queue partition key migration, and workflow status index migration)
// Verify migration version is 5 (after initial migration, queue partition key migration, workflow status index migration, forked_from migration, and step timestamps migration)
var version int64
var count int
err = sysDB.pool.QueryRow(dbCtx, "SELECT COUNT(*) FROM dbos.dbos_migrations").Scan(&count)
Expand All @@ -260,7 +260,7 @@ func TestConfig(t *testing.T) {

err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version)
require.NoError(t, err)
assert.Equal(t, int64(3), version, "migration version should be 3 (after initial migration, queue partition key migration, and workflow status index migration)")
assert.Equal(t, int64(5), version, "migration version should be 5 (after initial migration, queue partition key migration, workflow status index migration, forked_from migration, and step timestamps migration)")

// Test manual shutdown and recreate
Shutdown(ctx, 1*time.Minute)
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestCustomSystemDBSchema(t *testing.T) {
require.NoError(t, err)
assert.True(t, exists, "dbos_migrations table should exist in custom schema")

// Verify migration version is 3 (after initial migration, queue partition key migration, and workflow status index migration)
// Verify migration version is 5 (after initial migration, queue partition key migration, workflow status index migration, forked_from migration, and step timestamps migration)
var version int64
var count int
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT COUNT(*) FROM %s.dbos_migrations", customSchema)).Scan(&count)
Expand All @@ -468,7 +468,7 @@ func TestCustomSystemDBSchema(t *testing.T) {

err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version)
require.NoError(t, err)
assert.Equal(t, int64(3), version, "migration version should be 3 (after initial migration, queue partition key migration, and workflow status index migration)")
assert.Equal(t, int64(5), version, "migration version should be 5 (after initial migration, queue partition key migration, workflow status index migration, forked_from migration, and step timestamps migration)")
})

// Test workflows for exercising Send/Recv and SetEvent/GetEvent
Expand Down
8 changes: 8 additions & 0 deletions dbos/migrations/4_add_forked_from.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Migration 4: Add forked_from column to workflow_status table
-- This enables tracking workflow fork lineage

ALTER TABLE %s.workflow_status
ADD COLUMN forked_from TEXT;

CREATE INDEX "idx_workflow_status_forked_from" ON %s."workflow_status" ("forked_from");

6 changes: 6 additions & 0 deletions dbos/migrations/5_add_step_timestamps.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Migration 5: Add started_at_epoch_ms and completed_at_epoch_ms columns to operation_outputs table
-- This enables visualization of step duration

ALTER TABLE %s.operation_outputs
ADD COLUMN started_at_epoch_ms BIGINT, ADD COLUMN completed_at_epoch_ms BIGINT;

Loading
Loading