From a78eb7aaac6e22b4947b486202020b4381bdad7e Mon Sep 17 00:00:00 2001 From: vr-varad Date: Tue, 23 Sep 2025 13:20:52 +0530 Subject: [PATCH 1/7] Fix: Workflow Identity --- dbos/workflow.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index ed153a07..ce86abf9 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -490,13 +490,16 @@ type Workflow[P any, R any] func(ctx DBOSContext, input P) (R, error) type WorkflowFunc func(ctx DBOSContext, input any) (any, error) type workflowOptions struct { - workflowName string - workflowID string - queueName string - applicationVersion string - maxRetries int - deduplicationID string - priority uint + workflowName string + workflowID string + queueName string + applicationVersion string + maxRetries int + deduplicationID string + priority uint + authenticated_user string + assumed_role string + authenticated_roles []string } // WorkflowOption is a functional option for configuring workflow execution parameters. @@ -732,6 +735,9 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt QueueName: params.queueName, DeduplicationID: params.deduplicationID, Priority: int(params.priority), + AuthenticatedUser: params.authenticated_user, + AssumedRole: params.assumed_role, + AuthenticatedRoles: params.authenticated_roles, } // Init status and record child workflow relationship in a single transaction From 4f5612906360f20fa2317b1a9da6fb858d6061fe Mon Sep 17 00:00:00 2001 From: vr-varad Date: Thu, 25 Sep 2025 17:18:26 +0530 Subject: [PATCH 2/7] Fix: Update --- dbos/workflow.go | 21 +++++++++++++++++++++ dbos/workflows_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/dbos/workflow.go b/dbos/workflow.go index ce86abf9..0385eba2 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -549,6 +549,27 @@ func withWorkflowName(name string) WorkflowOption { } } +// Sets the authenticated user for the workflow +func WithAutheticatedUser(user string) WorkflowOption { + return func(p *workflowOptions) { + p.authenticated_user = user + } +} + +// Sets the assumed role for the workflow +func WithAssumedRole(role string) WorkflowOption { + return func(p *workflowOptions) { + p.assumed_role = role + } +} + +// Sets the authenticated role for the workflow +func WithAuthenticatedRoles(roles []string) WorkflowOption { + return func(p *workflowOptions) { + p.authenticated_roles = roles + } +} + // RunWorkflow executes a workflow function with type safety and durability guarantees. // The workflow can be executed immediately or enqueued for later execution based on options. // Returns a typed handle that can be used to wait for completion and retrieve results. diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index be66a7ee..872816d0 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -4131,3 +4131,31 @@ func TestSpecialSteps(t *testing.T) { require.Equal(t, "success", result, "workflow should return success") }) } +func TestWorkflowIdentity(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + handle, err := RunWorkflow( + dbosCtx, + simpleWorkflow, + "test", + WithWorkflowID("my-workflow-id"), + WithAutheticatedUser("user123"), + WithAssumedRole("admin"), + WithAuthenticatedRoles([]string{"reader", "writer"})) + require.NoError(t, err, "failed to start workflow") + + // Retrieve the workflow's status. + status, err := handle.GetStatus() + require.NoError(t, err) + + t.Run("CheckAuthenticatedUser", func(t *testing.T) { + assert.Equal(t, "user123", status.AuthenticatedUser) + }) + + t.Run("CheckAssumedRole", func(t *testing.T) { + assert.Equal(t, "admin", status.AssumedRole) + }) + + t.Run("CheckAuthenticatedRoles", func(t *testing.T) { + assert.Equal(t, []string{"reader", "writer"}, status.AuthenticatedRoles) + }) +} From 7edf2cafb92dd4358f0f5c244b8beb107fc8b342 Mon Sep 17 00:00:00 2001 From: vr-varad Date: Thu, 25 Sep 2025 17:30:42 +0530 Subject: [PATCH 3/7] Fix: Update --- dbos/workflow.go | 2 +- dbos/workflows_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 0385eba2..0a2cfdf4 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -550,7 +550,7 @@ func withWorkflowName(name string) WorkflowOption { } // Sets the authenticated user for the workflow -func WithAutheticatedUser(user string) WorkflowOption { +func WithAuthenticatedUser(user string) WorkflowOption { return func(p *workflowOptions) { p.authenticated_user = user } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 872816d0..3d0fd14c 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -4133,12 +4133,13 @@ func TestSpecialSteps(t *testing.T) { } func TestWorkflowIdentity(t *testing.T) { dbosCtx := setupDBOS(t, true, true) + RegisterWorkflow(dbosCtx, simpleWorkflow) handle, err := RunWorkflow( dbosCtx, simpleWorkflow, "test", WithWorkflowID("my-workflow-id"), - WithAutheticatedUser("user123"), + WithAuthenticatedUser("user123"), WithAssumedRole("admin"), WithAuthenticatedRoles([]string{"reader", "writer"})) require.NoError(t, err, "failed to start workflow") From cf036adc1eb2e25b5957b1f38aaaa0a4abd36d14 Mon Sep 17 00:00:00 2001 From: vr-varad Date: Thu, 25 Sep 2025 18:32:05 +0530 Subject: [PATCH 4/7] Fix: Update --- dbos/system_database.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index aae95837..36222ad4 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -16,6 +16,7 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" ) @@ -489,7 +490,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt input.status.QueueName, input.status.AuthenticatedUser, input.status.AssumedRole, - input.status.AuthenticatedRoles, + pgtype.Array[string]{Elements: input.status.AuthenticatedRoles}, input.status.ExecutorID, applicationVersion, input.status.ApplicationID, From 411eb940d70e1b264b1fee3bd740dc7fce210bd1 Mon Sep 17 00:00:00 2001 From: vr-varad Date: Fri, 26 Sep 2025 09:06:38 +0530 Subject: [PATCH 5/7] Fix: Update --- dbos/system_database.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 36222ad4..1dbc5f77 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -3,6 +3,7 @@ package dbos import ( "context" _ "embed" + "encoding/json" "errors" "fmt" "log/slog" @@ -16,7 +17,6 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" ) @@ -483,6 +483,10 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt var result insertWorkflowResult var timeoutMSResult *int64 var workflowDeadlineEpochMS *int64 + + // Marshal authenticated roles (slice of strings) to JSON for TEXT column + authenticatedRoles, _ := json.Marshal(input.status.AuthenticatedRoles) + err = input.tx.QueryRow(ctx, query, input.status.ID, input.status.Status, @@ -490,7 +494,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt input.status.QueueName, input.status.AuthenticatedUser, input.status.AssumedRole, - pgtype.Array[string]{Elements: input.status.AuthenticatedRoles}, + authenticatedRoles, input.status.ExecutorID, applicationVersion, input.status.ApplicationID, @@ -703,11 +707,12 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( var deduplicationID *string var applicationVersion *string var executorID *string + var authenticatedRoles *string // Build scan arguments dynamically based on loaded columns scanArgs := []any{ &wf.ID, &wf.Status, &wf.Name, &wf.AuthenticatedUser, &wf.AssumedRole, - &wf.AuthenticatedRoles, &executorID, &createdAtMs, + &authenticatedRoles, &executorID, &createdAtMs, &updatedAtMs, &applicationVersion, &wf.ApplicationID, &wf.Attempts, &queueName, &timeoutMs, &deadlineMs, &startedAtMs, &deduplicationID, &wf.Priority, @@ -720,6 +725,12 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( scanArgs = append(scanArgs, &inputString) } + if authenticatedRoles != nil && *authenticatedRoles != "" { + if err := json.Unmarshal([]byte(*authenticatedRoles), &wf.AuthenticatedRoles); err != nil { + return nil, fmt.Errorf("failed to unmarshal authenticated_roles: %w", err) + } + } + err := rows.Scan(scanArgs...) if err != nil { return nil, fmt.Errorf("failed to scan workflow row: %w", err) @@ -1086,13 +1097,16 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st return "", fmt.Errorf("failed to serialize input: %w", err) } + // Marshal authenticated roles (slice of strings) to JSON for TEXT column + authenticatedRoles, _ := json.Marshal(originalWorkflow.AuthenticatedRoles) + _, err = tx.Exec(ctx, insertQuery, forkedWorkflowID, WorkflowStatusEnqueued, originalWorkflow.Name, originalWorkflow.AuthenticatedUser, originalWorkflow.AssumedRole, - originalWorkflow.AuthenticatedRoles, + authenticatedRoles, &appVersion, originalWorkflow.ApplicationID, _DBOS_INTERNAL_QUEUE_NAME, From 14fe7cb74d4e47cc0db945c95d83c968890267e2 Mon Sep 17 00:00:00 2001 From: vr-varad Date: Sat, 27 Sep 2025 12:27:00 +0530 Subject: [PATCH 6/7] Fix: Update --- dbos/system_database.go | 12 ++++++++++-- dbos/workflows_test.go | 2 ++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 6dcfa343..c5c5b2b7 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -488,7 +488,11 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt var workflowDeadlineEpochMS *int64 // Marshal authenticated roles (slice of strings) to JSON for TEXT column - authenticatedRoles, _ := json.Marshal(input.status.AuthenticatedRoles) + authenticatedRoles, err := json.Marshal(input.status.AuthenticatedRoles) + + if err != nil { + return nil, fmt.Errorf("failed to marshal the authenticated roles: %w", err) + } err = input.tx.QueryRow(ctx, query, input.status.ID, @@ -1101,7 +1105,11 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st } // Marshal authenticated roles (slice of strings) to JSON for TEXT column - authenticatedRoles, _ := json.Marshal(originalWorkflow.AuthenticatedRoles) + authenticatedRoles, err := json.Marshal(originalWorkflow.AuthenticatedRoles) + + if err != nil { + return "", fmt.Errorf("failed to marshal the authenticated roles: %w", err) + } _, err = tx.Exec(ctx, insertQuery, forkedWorkflowID, diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index dcc0d7e5..1989ab2f 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log" "reflect" "runtime" "sync" @@ -4146,6 +4147,7 @@ func TestWorkflowIdentity(t *testing.T) { // Retrieve the workflow's status. status, err := handle.GetStatus() + log.Print(status) require.NoError(t, err) t.Run("CheckAuthenticatedUser", func(t *testing.T) { From f2bbb60030ec5b89537e89ae0fe0759e17de3033 Mon Sep 17 00:00:00 2001 From: vr-varad Date: Sat, 27 Sep 2025 12:40:26 +0530 Subject: [PATCH 7/7] Fix: Update --- dbos/system_database.go | 10 +++++----- dbos/workflows_test.go | 2 -- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index c5c5b2b7..6cf659f8 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -732,17 +732,17 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( scanArgs = append(scanArgs, &inputString) } + err := rows.Scan(scanArgs...) + if err != nil { + return nil, fmt.Errorf("failed to scan workflow row: %w", err) + } + if authenticatedRoles != nil && *authenticatedRoles != "" { if err := json.Unmarshal([]byte(*authenticatedRoles), &wf.AuthenticatedRoles); err != nil { return nil, fmt.Errorf("failed to unmarshal authenticated_roles: %w", err) } } - err := rows.Scan(scanArgs...) - if err != nil { - return nil, fmt.Errorf("failed to scan workflow row: %w", err) - } - if queueName != nil && len(*queueName) > 0 { wf.QueueName = *queueName } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 1989ab2f..dcc0d7e5 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log" "reflect" "runtime" "sync" @@ -4147,7 +4146,6 @@ func TestWorkflowIdentity(t *testing.T) { // Retrieve the workflow's status. status, err := handle.GetStatus() - log.Print(status) require.NoError(t, err) t.Run("CheckAuthenticatedUser", func(t *testing.T) {