Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 112 additions & 56 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,6 @@ func TestAdminServer(t *testing.T) {
// Create a workflow queue with limited concurrency to keep workflows enqueued
queue := NewWorkflowQueue(ctx, "test-queue", WithGlobalConcurrency(1))

// Create a partitioned queue for partition key test
partitionedQueue := NewWorkflowQueue(ctx, "partitioned-test-queue", WithPartitionQueue(), WithGlobalConcurrency(1))

// Create a priority-enabled queue for priority and deduplication tests
priorityQueue := NewWorkflowQueue(ctx, "priority-test-queue", WithPriorityEnabled(), WithGlobalConcurrency(1))

// Define a blocking workflow that will hold up the queue
startEvent := NewEvent()
blockingChan := make(chan struct{})
Expand Down Expand Up @@ -616,22 +610,6 @@ func TestAdminServer(t *testing.T) {
enqueuedHandles = append(enqueuedHandles, handle)
}

// Create workflow with partition key
partitionHandle, err := RunWorkflow(ctx, blockingWorkflow, "partition-test", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"))
require.NoError(t, err, "Failed to create workflow with partition key")
enqueuedHandles = append(enqueuedHandles, partitionHandle)

// Create workflow with deduplication ID
dedupID := "test-dedup-id"
dedupHandle, err := RunWorkflow(ctx, blockingWorkflow, "dedup-test", WithQueue(priorityQueue.Name), WithDeduplicationID(dedupID))
require.NoError(t, err, "Failed to create workflow with deduplication ID")
enqueuedHandles = append(enqueuedHandles, dedupHandle)

// Create workflow with priority
priorityHandle, err := RunWorkflow(ctx, blockingWorkflow, "priority-test", WithQueue(priorityQueue.Name), WithPriority(5))
require.NoError(t, err, "Failed to create workflow with priority")
enqueuedHandles = append(enqueuedHandles, priorityHandle)

// Create non-queued workflows that should NOT appear in queues-only results
var regularHandles []WorkflowHandle[string]
for i := range 2 {
Expand Down Expand Up @@ -661,11 +639,10 @@ func TestAdminServer(t *testing.T) {
err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows)
require.NoError(t, err, "Failed to decode queues_only workflows response")

// Should have exactly 7 workflows (3 original + 1 pending + 1 partition + 1 dedup + 1 priority)
assert.Equal(t, 7, len(queuesOnlyWorkflows), "Expected exactly 7 workflows")
// Should have exactly 4 workflows (1 pending + 3 enqueued)
assert.Equal(t, 4, len(queuesOnlyWorkflows), "Expected exactly 4 workflows")

// Verify all returned workflows are from the queue and have ENQUEUED/PENDING status
// Also verify QueuePartitionKey, DeduplicationID, and Priority fields are present
for _, wf := range queuesOnlyWorkflows {
status, ok := wf["Status"].(string)
require.True(t, ok, "Status should be a string")
Expand All @@ -675,36 +652,6 @@ func TestAdminServer(t *testing.T) {
queueName, ok := wf["QueueName"].(string)
require.True(t, ok, "QueueName should be a string")
assert.NotEmpty(t, queueName, "QueueName should not be empty")

wfID, ok := wf["WorkflowUUID"].(string)
require.True(t, ok, "WorkflowUUID should be a string")

// Verify QueuePartitionKey field is present (may be empty string for non-partitioned workflows)
_, hasPartitionKey := wf["QueuePartitionKey"]
assert.True(t, hasPartitionKey, "QueuePartitionKey field should be present for workflow %s", wfID)

// Verify DeduplicationID field is present (may be empty string for workflows without dedup ID)
_, hasDedupID := wf["DeduplicationID"]
assert.True(t, hasDedupID, "DeduplicationID field should be present for workflow %s", wfID)

// Verify Priority field is present (may be 0 for workflows without priority)
_, hasPriority := wf["Priority"]
assert.True(t, hasPriority, "Priority field should be present for workflow %s", wfID)

// Verify specific values for our test workflows
if wfID == partitionHandle.GetWorkflowID() {
partitionKey, ok := wf["QueuePartitionKey"].(string)
require.True(t, ok, "QueuePartitionKey should be a string")
assert.Equal(t, "partition-1", partitionKey, "Expected partition key to be 'partition-1'")
} else if wfID == dedupHandle.GetWorkflowID() {
dedupIDResp, ok := wf["DeduplicationID"].(string)
require.True(t, ok, "DeduplicationID should be a string")
assert.Equal(t, dedupID, dedupIDResp, "Expected deduplication ID to match")
} else if wfID == priorityHandle.GetWorkflowID() {
priority, ok := wf["Priority"].(float64) // JSON numbers decode as float64
require.True(t, ok, "Priority should be a number")
assert.Equal(t, float64(5), priority, "Expected priority to be 5")
}
}

// Verify that the enqueued workflow IDs match
Expand Down Expand Up @@ -786,6 +733,115 @@ func TestAdminServer(t *testing.T) {
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
})

t.Run("ListQueuedWorkflowsWithAdvancedFeatures", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(context.Background(), Config{
DatabaseURL: databaseURL,
AppName: "test-app",
AdminServer: true,
AdminServerPort: _DEFAULT_ADMIN_SERVER_PORT,
})
require.NoError(t, err)

// Create a partitioned queue for partition key test
partitionedQueue := NewWorkflowQueue(ctx, "partitioned-test-queue", WithPartitionQueue(), WithGlobalConcurrency(1))

// Create a priority-enabled queue for priority and deduplication tests
priorityQueue := NewWorkflowQueue(ctx, "priority-test-queue", WithPriorityEnabled(), WithGlobalConcurrency(1))

// Define a blocking workflow that will hold up the queue
blockingChan := make(chan struct{})
blockingWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
<-blockingChan // Block until channel is closed
return "blocked-" + input, nil
}
RegisterWorkflow(ctx, blockingWorkflow)

err = Launch(ctx)
require.NoError(t, err)

// Ensure cleanup
defer func() {
close(blockingChan) // Unblock any blocked workflows
if ctx != nil {
Shutdown(ctx, 1*time.Minute)
}
}()

client := &http.Client{Timeout: 5 * time.Second}
endpoint := fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_QUEUED_WORKFLOWS_PATTERN, "POST /"))

// Create workflow with partition key
partitionHandle, err := RunWorkflow(ctx, blockingWorkflow, "partition-test", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"))
require.NoError(t, err, "Failed to create workflow with partition key")

// Create workflow with deduplication ID
dedupID := "test-dedup-id"
dedupHandle, err := RunWorkflow(ctx, blockingWorkflow, "dedup-test", WithQueue(priorityQueue.Name), WithDeduplicationID(dedupID))
require.NoError(t, err, "Failed to create workflow with deduplication ID")

// Create workflow with priority
priorityHandle, err := RunWorkflow(ctx, blockingWorkflow, "priority-test", WithQueue(priorityQueue.Name), WithPriority(5))
require.NoError(t, err, "Failed to create workflow with priority")

// Query with empty body to get all enqueued/pending queue workflows
reqQueuesOnly, err := http.NewRequest(http.MethodPost, endpoint, nil)
require.NoError(t, err, "Failed to create queues_only request")
reqQueuesOnly.Header.Set("Content-Type", "application/json")

respQueuesOnly, err := client.Do(reqQueuesOnly)
require.NoError(t, err, "Failed to make queues_only request")
defer respQueuesOnly.Body.Close()

assert.Equal(t, http.StatusOK, respQueuesOnly.StatusCode)

var queuesOnlyWorkflows []map[string]any
err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows)
require.NoError(t, err, "Failed to decode queues_only workflows response")

// Find our test workflows in the response
var foundPartition, foundDedup, foundPriority bool
for _, wf := range queuesOnlyWorkflows {
wfID, ok := wf["WorkflowUUID"].(string)
require.True(t, ok, "WorkflowUUID should be a string")

// Verify QueuePartitionKey field is present (may be empty string for non-partitioned workflows)
_, hasPartitionKey := wf["QueuePartitionKey"]
assert.True(t, hasPartitionKey, "QueuePartitionKey field should be present for workflow %s", wfID)

// Verify DeduplicationID field is present (may be empty string for workflows without dedup ID)
_, hasDedupID := wf["DeduplicationID"]
assert.True(t, hasDedupID, "DeduplicationID field should be present for workflow %s", wfID)

// Verify Priority field is present (may be 0 for workflows without priority)
_, hasPriority := wf["Priority"]
assert.True(t, hasPriority, "Priority field should be present for workflow %s", wfID)

// Verify specific values for our test workflows
if wfID == partitionHandle.GetWorkflowID() {
foundPartition = true
partitionKey, ok := wf["QueuePartitionKey"].(string)
require.True(t, ok, "QueuePartitionKey should be a string")
assert.Equal(t, "partition-1", partitionKey, "Expected partition key to be 'partition-1'")
} else if wfID == dedupHandle.GetWorkflowID() {
foundDedup = true
dedupIDResp, ok := wf["DeduplicationID"].(string)
require.True(t, ok, "DeduplicationID should be a string")
assert.Equal(t, dedupID, dedupIDResp, "Expected deduplication ID to match")
} else if wfID == priorityHandle.GetWorkflowID() {
foundPriority = true
priority, ok := wf["Priority"].(float64) // JSON numbers decode as float64
require.True(t, ok, "Priority should be a number")
assert.Equal(t, float64(5), priority, "Expected priority to be 5")
}
}

// Verify all three workflows were found
assert.True(t, foundPartition, "Expected to find workflow with partition key")
assert.True(t, foundDedup, "Expected to find workflow with deduplication ID")
assert.True(t, foundPriority, "Expected to find workflow with priority")
})

t.Run("WorkflowSteps", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(context.Background(), Config{
Expand Down Expand Up @@ -978,7 +1034,7 @@ func TestAdminServer(t *testing.T) {
// Wait for 2-3 executions to verify scheduler is running
require.Eventually(t, func() bool {
return executionCount.Load() >= 2
}, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")
}, 10*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")

// Call deactivate endpoint
endpoint := fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
Expand Down