From ec8dff8f6e26eb77457349148f9f8c46e14b5336 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:09:53 -0800 Subject: [PATCH 01/11] fork contexts include queue runner --- dbos/dbos.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbos/dbos.go b/dbos/dbos.go index 93803cd..828d949 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -205,6 +205,7 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext { applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, + queueRunner: dbosCtx.queueRunner, } childCtx.launched.Store(launched) return childCtx @@ -233,6 +234,7 @@ func WithoutCancel(ctx DBOSContext) DBOSContext { applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, + queueRunner: dbosCtx.queueRunner, } childCtx.launched.Store(launched) return childCtx @@ -260,6 +262,7 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C applicationVersion: dbosCtx.applicationVersion, executorID: dbosCtx.executorID, applicationID: dbosCtx.applicationID, + queueRunner: dbosCtx.queueRunner, } childCtx.launched.Store(launched) return childCtx, cancelFunc From d5aa8b07f12086afe4dcab13ff91e188be7a5ad7 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:15:54 -0800 Subject: [PATCH 02/11] schema changes + account for partitioned key during dequeue + getQueuePartitions --- dbos/migrations/2_add_queue_partition_key.sql | 7 + dbos/system_database.go | 133 +++++++++++++----- 2 files changed, 102 insertions(+), 38 deletions(-) create mode 100644 dbos/migrations/2_add_queue_partition_key.sql diff --git a/dbos/migrations/2_add_queue_partition_key.sql b/dbos/migrations/2_add_queue_partition_key.sql new file mode 100644 index 0000000..06e9c93 --- /dev/null +++ b/dbos/migrations/2_add_queue_partition_key.sql @@ -0,0 +1,7 @@ +-- Migration 2: Add queue_partition_key column to workflow_status table +-- This enables partitioned queues where workflows can be distributed across +-- dynamically created queue partitions with separate concurrency limits per partition. + +ALTER TABLE %s.workflow_status +ADD COLUMN queue_partition_key TEXT; + diff --git a/dbos/system_database.go b/dbos/system_database.go index 8e753dc..4fbd035 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -66,6 +66,7 @@ type systemDatabase interface { // Queues dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error) clearQueueAssignment(ctx context.Context, workflowID string) (bool, error) + getQueuePartitions(ctx context.Context, queueName string) ([]string, error) // Garbage collection garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error @@ -125,6 +126,9 @@ func createDatabaseIfNotExists(ctx context.Context, pool *pgxpool.Pool, logger * //go:embed migrations/1_initial_dbos_schema.sql var migration1SQL string +//go:embed migrations/2_add_queue_partition_key.sql +var migration2SQL string + type migrationFile struct { version int64 sql string @@ -150,18 +154,21 @@ const ( ) func runMigrations(pool *pgxpool.Pool, schema string) error { - // Process the migration SQL with fmt.Sprintf (22 schema placeholders) + // Process the migration SQL with fmt.Sprintf sanitizedSchema := pgx.Identifier{schema}.Sanitize() - migrationSQL := fmt.Sprintf(migration1SQL, + migration1SQLProcessed := fmt.Sprintf(migration1SQL, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema, sanitizedSchema) + migration2SQLProcessed := fmt.Sprintf(migration2SQL, sanitizedSchema) + // Build migrations list with processed SQL migrations := []migrationFile{ - {version: 1, sql: migrationSQL}, + {version: 1, sql: migration1SQLProcessed}, + {version: 2, sql: migration2SQLProcessed}, } // Begin transaction for atomic migration execution @@ -452,6 +459,11 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt deduplicationID = &input.status.DeduplicationID } + var queuePartitionKey *string + if len(input.status.QueuePartitionKey) > 0 { + queuePartitionKey = &input.status.QueuePartitionKey + } + query := fmt.Sprintf(`INSERT INTO %s.workflow_status ( workflow_uuid, status, @@ -470,17 +482,18 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt workflow_deadline_epoch_ms, inputs, deduplication_id, - priority - ) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) + priority, + queue_partition_key + ) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) ON CONFLICT (workflow_uuid) DO UPDATE SET recovery_attempts = CASE - WHEN EXCLUDED.status != $19 THEN workflow_status.recovery_attempts + 1 + WHEN EXCLUDED.status != $20 THEN workflow_status.recovery_attempts + 1 ELSE workflow_status.recovery_attempts END, updated_at = EXCLUDED.updated_at, executor_id = CASE - WHEN EXCLUDED.status = $20 THEN workflow_status.executor_id + WHEN EXCLUDED.status = $21 THEN workflow_status.executor_id ELSE EXCLUDED.executor_id END RETURNING recovery_attempts, status, name, queue_name, workflow_timeout_ms, workflow_deadline_epoch_ms`, pgx.Identifier{s.schema}.Sanitize()) @@ -515,6 +528,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt input.status.Input, // encoded input (already *string) deduplicationID, input.status.Priority, + queuePartitionKey, WorkflowStatusEnqueued, WorkflowStatusEnqueued, ).Scan( @@ -614,7 +628,7 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( "workflow_uuid", "status", "name", "authenticated_user", "assumed_role", "authenticated_roles", "executor_id", "created_at", "updated_at", "application_version", "application_id", "recovery_attempts", "queue_name", "workflow_timeout_ms", "workflow_deadline_epoch_ms", "started_at_epoch_ms", - "deduplication_id", "priority", + "deduplication_id", "priority", "queue_partition_key", } if input.loadOutput { @@ -717,6 +731,7 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( var applicationVersion *string var executorID *string var authenticatedRoles *string + var queuePartitionKey *string // Build scan arguments dynamically based on loaded columns scanArgs := []any{ @@ -724,7 +739,7 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( &authenticatedRoles, &executorID, &createdAtMs, &updatedAtMs, &applicationVersion, &wf.ApplicationID, &wf.Attempts, &queueName, &timeoutMs, - &deadlineMs, &startedAtMs, &deduplicationID, &wf.Priority, + &deadlineMs, &startedAtMs, &deduplicationID, &wf.Priority, &queuePartitionKey, } if input.loadOutput { @@ -761,6 +776,10 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( wf.DeduplicationID = *deduplicationID } + if queuePartitionKey != nil && len(*queuePartitionKey) > 0 { + wf.QueuePartitionKey = *queuePartitionKey + } + // Convert milliseconds to time.Time wf.CreatedAt = time.Unix(0, createdAtMs*int64(time.Millisecond)) wf.UpdatedAt = time.Unix(0, updatedAtMs*int64(time.Millisecond)) @@ -2152,6 +2171,7 @@ type dequeueWorkflowsInput struct { queue WorkflowQueue executorID string applicationVersion string + queuePartitionKey string } func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error) { @@ -2182,10 +2202,13 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu AND status != $2 AND started_at_epoch_ms > $3`, pgx.Identifier{s.schema}.Sanitize()) - err := tx.QueryRow(ctx, limiterQuery, - input.queue.Name, - WorkflowStatusEnqueued, - cutoffTimeMs).Scan(&numRecentQueries) + limiterArgs := []any{input.queue.Name, WorkflowStatusEnqueued, cutoffTimeMs} + if len(input.queuePartitionKey) > 0 { + limiterQuery += ` AND queue_partition_key = $4` + limiterArgs = append(limiterArgs, input.queuePartitionKey) + } + + err := tx.QueryRow(ctx, limiterQuery, limiterArgs...).Scan(&numRecentQueries) if err != nil { return nil, fmt.Errorf("failed to query rate limiter: %w", err) } @@ -2203,10 +2226,16 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu pendingQuery := fmt.Sprintf(` SELECT executor_id, COUNT(*) as task_count FROM %s.workflow_status - WHERE queue_name = $1 AND status = $2 - GROUP BY executor_id`, pgx.Identifier{s.schema}.Sanitize()) + WHERE queue_name = $1 AND status = $2`, pgx.Identifier{s.schema}.Sanitize()) + + pendingArgs := []any{input.queue.Name, WorkflowStatusPending} + if len(input.queuePartitionKey) > 0 { + pendingQuery += ` AND queue_partition_key = $3` + pendingArgs = append(pendingArgs, input.queuePartitionKey) + } + pendingQuery += ` GROUP BY executor_id` - rows, err := tx.Query(ctx, pendingQuery, input.queue.Name, WorkflowStatusPending) + rows, err := tx.Query(ctx, pendingQuery, pendingArgs...) if err != nil { return nil, fmt.Errorf("failed to query pending workflows: %w", err) } @@ -2257,6 +2286,27 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu } // Build the query to select workflows for dequeueing + var query string + queryArgs := []any{input.queue.Name, WorkflowStatusEnqueued, input.applicationVersion} + query = fmt.Sprintf(` + SELECT workflow_uuid + FROM %s.workflow_status + WHERE queue_name = $1 + AND status = $2 + AND (application_version = $3 OR application_version IS NULL)`, pgx.Identifier{s.schema}.Sanitize()) + + // Add partition key filter if provided + if len(input.queuePartitionKey) > 0 { + query += ` AND queue_partition_key = $4` + queryArgs = append(queryArgs, input.queuePartitionKey) + } + + if input.queue.PriorityEnabled { + query += ` ORDER BY priority ASC, created_at ASC` + } else { + query += ` ORDER BY created_at ASC` + } + // Use SKIP LOCKED when no global concurrency is set to avoid blocking, // otherwise use NOWAIT to ensure consistent view across processes skipLocks := input.queue.GlobalConcurrency == nil @@ -2266,34 +2316,14 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu } else { lockClause = "FOR UPDATE NOWAIT" } - - var query string - if input.queue.PriorityEnabled { - query = fmt.Sprintf(` - SELECT workflow_uuid - FROM %s.workflow_status - WHERE queue_name = $1 - AND status = $2 - AND (application_version = $3 OR application_version IS NULL) - ORDER BY priority ASC, created_at ASC - %s`, pgx.Identifier{s.schema}.Sanitize(), lockClause) - } else { - query = fmt.Sprintf(` - SELECT workflow_uuid - FROM %s.workflow_status - WHERE queue_name = $1 - AND status = $2 - AND (application_version = $3 OR application_version IS NULL) - ORDER BY created_at ASC - %s`, pgx.Identifier{s.schema}.Sanitize(), lockClause) - } + query += fmt.Sprintf(" %s", lockClause) if maxTasks >= 0 { query += fmt.Sprintf(" LIMIT %d", int(maxTasks)) } // Execute the query to get workflow IDs - rows, err := tx.Query(ctx, query, input.queue.Name, WorkflowStatusEnqueued, input.applicationVersion) + rows, err := tx.Query(ctx, query, queryArgs...) if err != nil { return nil, fmt.Errorf("failed to query enqueued workflows: %w", err) } @@ -2390,6 +2420,33 @@ func (s *sysDB) clearQueueAssignment(ctx context.Context, workflowID string) (bo return commandTag.RowsAffected() > 0, nil } +// getQueuePartitions returns all unique partition keys for enqueued workflows in a queue. +func (s *sysDB) getQueuePartitions(ctx context.Context, queueName string) ([]string, error) { + query := fmt.Sprintf(` + SELECT DISTINCT queue_partition_key + FROM %s.workflow_status + WHERE queue_name = $1 + AND status = $2 + AND queue_partition_key IS NOT NULL`, pgx.Identifier{s.schema}.Sanitize()) + + rows, err := s.pool.Query(ctx, query, queueName, WorkflowStatusEnqueued) + if err != nil { + return nil, fmt.Errorf("failed to query queue partitions: %w", err) + } + defer rows.Close() + + var partitions []string + for rows.Next() { + var partitionKey string + if err := rows.Scan(&partitionKey); err != nil { + return nil, fmt.Errorf("failed to scan partition key: %w", err) + } + partitions = append(partitions, partitionKey) + } + + return partitions, nil +} + /*******************************/ /******* UTILS ********/ /*******************************/ From 300d92eabecf97a5641b673c38dfa552766f625f Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:17:57 -0800 Subject: [PATCH 03/11] queue runner logic --- dbos/queue.go | 95 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 22 deletions(-) diff --git a/dbos/queue.go b/dbos/queue.go index d0b4dc6..d9832b9 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -32,6 +32,7 @@ type WorkflowQueue struct { PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration + PartitionQueue bool `json:"partitionQueue,omitempty"` // Enable partitioned queue mode } // QueueOption is a functional option for configuring a workflow queue @@ -77,6 +78,16 @@ func WithMaxTasksPerIteration(maxTasks int) QueueOption { } } +// WithPartitionQueue enables partitioned queue mode. +// When enabled, workflows can be enqueued with a partition key, and each partition +// has its own concurrency limits. This allows distributing work across dynamically +// created queue partitions. +func WithPartitionQueue() QueueOption { + return func(q *WorkflowQueue) { + q.PartitionQueue = true + } +} + // NewWorkflowQueue creates a new workflow queue with the specified name and configuration options. // The queue must be created before workflows can be enqueued to it using the WithQueue option in RunWorkflow. // Queues provide controlled execution with support for concurrency limits, priority scheduling, and rate limiting. @@ -171,6 +182,15 @@ func (qr *queueRunner) listQueues() []WorkflowQueue { return queues } +// getQueue returns the queue with the given name from the registry. +// Returns a pointer to the queue if found, or nil if the queue does not exist. +func (qr *queueRunner) getQueue(queueName string) *WorkflowQueue { + if queue, exists := qr.workflowQueueRegistry[queueName]; exists { + return &queue + } + return nil +} + func (qr *queueRunner) run(ctx *dbosContext) { pollingInterval := qr.baseInterval @@ -178,31 +198,33 @@ func (qr *queueRunner) run(ctx *dbosContext) { hasBackoffError := false // Iterate through all queues in the registry - for queueName, queue := range qr.workflowQueueRegistry { - // Call DequeueWorkflows for each queue - dequeuedWorkflows, err := retryWithResult(ctx, func() ([]dequeuedWorkflow, error) { - return ctx.systemDB.dequeueWorkflows(ctx, dequeueWorkflowsInput{ - queue: queue, - executorID: ctx.executorID, - applicationVersion: ctx.applicationVersion, - }) - }, withRetrierLogger(qr.logger)) - if err != nil { - if pgErr, ok := err.(*pgconn.PgError); ok { - switch pgErr.Code { - case pgerrcode.SerializationFailure: - hasBackoffError = true - case pgerrcode.LockNotAvailable: - hasBackoffError = true - } - } else { - qr.logger.Error("Error dequeuing workflows from queue", "queue_name", queueName, "error", err) + for _, queue := range qr.workflowQueueRegistry { + // Build list of partition keys to dequeue from + // Default to empty string for non-partitioned queues + partitionKeys := []string{""} + if queue.PartitionQueue { + partitions, err := retryWithResult(ctx, func() ([]string, error) { + return ctx.systemDB.getQueuePartitions(ctx, queue.Name) + }, withRetrierLogger(qr.logger)) + if err != nil { + qr.logger.Error("Error getting queue partitions", "queue_name", queue.Name, "error", err) + continue + } + partitionKeys = partitions + } + + // Dequeue from each partition (or once for non-partitioned queues) + var dequeuedWorkflows []dequeuedWorkflow + for _, partitionKey := range partitionKeys { + workflows, shouldContinue := qr.dequeueWorkflows(ctx, queue, partitionKey, &hasBackoffError) + if shouldContinue { + continue } - continue + dequeuedWorkflows = append(dequeuedWorkflows, workflows...) } if len(dequeuedWorkflows) > 0 { - qr.logger.Debug("Dequeued workflows from queue", "queue_name", queueName, "workflows", dequeuedWorkflows) + qr.logger.Debug("Dequeued workflows from queue", "queue_name", queue.Name, "workflows", len(dequeuedWorkflows)) } for _, workflow := range dequeuedWorkflows { // Find the workflow in the registry @@ -225,7 +247,7 @@ func (qr *queueRunner) run(ctx *dbosContext) { } // Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type - _, err = registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id)) + _, err := registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id)) if err != nil { qr.logger.Error("Error running queued workflow", "error", err) } @@ -256,3 +278,32 @@ func (qr *queueRunner) run(ctx *dbosContext) { } } } + +// dequeueWorkflows dequeues workflows from a specific partition and handles errors. +// Returns the dequeued workflows and a boolean indicating whether to continue to the next iteration. +func (qr *queueRunner) dequeueWorkflows(ctx *dbosContext, queue WorkflowQueue, partitionKey string, hasBackoffError *bool) ([]dequeuedWorkflow, bool) { + dequeuedWorkflows, err := retryWithResult(ctx, func() ([]dequeuedWorkflow, error) { + return ctx.systemDB.dequeueWorkflows(ctx, dequeueWorkflowsInput{ + queue: queue, + executorID: ctx.executorID, + applicationVersion: ctx.applicationVersion, + queuePartitionKey: partitionKey, + }) + }, withRetrierLogger(qr.logger)) + + if err != nil { + if pgErr, ok := err.(*pgconn.PgError); ok { + switch pgErr.Code { + case pgerrcode.SerializationFailure: + *hasBackoffError = true + case pgerrcode.LockNotAvailable: + *hasBackoffError = true + } + } else { + qr.logger.Error("Error dequeuing workflows from queue", "queue_name", queue.Name, "partition_key", partitionKey, "error", err) + } + return nil, true // Indicate to continue to next iteration + } + + return dequeuedWorkflows, false // Success, don't continue +} From 01878ad451ebef24a0d6ab130b9608a27892147f Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:19:14 -0800 Subject: [PATCH 04/11] allow client and RunWorkflow to pass a partition key, + validate queue parameters --- dbos/client.go | 22 ++++++++++++++++++++++ dbos/workflow.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/dbos/client.go b/dbos/client.go index 5bcaed7..798a935 100644 --- a/dbos/client.go +++ b/dbos/client.go @@ -106,6 +106,15 @@ func WithEnqueueTimeout(timeout time.Duration) EnqueueOption { } } +// WithEnqueueQueuePartitionKey sets the queue partition key for partitioned queues. +// When a queue is partitioned, workflows with the same partition key are processed +// with separate concurrency limits per partition. +func WithEnqueueQueuePartitionKey(partitionKey string) EnqueueOption { + return func(opts *enqueueOptions) { + opts.queuePartitionKey = partitionKey + } +} + type enqueueOptions struct { workflowName string workflowID string @@ -114,6 +123,7 @@ type enqueueOptions struct { priority uint workflowTimeout time.Duration workflowInput any + queuePartitionKey string } // EnqueueWorkflow enqueues a workflow to a named queue for deferred execution. @@ -134,6 +144,16 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu opt(params) } + // Validate partition key is not provided without queue name + if len(params.queuePartitionKey) > 0 && len(queueName) == 0 { + return nil, newWorkflowExecutionError("", fmt.Errorf("partition key provided but queue name is missing")) + } + + // Validate partition key and deduplication ID are not both provided (they are incompatible) + if len(params.queuePartitionKey) > 0 && len(params.deduplicationID) > 0 { + return nil, newWorkflowExecutionError("", fmt.Errorf("partition key and deduplication ID cannot be used together")) + } + workflowID := params.workflowID if workflowID == "" { workflowID = uuid.New().String() @@ -160,6 +180,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu QueueName: queueName, DeduplicationID: params.deduplicationID, Priority: int(params.priority), + QueuePartitionKey: params.queuePartitionKey, } uncancellableCtx := WithoutCancel(dbosCtx) @@ -205,6 +226,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu // - WithEnqueueDeduplicationID: Deduplication identifier for idempotent enqueuing // - WithEnqueuePriority: Execution priority // - WithEnqueueTimeout: Maximum execution time for the workflow +// - WithEnqueueQueuePartitionKey: Queue partition key for partitioned queues // // Returns a typed workflow handle that can be used to check status and retrieve results. // The handle uses polling to check workflow completion since the execution is asynchronous. diff --git a/dbos/workflow.go b/dbos/workflow.go index f0996bf..0132634 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -53,6 +53,7 @@ type WorkflowStatus struct { DeduplicationID string `json:"deduplication_id,omitempty"` // Queue deduplication identifier Input any `json:"input,omitempty"` // Input parameters passed to the workflow Priority int `json:"priority,omitempty"` // Queue execution priority (lower numbers have higher priority) + QueuePartitionKey string `json:"queue_partition_key,omitempty"` // Queue partition key for partitioned queues } // workflowState holds the runtime state for a workflow execution @@ -583,6 +584,7 @@ type workflowOptions struct { authenticated_user string assumed_role string authenticated_roles []string + queuePartitionKey string } // WorkflowOption is a functional option for configuring workflow execution parameters. @@ -625,6 +627,15 @@ func WithPriority(priority uint) WorkflowOption { } } +// WithQueuePartitionKey sets the queue partition key for partitioned queues. +// When a queue is partitioned, workflows with the same partition key are processed +// with separate concurrency limits per partition. +func WithQueuePartitionKey(partitionKey string) WorkflowOption { + return func(p *workflowOptions) { + p.queuePartitionKey = partitionKey + } +} + // An internal option we use to map the reflection function name to the registration options. func withWorkflowName(name string) WorkflowOption { return func(p *workflowOptions) { @@ -790,6 +801,30 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt params.workflowName = registeredWorkflow.Name } + // Validate partition key is not provided without queue name + if len(params.queuePartitionKey) > 0 && len(params.queueName) == 0 { + return nil, newWorkflowExecutionError("", fmt.Errorf("partition key provided but queue name is missing")) + } + + // Validate partition key and deduplication ID are not both provided (they are incompatible) + if len(params.queuePartitionKey) > 0 && len(params.deduplicationID) > 0 { + return nil, newWorkflowExecutionError("", fmt.Errorf("partition key and deduplication ID cannot be used together")) + } + + // Validate queue exists if provided + if len(params.queueName) > 0 { + queue := c.queueRunner.getQueue(params.queueName) + if queue == nil { + return nil, newWorkflowExecutionError("", fmt.Errorf("queue %s does not exist", params.queueName)) + } + // Validate queue partition key if provided + if len(params.queuePartitionKey) > 0 { + if !queue.PartitionQueue { + return nil, newWorkflowExecutionError("", fmt.Errorf("queue %s is not a partitioned queue, but a partition key was provided", params.queueName)) + } + } + } + // Check if we are within a workflow (and thus a child workflow) parentWorkflowState, ok := c.Value(workflowStateKey).(*workflowState) isChildWorkflow := ok && parentWorkflowState != nil @@ -885,6 +920,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt AuthenticatedUser: params.authenticated_user, AssumedRole: params.assumed_role, AuthenticatedRoles: params.authenticated_roles, + QueuePartitionKey: params.queuePartitionKey, } var earlyReturnPollingHandle *workflowPollingHandle[any] From 2af70a752d8666b4cd11dc7d1924ace972d66ceb Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:21:43 -0800 Subject: [PATCH 05/11] update migration test --- dbos/dbos_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbos/dbos_test.go b/dbos/dbos_test.go index ee9b605..58803ce 100644 --- a/dbos/dbos_test.go +++ b/dbos/dbos_test.go @@ -260,7 +260,7 @@ func TestConfig(t *testing.T) { err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version) require.NoError(t, err) - assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)") + assert.Equal(t, int64(2), version, "migration version should be 1 (after initial migration)") // Test manual shutdown and recreate Shutdown(ctx, 1*time.Minute) @@ -468,7 +468,7 @@ func TestCustomSystemDBSchema(t *testing.T) { err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version) require.NoError(t, err) - assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)") + assert.Equal(t, int64(2), version, "migration version should be 2 (after initial migration and queue partition key migration)") }) // Test workflows for exercising Send/Recv and SetEvent/GetEvent From 40ab8c6d3024219eb4f5ed64051102a68809dfb1 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:24:11 -0800 Subject: [PATCH 06/11] have client check input parameters --- dbos/client.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dbos/client.go b/dbos/client.go index 798a935..474a8e9 100644 --- a/dbos/client.go +++ b/dbos/client.go @@ -263,6 +263,14 @@ func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, op return nil, errors.New("client cannot be nil") } + if len(queueName) == 0 { + return nil, fmt.Errorf("queue name is required") + } + + if len(workflowName) == 0 { + return nil, fmt.Errorf("workflow name is required") + } + // Serialize input serializer := newJSONSerializer[P]() encodedInput, err := serializer.Encode(input) From 77d15862be3ea75f19bf601a4634cef695ec5253 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:24:54 -0800 Subject: [PATCH 07/11] admin server includes queue parameters in wf status --- dbos/admin_server.go | 2 ++ dbos/admin_server_test.go | 59 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index 49d4c5a..b56629f 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -124,6 +124,8 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) { "QueueName": ws.QueueName, "Timeout": ws.Timeout, "DeduplicationID": ws.DeduplicationID, + "Priority": ws.Priority, + "QueuePartitionKey": ws.QueuePartitionKey, "Input": ws.Input, } diff --git a/dbos/admin_server_test.go b/dbos/admin_server_test.go index 80177d4..cb4b868 100644 --- a/dbos/admin_server_test.go +++ b/dbos/admin_server_test.go @@ -560,6 +560,12 @@ func TestAdminServer(t *testing.T) { // Create a workflow queue with limited concurrency to keep workflows enqueued queue := NewWorkflowQueue(ctx, "test-queue", WithGlobalConcurrency(1)) + // Create a partitioned queue for partition key test + partitionedQueue := NewWorkflowQueue(ctx, "partitioned-test-queue", WithPartitionQueue(), WithGlobalConcurrency(1)) + + // Create a priority-enabled queue for priority and deduplication tests + priorityQueue := NewWorkflowQueue(ctx, "priority-test-queue", WithPriorityEnabled(), WithGlobalConcurrency(1)) + // Define a blocking workflow that will hold up the queue startEvent := NewEvent() blockingChan := make(chan struct{}) @@ -610,6 +616,22 @@ func TestAdminServer(t *testing.T) { enqueuedHandles = append(enqueuedHandles, handle) } + // Create workflow with partition key + partitionHandle, err := RunWorkflow(ctx, blockingWorkflow, "partition-test", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1")) + require.NoError(t, err, "Failed to create workflow with partition key") + enqueuedHandles = append(enqueuedHandles, partitionHandle) + + // Create workflow with deduplication ID + dedupID := "test-dedup-id" + dedupHandle, err := RunWorkflow(ctx, blockingWorkflow, "dedup-test", WithQueue(priorityQueue.Name), WithDeduplicationID(dedupID)) + require.NoError(t, err, "Failed to create workflow with deduplication ID") + enqueuedHandles = append(enqueuedHandles, dedupHandle) + + // Create workflow with priority + priorityHandle, err := RunWorkflow(ctx, blockingWorkflow, "priority-test", WithQueue(priorityQueue.Name), WithPriority(5)) + require.NoError(t, err, "Failed to create workflow with priority") + enqueuedHandles = append(enqueuedHandles, priorityHandle) + // Create non-queued workflows that should NOT appear in queues-only results var regularHandles []WorkflowHandle[string] for i := range 2 { @@ -639,10 +661,11 @@ func TestAdminServer(t *testing.T) { err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows) require.NoError(t, err, "Failed to decode queues_only workflows response") - // Should have exactly 3 enqueued workflows and 1 pending workflow - assert.Equal(t, 4, len(queuesOnlyWorkflows), "Expected exactly 4 workflows") + // Should have exactly 7 workflows (3 original + 1 pending + 1 partition + 1 dedup + 1 priority) + assert.Equal(t, 7, len(queuesOnlyWorkflows), "Expected exactly 7 workflows") // Verify all returned workflows are from the queue and have ENQUEUED/PENDING status + // Also verify QueuePartitionKey, DeduplicationID, and Priority fields are present for _, wf := range queuesOnlyWorkflows { status, ok := wf["Status"].(string) require.True(t, ok, "Status should be a string") @@ -651,7 +674,37 @@ func TestAdminServer(t *testing.T) { queueName, ok := wf["QueueName"].(string) require.True(t, ok, "QueueName should be a string") - assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'") + assert.NotEmpty(t, queueName, "QueueName should not be empty") + + wfID, ok := wf["WorkflowUUID"].(string) + require.True(t, ok, "WorkflowUUID should be a string") + + // Verify QueuePartitionKey field is present (may be empty string for non-partitioned workflows) + _, hasPartitionKey := wf["QueuePartitionKey"] + assert.True(t, hasPartitionKey, "QueuePartitionKey field should be present for workflow %s", wfID) + + // Verify DeduplicationID field is present (may be empty string for workflows without dedup ID) + _, hasDedupID := wf["DeduplicationID"] + assert.True(t, hasDedupID, "DeduplicationID field should be present for workflow %s", wfID) + + // Verify Priority field is present (may be 0 for workflows without priority) + _, hasPriority := wf["Priority"] + assert.True(t, hasPriority, "Priority field should be present for workflow %s", wfID) + + // Verify specific values for our test workflows + if wfID == partitionHandle.GetWorkflowID() { + partitionKey, ok := wf["QueuePartitionKey"].(string) + require.True(t, ok, "QueuePartitionKey should be a string") + assert.Equal(t, "partition-1", partitionKey, "Expected partition key to be 'partition-1'") + } else if wfID == dedupHandle.GetWorkflowID() { + dedupIDResp, ok := wf["DeduplicationID"].(string) + require.True(t, ok, "DeduplicationID should be a string") + assert.Equal(t, dedupID, dedupIDResp, "Expected deduplication ID to match") + } else if wfID == priorityHandle.GetWorkflowID() { + priority, ok := wf["Priority"].(float64) // JSON numbers decode as float64 + require.True(t, ok, "Priority should be a number") + assert.Equal(t, float64(5), priority, "Expected priority to be 5") + } } // Verify that the enqueued workflow IDs match From 8902470df0acecaf61c9d216d2d1dfe02c092dee Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:25:19 -0800 Subject: [PATCH 08/11] client tests --- dbos/client.go | 4 +-- dbos/client_test.go | 83 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 84 insertions(+), 3 deletions(-) diff --git a/dbos/client.go b/dbos/client.go index 474a8e9..e591cbe 100644 --- a/dbos/client.go +++ b/dbos/client.go @@ -146,12 +146,12 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu // Validate partition key is not provided without queue name if len(params.queuePartitionKey) > 0 && len(queueName) == 0 { - return nil, newWorkflowExecutionError("", fmt.Errorf("partition key provided but queue name is missing")) + return nil, fmt.Errorf("partition key provided but queue name is missing") } // Validate partition key and deduplication ID are not both provided (they are incompatible) if len(params.queuePartitionKey) > 0 && len(params.deduplicationID) > 0 { - return nil, newWorkflowExecutionError("", fmt.Errorf("partition key and deduplication ID cannot be used together")) + return nil, fmt.Errorf("partition key and deduplication ID cannot be used together") } workflowID := params.workflowID diff --git a/dbos/client_test.go b/dbos/client_test.go index abcc2a9..fc093a6 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestEnqueue(t *testing.T) { +func TestClientEnqueue(t *testing.T) { // Setup server context - this will process tasks serverCtx := setupDBOS(t, true, true) @@ -23,6 +23,10 @@ func TestEnqueue(t *testing.T) { // Must be created before Launch() priorityQueue := NewWorkflowQueue(serverCtx, "priority-test-queue", WithGlobalConcurrency(1), WithPriorityEnabled()) + // Create a partitioned queue for partition key test + // Must be created before Launch() + partitionedQueue := NewWorkflowQueue(serverCtx, "client-partitioned-queue", WithPartitionQueue()) + // Track execution order for priority test var executionOrder []string var mu sync.Mutex @@ -59,6 +63,12 @@ func TestEnqueue(t *testing.T) { } RegisterWorkflow(serverCtx, priorityWorkflow, WithWorkflowName("PriorityWorkflow")) + // Simple workflow for partitioned queue test + partitionedWorkflow := func(ctx DBOSContext, input string) (string, error) { + return "partitioned: " + input, nil + } + RegisterWorkflow(serverCtx, partitionedWorkflow, WithWorkflowName("PartitionedWorkflow")) + // Launch the server context to start processing tasks err := Launch(serverCtx) require.NoError(t, err) @@ -257,6 +267,77 @@ func TestEnqueue(t *testing.T) { assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after deduplication test") }) + t.Run("EnqueueToPartitionedQueue", func(t *testing.T) { + // Enqueue a workflow to a partitioned queue with a partition key + handle, err := Enqueue[string, string](client, partitionedQueue.Name, "PartitionedWorkflow", "test-input", + WithEnqueueQueuePartitionKey("partition-1"), + WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion())) + require.NoError(t, err, "failed to enqueue workflow to partitioned queue") + + // Verify we got a polling handle + _, ok := handle.(*workflowPollingHandle[string]) + require.True(t, ok, "expected handle to be of type workflowPollingHandle, got %T", handle) + + // Get the result + result, err := handle.GetResult() + require.NoError(t, err, "failed to get result from partitioned queue workflow") + + expectedResult := "partitioned: test-input" + assert.Equal(t, expectedResult, result, "expected result to match") + + // Verify the workflow status + status, err := handle.GetStatus() + require.NoError(t, err, "failed to get workflow status") + + assert.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be SUCCESS") + assert.Equal(t, "PartitionedWorkflow", status.Name, "expected workflow name to match") + assert.Equal(t, partitionedQueue.Name, status.QueueName, "expected queue name to match") + + assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after partitioned queue test") + }) + + t.Run("EnqueueWithPartitionKeyWithoutQueue", func(t *testing.T) { + // Attempt to enqueue with a partition key but no queue name + _, err := Enqueue[string, string](client, "", "PartitionedWorkflow", "test-input", + WithEnqueueQueuePartitionKey("partition-1")) + require.Error(t, err, "expected error when enqueueing with partition key but no queue name") + + // Verify the error message contains the expected text + assert.Contains(t, err.Error(), "queue name is required", "expected error message to contain 'queue name is required'") + }) + + t.Run("EnqueueWithPartitionKeyAndDeduplicationID", func(t *testing.T) { + // Attempt to enqueue with both partition key and deduplication ID + // This should return an error + _, err := Enqueue[string, string](client, partitionedQueue.Name, "PartitionedWorkflow", "test-input", + WithEnqueueQueuePartitionKey("partition-1"), + WithEnqueueDeduplicationID("dedup-id")) + require.Error(t, err, "expected error when enqueueing with both partition key and deduplication ID") + + // Verify the error message contains the expected text + assert.Contains(t, err.Error(), "partition key and deduplication ID cannot be used together", "expected error message to contain validation message") + }) + + t.Run("EnqueueWithEmptyQueueName", func(t *testing.T) { + // Attempt to enqueue with empty queue name + // This should return an error + _, err := Enqueue[wfInput, string](client, "", "ServerWorkflow", wfInput{Input: "test-input"}) + require.Error(t, err, "expected error when enqueueing with empty queue name") + + // Verify the error message contains the expected text + assert.Contains(t, err.Error(), "queue name is required", "expected error message to contain 'queue name is required'") + }) + + t.Run("EnqueueWithEmptyWorkflowName", func(t *testing.T) { + // Attempt to enqueue with empty workflow name + // This should return an error + _, err := Enqueue[wfInput, string](client, queue.Name, "", wfInput{Input: "test-input"}) + require.Error(t, err, "expected error when enqueueing with empty workflow name") + + // Verify the error message contains the expected text + assert.Contains(t, err.Error(), "workflow name is required", "expected error message to contain 'workflow name is required'") + }) + // Verify all queue entries are cleaned up require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after client tests") } From e52640172916218dc55b3b8239d64f36bfbc32d8 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:28:27 -0800 Subject: [PATCH 09/11] return q parameters in conductor protocol --- dbos/conductor_protocol.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/dbos/conductor_protocol.go b/dbos/conductor_protocol.go index 4fce163..7dd9607 100644 --- a/dbos/conductor_protocol.go +++ b/dbos/conductor_protocol.go @@ -87,6 +87,9 @@ type listWorkflowsConductorResponseBody struct { CreatedAt *string `json:"CreatedAt,omitempty"` UpdatedAt *string `json:"UpdatedAt,omitempty"` QueueName *string `json:"QueueName,omitempty"` + QueuePartitionKey *string `json:"QueuePartitionKey,omitempty"` + DeduplicationID *string `json:"DeduplicationID,omitempty"` + Priority *int `json:"Priority,omitempty"` ApplicationVersion *string `json:"ApplicationVersion,omitempty"` ExecutorID *string `json:"ExecutorID,omitempty"` } @@ -167,6 +170,21 @@ func formatListWorkflowsResponseBody(wf WorkflowStatus) listWorkflowsConductorRe output.QueueName = &wf.QueueName } + // Copy queue partition key + if wf.QueuePartitionKey != "" { + output.QueuePartitionKey = &wf.QueuePartitionKey + } + + // Copy deduplication ID + if wf.DeduplicationID != "" { + output.DeduplicationID = &wf.DeduplicationID + } + + // Copy priority + if wf.Priority != 0 { + output.Priority = &wf.Priority + } + // Copy application version if wf.ApplicationVersion != "" { output.ApplicationVersion = &wf.ApplicationVersion From 4e8fd60ea6874a8646a6bfd591f1dfbb471c8c81 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 15:28:43 -0800 Subject: [PATCH 10/11] add tests --- dbos/queues_test.go | 204 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 7b52fa0..227722e 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -159,6 +159,12 @@ func TestWorkflowQueues(t *testing.T) { } RegisterWorkflow(dbosCtx, workflowEnqueuesAnother) + // Simple workflow for NonExistingQueue test + simpleWorkflow := func(ctx DBOSContext, input string) (string, error) { + return input, nil + } + RegisterWorkflow(dbosCtx, simpleWorkflow) + err := Launch(dbosCtx) require.NoError(t, err) @@ -463,6 +469,26 @@ func TestWorkflowQueues(t *testing.T) { require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after deduplication test") }) + + t.Run("NonExistingQueue", func(t *testing.T) { + // Attempt to enqueue to a non-existing queue + // This should return an error + _, err := RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueue("non-existing-queue")) + require.Error(t, err, "expected error when enqueueing to non-existing queue") + + // Check that it's the correct error type + var dbosErr *DBOSError + require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err) + + // Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code + assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError") + + // Verify the unwrapped error contains the validation message + unwrappedErr := errors.Unwrap(dbosErr) + require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error") + expectedMsgPart := "does not exist" + assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part") + }) } func TestQueueRecovery(t *testing.T) { @@ -1341,3 +1367,181 @@ func TestListQueuedWorkflows(t *testing.T) { require.True(t, queueEntriesAreCleanedUp(dbosCtx), "queue entries should be cleaned up") }) } + +func TestPartitionedQueues(t *testing.T) { + t.Run("PartitionKeyWithoutQueue", func(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Register a simple workflow + simpleWorkflow := func(ctx DBOSContext, input string) (string, error) { + return input, nil + } + RegisterWorkflow(dbosCtx, simpleWorkflow) + + err := Launch(dbosCtx) + require.NoError(t, err, "failed to launch DBOS instance") + + // Attempt to enqueue with a partition key but no queue name + // This should return an error + _, err = RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueuePartitionKey("partition-1")) + require.Error(t, err, "expected error when enqueueing with partition key but no queue name") + + // Check that it's the correct error type + var dbosErr *DBOSError + require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err) + + // Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code + assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError") + + // Verify the unwrapped error contains the validation message + unwrappedErr := errors.Unwrap(dbosErr) + require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error") + expectedMsgPart := "partition key provided but queue name is missing" + assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part") + }) + + t.Run("PartitionKeyOnNonPartitionedQueue", func(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Create a non-partitioned queue + nonPartitionedQueue := NewWorkflowQueue(dbosCtx, "non-partitioned-queue") + + // Register a simple workflow + simpleWorkflow := func(ctx DBOSContext, input string) (string, error) { + return input, nil + } + RegisterWorkflow(dbosCtx, simpleWorkflow) + + err := Launch(dbosCtx) + require.NoError(t, err, "failed to launch DBOS instance") + + // Attempt to enqueue with a partition key on a non-partitioned queue + // This should return an error + _, err = RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueue(nonPartitionedQueue.Name), WithQueuePartitionKey("partition-1")) + require.Error(t, err, "expected error when enqueueing with partition key on non-partitioned queue") + + // Check that it's the correct error type + var dbosErr *DBOSError + require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err) + + // Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code + assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError") + + // Verify the unwrapped error contains the validation message + unwrappedErr := errors.Unwrap(dbosErr) + require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error") + expectedMsgPart := "is not a partitioned queue, but a partition key was provided" + assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part") + }) + + t.Run("PartitionKeyWithDeduplicationID", func(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Create a partitioned queue + partitionedQueue := NewWorkflowQueue(dbosCtx, "partitioned-queue-test", WithPartitionQueue()) + + // Register a simple workflow + simpleWorkflow := func(ctx DBOSContext, input string) (string, error) { + return input, nil + } + RegisterWorkflow(dbosCtx, simpleWorkflow) + + err := Launch(dbosCtx) + require.NoError(t, err, "failed to launch DBOS instance") + + // Attempt to enqueue with both partition key and deduplication ID + // This should return an error + _, err = RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"), WithDeduplicationID("dedup-id")) + require.Error(t, err, "expected error when enqueueing with both partition key and deduplication ID") + + // Check that it's the correct error type + var dbosErr *DBOSError + require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err) + + // Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code + assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError") + + // Verify the unwrapped error contains the validation message + unwrappedErr := errors.Unwrap(dbosErr) + require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error") + expectedMsgPart := "partition key and deduplication ID cannot be used together" + assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part") + }) + + t.Run("Dequeue", func(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Create a partitioned queue with concurrency limit of 1 per partition + partitionedQueue := NewWorkflowQueue(dbosCtx, "partitioned-queue", WithPartitionQueue(), WithGlobalConcurrency(1)) + + // Create events for blocking workflow on partition 1 + partition1StartEvent := NewEvent() + partition1BlockEvent := NewEvent() + + // Create blocking workflow for partition 1 + blockingWorkflowP1 := func(ctx DBOSContext, input string) (string, error) { + partition1StartEvent.Set() + partition1BlockEvent.Wait() + return "p1-" + input, nil + } + + // Create non-blocking workflow (used for both partitions) + nonBlockingWorkflow := func(ctx DBOSContext, input string) (string, error) { + return input, nil + } + + RegisterWorkflow(dbosCtx, blockingWorkflowP1) + RegisterWorkflow(dbosCtx, nonBlockingWorkflow) + + err := Launch(dbosCtx) + require.NoError(t, err, "failed to launch DBOS instance") + + // Enqueue a blocking workflow on partition 1 + handleP1Blocked, err := RunWorkflow(dbosCtx, blockingWorkflowP1, "blocked", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1")) + require.NoError(t, err, "failed to enqueue blocking workflow on partition 1") + + // Wait for the blocking workflow on partition 1 to start + partition1StartEvent.Wait() + + // Enqueue a non-blocking workflow on partition 1 - this should be blocked behind the blocking one + handleP1Normal, err := RunWorkflow(dbosCtx, nonBlockingWorkflow, "p1-normal", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1")) + require.NoError(t, err, "failed to enqueue normal workflow on partition 1") + + // Verify the normal workflow is blocked (ENQUEUED status) behind the blocking one + statusP1Normal, err := handleP1Normal.GetStatus() + require.NoError(t, err, "failed to get status of normal workflow on partition 1") + assert.Equal(t, WorkflowStatusEnqueued, statusP1Normal.Status, "expected normal workflow on partition 1 to be ENQUEUED behind the blocking one") + + // Enqueue multiple non-blocking workflows on partition 2 - these should all complete + // even though partition 1 is blocked, demonstrating partition independence + numP2Workflows := 3 + handlesP2 := make([]WorkflowHandle[string], numP2Workflows) + for i := range numP2Workflows { + handle, err := RunWorkflow(dbosCtx, nonBlockingWorkflow, fmt.Sprintf("p2-workflow-%d", i), WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-2")) + require.NoError(t, err, "failed to enqueue workflow %d on partition 2", i) + handlesP2[i] = handle + } + + // Wait for all partition 2 workflows to complete + for i, handle := range handlesP2 { + result, err := handle.GetResult() + require.NoError(t, err, "failed to get result from partition 2 workflow %d", i) + expectedResult := fmt.Sprintf("p2-workflow-%d", i) + assert.Equal(t, expectedResult, result, "expected result from partition 2 workflow %d", i) + } + + // Verify partition 1 blocking workflow is still pending + statusP1Blocked, err := handleP1Blocked.GetStatus() + require.NoError(t, err, "failed to get status of blocking workflow on partition 1") + assert.Equal(t, WorkflowStatusPending, statusP1Blocked.Status, "expected blocking workflow on partition 1 to still be pending") + + // Verify the normal workflow on partition 1 is still enqueued + statusP1Normal, err = handleP1Normal.GetStatus() + require.NoError(t, err, "failed to get status of normal workflow on partition 1") + assert.Equal(t, WorkflowStatusEnqueued, statusP1Normal.Status, "expected normal workflow on partition 1 to still be ENQUEUED") + + // Now unblock partition 1 blocking workflow + partition1BlockEvent.Set() + require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after partitioned queue test") + }) +} From 1b93682dab017f953ed90bdee8e44f9a69f18443 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 12 Nov 2025 17:06:01 -0800 Subject: [PATCH 11/11] make partition keys mandatory on partitioned queues --- dbos/client.go | 17 ++++++----------- dbos/queues_test.go | 34 ++++++++++++++++++++++++++++++++++ dbos/workflow.go | 12 +++++++----- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/dbos/client.go b/dbos/client.go index e591cbe..7113800 100644 --- a/dbos/client.go +++ b/dbos/client.go @@ -144,9 +144,12 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu opt(params) } - // Validate partition key is not provided without queue name - if len(params.queuePartitionKey) > 0 && len(queueName) == 0 { - return nil, fmt.Errorf("partition key provided but queue name is missing") + if len(queueName) == 0 { + return nil, fmt.Errorf("queue name is required") + } + + if len(workflowName) == 0 { + return nil, fmt.Errorf("workflow name is required") } // Validate partition key and deduplication ID are not both provided (they are incompatible) @@ -263,14 +266,6 @@ func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, op return nil, errors.New("client cannot be nil") } - if len(queueName) == 0 { - return nil, fmt.Errorf("queue name is required") - } - - if len(workflowName) == 0 { - return nil, fmt.Errorf("workflow name is required") - } - // Serialize input serializer := newJSONSerializer[P]() encodedInput, err := serializer.Encode(input) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 227722e..53148f6 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -1434,6 +1434,40 @@ func TestPartitionedQueues(t *testing.T) { assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part") }) + t.Run("PartitionedQueueWithoutPartitionKey", func(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Create a partitioned queue + partitionedQueue := NewWorkflowQueue(dbosCtx, "partitioned-queue-required", WithPartitionQueue()) + + // Register a simple workflow + simpleWorkflow := func(ctx DBOSContext, input string) (string, error) { + return input, nil + } + RegisterWorkflow(dbosCtx, simpleWorkflow) + + err := Launch(dbosCtx) + require.NoError(t, err, "failed to launch DBOS instance") + + // Attempt to enqueue to a partitioned queue without a partition key + // This should return an error + _, err = RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueue(partitionedQueue.Name)) + require.Error(t, err, "expected error when enqueueing to partitioned queue without partition key") + + // Check that it's the correct error type + var dbosErr *DBOSError + require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err) + + // Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code + assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError") + + // Verify the unwrapped error contains the validation message + unwrappedErr := errors.Unwrap(dbosErr) + require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error") + expectedMsgPart := "has partitions enabled, but no partition key was provided" + assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part") + }) + t.Run("PartitionKeyWithDeduplicationID", func(t *testing.T) { dbosCtx := setupDBOS(t, true, true) diff --git a/dbos/workflow.go b/dbos/workflow.go index 0132634..0809d0a 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -817,11 +817,13 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt if queue == nil { return nil, newWorkflowExecutionError("", fmt.Errorf("queue %s does not exist", params.queueName)) } - // Validate queue partition key if provided - if len(params.queuePartitionKey) > 0 { - if !queue.PartitionQueue { - return nil, newWorkflowExecutionError("", fmt.Errorf("queue %s is not a partitioned queue, but a partition key was provided", params.queueName)) - } + // If queue has partitions enabled, partition key must be provided + if queue.PartitionQueue && len(params.queuePartitionKey) == 0 { + return nil, newWorkflowExecutionError("", fmt.Errorf("queue %s has partitions enabled, but no partition key was provided", params.queueName)) + } + // If partition key is provided, queue must have partitions enabled + if len(params.queuePartitionKey) > 0 && !queue.PartitionQueue { + return nil, newWorkflowExecutionError("", fmt.Errorf("queue %s is not a partitioned queue, but a partition key was provided", params.queueName)) } }