Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package dbos
import (
"context"
"crypto/sha256"
"encoding/gob"
"encoding/hex"
"fmt"
"io"
"log/slog"
"net/url"
"os"
"reflect"
"runtime"
"sort"
"time"

"github.com/robfig/cron/v3"
Expand All @@ -23,36 +22,6 @@ var (
_DEFAULT_ADMIN_SERVER_PORT = 3001
)

func computeApplicationVersion() string {
if len(registry) == 0 {
fmt.Println("DBOS: No registered workflows found, cannot compute application version")
return ""
}

// Collect all function names and sort them for consistent hashing
var functionNames []string
for fqn := range registry {
functionNames = append(functionNames, fqn)
}
sort.Strings(functionNames)

hasher := sha256.New()

for _, fqn := range functionNames {
workflowEntry := registry[fqn]

// Try to get function source location and other identifying info
if pc := runtime.FuncForPC(reflect.ValueOf(workflowEntry.wrappedFunction).Pointer()); pc != nil {
// Get the function's entry point - this reflects the actual compiled code
entry := pc.Entry()
fmt.Fprintf(hasher, "%x", entry)
}
}

return hex.EncodeToString(hasher.Sum(nil))

}

var workflowScheduler *cron.Cron // Global because accessed during workflow registration before the dbos singleton is initialized

var logger *slog.Logger // Global because accessed everywhere inside the library
Expand Down Expand Up @@ -141,6 +110,10 @@ func Initialize(inputConfig Config) error {
// Set global logger
logger = config.Logger

// Register types we serialize with gob
var t time.Time
gob.Register(t)

// Initialize global variables with environment variables, providing defaults if not set
_APP_VERSION = os.Getenv("DBOS__APPVERSION")
if _APP_VERSION == "" {
Expand Down Expand Up @@ -273,3 +246,32 @@ func Shutdown() {
}
dbos = nil
}

func GetBinaryHash() (string, error) {
execPath, err := os.Executable()
if err != nil {
return "", err
}

file, err := os.Open(execPath)
if err != nil {
return "", err
}
defer file.Close()

hasher := sha256.New()
if _, err := io.Copy(hasher, file); err != nil {
return "", err
}

return hex.EncodeToString(hasher.Sum(nil)), nil
}

func computeApplicationVersion() string {
hash, err := GetBinaryHash()
if err != nil {
fmt.Printf("DBOS: Failed to compute binary hash: %v\n", err)
return ""
}
return hash
}
28 changes: 0 additions & 28 deletions dbos/dbos_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package dbos

import (
"context"
"encoding/hex"
"maps"
"testing"
)

Expand Down Expand Up @@ -60,28 +57,3 @@ func TestConfigValidationErrorTypes(t *testing.T) {
}
})
}
func TestAppVersion(t *testing.T) {
if _, err := hex.DecodeString(_APP_VERSION); err != nil {
t.Fatalf("APP_VERSION is not a valid hex string: %v", err)
}

// Save the original registry content
originalRegistry := make(map[string]workflowRegistryEntry)
maps.Copy(originalRegistry, registry)

// Restore the registry after the test
defer func() {
registry = originalRegistry
}()

// Replace the registry and verify the hash is different
registry = make(map[string]workflowRegistryEntry)

WithWorkflow(func(ctx context.Context, input string) (string, error) {
return "new-registry-workflow-" + input, nil
})
hash2 := computeApplicationVersion()
if _APP_VERSION == hash2 {
t.Fatalf("APP_VERSION hash did not change after replacing registry")
}
}
179 changes: 179 additions & 0 deletions dbos/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"testing"
"time"
)

/** Test serialization and deserialization
Expand All @@ -13,6 +14,7 @@ import (
[x] Workflow inputs/outputs
[x] Step inputs/outputs
[x] Direct handlers, polling handler, list workflows results, get step infos
[x] Set/get event with user defined types
*/

var (
Expand Down Expand Up @@ -289,3 +291,180 @@ func TestWorkflowEncoding(t *testing.T) {
}
})
}

type UserDefinedEventData struct {
ID int `json:"id"`
Name string `json:"name"`
Details struct {
Description string `json:"description"`
Tags []string `json:"tags"`
} `json:"details"`
}

func setEventUserDefinedTypeWorkflow(ctx context.Context, input string) (string, error) {
eventData := UserDefinedEventData{
ID: 42,
Name: "test-event",
Details: struct {
Description string `json:"description"`
Tags []string `json:"tags"`
}{
Description: "This is a test event with user-defined data",
Tags: []string{"test", "user-defined", "serialization"},
},
}

err := SetEvent(ctx, WorkflowSetEventInput[UserDefinedEventData]{Key: input, Message: eventData})
if err != nil {
return "", err
}
return "user-defined-event-set", nil
}

var setEventUserDefinedTypeWf = WithWorkflow(setEventUserDefinedTypeWorkflow)

func TestSetEventSerialize(t *testing.T) {
setupDBOS(t)

t.Run("SetEventUserDefinedType", func(t *testing.T) {
// Start a workflow that sets an event with a user-defined type
setHandle, err := setEventUserDefinedTypeWf(context.Background(), "user-defined-key")
if err != nil {
t.Fatalf("failed to start workflow with user-defined event type: %v", err)
}

// Wait for the workflow to complete
result, err := setHandle.GetResult(context.Background())
if err != nil {
t.Fatalf("failed to get result from user-defined event workflow: %v", err)
}
if result != "user-defined-event-set" {
t.Fatalf("expected result to be 'user-defined-event-set', got '%s'", result)
}

// Retrieve the event to verify it was properly serialized and can be deserialized
retrievedEvent, err := GetEvent[UserDefinedEventData](context.Background(), WorkflowGetEventInput{
TargetWorkflowID: setHandle.GetWorkflowID(),
Key: "user-defined-key",
Timeout: 3 * time.Second,
})
if err != nil {
t.Fatalf("failed to get user-defined event: %v", err)
}

// Verify the retrieved data matches what we set
if retrievedEvent.ID != 42 {
t.Fatalf("expected ID to be 42, got %d", retrievedEvent.ID)
}
if retrievedEvent.Name != "test-event" {
t.Fatalf("expected Name to be 'test-event', got '%s'", retrievedEvent.Name)
}
if retrievedEvent.Details.Description != "This is a test event with user-defined data" {
t.Fatalf("expected Description to be 'This is a test event with user-defined data', got '%s'", retrievedEvent.Details.Description)
}
if len(retrievedEvent.Details.Tags) != 3 {
t.Fatalf("expected 3 tags, got %d", len(retrievedEvent.Details.Tags))
}
expectedTags := []string{"test", "user-defined", "serialization"}
for i, tag := range retrievedEvent.Details.Tags {
if tag != expectedTags[i] {
t.Fatalf("expected tag %d to be '%s', got '%s'", i, expectedTags[i], tag)
}
}
})
}


func sendUserDefinedTypeWorkflow(ctx context.Context, destinationID string) (string, error) {
// Create an instance of our user-defined type inside the workflow
sendData := UserDefinedEventData{
ID: 42,
Name: "test-send-message",
Details: struct {
Description string `json:"description"`
Tags []string `json:"tags"`
}{
Description: "This is a test send message with user-defined data",
Tags: []string{"test", "user-defined", "serialization", "send"},
},
}

// Send should automatically register this type with gob
// Note the explicit type parameter since compiler cannot infer UserDefinedEventData from string input
err := Send(ctx, WorkflowSendInput[UserDefinedEventData]{
DestinationID: destinationID,
Topic: "user-defined-topic",
Message: sendData,
})
if err != nil {
return "", err
}
return "user-defined-message-sent", nil
}

func recvUserDefinedTypeWorkflow(ctx context.Context, input string) (UserDefinedEventData, error) {
// Receive the user-defined type message
result, err := Recv[UserDefinedEventData](ctx, WorkflowRecvInput{
Topic: "user-defined-topic",
Timeout: 3 * time.Second,
})
return result, err
}

var sendUserDefinedTypeWf = WithWorkflow(sendUserDefinedTypeWorkflow)
var recvUserDefinedTypeWf = WithWorkflow(recvUserDefinedTypeWorkflow)

func TestSendSerialize(t *testing.T) {
setupDBOS(t)

t.Run("SendUserDefinedType", func(t *testing.T) {
// Start a receiver workflow first
recvHandle, err := recvUserDefinedTypeWf(context.Background(), "recv-input")
if err != nil {
t.Fatalf("failed to start receive workflow: %v", err)
}

// Start a sender workflow that sends a message with a user-defined type
sendHandle, err := sendUserDefinedTypeWf(context.Background(), recvHandle.GetWorkflowID())
if err != nil {
t.Fatalf("failed to start workflow with user-defined send type: %v", err)
}

// Wait for the sender workflow to complete
sendResult, err := sendHandle.GetResult(context.Background())
if err != nil {
t.Fatalf("failed to get result from user-defined send workflow: %v", err)
}
if sendResult != "user-defined-message-sent" {
t.Fatalf("expected result to be 'user-defined-message-sent', got '%s'", sendResult)
}

// Wait for the receiver workflow to complete and get the message
receivedData, err := recvHandle.GetResult(context.Background())
if err != nil {
t.Fatalf("failed to get result from receive workflow: %v", err)
}

// Verify the received data matches what we sent
if receivedData.ID != 42 {
t.Fatalf("expected ID to be 42, got %d", receivedData.ID)
}
if receivedData.Name != "test-send-message" {
t.Fatalf("expected Name to be 'test-send-message', got '%s'", receivedData.Name)
}
if receivedData.Details.Description != "This is a test send message with user-defined data" {
t.Fatalf("expected Description to be 'This is a test send message with user-defined data', got '%s'", receivedData.Details.Description)
}

// Verify tags
expectedTags := []string{"test", "user-defined", "serialization", "send"}
if len(receivedData.Details.Tags) != len(expectedTags) {
t.Fatalf("expected %d tags, got %d", len(expectedTags), len(receivedData.Details.Tags))
}
for i, tag := range receivedData.Details.Tags {
if tag != expectedTags[i] {
t.Fatalf("expected tag %d to be '%s', got '%s'", i, expectedTags[i], tag)
}
}
})
}
Loading
Loading