diff --git a/module/api_handlers.go b/module/api_handlers.go index d49d70bc..1f0db05b 100644 --- a/module/api_handlers.go +++ b/module/api_handlers.go @@ -85,6 +85,17 @@ type RESTResource struct { LastUpdate string `json:"lastUpdate,omitempty"` } +// WorkflowConfig holds the six workflow-related settings for a RESTAPIHandler. +// These fields are always configured together and are extracted here for clarity. +type WorkflowConfig struct { + Type string // The type of workflow to use (e.g., "order-workflow") + Engine string // The name of the workflow engine service to use + InitialTransition string // The first transition to trigger after creating a workflow instance (defaults to "start_validation") + InstanceIDPrefix string // Optional prefix for workflow instance IDs + InstanceIDField string // Field in resource data to use for instance ID (defaults to "id") + SeedFile string // Path to JSON seed data file +} + // RESTAPIHandler provides CRUD operations for a REST API type RESTAPIHandler struct { name string @@ -96,13 +107,7 @@ type RESTAPIHandler struct { app modular.Application persistence *PersistenceStore // optional write-through backend - // Workflow-related fields - workflowType string // The type of workflow to use (e.g., "order-workflow") - workflowEngine string // The name of the workflow engine service to use - initialTransition string // The first transition to trigger after creating a workflow instance (defaults to "start_validation") - instanceIDPrefix string // Optional prefix for workflow instance IDs - instanceIDField string // Field in resource data to use for instance ID (defaults to "id") - seedFile string // Path to JSON seed data file + WorkflowConfig // View/aggregation fields (e.g., queue-api reading from conversations) sourceResourceName string // Read from a different resource's persistence data (defaults to resourceName) @@ -137,27 +142,27 @@ func NewRESTAPIHandler(name, resourceName string) *RESTAPIHandler { // SetWorkflowType sets the workflow type for state machine operations. func (h *RESTAPIHandler) SetWorkflowType(wt string) { - h.workflowType = wt + h.Type = wt } // SetWorkflowEngine sets the name of the workflow engine service to use. func (h *RESTAPIHandler) SetWorkflowEngine(we string) { - h.workflowEngine = we + h.Engine = we } // SetInitialTransition sets the first transition to trigger after creating a workflow instance. func (h *RESTAPIHandler) SetInitialTransition(t string) { - h.initialTransition = t + h.InitialTransition = t } // SetInstanceIDPrefix sets the prefix used to build state machine instance IDs. func (h *RESTAPIHandler) SetInstanceIDPrefix(prefix string) { - h.instanceIDPrefix = prefix + h.InstanceIDPrefix = prefix } // SetSeedFile sets the path to a JSON seed data file. func (h *RESTAPIHandler) SetSeedFile(path string) { - h.seedFile = path + h.SeedFile = path } // SetSourceResourceName sets a different resource name for read operations (e.g., queue reads from conversations). @@ -211,12 +216,7 @@ func (h *RESTAPIHandler) Constructor() modular.ModuleConstructor { handler := NewRESTAPIHandler(h.name, h.resourceName) handler.app = app handler.logger = app.Logger() - handler.workflowType = h.workflowType - handler.workflowEngine = h.workflowEngine - handler.initialTransition = h.initialTransition - handler.instanceIDPrefix = h.instanceIDPrefix - handler.instanceIDField = h.instanceIDField - handler.seedFile = h.seedFile + handler.WorkflowConfig = h.WorkflowConfig handler.sourceResourceName = h.sourceResourceName handler.stateFilter = h.stateFilter handler.fieldMapping = h.fieldMapping @@ -247,7 +247,7 @@ func (h *RESTAPIHandler) Init(app modular.Application) error { h.logger = app.Logger() // Default values for workflow configuration - h.instanceIDField = "id" // Default to using "id" field if not specified + h.InstanceIDField = "id" // Default to using "id" field if not specified h.initFieldDefaults() // Get configuration if available @@ -268,22 +268,22 @@ func (h *RESTAPIHandler) Init(app modular.Application) error { // Extract workflow type if wt, ok := cfg["workflowType"].(string); ok && wt != "" { - h.workflowType = wt + h.Type = wt } // Extract workflow engine if we, ok := cfg["workflowEngine"].(string); ok && we != "" { - h.workflowEngine = we + h.Engine = we } // Extract instance ID prefix if prefix, ok := cfg["instanceIDPrefix"].(string); ok { - h.instanceIDPrefix = prefix + h.InstanceIDPrefix = prefix } // Extract instance ID field if field, ok := cfg["instanceIDField"].(string); ok && field != "" { - h.instanceIDField = field + h.InstanceIDField = field } // Extract source resource name (for view handlers like queue) @@ -331,14 +331,14 @@ func (h *RESTAPIHandler) Init(app modular.Application) error { // If workflowType is not set but we have a state machine configuration, // try to extract the default workflow type from there - if h.workflowType == "" { + if h.Type == "" { if statemachine, ok := config.(map[string]any)["workflows"].(map[string]any)["statemachine"]; ok { if smConfig, ok := statemachine.(map[string]any); ok { if defs, ok := smConfig["definitions"].([]any); ok && len(defs) > 0 { if def, ok := defs[0].(map[string]any); ok { if name, ok := def["name"].(string); ok && name != "" { - h.workflowType = name - h.logger.Info(fmt.Sprintf("Using default workflow type from state machine definition: %s", h.workflowType)) + h.Type = name + h.logger.Info(fmt.Sprintf("Using default workflow type from state machine definition: %s", h.Type)) } } } @@ -348,12 +348,12 @@ func (h *RESTAPIHandler) Init(app modular.Application) error { // If workflow engine is not set but we have a state machine configuration, // try to extract the engine name from there - if h.workflowEngine == "" { + if h.Engine == "" { if statemachine, ok := config.(map[string]any)["workflows"].(map[string]any)["statemachine"]; ok { if smConfig, ok := statemachine.(map[string]any); ok { if engine, ok := smConfig["engine"].(string); ok && engine != "" { - h.workflowEngine = engine - h.logger.Info(fmt.Sprintf("Using state machine engine from configuration: %s", h.workflowEngine)) + h.Engine = engine + h.logger.Info(fmt.Sprintf("Using state machine engine from configuration: %s", h.Engine)) } } } @@ -372,15 +372,15 @@ func (h *RESTAPIHandler) Init(app modular.Application) error { } // Log workflow configuration - if h.workflowType != "" { - h.logger.Info(fmt.Sprintf("REST API handler '%s' configured with workflow type: %s", h.name, h.workflowType)) - if h.workflowEngine != "" { - h.logger.Info(fmt.Sprintf("Using workflow engine: %s", h.workflowEngine)) + if h.Type != "" { + h.logger.Info(fmt.Sprintf("REST API handler '%s' configured with workflow type: %s", h.name, h.Type)) + if h.Engine != "" { + h.logger.Info(fmt.Sprintf("Using workflow engine: %s", h.Engine)) } - if h.instanceIDPrefix != "" { - h.logger.Info(fmt.Sprintf("Using instance ID prefix: %s", h.instanceIDPrefix)) + if h.InstanceIDPrefix != "" { + h.logger.Info(fmt.Sprintf("Using instance ID prefix: %s", h.InstanceIDPrefix)) } - h.logger.Info(fmt.Sprintf("Using instance ID field: %s", h.instanceIDField)) + h.logger.Info(fmt.Sprintf("Using instance ID field: %s", h.InstanceIDField)) } return nil @@ -416,7 +416,7 @@ func (h *RESTAPIHandler) Handle(w http.ResponseWriter, r *http.Request) { lastSegment := pathSegments[len(pathSegments)-1] if lastSegment == "transition" { isTransitionRequest = true - } else if h.workflowType != "" && lastSegment != resourceId { + } else if h.Type != "" && lastSegment != resourceId { // Only detect sub-actions for handlers with a workflow engine. // This prevents non-workflow handlers (e.g. messages-api) from // misinterpreting nested resource paths as sub-actions. @@ -536,13 +536,13 @@ func (h *RESTAPIHandler) handleGet(resourceId string, w http.ResponseWriter, r * // Get a specific resource if resource, ok := h.resources[resourceId]; ok { // Try to get the latest state and enrichment data from the workflow engine - if h.workflowEngine != "" { + if h.Engine != "" { instanceId := resourceId - if h.instanceIDPrefix != "" { - instanceId = h.instanceIDPrefix + resourceId + if h.InstanceIDPrefix != "" { + instanceId = h.InstanceIDPrefix + resourceId } var engineSvc any - if err := h.app.GetService(h.workflowEngine, &engineSvc); err == nil { + if err := h.app.GetService(h.Engine, &engineSvc); err == nil { if smEngine, ok := engineSvc.(*StateMachineEngine); ok { if instance, err := smEngine.GetInstance(instanceId); err == nil && instance != nil { resource.State = instance.CurrentState @@ -636,9 +636,9 @@ func (h *RESTAPIHandler) handleGetAll(w http.ResponseWriter, r *http.Request) { // Optionally get the state machine engine for live state lookup var smEngine *StateMachineEngine - if h.workflowEngine != "" { + if h.Engine != "" { var engineSvc any - if err := h.app.GetService(h.workflowEngine, &engineSvc); err == nil { + if err := h.app.GetService(h.Engine, &engineSvc); err == nil { smEngine, _ = engineSvc.(*StateMachineEngine) } } @@ -656,8 +656,8 @@ func (h *RESTAPIHandler) handleGetAll(w http.ResponseWriter, r *http.Request) { // so that fields added by processing steps (programId, affiliateId) are available. if smEngine != nil { instanceId := resource.ID - if h.instanceIDPrefix != "" { - instanceId = h.instanceIDPrefix + resource.ID + if h.InstanceIDPrefix != "" { + instanceId = h.InstanceIDPrefix + resource.ID } if instance, err := smEngine.GetInstance(instanceId); err == nil && instance != nil { resource.State = instance.CurrentState @@ -811,8 +811,8 @@ func (h *RESTAPIHandler) handlePost(resourceId string, w http.ResponseWriter, r if followUpID := firstNonEmpty(data, "sessionId", "conversationId"); followUpID != "" { h.mu.Lock() resource, exists := h.resources[followUpID] - if !exists && h.instanceIDPrefix != "" { - prefixed := h.instanceIDPrefix + followUpID + if !exists && h.InstanceIDPrefix != "" { + prefixed := h.InstanceIDPrefix + followUpID if r2, ok := h.resources[prefixed]; ok { followUpID = prefixed resource = r2 @@ -850,7 +850,7 @@ func (h *RESTAPIHandler) handlePost(resourceId string, w http.ResponseWriter, r // Bridge follow-up messages to the conversation resource // so the SPA (which reads from "conversations") sees them too. - if h.instanceIDPrefix == "conv-" { + if h.InstanceIDPrefix == "conv-" { convoId := "conv-" + strings.TrimPrefix(followUpID, "conv-") convoData, loadErr := h.persistence.LoadResource("conversations", convoId) if loadErr == nil && convoData != nil { @@ -991,7 +991,7 @@ func (h *RESTAPIHandler) handlePost(resourceId string, w http.ResponseWriter, r // corresponding conversation resource so the SPA can list it via /api/conversations. // Only bridge if the handler has instanceIDPrefix "conv-" (set in config for // webchat-api and webhooks-api), which signals it participates in conversation lifecycle. - if h.instanceIDPrefix == "conv-" && h.persistence != nil { + if h.InstanceIDPrefix == "conv-" && h.persistence != nil { h.bridgeToConversation(resourceId, data) } @@ -1011,7 +1011,7 @@ func (h *RESTAPIHandler) handlePost(resourceId string, w http.ResponseWriter, r } // If a workflow engine is configured, create an instance and trigger the initial transition - if h.workflowType != "" && h.workflowEngine != "" { + if h.Type != "" && h.Engine != "" { h.startWorkflowForResource(r.Context(), resourceId, resource) } @@ -1101,14 +1101,14 @@ func (h *RESTAPIHandler) resolveConversationRouting(data map[string]any, msgBody func (h *RESTAPIHandler) startWorkflowForResource(_ context.Context, resourceId string, resource RESTResource) { // Find the state machine engine var engineSvc any - if err := h.app.GetService(h.workflowEngine, &engineSvc); err != nil { - h.logger.Warn(fmt.Sprintf("Workflow engine '%s' not found: %v", h.workflowEngine, err)) + if err := h.app.GetService(h.Engine, &engineSvc); err != nil { + h.logger.Warn(fmt.Sprintf("Workflow engine '%s' not found: %v", h.Engine, err)) return } smEngine, ok := engineSvc.(*StateMachineEngine) if !ok { - h.logger.Warn(fmt.Sprintf("Service '%s' is not a StateMachineEngine", h.workflowEngine)) + h.logger.Warn(fmt.Sprintf("Service '%s' is not a StateMachineEngine", h.Engine)) return } @@ -1117,12 +1117,12 @@ func (h *RESTAPIHandler) startWorkflowForResource(_ context.Context, resourceId // instanceIDPrefix: "conv-" in their config so the conversations-api // can find the same state machine instance by the conversation's own ID. instanceId := resourceId - if h.instanceIDPrefix != "" { - instanceId = h.instanceIDPrefix + resourceId + if h.InstanceIDPrefix != "" { + instanceId = h.InstanceIDPrefix + resourceId } // Create the workflow instance - _, err := smEngine.CreateWorkflow(h.workflowType, instanceId, resource.Data) + _, err := smEngine.CreateWorkflow(h.Type, instanceId, resource.Data) if err != nil { h.logger.Error(fmt.Sprintf("Failed to create workflow instance '%s': %v", instanceId, err)) return @@ -1134,7 +1134,7 @@ func (h *RESTAPIHandler) startWorkflowForResource(_ context.Context, resourceId // handler returns, which would abort the processing pipeline. go func() { bgCtx := context.Background() - transitionName := h.initialTransition + transitionName := h.InitialTransition if transitionName == "" { transitionName = "start_validation" // default convention } @@ -1232,7 +1232,7 @@ func (h *RESTAPIHandler) syncResourceStateFromEngine(instanceId, resourceId stri // Update the bridged conversation resource's state from the engine. // Only update the state field, not the full data (which was already // set by bridgeToConversation with routing info, messages, etc.). - if h.instanceIDPrefix == "conv-" && h.persistence != nil { + if h.InstanceIDPrefix == "conv-" && h.persistence != nil { convoId := fmt.Sprintf("conv-%s", resourceId) h.updateConversationState(convoId, res.State) } @@ -1486,7 +1486,7 @@ func (h *RESTAPIHandler) handleTransition(resourceId string, w http.ResponseWrit } // Determine the workflow type to use - workflowType := h.workflowType // Use configured workflow type by default + workflowType := h.Type // Use configured workflow type by default // If a workflow type was specified in the transition request, use that instead if transitionRequest.WorkflowType != "" { @@ -1507,9 +1507,9 @@ func (h *RESTAPIHandler) handleTransition(resourceId string, w http.ResponseWrit var instanceId string // Check if we have a specific instance ID field configured - if h.instanceIDField != "" && h.instanceIDField != "id" { + if h.InstanceIDField != "" && h.InstanceIDField != "id" { // Try to get the instance ID from the specified field in the resource data - if idVal, ok := workflowData[h.instanceIDField].(string); ok && idVal != "" { + if idVal, ok := workflowData[h.InstanceIDField].(string); ok && idVal != "" { instanceId = idVal } } @@ -1520,8 +1520,8 @@ func (h *RESTAPIHandler) handleTransition(resourceId string, w http.ResponseWrit } // Add prefix if configured - if h.instanceIDPrefix != "" { - instanceId = h.instanceIDPrefix + instanceId + if h.InstanceIDPrefix != "" { + instanceId = h.InstanceIDPrefix + instanceId } // Set the required IDs in the workflow data @@ -1535,17 +1535,17 @@ func (h *RESTAPIHandler) handleTransition(resourceId string, w http.ResponseWrit var isStateMachineEngine bool // First, try to use the specifically configured engine if available - if h.workflowEngine != "" { + if h.Engine != "" { var engineSvc any - if err := h.app.GetService(h.workflowEngine, &engineSvc); err == nil && engineSvc != nil { + if err := h.app.GetService(h.Engine, &engineSvc); err == nil && engineSvc != nil { engine = engineSvc if sm, ok := engineSvc.(*StateMachineEngine); ok { stateMachineEngine = sm isStateMachineEngine = true } - h.logger.Debug(fmt.Sprintf("Using configured workflow engine: %s", h.workflowEngine)) + h.logger.Debug(fmt.Sprintf("Using configured workflow engine: %s", h.Engine)) } else { - h.logger.Warn(fmt.Sprintf("Configured workflow engine '%s' not found, will try to discover one", h.workflowEngine)) + h.logger.Warn(fmt.Sprintf("Configured workflow engine '%s' not found, will try to discover one", h.Engine)) } } @@ -1928,9 +1928,9 @@ func (h *RESTAPIHandler) handleSubAction(resourceId, subAction string, w http.Re // Find the state machine engine var smEngine *StateMachineEngine - if h.workflowEngine != "" { + if h.Engine != "" { var engineSvc any - if err := h.app.GetService(h.workflowEngine, &engineSvc); err == nil { + if err := h.app.GetService(h.Engine, &engineSvc); err == nil { smEngine, _ = engineSvc.(*StateMachineEngine) } } @@ -1942,8 +1942,8 @@ func (h *RESTAPIHandler) handleSubAction(resourceId, subAction string, w http.Re // Build instance ID instanceId := resourceId - if h.instanceIDPrefix != "" { - instanceId = h.instanceIDPrefix + resourceId + if h.InstanceIDPrefix != "" { + instanceId = h.InstanceIDPrefix + resourceId } // Merge existing resource data into the transition payload @@ -1970,7 +1970,7 @@ func (h *RESTAPIHandler) handleSubAction(resourceId, subAction string, w http.Re // Ensure workflow instance exists if _, err := smEngine.GetInstance(instanceId); err != nil { // Create it if missing - if _, err := smEngine.CreateWorkflow(h.workflowType, instanceId, workflowData); err != nil { + if _, err := smEngine.CreateWorkflow(h.Type, instanceId, workflowData); err != nil { h.logger.Error(fmt.Sprintf("Failed to create workflow instance for sub-action: %v", err)) w.WriteHeader(http.StatusInternalServerError) _ = json.NewEncoder(w).Encode(map[string]string{"error": "Failed to create workflow instance"}) @@ -2241,13 +2241,13 @@ func (h *RESTAPIHandler) handleSubActionGet(resourceId, subAction string, w http } } // Enrich with live state from workflow engine - if h.workflowEngine != "" { + if h.Engine != "" { instanceId := resourceId - if h.instanceIDPrefix != "" { - instanceId = h.instanceIDPrefix + resourceId + if h.InstanceIDPrefix != "" { + instanceId = h.InstanceIDPrefix + resourceId } var engineSvc any - if err := h.app.GetService(h.workflowEngine, &engineSvc); err == nil { + if err := h.app.GetService(h.Engine, &engineSvc); err == nil { if smEngine, ok := engineSvc.(*StateMachineEngine); ok { if instance, err := smEngine.GetInstance(instanceId); err == nil && instance != nil { summary["state"] = instance.CurrentState @@ -2349,13 +2349,13 @@ func (h *RESTAPIHandler) Start(ctx context.Context) error { } // Load seed data only if no persisted data was loaded - if h.seedFile != "" { - if err := h.loadSeedData(h.seedFile); err != nil { + if h.SeedFile != "" { + if err := h.loadSeedData(h.SeedFile); err != nil { if h.logger != nil { - h.logger.Warn(fmt.Sprintf("Failed to load seed data from %s: %v", h.seedFile, err)) + h.logger.Warn(fmt.Sprintf("Failed to load seed data from %s: %v", h.SeedFile, err)) } } else if h.logger != nil { - h.logger.Info(fmt.Sprintf("Loaded seed data from %s", h.seedFile)) + h.logger.Info(fmt.Sprintf("Loaded seed data from %s", h.SeedFile)) } } diff --git a/module/api_handlers_test.go b/module/api_handlers_test.go index 368f3ec4..e34e71c6 100644 --- a/module/api_handlers_test.go +++ b/module/api_handlers_test.go @@ -71,8 +71,8 @@ func TestRESTAPIHandler_Init(t *testing.T) { if h.logger == nil { t.Error("expected logger to be set") } - if h.instanceIDField != "id" { - t.Errorf("expected default instanceIDField 'id', got '%s'", h.instanceIDField) + if h.InstanceIDField != "id" { + t.Errorf("expected default instanceIDField 'id', got '%s'", h.InstanceIDField) } } @@ -684,8 +684,8 @@ func TestRESTAPIHandler_Init_FullSetup(t *testing.T) { t.Fatalf("Init failed: %v", err) } - if h.workflowType != "item-workflow" { - t.Errorf("expected workflowType 'item-workflow', got '%s'", h.workflowType) + if h.Type != "item-workflow" { + t.Errorf("expected workflowType 'item-workflow', got '%s'", h.Type) } }