Skip to content
2 changes: 2 additions & 0 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
"QueueName": ws.QueueName,
"Timeout": ws.Timeout,
"DeduplicationID": ws.DeduplicationID,
"Priority": ws.Priority,
"QueuePartitionKey": ws.QueuePartitionKey,
"Input": ws.Input,
}

Expand Down
59 changes: 56 additions & 3 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,12 @@ func TestAdminServer(t *testing.T) {
// Create a workflow queue with limited concurrency to keep workflows enqueued
queue := NewWorkflowQueue(ctx, "test-queue", WithGlobalConcurrency(1))

// Create a partitioned queue for partition key test
partitionedQueue := NewWorkflowQueue(ctx, "partitioned-test-queue", WithPartitionQueue(), WithGlobalConcurrency(1))

// Create a priority-enabled queue for priority and deduplication tests
priorityQueue := NewWorkflowQueue(ctx, "priority-test-queue", WithPriorityEnabled(), WithGlobalConcurrency(1))

// Define a blocking workflow that will hold up the queue
startEvent := NewEvent()
blockingChan := make(chan struct{})
Expand Down Expand Up @@ -610,6 +616,22 @@ func TestAdminServer(t *testing.T) {
enqueuedHandles = append(enqueuedHandles, handle)
}

// Create workflow with partition key
partitionHandle, err := RunWorkflow(ctx, blockingWorkflow, "partition-test", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"))
require.NoError(t, err, "Failed to create workflow with partition key")
enqueuedHandles = append(enqueuedHandles, partitionHandle)

// Create workflow with deduplication ID
dedupID := "test-dedup-id"
dedupHandle, err := RunWorkflow(ctx, blockingWorkflow, "dedup-test", WithQueue(priorityQueue.Name), WithDeduplicationID(dedupID))
require.NoError(t, err, "Failed to create workflow with deduplication ID")
enqueuedHandles = append(enqueuedHandles, dedupHandle)

// Create workflow with priority
priorityHandle, err := RunWorkflow(ctx, blockingWorkflow, "priority-test", WithQueue(priorityQueue.Name), WithPriority(5))
require.NoError(t, err, "Failed to create workflow with priority")
enqueuedHandles = append(enqueuedHandles, priorityHandle)

// Create non-queued workflows that should NOT appear in queues-only results
var regularHandles []WorkflowHandle[string]
for i := range 2 {
Expand Down Expand Up @@ -639,10 +661,11 @@ func TestAdminServer(t *testing.T) {
err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows)
require.NoError(t, err, "Failed to decode queues_only workflows response")

// Should have exactly 3 enqueued workflows and 1 pending workflow
assert.Equal(t, 4, len(queuesOnlyWorkflows), "Expected exactly 4 workflows")
// Should have exactly 7 workflows (3 original + 1 pending + 1 partition + 1 dedup + 1 priority)
assert.Equal(t, 7, len(queuesOnlyWorkflows), "Expected exactly 7 workflows")

// Verify all returned workflows are from the queue and have ENQUEUED/PENDING status
// Also verify QueuePartitionKey, DeduplicationID, and Priority fields are present
for _, wf := range queuesOnlyWorkflows {
status, ok := wf["Status"].(string)
require.True(t, ok, "Status should be a string")
Expand All @@ -651,7 +674,37 @@ func TestAdminServer(t *testing.T) {

queueName, ok := wf["QueueName"].(string)
require.True(t, ok, "QueueName should be a string")
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
assert.NotEmpty(t, queueName, "QueueName should not be empty")

wfID, ok := wf["WorkflowUUID"].(string)
require.True(t, ok, "WorkflowUUID should be a string")

// Verify QueuePartitionKey field is present (may be empty string for non-partitioned workflows)
_, hasPartitionKey := wf["QueuePartitionKey"]
assert.True(t, hasPartitionKey, "QueuePartitionKey field should be present for workflow %s", wfID)

// Verify DeduplicationID field is present (may be empty string for workflows without dedup ID)
_, hasDedupID := wf["DeduplicationID"]
assert.True(t, hasDedupID, "DeduplicationID field should be present for workflow %s", wfID)

// Verify Priority field is present (may be 0 for workflows without priority)
_, hasPriority := wf["Priority"]
assert.True(t, hasPriority, "Priority field should be present for workflow %s", wfID)

// Verify specific values for our test workflows
if wfID == partitionHandle.GetWorkflowID() {
partitionKey, ok := wf["QueuePartitionKey"].(string)
require.True(t, ok, "QueuePartitionKey should be a string")
assert.Equal(t, "partition-1", partitionKey, "Expected partition key to be 'partition-1'")
} else if wfID == dedupHandle.GetWorkflowID() {
dedupIDResp, ok := wf["DeduplicationID"].(string)
require.True(t, ok, "DeduplicationID should be a string")
assert.Equal(t, dedupID, dedupIDResp, "Expected deduplication ID to match")
} else if wfID == priorityHandle.GetWorkflowID() {
priority, ok := wf["Priority"].(float64) // JSON numbers decode as float64
require.True(t, ok, "Priority should be a number")
assert.Equal(t, float64(5), priority, "Expected priority to be 5")
}
}

// Verify that the enqueued workflow IDs match
Expand Down
25 changes: 25 additions & 0 deletions dbos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ func WithEnqueueTimeout(timeout time.Duration) EnqueueOption {
}
}

// WithEnqueueQueuePartitionKey sets the queue partition key for partitioned queues.
// When a queue is partitioned, workflows with the same partition key are processed
// with separate concurrency limits per partition.
func WithEnqueueQueuePartitionKey(partitionKey string) EnqueueOption {
return func(opts *enqueueOptions) {
opts.queuePartitionKey = partitionKey
}
}

type enqueueOptions struct {
workflowName string
workflowID string
Expand All @@ -114,6 +123,7 @@ type enqueueOptions struct {
priority uint
workflowTimeout time.Duration
workflowInput any
queuePartitionKey string
}

// EnqueueWorkflow enqueues a workflow to a named queue for deferred execution.
Expand All @@ -134,6 +144,19 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
opt(params)
}

if len(queueName) == 0 {
return nil, fmt.Errorf("queue name is required")
}

if len(workflowName) == 0 {
return nil, fmt.Errorf("workflow name is required")
}

// Validate partition key and deduplication ID are not both provided (they are incompatible)
if len(params.queuePartitionKey) > 0 && len(params.deduplicationID) > 0 {
return nil, fmt.Errorf("partition key and deduplication ID cannot be used together")
}

workflowID := params.workflowID
if workflowID == "" {
workflowID = uuid.New().String()
Expand All @@ -160,6 +183,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
QueueName: queueName,
DeduplicationID: params.deduplicationID,
Priority: int(params.priority),
QueuePartitionKey: params.queuePartitionKey,
}

uncancellableCtx := WithoutCancel(dbosCtx)
Expand Down Expand Up @@ -205,6 +229,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
// - WithEnqueueDeduplicationID: Deduplication identifier for idempotent enqueuing
// - WithEnqueuePriority: Execution priority
// - WithEnqueueTimeout: Maximum execution time for the workflow
// - WithEnqueueQueuePartitionKey: Queue partition key for partitioned queues
//
// Returns a typed workflow handle that can be used to check status and retrieve results.
// The handle uses polling to check workflow completion since the execution is asynchronous.
Expand Down
83 changes: 82 additions & 1 deletion dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestEnqueue(t *testing.T) {
func TestClientEnqueue(t *testing.T) {
// Setup server context - this will process tasks
serverCtx := setupDBOS(t, true, true)

Expand All @@ -23,6 +23,10 @@ func TestEnqueue(t *testing.T) {
// Must be created before Launch()
priorityQueue := NewWorkflowQueue(serverCtx, "priority-test-queue", WithGlobalConcurrency(1), WithPriorityEnabled())

// Create a partitioned queue for partition key test
// Must be created before Launch()
partitionedQueue := NewWorkflowQueue(serverCtx, "client-partitioned-queue", WithPartitionQueue())

// Track execution order for priority test
var executionOrder []string
var mu sync.Mutex
Expand Down Expand Up @@ -59,6 +63,12 @@ func TestEnqueue(t *testing.T) {
}
RegisterWorkflow(serverCtx, priorityWorkflow, WithWorkflowName("PriorityWorkflow"))

// Simple workflow for partitioned queue test
partitionedWorkflow := func(ctx DBOSContext, input string) (string, error) {
return "partitioned: " + input, nil
}
RegisterWorkflow(serverCtx, partitionedWorkflow, WithWorkflowName("PartitionedWorkflow"))

// Launch the server context to start processing tasks
err := Launch(serverCtx)
require.NoError(t, err)
Expand Down Expand Up @@ -257,6 +267,77 @@ func TestEnqueue(t *testing.T) {
assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after deduplication test")
})

t.Run("EnqueueToPartitionedQueue", func(t *testing.T) {
// Enqueue a workflow to a partitioned queue with a partition key
handle, err := Enqueue[string, string](client, partitionedQueue.Name, "PartitionedWorkflow", "test-input",
WithEnqueueQueuePartitionKey("partition-1"),
WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion()))
require.NoError(t, err, "failed to enqueue workflow to partitioned queue")

// Verify we got a polling handle
_, ok := handle.(*workflowPollingHandle[string])
require.True(t, ok, "expected handle to be of type workflowPollingHandle, got %T", handle)

// Get the result
result, err := handle.GetResult()
require.NoError(t, err, "failed to get result from partitioned queue workflow")

expectedResult := "partitioned: test-input"
assert.Equal(t, expectedResult, result, "expected result to match")

// Verify the workflow status
status, err := handle.GetStatus()
require.NoError(t, err, "failed to get workflow status")

assert.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be SUCCESS")
assert.Equal(t, "PartitionedWorkflow", status.Name, "expected workflow name to match")
assert.Equal(t, partitionedQueue.Name, status.QueueName, "expected queue name to match")

assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after partitioned queue test")
})

t.Run("EnqueueWithPartitionKeyWithoutQueue", func(t *testing.T) {
// Attempt to enqueue with a partition key but no queue name
_, err := Enqueue[string, string](client, "", "PartitionedWorkflow", "test-input",
WithEnqueueQueuePartitionKey("partition-1"))
require.Error(t, err, "expected error when enqueueing with partition key but no queue name")

// Verify the error message contains the expected text
assert.Contains(t, err.Error(), "queue name is required", "expected error message to contain 'queue name is required'")
})

t.Run("EnqueueWithPartitionKeyAndDeduplicationID", func(t *testing.T) {
// Attempt to enqueue with both partition key and deduplication ID
// This should return an error
_, err := Enqueue[string, string](client, partitionedQueue.Name, "PartitionedWorkflow", "test-input",
WithEnqueueQueuePartitionKey("partition-1"),
WithEnqueueDeduplicationID("dedup-id"))
require.Error(t, err, "expected error when enqueueing with both partition key and deduplication ID")

// Verify the error message contains the expected text
assert.Contains(t, err.Error(), "partition key and deduplication ID cannot be used together", "expected error message to contain validation message")
})

t.Run("EnqueueWithEmptyQueueName", func(t *testing.T) {
// Attempt to enqueue with empty queue name
// This should return an error
_, err := Enqueue[wfInput, string](client, "", "ServerWorkflow", wfInput{Input: "test-input"})
require.Error(t, err, "expected error when enqueueing with empty queue name")

// Verify the error message contains the expected text
assert.Contains(t, err.Error(), "queue name is required", "expected error message to contain 'queue name is required'")
})

t.Run("EnqueueWithEmptyWorkflowName", func(t *testing.T) {
// Attempt to enqueue with empty workflow name
// This should return an error
_, err := Enqueue[wfInput, string](client, queue.Name, "", wfInput{Input: "test-input"})
require.Error(t, err, "expected error when enqueueing with empty workflow name")

// Verify the error message contains the expected text
assert.Contains(t, err.Error(), "workflow name is required", "expected error message to contain 'workflow name is required'")
})

// Verify all queue entries are cleaned up
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after client tests")
}
Expand Down
18 changes: 18 additions & 0 deletions dbos/conductor_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type listWorkflowsConductorResponseBody struct {
CreatedAt *string `json:"CreatedAt,omitempty"`
UpdatedAt *string `json:"UpdatedAt,omitempty"`
QueueName *string `json:"QueueName,omitempty"`
QueuePartitionKey *string `json:"QueuePartitionKey,omitempty"`
DeduplicationID *string `json:"DeduplicationID,omitempty"`
Priority *int `json:"Priority,omitempty"`
ApplicationVersion *string `json:"ApplicationVersion,omitempty"`
ExecutorID *string `json:"ExecutorID,omitempty"`
}
Expand Down Expand Up @@ -167,6 +170,21 @@ func formatListWorkflowsResponseBody(wf WorkflowStatus) listWorkflowsConductorRe
output.QueueName = &wf.QueueName
}

// Copy queue partition key
if wf.QueuePartitionKey != "" {
output.QueuePartitionKey = &wf.QueuePartitionKey
}

// Copy deduplication ID
if wf.DeduplicationID != "" {
output.DeduplicationID = &wf.DeduplicationID
}

// Copy priority
if wf.Priority != 0 {
output.Priority = &wf.Priority
}

// Copy application version
if wf.ApplicationVersion != "" {
output.ApplicationVersion = &wf.ApplicationVersion
Expand Down
3 changes: 3 additions & 0 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
queueRunner: dbosCtx.queueRunner,
}
childCtx.launched.Store(launched)
return childCtx
Expand Down Expand Up @@ -233,6 +234,7 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
queueRunner: dbosCtx.queueRunner,
}
childCtx.launched.Store(launched)
return childCtx
Expand Down Expand Up @@ -260,6 +262,7 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
queueRunner: dbosCtx.queueRunner,
}
childCtx.launched.Store(launched)
return childCtx, cancelFunc
Expand Down
4 changes: 2 additions & 2 deletions dbos/dbos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestConfig(t *testing.T) {

err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version)
require.NoError(t, err)
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")
assert.Equal(t, int64(2), version, "migration version should be 1 (after initial migration)")

// Test manual shutdown and recreate
Shutdown(ctx, 1*time.Minute)
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestCustomSystemDBSchema(t *testing.T) {

err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version)
require.NoError(t, err)
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")
assert.Equal(t, int64(2), version, "migration version should be 2 (after initial migration and queue partition key migration)")
})

// Test workflows for exercising Send/Recv and SetEvent/GetEvent
Expand Down
7 changes: 7 additions & 0 deletions dbos/migrations/2_add_queue_partition_key.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Migration 2: Add queue_partition_key column to workflow_status table
-- This enables partitioned queues where workflows can be distributed across
-- dynamically created queue partitions with separate concurrency limits per partition.

ALTER TABLE %s.workflow_status
ADD COLUMN queue_partition_key TEXT;

Loading
Loading