From da3ae5d315a4a3bd4442ae8a090604960018c840 Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Tue, 4 Nov 2025 07:21:34 -0500 Subject: [PATCH 1/4] Enhance backend to support LangGraph workflows - Updated Dockerfile to include retry logic for dependency downloads. - Added database initialization in main.go. - Introduced new routes for managing LangGraph workflows in routes.go. - Enhanced session handling to support workflow references and inputs in sessions.go. - Updated session types to accommodate new workflow structures in session.go. - Configured PostgreSQL environment variables in backend deployment manifest. - Added new CRD properties for workflow references and status conditions in agenticsessions-crd.yaml. This commit lays the groundwork for integrating LangGraph workflows into the backend, improving session management and database interactions. --- .gitignore | 1 + components/backend/Dockerfile | 6 +- components/backend/go.mod | 3 +- components/backend/handlers/sessions.go | 215 +++++++++- components/backend/main.go | 5 + components/backend/routes.go | 13 + components/backend/types/session.go | 28 +- components/manifests/backend-deployment.yaml | 22 + .../manifests/crds/agenticsessions-crd.yaml | 55 ++- components/manifests/kustomization.yaml | 1 + .../operator/internal/handlers/sessions.go | 401 +++++++++++++++++- test-workflow-example/app/__init__.py | 2 + 12 files changed, 714 insertions(+), 38 deletions(-) create mode 100644 test-workflow-example/app/__init__.py diff --git a/.gitignore b/.gitignore index 5e58da2dc..d86e197a9 100644 --- a/.gitignore +++ b/.gitignore @@ -118,3 +118,4 @@ Thumbs.db .stories/ .testplans/ .spikes/ +.cursor/worktrees.json \ No newline at end of file diff --git a/components/backend/Dockerfile b/components/backend/Dockerfile index e186bdfb2..1106ef265 100644 --- a/components/backend/Dockerfile +++ b/components/backend/Dockerfile @@ -8,8 +8,10 @@ USER 0 # Copy go mod and sum files COPY go.mod go.sum ./ -# Download dependencies -RUN go mod download +# Download dependencies (with retry and DNS workaround for cross-platform builds) +RUN GOPROXY=direct GOSUMDB=off go mod download || \ + (sleep 2 && GOPROXY=direct GOSUMDB=off go mod download) || \ + (sleep 5 && GOPROXY=direct GOSUMDB=off go mod download) # Copy the source code COPY . . diff --git a/components/backend/go.mod b/components/backend/go.mod index 69050d560..87286f381 100644 --- a/components/backend/go.mod +++ b/components/backend/go.mod @@ -8,8 +8,10 @@ require ( github.com/gin-contrib/cors v1.7.6 github.com/gin-gonic/gin v1.10.1 github.com/golang-jwt/jwt/v5 v5.3.0 + github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/joho/godotenv v1.5.1 + github.com/lib/pq v1.10.9 k8s.io/api v0.34.0 k8s.io/apimachinery v0.34.0 k8s.io/client-go v0.34.0 @@ -34,7 +36,6 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/gnostic-models v0.7.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 9949ade03..6f87431e8 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -2,6 +2,7 @@ package handlers import ( "context" + "database/sql" "encoding/json" "fmt" "io" @@ -12,6 +13,7 @@ import ( "strings" "time" + "ambient-code-backend/server" "ambient-code-backend/types" "github.com/gin-gonic/gin" @@ -48,6 +50,27 @@ type contentListItem struct { func parseSpec(spec map[string]interface{}) types.AgenticSessionSpec { result := types.AgenticSessionSpec{} + // LangGraph workflow reference (optional) + if workflowRef, ok := spec["workflowRef"].(map[string]interface{}); ok { + wfRef := &types.WorkflowRef{} + if name, ok := workflowRef["name"].(string); ok { + wfRef.Name = name + } + if version, ok := workflowRef["version"].(string); ok { + wfRef.Version = version + } + if graph, ok := workflowRef["graph"].(string); ok { + wfRef.Graph = graph + } + result.WorkflowRef = wfRef + } + + // LangGraph workflow inputs (optional) + if inputs, ok := spec["inputs"].(map[string]interface{}); ok { + result.Inputs = inputs + } + + // Legacy prompt-based session if prompt, ok := spec["prompt"].(string); ok { result.Prompt = prompt } @@ -235,6 +258,36 @@ func parseStatus(status map[string]interface{}) *types.AgenticSessionStatus { result.StateDir = stateDir } + // LangGraph workflow status fields + if currentNode, ok := status["currentNode"].(string); ok { + result.CurrentNode = currentNode + } + if checkpointId, ok := status["checkpointId"].(string); ok { + result.CheckpointId = checkpointId + } + if conditions, ok := status["conditions"].([]interface{}); ok { + conds := make([]types.StatusCondition, 0, len(conditions)) + for _, cond := range conditions { + if condMap, ok := cond.(map[string]interface{}); ok { + c := types.StatusCondition{} + if t, ok := condMap["type"].(string); ok { + c.Type = t + } + if s, ok := condMap["status"].(string); ok { + c.Status = s + } + if m, ok := condMap["message"].(string); ok { + c.Message = m + } + if ltt, ok := condMap["lastTransitionTime"].(string); ok { + c.LastTransitionTime = ltt + } + conds = append(conds, c) + } + } + result.Conditions = conds + } + return result } @@ -288,7 +341,99 @@ func CreateSession(c *gin.Context) { return } - // Validation for multi-repo can be added here if needed + // Validate that either prompt or workflowRef is provided + if req.WorkflowRef == nil && req.Prompt == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "either prompt or workflowRef must be provided"}) + return + } + + // Resolve workflow if workflowRef is provided + var workflowImageDigest string + var workflowGraphEntry string + if req.WorkflowRef != nil { + // Resolve workflow version from registry + var version string = req.WorkflowRef.Version + if version == "" { + // Get latest version + var workflowID string + err := server.DB.QueryRow( + "SELECT id FROM workflows WHERE project = $1 AND name = $2", + project, req.WorkflowRef.Name, + ).Scan(&workflowID) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("workflow '%s' not found", req.WorkflowRef.Name)}) + return + } + if err != nil { + log.Printf("Failed to query workflow: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to resolve workflow"}) + return + } + + // Get latest version + err = server.DB.QueryRow( + "SELECT version FROM workflow_versions WHERE workflow_id = $1 ORDER BY created_at DESC LIMIT 1", + workflowID, + ).Scan(&version) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("no versions found for workflow '%s'", req.WorkflowRef.Name)}) + return + } + if err != nil { + log.Printf("Failed to query workflow version: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to resolve workflow version"}) + return + } + } + + // Get workflow version with graphs + var workflowID string + err := server.DB.QueryRow( + "SELECT id FROM workflows WHERE project = $1 AND name = $2", + project, req.WorkflowRef.Name, + ).Scan(&workflowID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("workflow '%s' not found", req.WorkflowRef.Name)}) + return + } + + var graphsJSON []byte + err = server.DB.QueryRow( + "SELECT image_digest, graphs FROM workflow_versions WHERE workflow_id = $1 AND version = $2", + workflowID, version, + ).Scan(&workflowImageDigest, &graphsJSON) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("workflow version '%s' not found", version)}) + return + } + if err != nil { + log.Printf("Failed to query workflow version: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to resolve workflow version"}) + return + } + + // Parse graphs and find the requested graph + var graphs []types.WorkflowGraph + if err := json.Unmarshal(graphsJSON, &graphs); err != nil { + log.Printf("Failed to unmarshal graphs: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to parse workflow graphs"}) + return + } + + // Find the requested graph + found := false + for _, graph := range graphs { + if graph.Name == req.WorkflowRef.Graph { + workflowGraphEntry = graph.Entry + found = true + break + } + } + if !found { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("graph '%s' not found in workflow version", req.WorkflowRef.Graph)}) + return + } + } // Set defaults for LLM settings if not provided llmSettings := types.LLMSettings{ @@ -330,29 +475,73 @@ func CreateSession(c *gin.Context) { } metadata["labels"] = labels } + annotations := map[string]interface{}{} if len(req.Annotations) > 0 { - annotations := map[string]interface{}{} for k, v := range req.Annotations { annotations[k] = v } + } + + // Store resolved workflow info in annotations for operator + if req.WorkflowRef != nil { + annotations["ambient-code.io/workflow-image-digest"] = workflowImageDigest + annotations["ambient-code.io/workflow-graph-entry"] = workflowGraphEntry + annotations["ambient-code.io/workflow-name"] = req.WorkflowRef.Name + annotations["ambient-code.io/workflow-version"] = func() string { + if req.WorkflowRef.Version != "" { + return req.WorkflowRef.Version + } + // Get latest version + var workflowID string + var version string + err := server.DB.QueryRow( + "SELECT id FROM workflows WHERE project = $1 AND name = $2", + project, req.WorkflowRef.Name, + ).Scan(&workflowID) + if err == nil { + _ = server.DB.QueryRow( + "SELECT version FROM workflow_versions WHERE workflow_id = $1 ORDER BY created_at DESC LIMIT 1", + workflowID, + ).Scan(&version) + } + return version + }() + annotations["ambient-code.io/workflow-graph"] = req.WorkflowRef.Graph + } + if len(annotations) > 0 { metadata["annotations"] = annotations } + spec := map[string]interface{}{ + "displayName": req.DisplayName, + "project": project, + "llmSettings": map[string]interface{}{ + "model": llmSettings.Model, + "temperature": llmSettings.Temperature, + "maxTokens": llmSettings.MaxTokens, + }, + "timeout": timeout, + } + + // Add prompt or workflowRef based on request + if req.WorkflowRef != nil { + spec["workflowRef"] = map[string]interface{}{ + "name": req.WorkflowRef.Name, + "version": annotations["ambient-code.io/workflow-version"], + "graph": req.WorkflowRef.Graph, + } + if req.Inputs != nil && len(req.Inputs) > 0 { + spec["inputs"] = req.Inputs + } + } else { + spec["prompt"] = req.Prompt + } + session := map[string]interface{}{ "apiVersion": "vteam.ambient-code/v1alpha1", "kind": "AgenticSession", "metadata": metadata, - "spec": map[string]interface{}{ - "prompt": req.Prompt, - "displayName": req.DisplayName, - "project": project, - "llmSettings": map[string]interface{}{ - "model": llmSettings.Model, - "temperature": llmSettings.Temperature, - "maxTokens": llmSettings.MaxTokens, - }, - "timeout": timeout, - }, + "spec": spec, "status": map[string]interface{}{ "phase": "Pending", }, diff --git a/components/backend/main.go b/components/backend/main.go index cd43f357c..9a83e581b 100644 --- a/components/backend/main.go +++ b/components/backend/main.go @@ -33,6 +33,11 @@ func main() { server.InitConfig() + // Initialize database connection + if err := server.InitDB(); err != nil { + log.Fatalf("Failed to initialize database: %v", err) + } + // Initialize git package git.GetProjectSettingsResource = k8s.GetProjectSettingsResource git.GetGitHubInstallation = func(ctx context.Context, userID string) (interface{}, error) { diff --git a/components/backend/routes.go b/components/backend/routes.go index d0894de56..e45866de9 100644 --- a/components/backend/routes.go +++ b/components/backend/routes.go @@ -80,6 +80,19 @@ func registerRoutes(r *gin.Engine, jiraHandler *jira.Handler) { projectGroup.PUT("/runner-secrets/config", handlers.UpdateRunnerSecretsConfig) projectGroup.GET("/runner-secrets", handlers.ListRunnerSecrets) projectGroup.PUT("/runner-secrets", handlers.UpdateRunnerSecrets) + + // LangGraph workflow routes + projectGroup.GET("/workflows", handlers.ListWorkflows) + projectGroup.POST("/workflows", handlers.CreateWorkflow) + projectGroup.GET("/workflows/:name", handlers.GetWorkflow) + projectGroup.DELETE("/workflows/:name", handlers.DeleteWorkflow) + projectGroup.POST("/workflows/:name/versions", handlers.CreateWorkflowVersion) + projectGroup.GET("/workflows/:name/versions/:version", handlers.GetWorkflowVersion) + + // LangGraph run management routes + projectGroup.POST("/runs/:runId/events", handlers.IngestRunEvent) + projectGroup.GET("/runs/:runId/events", handlers.GetRunEvents) + projectGroup.POST("/runs/:runId/approve", handlers.ApproveRun) } api.POST("/auth/github/install", handlers.LinkGitHubInstallationGlobal) diff --git a/components/backend/types/session.go b/components/backend/types/session.go index e7a5de5b6..3c2d7ba8b 100644 --- a/components/backend/types/session.go +++ b/components/backend/types/session.go @@ -10,7 +10,12 @@ type AgenticSession struct { } type AgenticSessionSpec struct { - Prompt string `json:"prompt" binding:"required"` + // LangGraph workflow reference (optional, for new workflow system) + WorkflowRef *WorkflowRef `json:"workflowRef,omitempty"` + Inputs map[string]interface{} `json:"inputs,omitempty"` // Free-form JSON for LangGraph workflows + + // Legacy prompt-based session (required if workflowRef not provided) + Prompt string `json:"prompt,omitempty"` Interactive bool `json:"interactive,omitempty"` DisplayName string `json:"displayName"` LLMSettings LLMSettings `json:"llmSettings"` @@ -58,10 +63,27 @@ type AgenticSessionStatus struct { TotalCostUSD *float64 `json:"total_cost_usd,omitempty"` Usage map[string]interface{} `json:"usage,omitempty"` Result *string `json:"result,omitempty"` + // LangGraph workflow status + CurrentNode string `json:"currentNode,omitempty"` + CheckpointId string `json:"checkpointId,omitempty"` + Conditions []StatusCondition `json:"conditions,omitempty"` +} + +// StatusCondition represents a status condition +type StatusCondition struct { + Type string `json:"type"` + Status string `json:"status"` + Message string `json:"message,omitempty"` + LastTransitionTime string `json:"lastTransitionTime,omitempty"` } type CreateAgenticSessionRequest struct { - Prompt string `json:"prompt" binding:"required"` + // LangGraph workflow (optional) + WorkflowRef *WorkflowRef `json:"workflowRef,omitempty"` + Inputs map[string]interface{} `json:"inputs,omitempty"` + + // Legacy prompt-based session (required if workflowRef not provided) + Prompt string `json:"prompt,omitempty"` DisplayName string `json:"displayName,omitempty"` LLMSettings *LLMSettings `json:"llmSettings,omitempty"` Timeout *int `json:"timeout,omitempty"` @@ -73,7 +95,7 @@ type CreateAgenticSessionRequest struct { AutoPushOnComplete *bool `json:"autoPushOnComplete,omitempty"` UserContext *UserContext `json:"userContext,omitempty"` BotAccount *BotAccountRef `json:"botAccount,omitempty"` - ResourceOverrides *ResourceOverrides `json:"resourceOverrides,omitempty"` + ResourceOverrides *ResourceOverrides `json:"resourceOverrides,omitempty"` EnvironmentVariables map[string]string `json:"environmentVariables,omitempty"` Labels map[string]string `json:"labels,omitempty"` Annotations map[string]string `json:"annotations,omitempty"` diff --git a/components/manifests/backend-deployment.yaml b/components/manifests/backend-deployment.yaml index 6a35cb6f1..a3c94ad0b 100644 --- a/components/manifests/backend-deployment.yaml +++ b/components/manifests/backend-deployment.yaml @@ -71,6 +71,28 @@ spec: name: github-app-secret key: GITHUB_STATE_SECRET optional: true + # Postgres configuration for LangGraph workflows + - name: POSTGRES_HOST + value: "postgres-service" + - name: POSTGRES_PORT + value: "5432" + - name: POSTGRES_USER + valueFrom: + secretKeyRef: + name: postgres-secret + key: POSTGRES_USER + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: postgres-secret + key: POSTGRES_PASSWORD + - name: POSTGRES_DB + valueFrom: + secretKeyRef: + name: postgres-secret + key: POSTGRES_DB + - name: TRUSTED_REGISTRIES + value: "quay.io/ambient_code/*" resources: requests: cpu: 100m diff --git a/components/manifests/crds/agenticsessions-crd.yaml b/components/manifests/crds/agenticsessions-crd.yaml index d8e8437f1..4a5370e27 100644 --- a/components/manifests/crds/agenticsessions-crd.yaml +++ b/components/manifests/crds/agenticsessions-crd.yaml @@ -16,9 +16,29 @@ spec: properties: spec: type: object - required: - - prompt properties: + # LangGraph workflow reference (optional, for new workflow system) + workflowRef: + type: object + description: "Reference to a registered LangGraph workflow" + properties: + name: + type: string + description: "Workflow name" + version: + type: string + description: "Workflow version (optional, defaults to latest)" + graph: + type: string + description: "Graph name from workflow version's graphs array" + inputs: + type: object + description: "Inputs to pass to the LangGraph workflow (free-form JSON)" + x-kubernetes-preserve-unknown-fields: true + # Legacy prompt-based session (required if workflowRef not provided) + prompt: + type: string + description: "The initial prompt for the agentic session (required if workflowRef not provided)" # Multiple-repo configuration (new unified mapping) repos: type: array @@ -64,9 +84,6 @@ spec: interactive: type: boolean description: "When true, run session in interactive chat mode using inbox/outbox files" - prompt: - type: string - description: "The initial prompt for the agentic session" displayName: type: string description: "A descriptive display name for the agentic session generated from prompt and website" @@ -158,6 +175,34 @@ spec: result: type: string description: "Final result text as reported by the runner" + currentNode: + type: string + description: "Current node being executed (for LangGraph workflows)" + checkpointId: + type: string + description: "Current checkpoint ID (for LangGraph workflows)" + conditions: + type: array + description: "Status conditions" + items: + type: object + properties: + type: + type: string + enum: + - "AwaitingApproval" + - "LeaseHealthy" + status: + type: string + enum: + - "True" + - "False" + - "Unknown" + message: + type: string + lastTransitionTime: + type: string + format: date-time additionalPrinterColumns: - name: Phase type: string diff --git a/components/manifests/kustomization.yaml b/components/manifests/kustomization.yaml index d8c15ebac..1d83520e2 100644 --- a/components/manifests/kustomization.yaml +++ b/components/manifests/kustomization.yaml @@ -19,6 +19,7 @@ resources: - frontend-deployment.yaml - operator-deployment.yaml - workspace-pvc.yaml +- postgres images: - name: quay.io/ambient_code/vteam_backend:latest newName: quay.io/ambient_code/vteam_backend diff --git a/components/operator/internal/handlers/sessions.go b/components/operator/internal/handlers/sessions.go index f2076491e..0936ca025 100644 --- a/components/operator/internal/handlers/sessions.go +++ b/components/operator/internal/handlers/sessions.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "net/http" "strings" "time" @@ -152,6 +153,40 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error { // Extract spec information from the fresh object spec, _, _ := unstructured.NestedMap(currentObj.Object, "spec") + + // Check if this is a LangGraph workflow session + workflowRef, hasWorkflowRef, _ := unstructured.NestedMap(spec, "workflowRef") + var workflowImageDigest string + var workflowGraphEntry string + var workflowInputs map[string]interface{} + + if hasWorkflowRef && workflowRef != nil { + // Read resolved workflow info from annotations + metadata, _, _ := unstructured.NestedMap(currentObj.Object, "metadata") + if annotations, ok := metadata["annotations"].(map[string]interface{}); ok { + if imgDigest, ok := annotations["ambient-code.io/workflow-image-digest"].(string); ok { + workflowImageDigest = imgDigest + } + if graphEntry, ok := annotations["ambient-code.io/workflow-graph-entry"].(string); ok { + workflowGraphEntry = graphEntry + } + } + + // Read inputs from spec + if inputs, _, _ := unstructured.NestedMap(spec, "inputs"); inputs != nil { + workflowInputs = inputs + } + + if workflowImageDigest == "" || workflowGraphEntry == "" { + log.Printf("WorkflowRef found but missing resolved image/graph info in annotations for session %s", name) + updateAgenticSessionStatus(sessionNamespace, name, map[string]interface{}{ + "phase": "Error", + "message": "Workflow image or graph entry not resolved", + }) + return fmt.Errorf("workflow image or graph entry not resolved") + } + } + prompt, _, _ := unstructured.NestedString(spec, "prompt") timeout, _, _ := unstructured.NestedInt64(spec, "timeout") interactive, _, _ := unstructured.NestedBool(spec, "interactive") @@ -378,20 +413,97 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error { } } - return base - }(), - - // If configured, import all keys from the runner Secret as environment variables - EnvFrom: func() []corev1.EnvFromSource { - if runnerSecretsName != "" { - return []corev1.EnvFromSource{{SecretRef: &corev1.SecretEnvSource{LocalObjectReference: corev1.LocalObjectReference{Name: runnerSecretsName}}}} - } - return []corev1.EnvFromSource{} - }(), - - Resources: corev1.ResourceRequirements{}, - }, - }, + // Flip roles so the content writer is the main container that keeps the pod alive + Containers: func() []corev1.Container { + containers := []corev1.Container{ + { + Name: "ambient-content", + Image: appConfig.ContentServiceImage, + ImagePullPolicy: appConfig.ImagePullPolicy, + Env: []corev1.EnvVar{ + {Name: "CONTENT_SERVICE_MODE", Value: "true"}, + {Name: "STATE_BASE_DIR", Value: "/workspace"}, + }, + Ports: []corev1.ContainerPort{{ContainerPort: 8080, Name: "http"}}, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 5, + }, + VolumeMounts: []corev1.VolumeMount{{Name: "workspace", MountPath: "/workspace"}}, + }, + } + + // Determine runner image and container config + var runnerImage string + var runnerContainer corev1.Container + + if hasWorkflowRef && workflowImageDigest != "" { + // LangGraph workflow runner + runnerImage = workflowImageDigest + runnerContainer = corev1.Container{ + Name: "langgraph-runner", + Image: runnerImage, + ImagePullPolicy: appConfig.ImagePullPolicy, + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: boolPtr(false), + ReadOnlyRootFilesystem: boolPtr(false), + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + }, + Ports: []corev1.ContainerPort{{ContainerPort: 8000, Name: "http"}}, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/ready", + Port: intstr.FromString("http"), + }, + }, + InitialDelaySeconds: 10, + PeriodSeconds: 5, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "workspace", MountPath: "/workspace", ReadOnly: false}, + }, + Env: buildLangGraphRunnerEnv(name, sessionNamespace, workflowGraphEntry, workflowInputs, appConfig), + } + } else { + // Legacy Claude Code runner + runnerImage = appConfig.AmbientCodeRunnerImage + runnerContainer = corev1.Container{ + Name: "ambient-code-runner", + Image: runnerImage, + ImagePullPolicy: appConfig.ImagePullPolicy, + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: boolPtr(false), + ReadOnlyRootFilesystem: boolPtr(false), + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "workspace", MountPath: "/workspace", ReadOnly: false}, + }, + Env: buildLegacyRunnerEnv(name, sessionNamespace, prompt, model, temperature, maxTokens, timeout, autoPushOnComplete, interactive, inputRepo, inputBranch, outputRepo, outputBranch, appConfig, currentObj, runnerSecretsName), + EnvFrom: func() []corev1.EnvFromSource { + if runnerSecretsName != "" { + return []corev1.EnvFromSource{{SecretRef: &corev1.SecretEnvSource{LocalObjectReference: corev1.LocalObjectReference{Name: runnerSecretsName}}}} + } + return []corev1.EnvFromSource{} + }(), + Resources: corev1.ResourceRequirements{}, + } + } + + containers = append(containers, runnerContainer) + return containers + }(), }, }, }, @@ -478,6 +590,35 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error { log.Printf("Failed to create per-job content service for %s: %v", name, serr) } + // If LangGraph workflow, create service for runner and start workflow + if hasWorkflowRef && workflowImageDigest != "" { + runnerSvc := &corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("langgraph-runner-%s", name), + Namespace: sessionNamespace, + Labels: map[string]string{"app": "langgraph-runner", "agentic-session": name}, + OwnerReferences: []v1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "Job", + Name: jobName, + UID: createdJob.UID, + Controller: boolPtr(true), + }}, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"job-name": jobName}, + Ports: []corev1.ServicePort{{Port: 8000, TargetPort: intstr.FromString("http"), Protocol: corev1.ProtocolTCP, Name: "http"}}, + Type: corev1.ServiceTypeClusterIP, + }, + } + if _, serr := config.K8sClient.CoreV1().Services(sessionNamespace).Create(context.TODO(), runnerSvc, v1.CreateOptions{}); serr != nil && !errors.IsAlreadyExists(serr) { + log.Printf("Failed to create langgraph runner service for %s: %v", name, serr) + } + + // Start workflow runner in background after pod is ready + go startLangGraphWorkflow(name, sessionNamespace, jobName, workflowInputs) + } + // Start monitoring the job go monitorJob(jobName, name, sessionNamespace) @@ -782,3 +923,235 @@ var ( int32Ptr = func(i int32) *int32 { return &i } int64Ptr = func(i int64) *int64 { return &i } ) + +// buildLangGraphRunnerEnv builds environment variables for LangGraph runner +func buildLangGraphRunnerEnv(sessionName, sessionNamespace, graphEntry string, inputs map[string]interface{}, appConfig *config.Config) []corev1.EnvVar { + env := []corev1.EnvVar{ + {Name: "RUN_ID", Value: sessionName}, + {Name: "GRAPH_ENTRY", Value: graphEntry}, + {Name: "ARTIFACTS_DIR", Value: "/workspace/artifacts"}, + {Name: "PROJECT", Value: sessionNamespace}, + {Name: "BACKEND_API_URL", Value: fmt.Sprintf("http://backend-service.%s.svc.cluster.local:8080/api", appConfig.BackendNamespace)}, + } + + // Postgres connection - use explicit namespace for secret (ambient-code namespace) + postgresNamespace := appConfig.BackendNamespace // Assume Postgres is in backend namespace + env = append(env, + corev1.EnvVar{Name: "POSTGRES_HOST", Value: fmt.Sprintf("postgres-service.%s.svc.cluster.local", postgresNamespace)}, + corev1.EnvVar{Name: "POSTGRES_PORT", Value: "5432"}, + corev1.EnvVar{ + Name: "POSTGRES_USER", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "postgres-secret"}, + Key: "POSTGRES_USER", + }, + }, + }, + corev1.EnvVar{ + Name: "POSTGRES_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "postgres-secret"}, + Key: "POSTGRES_PASSWORD", + }, + }, + }, + corev1.EnvVar{ + Name: "POSTGRES_DB", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "postgres-secret"}, + Key: "POSTGRES_DB", + }, + }, + }, + ) + + // Note: Inputs are passed via POST body to /start endpoint, not env vars + return env +} + +// buildLegacyRunnerEnv builds environment variables for legacy Claude Code runner +func buildLegacyRunnerEnv(sessionName, sessionNamespace, prompt, model string, temperature float64, maxTokens, timeout int64, autoPushOnComplete, interactive bool, inputRepo, inputBranch, outputRepo, outputBranch string, appConfig *config.Config, currentObj *unstructured.Unstructured, runnerSecretsName string) []corev1.EnvVar { + base := []corev1.EnvVar{ + {Name: "DEBUG", Value: "true"}, + {Name: "INTERACTIVE", Value: fmt.Sprintf("%t", interactive)}, + {Name: "AGENTIC_SESSION_NAME", Value: sessionName}, + {Name: "AGENTIC_SESSION_NAMESPACE", Value: sessionNamespace}, + {Name: "SESSION_ID", Value: sessionName}, + {Name: "WORKSPACE_PATH", Value: fmt.Sprintf("/workspace/sessions/%s/workspace", sessionName)}, + {Name: "INPUT_REPO_URL", Value: inputRepo}, + {Name: "INPUT_BRANCH", Value: inputBranch}, + {Name: "OUTPUT_REPO_URL", Value: outputRepo}, + {Name: "OUTPUT_BRANCH", Value: outputBranch}, + {Name: "PROMPT", Value: prompt}, + {Name: "LLM_MODEL", Value: model}, + {Name: "LLM_TEMPERATURE", Value: fmt.Sprintf("%.2f", temperature)}, + {Name: "LLM_MAX_TOKENS", Value: fmt.Sprintf("%d", maxTokens)}, + {Name: "TIMEOUT", Value: fmt.Sprintf("%d", timeout)}, + {Name: "AUTO_PUSH_ON_COMPLETE", Value: fmt.Sprintf("%t", autoPushOnComplete)}, + {Name: "BACKEND_API_URL", Value: fmt.Sprintf("http://backend-service.%s.svc.cluster.local:8080/api", appConfig.BackendNamespace)}, + {Name: "WEBSOCKET_URL", Value: fmt.Sprintf("ws://backend-service.%s.svc.cluster.local:8080/api/projects/%s/sessions/%s/ws", appConfig.BackendNamespace, sessionNamespace, sessionName)}, + } + + // Add BOT_TOKEN from secret + secretName := "" + if meta, ok := currentObj.Object["metadata"].(map[string]interface{}); ok { + if anns, ok := meta["annotations"].(map[string]interface{}); ok { + if v, ok := anns["ambient-code.io/runner-token-secret"].(string); ok && strings.TrimSpace(v) != "" { + secretName = strings.TrimSpace(v) + } + } + } + if secretName == "" { + secretName = fmt.Sprintf("ambient-runner-token-%s", sessionName) + } + base = append(base, corev1.EnvVar{ + Name: "BOT_TOKEN", + ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, + Key: "k8s-token", + }}, + }) + + // Add CR-provided envs + if spec, ok := currentObj.Object["spec"].(map[string]interface{}); ok { + if repos, ok := spec["repos"].([]interface{}); ok && len(repos) > 0 { + b, _ := json.Marshal(repos) + base = append(base, corev1.EnvVar{Name: "REPOS_JSON", Value: string(b)}) + } + if mrn, ok := spec["mainRepoName"].(string); ok && strings.TrimSpace(mrn) != "" { + base = append(base, corev1.EnvVar{Name: "MAIN_REPO_NAME", Value: mrn}) + } + if mriRaw, ok := spec["mainRepoIndex"]; ok { + switch v := mriRaw.(type) { + case int64: + base = append(base, corev1.EnvVar{Name: "MAIN_REPO_INDEX", Value: fmt.Sprintf("%d", v)}) + case int32: + base = append(base, corev1.EnvVar{Name: "MAIN_REPO_INDEX", Value: fmt.Sprintf("%d", v)}) + case int: + base = append(base, corev1.EnvVar{Name: "MAIN_REPO_INDEX", Value: fmt.Sprintf("%d", v)}) + case float64: + base = append(base, corev1.EnvVar{Name: "MAIN_REPO_INDEX", Value: fmt.Sprintf("%d", int64(v))}) + case string: + if strings.TrimSpace(v) != "" { + base = append(base, corev1.EnvVar{Name: "MAIN_REPO_INDEX", Value: v}) + } + } + } + if envMap, ok := spec["environmentVariables"].(map[string]interface{}); ok { + for k, v := range envMap { + if vs, ok := v.(string); ok { + replaced := false + for i := range base { + if base[i].Name == k { + base[i].Value = vs + replaced = true + break + } + } + if !replaced { + base = append(base, corev1.EnvVar{Name: k, Value: vs}) + } + } + } + } + } + + return base +} + +// startLangGraphWorkflow waits for pod to be ready and calls /start endpoint +func startLangGraphWorkflow(sessionName, sessionNamespace, jobName string, inputs map[string]interface{}) { + runnerSvcName := fmt.Sprintf("langgraph-runner-%s", sessionName) + runnerURL := fmt.Sprintf("http://%s.%s.svc.cluster.local:8000", runnerSvcName, sessionNamespace) + + // Wait for pod to be ready (with timeout) + maxWait := 300 // 5 minutes + waitInterval := 5 * time.Second + elapsed := 0 + + for elapsed < maxWait { + time.Sleep(waitInterval) + elapsed += int(waitInterval.Seconds()) + + // Check if pod is ready + pods, err := config.K8sClient.CoreV1().Pods(sessionNamespace).List(context.TODO(), v1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", jobName), + }) + if err != nil { + log.Printf("Failed to list pods for job %s: %v", jobName, err) + continue + } + + if len(pods.Items) == 0 { + continue + } + + pod := pods.Items[0] + ready := false + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { + ready = true + break + } + } + + if !ready { + continue + } + + // Check if runner container is ready + for _, status := range pod.Status.ContainerStatuses { + if status.Name == "langgraph-runner" && status.Ready { + // Pod is ready, call /start endpoint + startReq := map[string]interface{}{ + "run_id": sessionName, + "inputs": func() map[string]interface{} { + if inputs != nil { + return inputs + } + return make(map[string]interface{}) + }(), + } + reqJSON, _ := json.Marshal(startReq) + + // Use HTTP client to call /start + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Post(fmt.Sprintf("%s/start", runnerURL), "application/json", strings.NewReader(string(reqJSON))) + if err != nil { + log.Printf("Failed to call /start on langgraph runner for %s: %v", sessionName, err) + updateAgenticSessionStatus(sessionNamespace, sessionName, map[string]interface{}{ + "phase": "Error", + "message": fmt.Sprintf("Failed to start workflow: %v", err), + }) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("LangGraph runner /start returned status %d for %s", resp.StatusCode, sessionName) + updateAgenticSessionStatus(sessionNamespace, sessionName, map[string]interface{}{ + "phase": "Error", + "message": fmt.Sprintf("Workflow start failed with status %d", resp.StatusCode), + }) + return + } + + log.Printf("Successfully started LangGraph workflow for session %s", sessionName) + updateAgenticSessionStatus(sessionNamespace, sessionName, map[string]interface{}{ + "phase": "Running", + "message": "Workflow started", + }) + return + } + } + } + + log.Printf("Timeout waiting for LangGraph runner pod to be ready for session %s", sessionName) + updateAgenticSessionStatus(sessionNamespace, sessionName, map[string]interface{}{ + "phase": "Error", + "message": "Timeout waiting for runner pod to be ready", + }) +} diff --git a/test-workflow-example/app/__init__.py b/test-workflow-example/app/__init__.py new file mode 100644 index 000000000..0bcfb87c2 --- /dev/null +++ b/test-workflow-example/app/__init__.py @@ -0,0 +1,2 @@ +__init__.py + From da7567c381c901b70f5a0da4cdf7ae3d899ab3c9 Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Tue, 4 Nov 2025 08:10:23 -0500 Subject: [PATCH 2/4] Merge backend Dockerfile fix for amd64 architecture from current-agent-workflow-syBFS --- .gitignore | 7 +- BACKEND_BUILD_FIX.md | 65 +++ BUILD_COMMANDS.md | 128 +++++ DEPLOY_CHECKLIST.md | 114 +++++ DEPLOY_NOW.md | 162 ++++++ HOW_TO_TEST.md | 99 ++++ MVP_MISSING.md | 104 ++++ PODMAN_FIX.md | 66 +++ PODMAN_TROUBLESHOOT.md | 70 +++ QUICK_TEST.md | 56 +++ TESTING_GUIDE.md | 291 +++++++++++ TEST_NOW.md | 193 ++++++++ VM_TRANSFER_OPTIONS.md | 64 +++ components/backend/Dockerfile | 4 +- components/backend/go.sum | 2 + components/backend/handlers/runs.go | 246 ++++++++++ components/backend/handlers/workflows.go | 460 ++++++++++++++++++ components/backend/server/db.go | 139 ++++++ components/backend/types/workflow.go | 53 ++ .../manifests/postgres/kustomization.yaml | 9 + .../postgres/postgres-configmap.yaml | 14 + .../manifests/postgres/postgres-secret.yaml | 12 + .../manifests/postgres/postgres-service.yaml | 16 + .../postgres/postgres-statefulset.yaml | 85 ++++ components/runners/langgraph-wrapper/BUILD.md | 51 ++ .../runners/langgraph-wrapper/Dockerfile | 30 ++ .../langgraph-wrapper/Dockerfile.template | 29 ++ .../langgraph-wrapper/requirements.txt | 8 + .../langgraph-wrapper/runner/__init__.py | 2 + .../langgraph-wrapper/runner/server.py | 343 +++++++++++++ package-lock.json | 6 + test-langgraph.sh | 113 +++++ test-workflow-example/.gitignore | 23 + test-workflow-example/Dockerfile | 17 + test-workflow-example/PUSH.md | 36 ++ test-workflow-example/README.md | 100 ++++ test-workflow-example/app/workflow.py | 93 ++++ test-workflow-example/requirements.txt | 3 + 38 files changed, 3310 insertions(+), 3 deletions(-) create mode 100644 BACKEND_BUILD_FIX.md create mode 100644 BUILD_COMMANDS.md create mode 100644 DEPLOY_CHECKLIST.md create mode 100644 DEPLOY_NOW.md create mode 100644 HOW_TO_TEST.md create mode 100644 MVP_MISSING.md create mode 100644 PODMAN_FIX.md create mode 100644 PODMAN_TROUBLESHOOT.md create mode 100644 QUICK_TEST.md create mode 100644 TESTING_GUIDE.md create mode 100644 TEST_NOW.md create mode 100644 VM_TRANSFER_OPTIONS.md create mode 100644 components/backend/handlers/runs.go create mode 100644 components/backend/handlers/workflows.go create mode 100644 components/backend/server/db.go create mode 100644 components/backend/types/workflow.go create mode 100644 components/manifests/postgres/kustomization.yaml create mode 100644 components/manifests/postgres/postgres-configmap.yaml create mode 100644 components/manifests/postgres/postgres-secret.yaml create mode 100644 components/manifests/postgres/postgres-service.yaml create mode 100644 components/manifests/postgres/postgres-statefulset.yaml create mode 100644 components/runners/langgraph-wrapper/BUILD.md create mode 100644 components/runners/langgraph-wrapper/Dockerfile create mode 100644 components/runners/langgraph-wrapper/Dockerfile.template create mode 100644 components/runners/langgraph-wrapper/requirements.txt create mode 100644 components/runners/langgraph-wrapper/runner/__init__.py create mode 100644 components/runners/langgraph-wrapper/runner/server.py create mode 100644 package-lock.json create mode 100755 test-langgraph.sh create mode 100644 test-workflow-example/.gitignore create mode 100644 test-workflow-example/Dockerfile create mode 100644 test-workflow-example/PUSH.md create mode 100644 test-workflow-example/README.md create mode 100644 test-workflow-example/app/workflow.py create mode 100644 test-workflow-example/requirements.txt diff --git a/.gitignore b/.gitignore index d86e197a9..b787dcf85 100644 --- a/.gitignore +++ b/.gitignore @@ -118,4 +118,9 @@ Thumbs.db .stories/ .testplans/ .spikes/ -.cursor/worktrees.json \ No newline at end of file +<<<<<<< HEAD +.cursor/worktrees.json +======= +components/backend/Dockerfile.local-build +components/backend/BUILD*.md +>>>>>>> current-agent-workflow-syBFS diff --git a/BACKEND_BUILD_FIX.md b/BACKEND_BUILD_FIX.md new file mode 100644 index 000000000..1c67fde55 --- /dev/null +++ b/BACKEND_BUILD_FIX.md @@ -0,0 +1,65 @@ +# Fix Backend Exec Format Error + +## Problem +`exec container process '/app/./main': Exec format error` - This means the binary architecture doesn't match the cluster. + +## Solution: Rebuild with explicit platform + +### Option 1: Build with --platform flag (Recommended) + +```bash +cd components/backend + +# Build explicitly for linux/amd64 +podman build --platform linux/amd64 -t quay.io/gkrumbach07/vteam_backend:langgraph-mvp . + +# Push +podman push quay.io/gkrumbach07/vteam_backend:langgraph-mvp +``` + +### Option 2: Update Dockerfile (Already done) + +The Dockerfile has been updated to explicitly build for `GOARCH=amd64`. Just rebuild: + +```bash +cd components/backend +podman build --platform linux/amd64 -t quay.io/gkrumbach07/vteam_backend:langgraph-mvp . +podman push quay.io/gkrumbach07/vteam_backend:langgraph-mvp +``` + +### Option 3: Check your VM architecture + +```bash +# Check what architecture your VM is +uname -m + +# If it's aarch64 (ARM64), you MUST use --platform linux/amd64 +# If it's x86_64 (AMD64), the build should work, but still use --platform to be safe +``` + +## After Rebuilding + +```bash +# Update deployment +oc set image deployment/backend-api backend-api=quay.io/gkrumbach07/vteam_backend:langgraph-mvp -n ambient-code + +# Wait for rollout +oc rollout status deployment/backend-api -n ambient-code + +# Check logs +oc logs -n ambient-code -l app=backend-api --tail=50 +``` + +## Verify the Binary Architecture + +You can check the binary architecture inside the image: + +```bash +# Create a test container +podman run --rm quay.io/gkrumbach07/vteam_backend:langgraph-mvp file /app/main + +# Should show: ELF 64-bit LSB executable, x86-64 +# If it shows ARM or other architecture, rebuild with --platform linux/amd64 +``` + + diff --git a/BUILD_COMMANDS.md b/BUILD_COMMANDS.md new file mode 100644 index 000000000..50dfe64fe --- /dev/null +++ b/BUILD_COMMANDS.md @@ -0,0 +1,128 @@ +# Build and Push Commands + +## Prerequisites + +```bash +# Login to Quay.io (do this once) +podman login quay.io +# Enter username: gkrumbach07 +# Enter password/token when prompted +``` + +--- + +## 1. LangGraph Wrapper Base Image + +**Location:** `components/runners/langgraph-wrapper/` + +```bash +cd components/runners/langgraph-wrapper + +# Build for linux/amd64 (required for OpenShift/K8s) +podman build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . + +# Push to Quay.io +podman push quay.io/gkrumbach07/langgraph-wrapper:base + +# Get digest (save this for reference) +podman inspect quay.io/gkrumbach07/langgraph-wrapper:base | jq -r '.[0].RepoDigests[0]' +``` + +**Note:** Make sure this image is public in Quay.io settings so workflows can pull it. + +--- + +## 2. Backend Image + +**Location:** `components/backend/` + +**On your Linux VM:** + +```bash +cd components/backend + +# Build the backend image +podman build -t quay.io/gkrumbach07/vteam_backend:langgraph-mvp . + +# Push to Quay.io +podman push quay.io/gkrumbach07/vteam_backend:langgraph-mvp + +# Get digest (save this for reference) +podman inspect quay.io/gkrumbach07/vteam_backend:langgraph-mvp | jq -r '.[0].RepoDigests[0]' +``` + +**After pushing, update the deployment:** + +```bash +# Update deployment to use new image +oc set image deployment/backend-api backend-api=quay.io/gkrumbach07/vteam_backend:langgraph-mvp -n ambient-code + +# Or use digest for pinning: +oc set image deployment/backend-api backend-api=quay.io/gkrumbach07/vteam_backend@sha256:YOUR_DIGEST -n ambient-code +``` + +--- + +## 3. Example Workflow Image (Optional - for testing) + +**Location:** `test-workflow-example/` + +```bash +cd test-workflow-example + +# Build for linux/amd64 +podman build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0 . + +# Push to Quay.io +podman push quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0 + +# Get digest (required for registration) +podman inspect quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0 | jq -r '.[0].RepoDigests[0]' +``` + +**After pushing, register in the backend:** + +```bash +# Replace YOUR_DIGEST with the digest from above +curl -X POST "http://localhost:8080/api/projects/YOUR_PROJECT/workflows/example-workflow/versions" \ + -H "Content-Type: application/json" \ + -d '{ + "version": "v1.0.0", + "imageRef": "quay.io/gkrumbach07/langgraph-example-workflow@sha256:YOUR_DIGEST", + "graphs": [ + { + "name": "main", + "entry": "app.workflow:build_app" + } + ], + "inputsSchema": { + "type": "object", + "properties": { + "message": { + "type": "string", + "description": "Input message to process" + } + }, + "required": ["message"] + } + }' +``` + +--- + +## Quick Reference + +### Image Tags: +- Base wrapper: `quay.io/gkrumbach07/langgraph-wrapper:base` +- Backend: `quay.io/gkrumbach07/vteam_backend:langgraph-mvp` +- Example workflow: `quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0` + +### Registry Paths: +- Your personal org: `quay.io/gkrumbach07/*` +- Main org: `quay.io/ambient_code/*` + +### Build Order: +1. **First:** Build and push `langgraph-wrapper:base` (workflows depend on it) +2. **Second:** Build and push `vteam_backend:langgraph-mvp` (system component) +3. **Third:** Build and push example workflow (for testing) + diff --git a/DEPLOY_CHECKLIST.md b/DEPLOY_CHECKLIST.md new file mode 100644 index 000000000..3895fc766 --- /dev/null +++ b/DEPLOY_CHECKLIST.md @@ -0,0 +1,114 @@ +# Deployment Checklist for LangGraph MVP + +## โœ… Already Done +- โœ… Logged into OpenShift cluster (as `clusteradmin`) +- โœ… Built `langgraph-wrapper:base` image +- โœ… Built example workflow image + +## ๐Ÿ”จ What You Need to Do + +### 1. Build All Component Images + +The LangGraph MVP requires these images to be built and deployed: +- **Backend** (includes Postgres DB code) +- **Frontend** +- **Operator** (handles LangGraph workflow pods) +- **Runner** (legacy Claude runner) + +```bash +# Build all images (using podman, targeting linux/amd64) +make build-all CONTAINER_ENGINE=podman PLATFORM=linux/amd64 +``` + +### 2. Push Images to Registry + +**Option A: Use Existing Images from quay.io/ambient_code** +- Skip building/pushing if you trust the existing images +- The deploy script uses `quay.io/ambient_code/*:latest` by default + +**Option B: Build and Push Your Own** +```bash +# Set your registry +export REGISTRY="quay.io/gkrumbach07" + +# Tag and push +make push-all CONTAINER_ENGINE=podman REGISTRY=$REGISTRY +``` + +### 3. Deploy to Cluster + +```bash +# From project root +cd components/manifests + +# Copy env file if first time +cp env.example .env + +# Edit .env and set at least: +# - ANTHROPIC_API_KEY=your-key +# - (Optional) CONTAINER_REGISTRY if using custom images + +# Deploy (from project root) +make deploy +``` + +The deploy script will: +- โœ… Check you're logged in (already done) +- โœ… Create namespace `ambient-code` +- โœ… Deploy Postgres (new!) +- โœ… Deploy Backend, Frontend, Operator +- โœ… Set up RBAC, CRDs, Routes + +### 4. Verify Deployment + +```bash +# Check pods +oc get pods -n ambient-code + +# Wait for all pods to be Running +oc get pods -n ambient-code -w + +# Check Postgres is running (NEW!) +oc get pods -n ambient-code | grep postgres + +# Check services +oc get services -n ambient-code + +# Get frontend route +oc get route frontend-route -n ambient-code +``` + +## ๐Ÿ“ Important Notes + +1. **LangGraph Wrapper**: Already built and pushed to `quay.io/gkrumbach07/langgraph-wrapper:base` โœ… + - This is the base image users extend for their workflows + - Doesn't need to be in Makefile - it's a separate concern + +2. **Postgres**: Will be deployed automatically via `make deploy` + - New StatefulSet, Service, Secret, ConfigMap + - Backend connects to it automatically + +3. **Cluster Login**: Already done โœ… + - `oc whoami` shows `clusteradmin` + - `make deploy` will work + +4. **Image Registry**: + - Default: Uses `quay.io/ambient_code/*:latest` + - Custom: Set `CONTAINER_REGISTRY` in `.env` or override in deploy script + +## ๐Ÿš€ Quick Start + +```bash +# 1. Build images (if needed) +make build-all CONTAINER_ENGINE=podman PLATFORM=linux/amd64 + +# 2. Deploy everything +make deploy + +# 3. Watch pods start +oc get pods -n ambient-code -w + +# 4. Get frontend URL +oc get route frontend-route -n ambient-code +``` + diff --git a/DEPLOY_NOW.md b/DEPLOY_NOW.md new file mode 100644 index 000000000..ded7f2f30 --- /dev/null +++ b/DEPLOY_NOW.md @@ -0,0 +1,162 @@ +# Deploy and Test LangGraph MVP + +## Step 1: Update Backend Deployment + +```bash +# Update backend to use your new image +oc set image deployment/backend-api backend-api=quay.io/gkrumbach07/vteam_backend:langgraph-mvp -n ambient-code + +# Wait for rollout +oc rollout status deployment/backend-api -n ambient-code + +# Check backend is running +oc get pods -n ambient-code -l app=backend-api +``` + +## Step 2: Verify Postgres is Running + +```bash +# Check Postgres pod +oc get pods -n ambient-code | grep postgres + +# If not running, deploy Postgres +oc apply -k components/manifests/postgres/ -n ambient-code + +# Wait for Postgres to be ready +oc wait --for=condition=ready pod -l app=postgres -n ambient-code --timeout=120s +``` + +## Step 3: Get Workflow Image Digest + +```bash +# Get the digest of your workflow image +DIGEST=$(podman inspect quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0 | jq -r '.[0].RepoDigests[0]') +echo "Workflow digest: $DIGEST" +``` + +## Step 4: Port-Forward Backend (if needed) + +```bash +# Port-forward backend API +oc port-forward -n ambient-code svc/backend-service 8080:8080 & +BACKEND_URL="http://localhost:8080" +``` + +Or use your route: +```bash +BACKEND_URL="https://ambient-code.apps.gkrumbac.dev.datahub.redhat.com" +``` + +## Step 5: Create Test Project + +```bash +PROJECT="test-langgraph" +oc new-project $PROJECT 2>/dev/null || oc project $PROJECT +oc label namespace $PROJECT ambient-code.io/managed=true --overwrite +``` + +## Step 6: Register Workflow + +```bash +# First create the workflow +curl -X POST "$BACKEND_URL/api/projects/$PROJECT/workflows" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "example-workflow", + "owner": "test-user" + }' + +# Then register the version (replace DIGEST) +curl -X POST "$BACKEND_URL/api/projects/$PROJECT/workflows/example-workflow/versions" \ + -H "Content-Type: application/json" \ + -d "{ + \"version\": \"v1.0.0\", + \"imageRef\": \"$DIGEST\", + \"graphs\": [ + { + \"name\": \"main\", + \"entry\": \"app.workflow:build_app\" + } + ], + \"inputsSchema\": { + \"type\": \"object\", + \"properties\": { + \"message\": { + \"type\": \"string\" + } + }, + \"required\": [\"message\"] + } + }" +``` + +## Step 7: Create Workflow Run + +```bash +curl -X POST "$BACKEND_URL/api/projects/$PROJECT/agentic-sessions" \ + -H "Content-Type: application/json" \ + -d '{ + "workflowRef": { + "name": "example-workflow", + "version": "v1.0.0", + "graph": "main" + }, + "inputs": { + "message": "Hello from LangGraph MVP!", + "step": 0, + "result": "", + "counter": 0 + }, + "displayName": "Test LangGraph Workflow" + }' | jq -r '.name' +``` + +Save the session name from the response. + +## Step 8: Monitor Execution + +```bash +SESSION_NAME="agentic-session-XXXXX" # From Step 7 + +# Watch pods +oc get pods -n $PROJECT -w + +# Check session status +oc get agenticsessions -n $PROJECT $SESSION_NAME -o yaml + +# View runner logs +oc logs -n $PROJECT -l job-name=${SESSION_NAME}-job -c langgraph-runner -f + +# Check events +curl "$BACKEND_URL/api/projects/$PROJECT/runs/$SESSION_NAME/events" | jq +``` + +## Quick Verification Checklist + +- [ ] Backend pod is running with new image +- [ ] Postgres pod is running +- [ ] Backend can connect to Postgres (check logs) +- [ ] Workflow registered successfully +- [ ] AgenticSession created +- [ ] Workflow pod started +- [ ] Runner is ready (`/ready` endpoint) +- [ ] Workflow executed successfully + +## Troubleshooting + +**Backend errors:** +```bash +oc logs -n ambient-code -l app=backend-api --tail=100 | grep -i error +``` + +**Postgres connection:** +```bash +oc exec -n ambient-code deployment/backend-api -- env | grep POSTGRES +``` + +**Workflow pod issues:** +```bash +oc describe pod -n $PROJECT +oc logs -n $PROJECT -c langgraph-runner +``` + diff --git a/HOW_TO_TEST.md b/HOW_TO_TEST.md new file mode 100644 index 000000000..5f50653b2 --- /dev/null +++ b/HOW_TO_TEST.md @@ -0,0 +1,99 @@ +# How to Test the LangGraph MVP + +## Quick Start (3 Steps) + +### 1. Setup (One-time) + +```bash +# Port-forward backend +kubectl port-forward -n ambient-code svc/backend-service 8080:8080 & + +# Copy Postgres secret to your project namespace +PROJECT="your-project" +kubectl get secret postgres-secret -n ambient-code -o yaml | \ + sed "s/namespace: ambient-code/namespace: $PROJECT/" | \ + kubectl apply -f - +``` + +### 2. Build Test Workflow + +```bash +cd test-workflow-example +# Build for linux/amd64 (required for OpenShift/K8s) +podman build --platform linux/amd64 -t quay.io/ambient_code/test-workflow:v1.0.0 . +podman push quay.io/ambient_code/test-workflow:v1.0.0 + +# Get digest +DIGEST=$(podman inspect quay.io/ambient_code/test-workflow:v1.0.0 | jq -r '.[0].RepoDigests[0]') +echo $DIGEST +``` + +### 3. Run Test + +```bash +# Use the test script +./test-langgraph.sh your-project "$DIGEST" + +# Or manually: +# Register workflow โ†’ Create run โ†’ Monitor status +``` + +## Manual Testing + +### Register Workflow +```bash +curl -X POST "http://localhost:8080/api/projects/YOUR_PROJECT/workflows" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "test-workflow", + "imageDigest": "quay.io/ambient_code/test-workflow@sha256:...", + "graphs": [{"name": "main", "entry": "app.workflow:build_app"}] + }' +``` + +### Create Run +```bash +curl -X POST "http://localhost:8080/api/projects/YOUR_PROJECT/agentic-sessions" \ + -H "Content-Type: application/json" \ + -d '{ + "workflowRef": {"name": "test-workflow", "graph": "main"}, + "inputs": {"message": "Hello!"} + }' +``` + +### Monitor Status +```bash +SESSION_NAME="agentic-session-..." # From create response + +# Check status +curl "http://localhost:8080/api/projects/YOUR_PROJECT/agentic-sessions/$SESSION_NAME" | jq + +# Check events +curl "http://localhost:8080/api/projects/YOUR_PROJECT/runs/$SESSION_NAME/events" | jq + +# Watch logs +kubectl logs -n YOUR_PROJECT -l job-name=${SESSION_NAME}-job -c langgraph-runner -f +``` + +## Troubleshooting + +**Pod not starting?** +```bash +kubectl describe pod -n YOUR_PROJECT +kubectl get events -n YOUR_PROJECT --sort-by='.lastTimestamp' +``` + +**Runner not ready?** +```bash +kubectl logs -n YOUR_PROJECT -c langgraph-runner +kubectl exec -n YOUR_PROJECT -c langgraph-runner -- curl http://localhost:8000/ready +``` + +**Events not appearing?** +```bash +kubectl logs -n YOUR_PROJECT -c langgraph-runner | grep emit_event +kubectl exec -n YOUR_PROJECT -c langgraph-runner -- env | grep BACKEND_API_URL +``` + +For detailed testing guide, see `TESTING_GUIDE.md` + diff --git a/MVP_MISSING.md b/MVP_MISSING.md new file mode 100644 index 000000000..5902dc9d1 --- /dev/null +++ b/MVP_MISSING.md @@ -0,0 +1,104 @@ +# Missing Pieces for E2E MVP + +## โœ… Fixed Critical Issues + +### 1. โœ… Backend URL Construction Bug - FIXED +**Fix Applied**: Changed runner to use `{BACKEND_API_URL}/projects/...` (removed duplicate `/api`) + +### 2. โœ… Postgres Secret Namespace Access - FIXED +**Fix Applied**: Using explicit namespace in POSTGRES_HOST env var: `postgres-service.{namespace}.svc.cluster.local` +**Note**: Secret still needs to be accessible. Options: +- Copy secret to each project namespace (recommended for MVP) +- Or grant ServiceAccount permission to read secrets from `ambient-code` namespace + +### 3. โš ๏ธ Pod Restart/Resume Logic - PARTIALLY FIXED +**Current State**: Resume endpoint handles pod restart (loads graph lazily) +**Missing**: Operator doesn't detect pod restart and call `/resume` automatically +**Workaround**: Can manually call `/resume` API, but operator should detect this + +### 4. โœ… Ready Endpoint Fails Before Graph Load - FIXED +**Fix Applied**: `/ready` now returns ready=true even if graph not loaded (graph loads lazily) + +### 5. โœ… Resume Logic Needs Checkpoint ID - FIXED +**Fix Applied**: Resume now calls `aget_state()` first to verify checkpoint, then updates state + +## Remaining Critical Issues + +### 3. Pod Restart Detection in Operator +**Location**: `components/operator/internal/handlers/sessions.go:startLangGraphWorkflow` +**Issue**: When pod restarts (backoff retry), operator should detect and call `/resume` with checkpoint_id +**Fix Needed**: +- Check if session has `status.checkpointId` +- If yes and pod restarted, call `/resume` instead of `/start` +- Get checkpoint_id from status before calling + +### 2b. Postgres Secret Access Permissions +**Location**: ServiceAccount used by LangGraph runner pods +**Issue**: Pods need permission to read `postgres-secret` from `ambient-code` namespace +**Fix Needed**: Add RBAC to allow secret reading, OR copy secret to each project namespace + +## Important Fixes (Should Fix) + +### 6. Event Sequence Numbers +**Location**: `components/runners/langgraph-wrapper/runner/server.py:123` +**Issue**: Using timestamp as sequence, should use incrementing counter +**Fix**: Use atomic counter or database sequence (can defer for MVP) + +### 7. Graph Compilation Logic +**Location**: `components/runners/langgraph-wrapper/runner/server.py:211-216` +**Issue**: May try to recompile already-compiled graphs +**Fix**: Check if graph already has checkpointer before recompiling (can defer) + +### 8. โœ… Missing Inputs Parsing - FIXED +**Fix Applied**: Removed WORKFLOW_INPUTS env var, inputs only come from `/start` POST body + +## Nice-to-Have (Can Defer) + +### 9. Frontend UI +**Missing**: Complete UI for: +- Workflow registration form +- Workflow version listing +- Run creation form +- Run status monitoring +- Approval UI for interrupts + +### 10. Example Workflow +**Missing**: Simple test LangGraph workflow to verify end-to-end: +- `app/workflow.py` with `build_app()` function +- Basic graph with 2-3 nodes +- One node with interrupt for approval testing + +### 11. Error Handling +- Better error messages in runner server +- Retry logic for event emission failures +- Proper cleanup on pod termination + +### 12. Testing +- Integration tests for workflow registration +- E2E test for workflow execution +- Pod restart recovery test + +## Deployment Requirements + +### 13. Base Image Build +**Action**: Build and push base wrapper image: +```bash +docker build -t quay.io/ambient_code/langgraph-wrapper:base components/runners/langgraph-wrapper/ +docker push quay.io/ambient_code/langgraph-wrapper:base +``` + +### 14. Postgres Deployment +**Action**: Deploy Postgres manifests: +```bash +kubectl apply -k components/manifests/postgres/ +``` + +### 15. Backend Dependencies +**Action**: Run `go mod tidy` and rebuild backend: +```bash +cd components/backend && go mod tidy && go build +``` + +### 16. Operator Dependencies +**Action**: Rebuild operator (no new deps needed, HTTP client is stdlib) + diff --git a/PODMAN_FIX.md b/PODMAN_FIX.md new file mode 100644 index 000000000..8f26aed28 --- /dev/null +++ b/PODMAN_FIX.md @@ -0,0 +1,66 @@ +# Fixing Podman User Namespace Error + +## Error: +``` +potentially insufficient UIDs or GIDs available in user namespace +Check /etc/subuid and /etc/subgid if configured locally and run "podman system migrate" +``` + +## Solutions (try in order): + +### Solution 1: Run podman system migrate +```bash +podman system migrate +``` + +### Solution 2: Check subuid/subgid mappings +```bash +# Check current mappings +cat /etc/subuid +cat /etc/subgid + +# If empty or insufficient, add your user (requires sudo): +sudo usermod --add-subuids 100000-165535 --add-subgids 100000-165535 $USER + +# Log out and back in for changes to take effect +``` + +### Solution 3: Use --userns=keep-id flag +```bash +podman build --userns=keep-id --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . +``` + +### Solution 4: Use rootless with proper groups +```bash +# Check if user is in podman group +groups | grep podman + +# If not, add user to podman group (requires sudo): +sudo usermod -aG podman $USER +# Log out and back in +``` + +### Solution 5: Build as root (not recommended, but works) +```bash +sudo podman build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . +sudo podman push quay.io/gkrumbach07/langgraph-wrapper:base +``` + +### Solution 6: Use Docker instead (if available) +```bash +docker build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . +docker push quay.io/gkrumbach07/langgraph-wrapper:base +``` + +## Quick Fix (Most Common): +```bash +# 1. Run migrate +podman system migrate + +# 2. If that doesn't work, configure subuid/subgid +sudo usermod --add-subuids 100000-165535 --add-subgids 100000-165535 $USER + +# 3. Log out and back in, then try again +podman build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . +``` + diff --git a/PODMAN_TROUBLESHOOT.md b/PODMAN_TROUBLESHOOT.md new file mode 100644 index 000000000..7fff13144 --- /dev/null +++ b/PODMAN_TROUBLESHOOT.md @@ -0,0 +1,70 @@ +# Podman User Namespace Troubleshooting + +## Current Error: +``` +potentially insufficient UIDs or GIDs available in user namespace +lchown /usr/bin/write: invalid argument +``` + +## Step-by-Step Fix: + +### 1. Check current subuid/subgid configuration +```bash +cat /etc/subuid +cat /etc/subgid +``` + +### 2. If empty or missing your user, add it: +```bash +# Add subuid range for your user +sudo usermod --add-subuids 100000-165535 gkrumbac + +# Add subgid range for your user +sudo usermod --add-subgids 100000-165535 gkrumbac + +# Verify it was added +cat /etc/subuid | grep gkrumbac +cat /etc/subgid | grep gkrumbac +``` + +### 3. Restart podman system to pick up changes +```bash +podman system migrate +# Or restart podman service if running as root +sudo systemctl restart podman.socket +``` + +### 4. Log out and back in (or restart terminal session) + +### 5. Try building again +```bash +podman build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . +``` + +## Alternative: Use Docker (if available) +```bash +# Check if docker is installed +which docker + +# If available, use docker instead +docker build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . +docker push quay.io/gkrumbach07/langgraph-wrapper:base +``` + +## Alternative: Build as root (quick workaround) +```bash +sudo podman build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . +sudo podman push quay.io/gkrumbach07/langgraph-wrapper:base +``` + +## Alternative: Use a different base image (if Red Hat registry requires auth) +Try using a public Python base image: + +```dockerfile +# In Dockerfile, change FROM line to: +FROM python:3.11-slim + +# Or use UBI from Docker Hub: +FROM registry.hub.docker.com/library/python:3.11-slim +``` + diff --git a/QUICK_TEST.md b/QUICK_TEST.md new file mode 100644 index 000000000..77a9dca16 --- /dev/null +++ b/QUICK_TEST.md @@ -0,0 +1,56 @@ +# Quick Start Testing Guide + +## Setup (One-time) + +### 1. Port-forward Backend (for local testing) +```bash +kubectl port-forward -n ambient-code svc/backend-service 8080:8080 +``` + +### 2. Get Auth Token (if using OAuth proxy) +```bash +# If using OpenShift oauth-proxy, get token from browser dev tools +# Or use oc to get token: +oc whoami -t +``` + +### 3. Set Environment Variables +```bash +export PROJECT="your-project-name" +export BACKEND_URL="http://localhost:8080" +export TOKEN="your-token-here" # Optional if using port-forward without auth +``` + +## Quick Test + +### Step 1: Register Workflow +```bash +curl -X POST "${BACKEND_URL}/api/projects/${PROJECT}/workflows" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "hello-world", + "imageDigest": "quay.io/ambient_code/hello-world@sha256:YOUR_DIGEST", + "graphs": [{"name": "main", "entry": "app:build_app"}] + }' +``` + +### Step 2: Create Run +```bash +curl -X POST "${BACKEND_URL}/api/projects/${PROJECT}/agentic-sessions" \ + -H "Content-Type: application/json" \ + -d '{ + "workflowRef": {"name": "hello-world", "graph": "main"}, + "inputs": {"message": "Hello!"} + }' | jq -r '.name' +``` + +### Step 3: Check Status +```bash +SESSION_NAME="agentic-session-..." # From Step 2 +watch -n 2 "curl -s ${BACKEND_URL}/api/projects/${PROJECT}/agentic-sessions/${SESSION_NAME} | jq '.status'" +``` + +## Example Test Workflow + +See `test-workflow-example/` directory for a complete example. + diff --git a/TESTING_GUIDE.md b/TESTING_GUIDE.md new file mode 100644 index 000000000..67fe6b5f0 --- /dev/null +++ b/TESTING_GUIDE.md @@ -0,0 +1,291 @@ +# E2E MVP Testing Guide + +## Prerequisites + +### 1. Deploy Infrastructure + +```bash +# Deploy Postgres +kubectl apply -k components/manifests/postgres/ + +# Wait for Postgres to be ready +kubectl wait --for=condition=ready pod -l app=postgres -n ambient-code --timeout=300s + +# Verify Postgres secret exists +kubectl get secret postgres-secret -n ambient-code +``` + +### 2. Build and Push Base Wrapper Image + +```bash +cd components/runners/langgraph-wrapper +podman build --platform linux/amd64 -t quay.io/ambient_code/langgraph-wrapper:base . +podman push quay.io/ambient_code/langgraph-wrapper:base +``` + +**Note**: Use `--platform linux/amd64` to ensure compatibility with OpenShift/K8s clusters. + +### 3. Rebuild Backend + +```bash +cd components/backend +go mod tidy +go build +# Deploy updated backend (or restart if already deployed) +``` + +### 4. Copy Postgres Secret to Project Namespace + +For each project namespace where you'll test workflows: + +```bash +PROJECT_NAME="your-project-name" +kubectl get secret postgres-secret -n ambient-code -o yaml | \ + sed "s/namespace: ambient-code/namespace: $PROJECT_NAME/" | \ + kubectl apply -f - +``` + +## Test Workflow + +### Step 1: Create Example LangGraph Workflow Image + +Create a simple test workflow: + +```bash +mkdir test-workflow +cd test-workflow +``` + +**Or use the example workflow from GitHub:** +```bash +git clone https://github.com/Gkrumbach07/langgraph-example-workflow.git +cd langgraph-example-workflow +``` + +Create `app/workflow.py`: +```python +from langgraph.graph import StateGraph, END +from typing import TypedDict + +class State(TypedDict): + message: str + step: int + +def build_app(): + graph = StateGraph(State) + + def node1(state: State) -> State: + return {"message": f"Step 1: {state.get('message', '')}", "step": 1} + + def node2(state: State) -> State: + return {"message": f"Step 2: {state['message']}", "step": 2} + + graph.add_node("node1", node1) + graph.add_node("node2", node2) + graph.set_entry_point("node1") + graph.add_edge("node1", "node2") + graph.add_edge("node2", END) + + return graph.compile() +``` + +Create `Dockerfile`: +```dockerfile +FROM quay.io/ambient_code/langgraph-wrapper:base + +WORKDIR /app/workflow +COPY app/ ./app/ + +# Ensure app module is importable +ENV PYTHONPATH=/app/workflow:$PYTHONPATH +``` + +Build and push: +```bash +# Build for linux/amd64 (required for OpenShift/K8s) +podman build --platform linux/amd64 -t quay.io/ambient_code/test-workflow:v1.0.0 . +podman push quay.io/ambient_code/test-workflow:v1.0.0 + +# Get digest +podman inspect quay.io/ambient_code/test-workflow:v1.0.0 | jq -r '.[0].RepoDigests[0]' +# Output: quay.io/ambient_code/test-workflow@sha256:... +``` + +**Note**: Always use `--platform linux/amd64` when building on ARM Macs to ensure compatibility with OpenShift/K8s clusters. + +### Step 2: Register Workflow via API + +```bash +PROJECT="your-project-name" +BACKEND_URL="http://backend-service.ambient-code.svc.cluster.local:8080" +# Or use port-forward: kubectl port-forward -n ambient-code svc/backend-service 8080:8080 +# Then BACKEND_URL="http://localhost:8080" + +# Register workflow +curl -X POST "${BACKEND_URL}/api/projects/${PROJECT}/workflows" \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer YOUR_TOKEN" \ + -d '{ + "name": "test-workflow", + "imageDigest": "quay.io/ambient_code/test-workflow@sha256:YOUR_DIGEST_HERE", + "graphs": [ + { + "name": "main", + "entry": "app.workflow:build_app" + } + ], + "inputsSchema": { + "type": "object", + "properties": { + "message": { + "type": "string", + "description": "Input message" + } + }, + "required": ["message"] + } + }' + +# Verify workflow registered +curl "${BACKEND_URL}/api/projects/${PROJECT}/workflows/test-workflow" \ + -H "Authorization: Bearer YOUR_TOKEN" +``` + +### Step 3: Create Workflow Run + +```bash +# Create AgenticSession with workflowRef +curl -X POST "${BACKEND_URL}/api/projects/${PROJECT}/agentic-sessions" \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer YOUR_TOKEN" \ + -d '{ + "workflowRef": { + "name": "test-workflow", + "graph": "main" + }, + "inputs": { + "message": "Hello from test!" + }, + "displayName": "Test LangGraph Workflow Run" + }' + +# Note the session name from response +# Response: {"message":"Agentic session created successfully","name":"agentic-session-1234567890","uid":"..."} +``` + +### Step 4: Monitor Run Status + +```bash +SESSION_NAME="agentic-session-1234567890" # From previous response + +# Check session status +curl "${BACKEND_URL}/api/projects/${PROJECT}/agentic-sessions/${SESSION_NAME}" \ + -H "Authorization: Bearer YOUR_TOKEN" | jq + +# Check events +curl "${BACKEND_URL}/api/projects/${PROJECT}/runs/${SESSION_NAME}/events" \ + -H "Authorization: Bearer YOUR_TOKEN" | jq + +# Watch pod logs +kubectl logs -n ${PROJECT} -l job-name=${SESSION_NAME}-job -c langgraph-runner -f + +# Check pod status +kubectl get pods -n ${PROJECT} -l job-name=${SESSION_NAME}-job + +# Check operator logs +kubectl logs -n ambient-code -l app=operator -f +``` + +### Step 5: Test Approval Flow (if workflow has interrupt) + +If your workflow has an interrupt node: + +```bash +# Approve interrupted workflow +curl -X POST "${BACKEND_URL}/api/projects/${PROJECT}/runs/${SESSION_NAME}/approve" \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer YOUR_TOKEN" \ + -d '{ + "node": "approval_node", + "decision": { + "approved": true, + "notes": "Looks good" + } + }' +``` + +## Verification Checklist + +- [ ] Postgres pod is running: `kubectl get pods -n ambient-code -l app=postgres` +- [ ] Backend can connect to Postgres: Check backend logs +- [ ] Workflow tables created: `kubectl exec -n ambient-code -it postgres-0 -- psql -U langgraph -d langgraph -c "\dt"` +- [ ] Workflow registered successfully: Check API response +- [ ] Operator created job: `kubectl get jobs -n ${PROJECT}` +- [ ] Runner pod started: `kubectl get pods -n ${PROJECT} -l app=langgraph-runner` +- [ ] Runner service exists: `kubectl get svc -n ${PROJECT} -l app=langgraph-runner` +- [ ] Runner called /start: Check operator logs for "Successfully started LangGraph workflow" +- [ ] Events are being emitted: Check events API +- [ ] Status updates: Check session status.currentNode +- [ ] Workflow completes: Check session status.phase = "Completed" + +## Troubleshooting + +### Pod not starting +```bash +# Check pod events +kubectl describe pod -n ${PROJECT} + +# Check image pull errors +kubectl get events -n ${PROJECT} --sort-by='.lastTimestamp' +``` + +### Runner not ready +```bash +# Check runner logs +kubectl logs -n ${PROJECT} -c langgraph-runner + +# Check readiness probe +kubectl exec -n ${PROJECT} -c langgraph-runner -- curl http://localhost:8000/ready +``` + +### Events not reaching backend +```bash +# Check runner logs for event emission errors +kubectl logs -n ${PROJECT} -c langgraph-runner | grep "emit_event" + +# Verify backend URL is correct +kubectl exec -n ${PROJECT} -c langgraph-runner -- env | grep BACKEND_API_URL +``` + +### Postgres connection issues +```bash +# Test connection from pod +kubectl exec -n ${PROJECT} -c langgraph-runner -- \ + python -c "from langgraph.checkpoint.postgres import PostgresSaver; \ + import os; \ + saver = PostgresSaver.from_conn_string(os.getenv('POSTGRES_DB', 'postgresql://...')); \ + saver.setup()" +``` + +## Quick Test Script + +Use the provided `test-langgraph.sh` script: + +```bash +# Make executable (if not already) +chmod +x test-langgraph.sh + +# Run test +./test-langgraph.sh + +# Example: +./test-langgraph.sh myproject quay.io/ambient_code/test-workflow@sha256:abc123... +``` + +The script will: +1. Register the workflow +2. Create a workflow run +3. Monitor execution status +4. Show events +5. Display final status + diff --git a/TEST_NOW.md b/TEST_NOW.md new file mode 100644 index 000000000..e21d2f7d2 --- /dev/null +++ b/TEST_NOW.md @@ -0,0 +1,193 @@ +# Quick Test Guide for LangGraph MVP + +## Prerequisites Check โœ… + +- โœ… Postgres running +- โœ… Backend running +- โœ… Frontend running +- โœ… Operator running +- โœ… Workflow image built: `quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0` + +## Step 1: Push Workflow Image to Quay.io + +```bash +# Login to Quay.io (if not already) +podman login quay.io + +# Push the workflow image +podman push quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0 + +# Get the digest (required for registration) +DIGEST=$(podman inspect quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0 | jq -r '.[0].RepoDigests[0]') +echo "Image digest: $DIGEST" +``` + +**Make sure the repository is public:** +- Go to: https://quay.io/repository/gkrumbach07/langgraph-example-workflow +- Settings โ†’ Visibility โ†’ Make Public + +## Step 2: Port-Forward Backend + +```bash +# Port-forward backend API (run in background) +oc port-forward -n ambient-code svc/backend-service 8080:8080 & + +# Or use the route (already exposed) +BACKEND_URL="https://ambient-code.apps.gkrumbac.dev.datahub.redhat.com" +``` + +## Step 3: Create/Verify Project Namespace + +```bash +# Create a test project namespace +PROJECT="test-langgraph" +oc new-project $PROJECT || oc project $PROJECT + +# Label it as managed +oc label namespace $PROJECT ambient-code.io/managed=true --overwrite +``` + +## Step 4: Register Workflow + +```bash +# Replace DIGEST with the actual digest from Step 1 +DIGEST="quay.io/gkrumbach07/langgraph-example-workflow@sha256:YOUR_DIGEST" + +curl -X POST "http://localhost:8080/api/projects/$PROJECT/workflows" \ + -H "Content-Type: application/json" \ + -d "{ + \"name\": \"example-workflow\", + \"owner\": \"test-user\", + \"version\": \"v1.0.0\", + \"imageRef\": \"$DIGEST\", + \"graphs\": [ + { + \"name\": \"main\", + \"entry\": \"app.workflow:build_app\" + } + ], + \"inputsSchema\": { + \"type\": \"object\", + \"properties\": { + \"message\": { + \"type\": \"string\", + \"description\": \"Input message to process\" + } + }, + \"required\": [\"message\"] + } + }" +``` + +**Expected response:** +```json +{"message":"Workflow version registered successfully","id":"..."} +``` + +## Step 5: Create Workflow Run (AgenticSession) + +```bash +curl -X POST "http://localhost:8080/api/projects/$PROJECT/agentic-sessions" \ + -H "Content-Type: application/json" \ + -d "{ + \"workflowRef\": { + \"name\": \"example-workflow\", + \"version\": \"v1.0.0\", + \"graph\": \"main\" + }, + \"inputs\": { + \"message\": \"Hello from LangGraph MVP!\", + \"step\": 0, + \"result\": \"\", + \"counter\": 0 + }, + \"displayName\": \"Test LangGraph Workflow\" + }" +``` + +**Expected response:** +```json +{"name":"agentic-session-...","status":{"phase":"Pending"}} +``` + +Save the `name` value - you'll need it for monitoring. + +## Step 6: Monitor Workflow Execution + +```bash +SESSION_NAME="agentic-session-XXXXX" # From Step 5 + +# Watch pod creation +oc get pods -n $PROJECT -w + +# Check session status +oc get agenticsessions -n $PROJECT $SESSION_NAME -o yaml + +# View logs (once pod is running) +oc logs -n $PROJECT -l job-name=${SESSION_NAME}-job -c langgraph-runner -f + +# Check workflow events via API +curl "http://localhost:8080/api/projects/$PROJECT/runs/$SESSION_NAME/events" | jq +``` + +## Step 7: Verify Completion + +```bash +# Check final status +curl "http://localhost:8080/api/projects/$PROJECT/agentic-sessions/$SESSION_NAME" | jq '.status' + +# Check Postgres checkpoint data +oc exec postgres-0 -n ambient-code -- psql -U langgraph -d langgraph -c "SELECT thread_id, checkpoint_ns, checkpoint_id FROM checkpoints ORDER BY created_at DESC LIMIT 5;" +``` + +## Automated Test Script + +For convenience, use the provided test script: + +```bash +cd /Users/gkrumbac/.cursor/worktrees/vTeam/syBFS + +# Make script executable +chmod +x test-langgraph.sh + +# Run automated test +./test-langgraph.sh $PROJECT "$DIGEST" +``` + +## Troubleshooting + +**Backend not connecting to Postgres?** +```bash +# Check backend logs +oc logs -n ambient-code -l app=backend-api --tail=50 | grep -i postgres + +# Verify Postgres service is accessible +oc exec backend-api-XXX -n ambient-code -- nc -zv postgres-service.ambient-code.svc.cluster.local 5432 +``` + +**Workflow pod not starting?** +```bash +# Check operator logs +oc logs -n ambient-code -l app=agentic-operator --tail=50 + +# Check pod events +oc describe pod -n $PROJECT +``` + +**Runner not ready?** +```bash +# Check runner logs +oc logs -n $PROJECT -c langgraph-runner + +# Check readiness endpoint +oc exec -n $PROJECT -c langgraph-runner -- curl http://localhost:8000/ready +``` + +## Next Steps + +Once this basic test works: +1. Test with interrupts (HITL) +2. Test workflow resumption +3. Test multiple graphs per image +4. Test workflow versioning + diff --git a/VM_TRANSFER_OPTIONS.md b/VM_TRANSFER_OPTIONS.md new file mode 100644 index 000000000..4ed45ceef --- /dev/null +++ b/VM_TRANSFER_OPTIONS.md @@ -0,0 +1,64 @@ +# Transferring Code to VM + +## Option 1: Git Clone (Recommended if you push first) + +**On your Mac:** +```bash +# Commit and push your changes +cd /Users/gkrumbac/.cursor/worktrees/vTeam/syBFS +git add . +git commit -m "Add LangGraph MVP support" +git push origin lang-graph # or your branch name +``` + +**On your VM:** +```bash +git clone git@github.com:Gkrumbach07/vTeam.git +cd vTeam +git checkout lang-graph # or your branch +``` + +## Option 2: SCP/rsync (Fast, works with any VM) + +**From your Mac:** +```bash +# Replace VM_IP with your VM's IP address +# Replace USERNAME with your VM username +scp -r /Users/gkrumbac/.cursor/worktrees/vTeam/syBFS username@VM_IP:/home/username/vTeam + +# Or use rsync (better for updates): +rsync -avz --exclude '.git' \ + /Users/gkrumbac/.cursor/worktrees/vTeam/syBFS/ \ + username@VM_IP:/home/username/vTeam/ +``` + +## Option 3: Drag and Drop (VM Client Dependent) + +**VMware Fusion/Parallels:** +- Usually supports drag-and-drop if VMware Tools/Parallels Tools is installed +- Just drag the folder from Finder to the VM window + +**VirtualBox:** +- Enable "Shared Clipboard" and "Drag'n'Drop" in VM settings +- May need Guest Additions installed + +**VMware Workstation/Fusion:** +- Enable "Drag and Drop" in VM settings +- Drag folder from host to guest + +## Option 4: Shared Folder (Best for ongoing development) + +**VMware Fusion:** +1. VM Settings โ†’ Sharing โ†’ Enable "Share Folders" +2. Add folder: `/Users/gkrumbac/.cursor/worktrees/vTeam/syBFS` +3. Access in VM at: `/mnt/hgfs/syBFS` (Linux) or `/Volumes/syBFS` (macOS guest) + +**VirtualBox:** +1. VM Settings โ†’ Shared Folders +2. Add shared folder pointing to your code directory +3. Access via: `/media/sf_FolderName` (Linux) or `/Volumes/FolderName` (macOS) + +## Recommended: Git Clone + +If you push your changes, cloning in the VM is cleanest and keeps everything in sync. + diff --git a/components/backend/Dockerfile b/components/backend/Dockerfile index 1106ef265..275411e6a 100644 --- a/components/backend/Dockerfile +++ b/components/backend/Dockerfile @@ -16,8 +16,8 @@ RUN GOPROXY=direct GOSUMDB=off go mod download || \ # Copy the source code COPY . . -# Build the application (with flags to avoid segfault) -RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o main . +# Build the application for linux/amd64 (required for OpenShift/K8s) +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -o main . # Final stage FROM registry.access.redhat.com/ubi9/ubi-minimal:latest diff --git a/components/backend/go.sum b/components/backend/go.sum index 34f4ab619..d5ffd5c3e 100644 --- a/components/backend/go.sum +++ b/components/backend/go.sum @@ -80,6 +80,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= diff --git a/components/backend/handlers/runs.go b/components/backend/handlers/runs.go new file mode 100644 index 000000000..e16481c99 --- /dev/null +++ b/components/backend/handlers/runs.go @@ -0,0 +1,246 @@ +package handlers + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + "time" + + "ambient-code-backend/server" + + "github.com/gin-gonic/gin" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// IngestRunEvent receives events from LangGraph runner pods +func IngestRunEvent(c *gin.Context) { + project := c.Param("projectName") + runID := c.Param("runId") + + var event struct { + RunID string `json:"run_id"` + Seq int `json:"seq"` + Ts string `json:"ts"` + Type string `json:"type"` + Node *string `json:"node"` + CheckpointID *string `json:"checkpoint_id"` + Payload map[string]interface{} `json:"payload"` + } + + if err := c.ShouldBindJSON(&event); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate run_id matches + if event.RunID != runID { + c.JSON(http.StatusBadRequest, gin.H{"error": "run_id mismatch"}) + return + } + + // Insert event + payloadJSON, _ := json.Marshal(event.Payload) + var checkpointID *string + if event.CheckpointID != nil { + checkpointID = event.CheckpointID + } + + _, err := server.DB.Exec( + "INSERT INTO run_events (run_id, seq, ts, kind, checkpoint_id, payload) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (run_id, seq) DO NOTHING", + runID, event.Seq, event.Ts, event.Type, checkpointID, payloadJSON, + ) + if err != nil { + log.Printf("Failed to insert run event: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to store event"}) + return + } + + // Update AgenticSession status based on event type + if event.Type == "node_start" || event.Type == "node_update" { + // Update currentNode in status + if event.Node != nil { + updateSessionStatusFromEvent(project, runID, map[string]interface{}{ + "currentNode": *event.Node, + }) + } + } else if event.Type == "interrupt" { + // Add condition for awaiting approval + if event.CheckpointID != nil { + updateSessionStatusFromEvent(project, runID, map[string]interface{}{ + "currentNode": event.Node, + "checkpointId": *event.CheckpointID, + "conditions": []map[string]interface{}{ + { + "type": "AwaitingApproval", + "status": "True", + "message": fmt.Sprintf("Waiting for approval at node %s", *event.Node), + "lastTransitionTime": time.Now().Format(time.RFC3339), + }, + }, + }) + } + } else if event.Type == "node_end" { + // Clear conditions + updateSessionStatusFromEvent(project, runID, map[string]interface{}{ + "conditions": []map[string]interface{}{}, + }) + } else if event.Type == "error" { + updateSessionStatusFromEvent(project, runID, map[string]interface{}{ + "phase": "Error", + "message": fmt.Sprintf("Workflow error: %v", event.Payload), + }) + } + + c.JSON(http.StatusOK, gin.H{"status": "ok"}) +} + +// GetRunEvents retrieves events for a run +func GetRunEvents(c *gin.Context) { + _ = c.Param("projectName") // project name from path, not used but kept for API consistency + runID := c.Param("runId") + + rows, err := server.DB.Query( + "SELECT seq, ts, kind, checkpoint_id, payload FROM run_events WHERE run_id = $1 ORDER BY seq ASC", + runID, + ) + if err != nil { + log.Printf("Failed to query run events: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get run events"}) + return + } + defer rows.Close() + + events := []map[string]interface{}{} + for rows.Next() { + var seq int + var ts time.Time + var kind string + var checkpointID sql.NullString + var payloadJSON []byte + + if err := rows.Scan(&seq, &ts, &kind, &checkpointID, &payloadJSON); err != nil { + log.Printf("Error scanning event: %v", err) + continue + } + + var payload map[string]interface{} + if len(payloadJSON) > 0 { + json.Unmarshal(payloadJSON, &payload) + } + + event := map[string]interface{}{ + "seq": seq, + "ts": ts.Format(time.RFC3339), + "type": kind, + "payload": payload, + } + if checkpointID.Valid { + event["checkpoint_id"] = checkpointID.String + } + + events = append(events, event) + } + + c.JSON(http.StatusOK, gin.H{"events": events}) +} + +// ApproveRun approves an interrupted workflow run +func ApproveRun(c *gin.Context) { + project := c.Param("projectName") + runID := c.Param("runId") + + var req struct { + Node string `json:"node" binding:"required"` + Decision map[string]interface{} `json:"decision" binding:"required"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Get current session to find checkpoint_id + gvr := GetAgenticSessionV1Alpha1Resource() + session, err := DynamicClient.Resource(gvr).Namespace(project).Get(c.Request.Context(), runID, v1.GetOptions{}) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Session not found"}) + return + } + + status, _, _ := unstructured.NestedMap(session.Object, "status") + checkpointID, _, _ := unstructured.NestedString(status, "checkpointId") + + if checkpointID == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "No checkpoint ID found"}) + return + } + + // Get runner service URL + runnerSvcName := fmt.Sprintf("langgraph-runner-%s", runID) + runnerURL := fmt.Sprintf("http://%s.%s.svc.cluster.local:8000", runnerSvcName, project) + + // Call /resume endpoint + resumeReq := map[string]interface{}{ + "checkpoint_id": checkpointID, + "values": req.Decision, + } + reqJSON, _ := json.Marshal(resumeReq) + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Post(fmt.Sprintf("%s/resume", runnerURL), "application/json", strings.NewReader(string(reqJSON))) + if err != nil { + log.Printf("Failed to call /resume: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("Failed to resume workflow: %v", err)}) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("Resume failed with status %d", resp.StatusCode)}) + return + } + + // Update session status + updateSessionStatusFromEvent(project, runID, map[string]interface{}{ + "conditions": []map[string]interface{}{ + { + "type": "AwaitingApproval", + "status": "False", + "lastTransitionTime": time.Now().Format(time.RFC3339), + }, + }, + }) + + c.JSON(http.StatusOK, gin.H{"status": "approved"}) +} + +// updateSessionStatusFromEvent is a helper to update AgenticSession status +func updateSessionStatusFromEvent(project, runID string, updates map[string]interface{}) { + gvr := GetAgenticSessionV1Alpha1Resource() + session, err := DynamicClient.Resource(gvr).Namespace(project).Get(context.TODO(), runID, v1.GetOptions{}) + if err != nil { + log.Printf("Failed to get session for status update: %v", err) + return + } + + status, _, _ := unstructured.NestedMap(session.Object, "status") + if status == nil { + status = make(map[string]interface{}) + } + + for k, v := range updates { + status[k] = v + } + + session.Object["status"] = status + _, err = DynamicClient.Resource(gvr).Namespace(project).UpdateStatus(context.TODO(), session, v1.UpdateOptions{}) + if err != nil { + log.Printf("Failed to update session status: %v", err) + } +} + diff --git a/components/backend/handlers/workflows.go b/components/backend/handlers/workflows.go new file mode 100644 index 000000000..698d9e714 --- /dev/null +++ b/components/backend/handlers/workflows.go @@ -0,0 +1,460 @@ +package handlers + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "regexp" + "strings" + "time" + + "ambient-code-backend/server" + "ambient-code-backend/types" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +var ( + // TrustedRegistries is a comma-separated list of registry patterns (e.g., "quay.io/ambient_code/*,quay.io/myorg/*") + TrustedRegistries string +) + +func init() { + TrustedRegistries = os.Getenv("TRUSTED_REGISTRIES") + if TrustedRegistries == "" { + TrustedRegistries = "quay.io/ambient_code/*" + } +} + +// validateImageDigest validates that the image digest is in the correct format +func validateImageDigest(imageDigest string) error { + // Must contain @sha256: prefix + if !strings.Contains(imageDigest, "@sha256:") { + return fmt.Errorf("image digest must be in format 'registry.io/org/repo@sha256:...' (not a tag)") + } + + // Basic format check: registry.io/org/repo@sha256:hexdigest + digestPattern := regexp.MustCompile(`^[a-zA-Z0-9._-]+(/[a-zA-Z0-9._-]+)+@sha256:[a-f0-9]{64}$`) + if !digestPattern.MatchString(imageDigest) { + return fmt.Errorf("invalid image digest format") + } + + return nil +} + +// validateRegistryWhitelist checks if the image digest matches a trusted registry pattern +func validateRegistryWhitelist(imageDigest string) error { + patterns := strings.Split(TrustedRegistries, ",") + for _, pattern := range patterns { + pattern = strings.TrimSpace(pattern) + if pattern == "" { + continue + } + + // Convert glob pattern to regex + // quay.io/ambient_code/* -> ^quay\.io/ambient_code/[^@]+ + // quay.io/myorg/* -> ^quay\.io/myorg/[^@]+ + regexPattern := strings.ReplaceAll(pattern, ".", "\\.") + regexPattern = strings.ReplaceAll(regexPattern, "*", "[^@]+") + regexPattern = "^" + regexPattern + + matched, err := regexp.MatchString(regexPattern, imageDigest) + if err != nil { + log.Printf("Error matching registry pattern %s: %v", pattern, err) + continue + } + + if matched { + return nil // Found matching pattern + } + } + + return fmt.Errorf("image digest does not match any trusted registry pattern. Allowed: %s", TrustedRegistries) +} + +// CreateWorkflow registers a new workflow +func CreateWorkflow(c *gin.Context) { + project := c.Param("projectName") + userID, _ := c.Get("userID") + userIDStr, ok := userID.(string) + if !ok || userIDStr == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "User identity required"}) + return + } + + var req types.CreateWorkflowRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate image digest format + if err := validateImageDigest(req.ImageDigest); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate registry whitelist + if err := validateRegistryWhitelist(req.ImageDigest); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate graphs + if len(req.Graphs) == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "at least one graph is required"}) + return + } + + for _, graph := range req.Graphs { + if graph.Name == "" || graph.Entry == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "graph name and entry are required"}) + return + } + // Validate entry format: module:function + if !strings.Contains(graph.Entry, ":") { + c.JSON(http.StatusBadRequest, gin.H{"error": "graph entry must be in format 'module:function'"}) + return + } + } + + // Start transaction + tx, err := server.DB.Begin() + if err != nil { + log.Printf("Failed to begin transaction: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create workflow"}) + return + } + defer tx.Rollback() + + // Check if workflow already exists + var existingID string + err = tx.QueryRow("SELECT id FROM workflows WHERE project = $1 AND name = $2", project, req.Name).Scan(&existingID) + if err == nil { + c.JSON(http.StatusConflict, gin.H{"error": fmt.Sprintf("workflow '%s' already exists", req.Name)}) + return + } + if err != sql.ErrNoRows { + log.Printf("Error checking existing workflow: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to check workflow existence"}) + return + } + + // Create workflow + workflowID := uuid.New().String() + _, err = tx.Exec( + "INSERT INTO workflows (id, name, owner, project, created_at) VALUES ($1, $2, $3, $4, $5)", + workflowID, req.Name, userIDStr, project, time.Now(), + ) + if err != nil { + log.Printf("Failed to insert workflow: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create workflow"}) + return + } + + // Create initial version (v1.0.0) + versionID := uuid.New().String() + graphsJSON, _ := json.Marshal(req.Graphs) + var inputsSchemaJSON []byte + if req.InputsSchema != nil { + inputsSchemaJSON, _ = json.Marshal(req.InputsSchema) + } + + _, err = tx.Exec( + "INSERT INTO workflow_versions (id, workflow_id, version, image_digest, graphs, inputs_schema, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7)", + versionID, workflowID, "v1.0.0", req.ImageDigest, graphsJSON, inputsSchemaJSON, time.Now(), + ) + if err != nil { + log.Printf("Failed to insert workflow version: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create workflow version"}) + return + } + + // Commit transaction + if err := tx.Commit(); err != nil { + log.Printf("Failed to commit transaction: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create workflow"}) + return + } + + c.JSON(http.StatusCreated, gin.H{ + "id": workflowID, + "name": req.Name, + }) +} + +// ListWorkflows lists all workflows for a project +func ListWorkflows(c *gin.Context) { + project := c.Param("projectName") + + rows, err := server.DB.Query( + "SELECT id, name, owner, project, created_at FROM workflows WHERE project = $1 ORDER BY created_at DESC", + project, + ) + if err != nil { + log.Printf("Failed to query workflows: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list workflows"}) + return + } + defer rows.Close() + + workflows := []types.Workflow{} + for rows.Next() { + var wf types.Workflow + if err := rows.Scan(&wf.ID, &wf.Name, &wf.Owner, &wf.Project, &wf.CreatedAt); err != nil { + log.Printf("Error scanning workflow: %v", err) + continue + } + workflows = append(workflows, wf) + } + + c.JSON(http.StatusOK, gin.H{"workflows": workflows}) +} + +// GetWorkflow gets a workflow with its versions +func GetWorkflow(c *gin.Context) { + project := c.Param("projectName") + name := c.Param("name") + + var wf types.Workflow + err := server.DB.QueryRow( + "SELECT id, name, owner, project, created_at FROM workflows WHERE project = $1 AND name = $2", + project, name, + ).Scan(&wf.ID, &wf.Name, &wf.Owner, &wf.Project, &wf.CreatedAt) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "Workflow not found"}) + return + } + if err != nil { + log.Printf("Failed to query workflow: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get workflow"}) + return + } + + // Get versions + versionRows, err := server.DB.Query( + "SELECT id, workflow_id, version, image_digest, graphs, inputs_schema, created_at FROM workflow_versions WHERE workflow_id = $1 ORDER BY created_at DESC", + wf.ID, + ) + if err != nil { + log.Printf("Failed to query workflow versions: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get workflow versions"}) + return + } + defer versionRows.Close() + + versions := []types.WorkflowVersion{} + for versionRows.Next() { + var v types.WorkflowVersion + var graphsJSON, inputsSchemaJSON []byte + if err := versionRows.Scan(&v.ID, &v.WorkflowID, &v.Version, &v.ImageDigest, &graphsJSON, &inputsSchemaJSON, &v.CreatedAt); err != nil { + log.Printf("Error scanning workflow version: %v", err) + continue + } + + if err := json.Unmarshal(graphsJSON, &v.Graphs); err != nil { + log.Printf("Error unmarshaling graphs: %v", err) + continue + } + + if len(inputsSchemaJSON) > 0 { + if err := json.Unmarshal(inputsSchemaJSON, &v.InputsSchema); err != nil { + log.Printf("Error unmarshaling inputs schema: %v", err) + } + } + + versions = append(versions, v) + } + + c.JSON(http.StatusOK, gin.H{ + "workflow": wf, + "versions": versions, + }) +} + +// CreateWorkflowVersion adds a new version to an existing workflow +func CreateWorkflowVersion(c *gin.Context) { + project := c.Param("projectName") + name := c.Param("name") + + var req types.CreateWorkflowVersionRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate image digest format + if err := validateImageDigest(req.ImageDigest); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate registry whitelist + if err := validateRegistryWhitelist(req.ImageDigest); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate graphs + if len(req.Graphs) == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "at least one graph is required"}) + return + } + + for _, graph := range req.Graphs { + if graph.Name == "" || graph.Entry == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "graph name and entry are required"}) + return + } + if !strings.Contains(graph.Entry, ":") { + c.JSON(http.StatusBadRequest, gin.H{"error": "graph entry must be in format 'module:function'"}) + return + } + } + + // Get workflow ID + var workflowID string + err := server.DB.QueryRow( + "SELECT id FROM workflows WHERE project = $1 AND name = $2", + project, name, + ).Scan(&workflowID) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "Workflow not found"}) + return + } + if err != nil { + log.Printf("Failed to query workflow: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get workflow"}) + return + } + + // Check if version already exists + var existingID string + err = server.DB.QueryRow( + "SELECT id FROM workflow_versions WHERE workflow_id = $1 AND version = $2", + workflowID, req.Version, + ).Scan(&existingID) + if err == nil { + c.JSON(http.StatusConflict, gin.H{"error": fmt.Sprintf("version '%s' already exists", req.Version)}) + return + } + if err != sql.ErrNoRows { + log.Printf("Error checking existing version: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to check version existence"}) + return + } + + // Create version + versionID := uuid.New().String() + graphsJSON, _ := json.Marshal(req.Graphs) + var inputsSchemaJSON []byte + if req.InputsSchema != nil { + inputsSchemaJSON, _ = json.Marshal(req.InputsSchema) + } + + _, err = server.DB.Exec( + "INSERT INTO workflow_versions (id, workflow_id, version, image_digest, graphs, inputs_schema, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7)", + versionID, workflowID, req.Version, req.ImageDigest, graphsJSON, inputsSchemaJSON, time.Now(), + ) + if err != nil { + log.Printf("Failed to insert workflow version: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create workflow version"}) + return + } + + c.JSON(http.StatusCreated, gin.H{ + "id": versionID, + "version": req.Version, + }) +} + +// GetWorkflowVersion gets a specific workflow version +func GetWorkflowVersion(c *gin.Context) { + project := c.Param("projectName") + name := c.Param("name") + version := c.Param("version") + + // Get workflow ID + var workflowID string + err := server.DB.QueryRow( + "SELECT id FROM workflows WHERE project = $1 AND name = $2", + project, name, + ).Scan(&workflowID) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "Workflow not found"}) + return + } + if err != nil { + log.Printf("Failed to query workflow: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get workflow"}) + return + } + + // Get version + var v types.WorkflowVersion + var graphsJSON, inputsSchemaJSON []byte + err = server.DB.QueryRow( + "SELECT id, workflow_id, version, image_digest, graphs, inputs_schema, created_at FROM workflow_versions WHERE workflow_id = $1 AND version = $2", + workflowID, version, + ).Scan(&v.ID, &v.WorkflowID, &v.Version, &v.ImageDigest, &graphsJSON, &inputsSchemaJSON, &v.CreatedAt) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "Workflow version not found"}) + return + } + if err != nil { + log.Printf("Failed to query workflow version: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get workflow version"}) + return + } + + if err := json.Unmarshal(graphsJSON, &v.Graphs); err != nil { + log.Printf("Error unmarshaling graphs: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to parse workflow graphs"}) + return + } + + if len(inputsSchemaJSON) > 0 { + if err := json.Unmarshal(inputsSchemaJSON, &v.InputsSchema); err != nil { + log.Printf("Error unmarshaling inputs schema: %v", err) + } + } + + c.JSON(http.StatusOK, v) +} + +// DeleteWorkflow deletes a workflow and all its versions +func DeleteWorkflow(c *gin.Context) { + project := c.Param("projectName") + name := c.Param("name") + + // Get workflow ID + var workflowID string + err := server.DB.QueryRow( + "SELECT id FROM workflows WHERE project = $1 AND name = $2", + project, name, + ).Scan(&workflowID) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "Workflow not found"}) + return + } + if err != nil { + log.Printf("Failed to query workflow: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get workflow"}) + return + } + + // Delete workflow (CASCADE will delete versions) + _, err = server.DB.Exec("DELETE FROM workflows WHERE id = $1", workflowID) + if err != nil { + log.Printf("Failed to delete workflow: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to delete workflow"}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "Workflow deleted successfully"}) +} + diff --git a/components/backend/server/db.go b/components/backend/server/db.go new file mode 100644 index 000000000..5f385d88b --- /dev/null +++ b/components/backend/server/db.go @@ -0,0 +1,139 @@ +package server + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "os" + + _ "github.com/lib/pq" // PostgreSQL driver +) + +var ( + DB *sql.DB +) + +// InitDB initializes the database connection +func InitDB() error { + pgHost := os.Getenv("POSTGRES_HOST") + if pgHost == "" { + pgHost = "postgres-service" + } + + pgPort := os.Getenv("POSTGRES_PORT") + if pgPort == "" { + pgPort = "5432" + } + + pgUser := os.Getenv("POSTGRES_USER") + if pgUser == "" { + pgUser = "langgraph" + } + + pgPassword := os.Getenv("POSTGRES_PASSWORD") + if pgPassword == "" { + pgPassword = "langgraph-change-me" + } + + pgDB := os.Getenv("POSTGRES_DB") + if pgDB == "" { + pgDB = "langgraph" + } + + pgSSLMode := os.Getenv("POSTGRES_SSLMODE") + if pgSSLMode == "" { + pgSSLMode = "disable" // Default for internal cluster communication + } + + dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s", + pgHost, pgPort, pgUser, pgPassword, pgDB, pgSSLMode) + + var err error + DB, err = sql.Open("postgres", dsn) + if err != nil { + return fmt.Errorf("failed to open database connection: %v", err) + } + + // Test connection + if err := DB.Ping(); err != nil { + return fmt.Errorf("failed to ping database: %v", err) + } + + // Create workflow tables if they don't exist + if err := createWorkflowTables(); err != nil { + return fmt.Errorf("failed to create workflow tables: %v", err) + } + + log.Printf("Database connection initialized: %s@%s:%s/%s", pgUser, pgHost, pgPort, pgDB) + return nil +} + +// createWorkflowTables creates the workflow registry tables +func createWorkflowTables() error { + workflowsTable := ` + CREATE TABLE IF NOT EXISTS workflows ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + owner TEXT NOT NULL, + project TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + UNIQUE(project, name) + )` + + workflowVersionsTable := ` + CREATE TABLE IF NOT EXISTS workflow_versions ( + id TEXT PRIMARY KEY, + workflow_id TEXT NOT NULL REFERENCES workflows(id) ON DELETE CASCADE, + version TEXT NOT NULL, + image_digest TEXT NOT NULL, + graphs JSONB NOT NULL, + inputs_schema JSONB, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + UNIQUE(workflow_id, version) + )` + + runEventsTable := ` + CREATE TABLE IF NOT EXISTS run_events ( + id SERIAL PRIMARY KEY, + run_id TEXT NOT NULL, + seq INTEGER NOT NULL, + ts TIMESTAMP NOT NULL DEFAULT NOW(), + kind TEXT NOT NULL, + checkpoint_id TEXT, + payload JSONB, + UNIQUE(run_id, seq) + )` + + runEventsIdx := ` + CREATE INDEX IF NOT EXISTS idx_run_events_run_id ON run_events(run_id, ts DESC)` + + if _, err := DB.Exec(workflowsTable); err != nil { + return fmt.Errorf("failed to create workflows table: %v", err) + } + + if _, err := DB.Exec(workflowVersionsTable); err != nil { + return fmt.Errorf("failed to create workflow_versions table: %v", err) + } + + if _, err := DB.Exec(runEventsTable); err != nil { + return fmt.Errorf("failed to create run_events table: %v", err) + } + + if _, err := DB.Exec(runEventsIdx); err != nil { + return fmt.Errorf("failed to create run_events index: %v", err) + } + + log.Println("Workflow tables created/verified") + return nil +} + +// Helper functions for JSONB handling +func jsonbMarshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func jsonbUnmarshal(data []byte, v interface{}) error { + return json.Unmarshal(data, v) +} + diff --git a/components/backend/types/workflow.go b/components/backend/types/workflow.go new file mode 100644 index 000000000..76673e242 --- /dev/null +++ b/components/backend/types/workflow.go @@ -0,0 +1,53 @@ +package types + +import "time" + +// Workflow represents a registered LangGraph workflow +type Workflow struct { + ID string `json:"id"` + Name string `json:"name"` + Owner string `json:"owner"` + Project string `json:"project"` + CreatedAt time.Time `json:"createdAt"` +} + +// WorkflowVersion represents a version of a workflow with its image +type WorkflowVersion struct { + ID string `json:"id"` + WorkflowID string `json:"workflowId"` + Version string `json:"version"` + ImageDigest string `json:"imageDigest"` // Full digest: quay.io/org/repo@sha256:... + Graphs []WorkflowGraph `json:"graphs"` // Multiple graphs per image + InputsSchema map[string]interface{} `json:"inputsSchema,omitempty"` // JSONSchema for UI + CreatedAt time.Time `json:"createdAt"` +} + +// WorkflowGraph represents a graph entry point in a workflow version +type WorkflowGraph struct { + Name string `json:"name"` // Display name (e.g., "spec_kit") + Entry string `json:"entry"` // Module:function (e.g., "app:build_app") +} + +// CreateWorkflowRequest represents a request to register a new workflow +type CreateWorkflowRequest struct { + Name string `json:"name" binding:"required"` + ImageDigest string `json:"imageDigest" binding:"required"` // Must be digest format + Graphs []WorkflowGraph `json:"graphs" binding:"required"` + InputsSchema map[string]interface{} `json:"inputsSchema,omitempty"` +} + +// CreateWorkflowVersionRequest represents a request to add a new version to an existing workflow +type CreateWorkflowVersionRequest struct { + Version string `json:"version" binding:"required"` + ImageDigest string `json:"imageDigest" binding:"required"` + Graphs []WorkflowGraph `json:"graphs" binding:"required"` + InputsSchema map[string]interface{} `json:"inputsSchema,omitempty"` +} + +// WorkflowRef references a workflow for use in AgenticSession +type WorkflowRef struct { + Name string `json:"name" binding:"required"` + Version string `json:"version,omitempty"` // Optional, defaults to latest + Graph string `json:"graph" binding:"required"` // Graph name from workflow version's graphs array +} + diff --git a/components/manifests/postgres/kustomization.yaml b/components/manifests/postgres/kustomization.yaml new file mode 100644 index 000000000..105f8f1bb --- /dev/null +++ b/components/manifests/postgres/kustomization.yaml @@ -0,0 +1,9 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: +- postgres-secret.yaml +- postgres-configmap.yaml +- postgres-statefulset.yaml +- postgres-service.yaml + diff --git a/components/manifests/postgres/postgres-configmap.yaml b/components/manifests/postgres/postgres-configmap.yaml new file mode 100644 index 000000000..037e424cf --- /dev/null +++ b/components/manifests/postgres/postgres-configmap.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: postgres-config + labels: + app: postgres +data: + init-db.sh: | + #!/bin/bash + set -e + # Database is created automatically by Postgres image + # LangGraph tables will be created by PostgresSaver.setup() on first use + echo "Postgres initialized. LangGraph tables will be created automatically." + diff --git a/components/manifests/postgres/postgres-secret.yaml b/components/manifests/postgres/postgres-secret.yaml new file mode 100644 index 000000000..3cb647c1c --- /dev/null +++ b/components/manifests/postgres/postgres-secret.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Secret +metadata: + name: postgres-secret + labels: + app: postgres +type: Opaque +stringData: + POSTGRES_USER: langgraph + POSTGRES_PASSWORD: langgraph-change-me # Change in production + POSTGRES_DB: langgraph + diff --git a/components/manifests/postgres/postgres-service.yaml b/components/manifests/postgres/postgres-service.yaml new file mode 100644 index 000000000..284e20ba4 --- /dev/null +++ b/components/manifests/postgres/postgres-service.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Service +metadata: + name: postgres-service + labels: + app: postgres +spec: + selector: + app: postgres + ports: + - port: 5432 + targetPort: postgres + protocol: TCP + name: postgres + type: ClusterIP + diff --git a/components/manifests/postgres/postgres-statefulset.yaml b/components/manifests/postgres/postgres-statefulset.yaml new file mode 100644 index 000000000..8be23ddd6 --- /dev/null +++ b/components/manifests/postgres/postgres-statefulset.yaml @@ -0,0 +1,85 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: postgres + labels: + app: postgres +spec: + serviceName: postgres-service + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + serviceAccountName: default + containers: + - name: postgres + image: image-registry.openshift-image-registry.svc:5000/openshift/postgresql:15-el8 + imagePullPolicy: IfNotPresent + ports: + - containerPort: 5432 + name: postgres + env: + - name: POSTGRESQL_USER + valueFrom: + secretKeyRef: + name: postgres-secret + key: POSTGRES_USER + - name: POSTGRESQL_PASSWORD + valueFrom: + secretKeyRef: + name: postgres-secret + key: POSTGRES_PASSWORD + - name: POSTGRESQL_DATABASE + valueFrom: + secretKeyRef: + name: postgres-secret + key: POSTGRES_DB + - name: PGDATA + value: /var/lib/postgresql/data/pgdata + volumeMounts: + - name: postgres-data + mountPath: /var/lib/postgresql/data + - name: postgres-config + mountPath: /docker-entrypoint-initdb.d + resources: + requests: + cpu: 200m + memory: 256Mi + limits: + cpu: 1000m + memory: 1Gi + livenessProbe: + exec: + command: + - /bin/sh + - -c + - pg_isready -U $(POSTGRESQL_USER) -d $(POSTGRESQL_DATABASE) + initialDelaySeconds: 30 + periodSeconds: 10 + readinessProbe: + exec: + command: + - /bin/sh + - -c + - pg_isready -U $(POSTGRESQL_USER) -d $(POSTGRESQL_DATABASE) + initialDelaySeconds: 5 + periodSeconds: 5 + volumes: + - name: postgres-config + configMap: + name: postgres-config + volumeClaimTemplates: + - metadata: + name: postgres-data + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi + diff --git a/components/runners/langgraph-wrapper/BUILD.md b/components/runners/langgraph-wrapper/BUILD.md new file mode 100644 index 000000000..2418af93f --- /dev/null +++ b/components/runners/langgraph-wrapper/BUILD.md @@ -0,0 +1,51 @@ +# Build and Push Base LangGraph Wrapper Image + +## Build the Base Image + +```bash +cd components/runners/langgraph-wrapper + +# Build for linux/amd64 (required for OpenShift/K8s) +podman build --platform linux/amd64 -t quay.io/gkrumbach07/langgraph-wrapper:base . +``` + +## Push to Quay.io + +```bash +# Login to Quay.io (if not already logged in) +podman login quay.io +# Enter your Quay.io username: gkrumbach07 +# Enter your password/token when prompted + +# Push the image +podman push quay.io/gkrumbach07/langgraph-wrapper:base + +# Get the digest (for reference) +podman inspect quay.io/gkrumbach07/langgraph-wrapper:base | jq -r '.[0].RepoDigests[0]' +``` + +## Verify Image + +```bash +# Check image exists locally +podman images | grep langgraph-wrapper + +# Pull and test (after pushing) +podman pull quay.io/gkrumbach07/langgraph-wrapper:base +``` + +## Make Image Public (if needed) + +1. Go to https://quay.io/repository/gkrumbach07/langgraph-wrapper +2. Click on "Settings" โ†’ "Visibility" +3. Make it public so workflows can pull it + +## Update Workflow Dockerfiles + +All workflow Dockerfiles should now use: +```dockerfile +FROM quay.io/gkrumbach07/langgraph-wrapper:base +``` + +The Dockerfile.template has been updated with this base image. + diff --git a/components/runners/langgraph-wrapper/Dockerfile b/components/runners/langgraph-wrapper/Dockerfile new file mode 100644 index 000000000..93dafd7c2 --- /dev/null +++ b/components/runners/langgraph-wrapper/Dockerfile @@ -0,0 +1,30 @@ +FROM registry.access.redhat.com/ubi9/python-311:latest + +USER 0 + +WORKDIR /app + +# Install system dependencies +RUN dnf install -y git procps && dnf clean all + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy runner server +COPY runner/ ./runner/ + +# Set environment variables +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 + +# Expose port for FastAPI server +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1 + +# Run the server +CMD ["uvicorn", "runner.server:app", "--host", "0.0.0.0", "--port", "8000"] + diff --git a/components/runners/langgraph-wrapper/Dockerfile.template b/components/runners/langgraph-wrapper/Dockerfile.template new file mode 100644 index 000000000..51a579f8d --- /dev/null +++ b/components/runners/langgraph-wrapper/Dockerfile.template @@ -0,0 +1,29 @@ +# Example Dockerfile for building LangGraph workflows +# +# Usage: +# 1. Copy this file to your LangGraph workflow repo +# 2. Customize as needed (install your dependencies) +# 3. Build: docker build -t quay.io/myorg/my-workflow:v1.0.0 . +# 4. Push: docker push quay.io/myorg/my-workflow:v1.0.0 +# 5. Get digest: docker inspect quay.io/myorg/my-workflow:v1.0.0 | jq '.[0].RepoDigests[0]' +# 6. Register in UI with the digest + +FROM quay.io/gkrumbach07/langgraph-wrapper:base + +# Copy your workflow code +WORKDIR /app/workflow +COPY . . + +# Install your workflow dependencies (if any) +# Example: +# COPY requirements.txt . +# RUN pip install --no-cache-dir -r requirements.txt + +# Your workflow code should expose graph entry points +# Example structure: +# app/ +# __init__.py +# workflow.py # Contains build_app() function +# +# Graph entry point: "app:build_app" + diff --git a/components/runners/langgraph-wrapper/requirements.txt b/components/runners/langgraph-wrapper/requirements.txt new file mode 100644 index 000000000..79cb39331 --- /dev/null +++ b/components/runners/langgraph-wrapper/requirements.txt @@ -0,0 +1,8 @@ +langgraph>=0.2.0 +langgraph-checkpoint-postgres>=0.1.0 +fastapi>=0.115.0 +uvicorn[standard]>=0.32.0 +httpx>=0.27.0 +requests>=2.31.0 + + diff --git a/components/runners/langgraph-wrapper/runner/__init__.py b/components/runners/langgraph-wrapper/runner/__init__.py new file mode 100644 index 000000000..da2ce3dd4 --- /dev/null +++ b/components/runners/langgraph-wrapper/runner/__init__.py @@ -0,0 +1,2 @@ +# Empty __init__.py to make runner a package + diff --git a/components/runners/langgraph-wrapper/runner/server.py b/components/runners/langgraph-wrapper/runner/server.py new file mode 100644 index 000000000..2413a4b95 --- /dev/null +++ b/components/runners/langgraph-wrapper/runner/server.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python3 +""" +LangGraph workflow runner server. +Provides HTTP endpoints for starting, resuming, and querying LangGraph workflow runs. +""" + +import os +import sys +import importlib +import asyncio +import logging +from typing import Dict, Any, Optional +from pathlib import Path + +from fastapi import FastAPI, HTTPException, BackgroundTasks +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from langgraph.checkpoint.postgres import PostgresSaver + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +app = FastAPI(title="LangGraph Runner Server") + +# Global state +_graph: Optional[Any] = None +_checkpointer: Optional[PostgresSaver] = None +_run_id: Optional[str] = None +_artifacts_dir: Optional[str] = None +_backend_url: Optional[str] = None +_stream_task: Optional[asyncio.Task] = None + +# Request models +class StartRequest(BaseModel): + run_id: str + inputs: Dict[str, Any] + +class ResumeRequest(BaseModel): + checkpoint_id: str + values: Dict[str, Any] + +class EventPayload(BaseModel): + run_id: str + seq: int + ts: str + type: str + node: Optional[str] = None + checkpoint_id: Optional[str] = None + payload: Dict[str, Any] + + +def load_graph(entry: str): + """Load a compiled graph from the specified module:function entry point.""" + try: + module_path, function_name = entry.split(":", 1) + logger.info(f"Loading graph from {module_path}:{function_name}") + + # Add workflow directory to path if it exists + workflow_dir = Path("/app/workflow") + if workflow_dir.exists(): + sys.path.insert(0, str(workflow_dir)) + + # Import module + module = importlib.import_module(module_path) + + # Get function + build_func = getattr(module, function_name) + + # Call function to get graph + graph = build_func() + + logger.info(f"Successfully loaded graph from {entry}") + return graph + except Exception as e: + logger.error(f"Failed to load graph from {entry}: {e}", exc_info=True) + raise ValueError(f"Failed to load graph from {entry}: {e}") + + +def initialize_checkpointer(): + """Initialize PostgresSaver checkpointer.""" + pg_dsn = os.getenv("PG_DSN") + if not pg_dsn: + # Build DSN from individual env vars + pg_host = os.getenv("POSTGRES_HOST", "postgres-service") + pg_port = os.getenv("POSTGRES_PORT", "5432") + pg_user = os.getenv("POSTGRES_USER", "langgraph") + pg_password = os.getenv("POSTGRES_PASSWORD", "langgraph-change-me") + pg_db = os.getenv("POSTGRES_DB", "langgraph") + pg_dsn = f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}" + + logger.info(f"Initializing PostgresSaver with DSN: {pg_dsn.replace(pg_password, '***')}") + checkpointer = PostgresSaver.from_conn_string(pg_dsn) + + # Setup tables (idempotent) + try: + checkpointer.setup() + logger.info("PostgresSaver tables initialized") + except Exception as e: + logger.warning(f"Failed to setup PostgresSaver tables (may already exist): {e}") + + return checkpointer + + +async def emit_event(event_type: str, node: Optional[str] = None, checkpoint_id: Optional[str] = None, payload: Dict[str, Any] = None): + """Emit an event to the backend.""" + if not _backend_url or not _run_id: + logger.debug("Skipping event emission (backend_url or run_id not set)") + return + + if payload is None: + payload = {} + + import httpx + from datetime import datetime + + event = { + "run_id": _run_id, + "seq": int(datetime.now().timestamp() * 1000), # Simple sequence number + "ts": datetime.now().isoformat(), + "type": event_type, + "node": node, + "checkpoint_id": checkpoint_id, + "payload": payload, + } + + try: + async with httpx.AsyncClient(timeout=5.0) as client: + await client.post( + f"{_backend_url}/projects/{os.getenv('PROJECT', 'default')}/runs/{_run_id}/events", + json=event, + ) + logger.debug(f"Emitted event: {event_type} for node {node}") + except Exception as e: + logger.warning(f"Failed to emit event to backend: {e}") + + +async def stream_graph(inputs: Dict[str, Any], run_id: str): + """Stream graph execution and emit events.""" + global _graph, _checkpointer, _run_id + + try: + config = {"configurable": {"thread_id": run_id}} + + await emit_event("node_start", payload={"inputs": inputs}) + + # Stream graph execution + async for event in _graph.astream(inputs, config, stream_mode="updates"): + for node_name, node_data in event.items(): + await emit_event("node_update", node=node_name, payload={"data": node_data}) + + # Check if this node has an interrupt + state = await _graph.aget_state(config) + if state.next and state.next != (): + # Check if any node in next is waiting for interrupt + for next_node in state.next: + # If graph is paused, emit interrupt event + if state.metadata and state.metadata.get("source") == "interrupt": + checkpoint_id = state.config.get("configurable", {}).get("checkpoint_id") + await emit_event("interrupt", node=next_node, checkpoint_id=checkpoint_id, payload={ + "state": state.values, + }) + return # Pause execution + + await emit_event("node_end", payload={"completed": True}) + + except Exception as e: + logger.error(f"Error streaming graph: {e}", exc_info=True) + await emit_event("error", payload={"error": str(e)}) + raise + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "healthy"} + + +@app.get("/ready") +async def ready(): + """Readiness check endpoint.""" + # Graph loads lazily on first /start, so we're ready if server is running + return {"status": "ready"} + + +@app.post("/start") +async def start(request: StartRequest, background_tasks: BackgroundTasks): + """Start a workflow run.""" + global _graph, _checkpointer, _run_id, _artifacts_dir, _backend_url, _stream_task + + if _graph is None: + # Load graph on first start + graph_entry = os.getenv("GRAPH_ENTRY") + if not graph_entry: + raise HTTPException(status_code=400, detail="GRAPH_ENTRY environment variable not set") + + try: + _graph = load_graph(graph_entry) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + # Initialize checkpointer + _checkpointer = initialize_checkpointer() + + # Compile graph with checkpointer if not already compiled + if not hasattr(_graph, "astream"): + # Assume it's a workflow that needs compilation + _graph = _graph.compile(checkpointer=_checkpointer) + elif hasattr(_graph, "compile") and _checkpointer: + # Recompile with checkpointer + _graph = _graph.compile(checkpointer=_checkpointer) + + _run_id = request.run_id + _artifacts_dir = os.getenv("ARTIFACTS_DIR", "/workspace/artifacts") + _backend_url = os.getenv("BACKEND_API_URL", "http://backend-service:8080/api") + + # Ensure artifacts directory exists + Path(_artifacts_dir).mkdir(parents=True, exist_ok=True) + + # Start streaming in background + _stream_task = asyncio.create_task(stream_graph(request.inputs, request.run_id)) + + return {"status": "started", "run_id": request.run_id} + + +@app.post("/resume") +async def resume(request: ResumeRequest): + """Resume a workflow from an interrupt.""" + global _graph, _checkpointer, _run_id, _stream_task + + # Load graph if not already loaded (for pod restart scenario) + if _graph is None: + graph_entry = os.getenv("GRAPH_ENTRY") + if not graph_entry: + raise HTTPException(status_code=400, detail="GRAPH_ENTRY environment variable not set") + + try: + _graph = load_graph(graph_entry) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + # Initialize checkpointer + _checkpointer = initialize_checkpointer() + + # Compile graph with checkpointer if not already compiled + if not hasattr(_graph, "astream"): + _graph = _graph.compile(checkpointer=_checkpointer) + elif hasattr(_graph, "compile") and _checkpointer: + _graph = _graph.compile(checkpointer=_checkpointer) + + # Use run_id from request if not set (for pod restart) + if not _run_id: + # Extract run_id from checkpoint_id if it contains it, otherwise use checkpoint_id as-is + # Checkpoint IDs from LangGraph are typically in format "thread_id:checkpoint_id" + # For now, assume run_id is the session name from env or checkpoint_id + _run_id = os.getenv("RUN_ID", request.checkpoint_id.split(":")[0] if ":" in request.checkpoint_id else request.checkpoint_id) + + _artifacts_dir = os.getenv("ARTIFACTS_DIR", "/workspace/artifacts") + _backend_url = os.getenv("BACKEND_API_URL", "http://backend-service:8080/api") + + config = { + "configurable": { + "thread_id": _run_id, + "checkpoint_id": request.checkpoint_id, + } + } + + try: + # Get current state first to verify checkpoint exists + state = await _graph.aget_state(config) + + # Update state with decision values + await _graph.aupdate_state(config, values=request.values) + + # Get updated state to continue streaming + state = await _graph.aget_state(config) + + # Continue streaming from current state + _stream_task = asyncio.create_task(stream_graph(state.values, _run_id)) + + return {"status": "resumed", "run_id": _run_id, "checkpoint_id": request.checkpoint_id} + except Exception as e: + logger.error(f"Error resuming workflow: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.get("/status") +async def status(): + """Get current workflow run status.""" + global _graph, _run_id + + if _graph is None: + return {"status": "not_started"} + + if not _run_id: + return {"status": "no_active_run"} + + try: + config = {"configurable": {"thread_id": _run_id}} + state = await _graph.aget_state(config) + + # Determine current node + currentNode = None + if state.next and len(state.next) > 0: + currentNode = state.next[0] + + checkpoint_id = state.config.get("configurable", {}).get("checkpoint_id") + + return { + "status": "running" if currentNode else "completed", + "currentNode": currentNode, + "checkpoint_id": checkpoint_id, + "values": state.values, + } + except Exception as e: + logger.error(f"Error getting status: {e}", exc_info=True) + return {"status": "error", "error": str(e)} + + +@app.post("/stop") +async def stop(): + """Stop the current workflow run.""" + global _stream_task + + if _stream_task: + _stream_task.cancel() + try: + await _stream_task + except asyncio.CancelledError: + pass + + return {"status": "stopped"} + + +if __name__ == "__main__": + import uvicorn + port = int(os.getenv("PORT", "8000")) + uvicorn.run(app, host="0.0.0.0", port=port) + diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 000000000..a9a9a5e8e --- /dev/null +++ b/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "syBFS", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/test-langgraph.sh b/test-langgraph.sh new file mode 100755 index 000000000..fd7c8b929 --- /dev/null +++ b/test-langgraph.sh @@ -0,0 +1,113 @@ +#!/bin/bash +# Quick E2E Test Script for LangGraph Workflows +# Usage: ./test-langgraph.sh + +set -e + +PROJECT="${1:-test-project}" +IMAGE_DIGEST="${2}" +BACKEND_URL="${BACKEND_URL:-http://localhost:8080}" + +if [ -z "$IMAGE_DIGEST" ]; then + echo "Usage: $0 " + echo "Example: $0 myproject quay.io/ambient_code/test-workflow@sha256:abc123..." + exit 1 +fi + +echo "๐Ÿงช Testing LangGraph Workflow MVP" +echo "==================================" +echo "Project: $PROJECT" +echo "Image: $IMAGE_DIGEST" +echo "Backend: $BACKEND_URL" +echo "" + +# Step 1: Register workflow +echo "1๏ธโƒฃ Registering workflow..." +WORKFLOW_RESPONSE=$(curl -s -X POST "${BACKEND_URL}/api/projects/${PROJECT}/workflows" \ + -H "Content-Type: application/json" \ + -d "{ + \"name\": \"test-workflow\", + \"imageDigest\": \"${IMAGE_DIGEST}\", + \"graphs\": [{\"name\": \"main\", \"entry\": \"app.workflow:build_app\"}] + }") + +if echo "$WORKFLOW_RESPONSE" | jq -e '.error' > /dev/null 2>&1; then + echo "โŒ Failed to register workflow:" + echo "$WORKFLOW_RESPONSE" | jq + exit 1 +fi + +echo "โœ… Workflow registered" +echo "$WORKFLOW_RESPONSE" | jq + +# Step 2: Create run +echo "" +echo "2๏ธโƒฃ Creating workflow run..." +SESSION_RESPONSE=$(curl -s -X POST "${BACKEND_URL}/api/projects/${PROJECT}/agentic-sessions" \ + -H "Content-Type: application/json" \ + -d '{ + "workflowRef": {"name": "test-workflow", "graph": "main"}, + "inputs": {"message": "Hello from test script!"}, + "displayName": "Test Run" + }') + +SESSION_NAME=$(echo "$SESSION_RESPONSE" | jq -r '.name // empty') + +if [ -z "$SESSION_NAME" ] || [ "$SESSION_NAME" = "null" ]; then + echo "โŒ Failed to create session:" + echo "$SESSION_RESPONSE" | jq + exit 1 +fi + +echo "โœ… Session created: $SESSION_NAME" +echo "$SESSION_RESPONSE" | jq + +# Step 3: Monitor status +echo "" +echo "3๏ธโƒฃ Monitoring workflow execution..." +echo " (Press Ctrl+C to stop monitoring)" + +for i in {1..60}; do + STATUS_RESPONSE=$(curl -s "${BACKEND_URL}/api/projects/${PROJECT}/agentic-sessions/${SESSION_NAME}") + PHASE=$(echo "$STATUS_RESPONSE" | jq -r '.status.phase // "Unknown"') + CURRENT_NODE=$(echo "$STATUS_RESPONSE" | jq -r '.status.currentNode // "N/A"') + MESSAGE=$(echo "$STATUS_RESPONSE" | jq -r '.status.message // ""') + + printf "\r [%02d] Phase: %-12s | Node: %-20s" "$i" "$PHASE" "$CURRENT_NODE" + + if [ "$PHASE" = "Completed" ]; then + echo "" + echo "โœ… Workflow completed successfully!" + break + elif [ "$PHASE" = "Failed" ] || [ "$PHASE" = "Error" ]; then + echo "" + echo "โŒ Workflow failed: $MESSAGE" + exit 1 + fi + + sleep 2 +done + +if [ "$PHASE" != "Completed" ] && [ "$PHASE" != "Failed" ] && [ "$PHASE" != "Error" ]; then + echo "" + echo "โฑ๏ธ Workflow still running after 2 minutes" +fi + +# Step 4: Check events +echo "" +echo "4๏ธโƒฃ Checking workflow events..." +EVENTS=$(curl -s "${BACKEND_URL}/api/projects/${PROJECT}/runs/${SESSION_NAME}/events") +EVENT_COUNT=$(echo "$EVENTS" | jq '.events | length') +echo " Found $EVENT_COUNT events" +echo "$EVENTS" | jq '.events[-3:]' # Show last 3 events + +# Step 5: Final status +echo "" +echo "5๏ธโƒฃ Final session status:" +curl -s "${BACKEND_URL}/api/projects/${PROJECT}/agentic-sessions/${SESSION_NAME}" | jq '.status' + +echo "" +echo "๐ŸŽ‰ Test complete!" +echo " Session: $SESSION_NAME" +echo " View logs: kubectl logs -n $PROJECT -l job-name=${SESSION_NAME}-job -c langgraph-runner" + diff --git a/test-workflow-example/.gitignore b/test-workflow-example/.gitignore new file mode 100644 index 000000000..61982ca55 --- /dev/null +++ b/test-workflow-example/.gitignore @@ -0,0 +1,23 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +build/ +dist/ +*.egg-info/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + diff --git a/test-workflow-example/Dockerfile b/test-workflow-example/Dockerfile new file mode 100644 index 000000000..acff610fe --- /dev/null +++ b/test-workflow-example/Dockerfile @@ -0,0 +1,17 @@ +FROM quay.io/gkrumbach07/langgraph-wrapper:base + +WORKDIR /app/workflow + +# Copy workflow code +COPY app/ ./app/ +COPY requirements.txt ./ + +# Install workflow-specific dependencies (if any) +# The base image already includes langgraph, but we can add more here +RUN pip install --no-cache-dir -r requirements.txt || true + +# Ensure app module is importable +ENV PYTHONPATH=/app/workflow:$PYTHONPATH + +# Verify import works at build time +RUN python -c "import app.workflow; print('โœ… Workflow import successful')" diff --git a/test-workflow-example/PUSH.md b/test-workflow-example/PUSH.md new file mode 100644 index 000000000..fe1aab30c --- /dev/null +++ b/test-workflow-example/PUSH.md @@ -0,0 +1,36 @@ +# Push Workflow Image to Quay.io + +## Image Built Successfully โœ… + +**Base Image**: `quay.io/gkrumbach07/langgraph-wrapper:base` +**Workflow Image**: `quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0` +**Platform**: linux/amd64 (for OpenShift/K8s) + +## Push to Quay.io + +```bash +# Make sure you're logged in +podman login quay.io + +# Push the workflow image +podman push quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0 + +# Get the digest (required for registration) +podman inspect quay.io/gkrumbach07/langgraph-example-workflow:v1.0.0 | jq -r '.[0].RepoDigests[0]' +``` + +## After Pushing + +The digest will look like: +``` +quay.io/gkrumbach07/langgraph-example-workflow@sha256:1a3fffb16fcec808995907361828439aeb63ae28b6e0474c46e0f04ae7cb14c6 +``` + +Use this digest to register the workflow in vTeam. + +## Make Repository Public (if needed) + +1. Go to: https://quay.io/repository/gkrumbach07/langgraph-example-workflow +2. Create the repository if it doesn't exist +3. Settings โ†’ Visibility โ†’ Make Public + diff --git a/test-workflow-example/README.md b/test-workflow-example/README.md new file mode 100644 index 000000000..bd5e70e64 --- /dev/null +++ b/test-workflow-example/README.md @@ -0,0 +1,100 @@ +# Example LangGraph Workflow + +A simple example workflow demonstrating LangGraph integration with the vTeam workflow system. + +## Workflow Structure + +This workflow has 3 sequential nodes: +1. **step_one**: Processes the input message +2. **step_two**: Further processes the message +3. **step_three**: Produces final output + +## Building the Image + +**Important**: Build for Linux AMD64 architecture (required for OpenShift/K8s clusters): + +```bash +# Build for linux/amd64 platform (required for OpenShift/K8s) +podman build --platform linux/amd64 -t quay.io/ambient_code/langgraph-example-workflow:v1.0.0 . + +# Push to registry +podman push quay.io/ambient_code/langgraph-example-workflow:v1.0.0 + +# Get digest (required for registration) +podman inspect quay.io/ambient_code/langgraph-example-workflow:v1.0.0 | jq -r '.[0].RepoDigests[0]' +``` + +**Note**: If you're on an ARM Mac (M1/M2/M3), you must use `--platform linux/amd64` to build for OpenShift/K8s clusters which typically run on AMD64. + +## Registering in vTeam + +```bash +curl -X POST "http://localhost:8080/api/projects/YOUR_PROJECT/workflows" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "example-workflow", + "imageDigest": "quay.io/ambient_code/langgraph-example-workflow@sha256:YOUR_DIGEST", + "graphs": [ + { + "name": "main", + "entry": "app.workflow:build_app" + } + ], + "inputsSchema": { + "type": "object", + "properties": { + "message": { + "type": "string", + "description": "Input message to process" + } + }, + "required": ["message"] + } + }' +``` + +## Running the Workflow + +```bash +curl -X POST "http://localhost:8080/api/projects/YOUR_PROJECT/agentic-sessions" \ + -H "Content-Type: application/json" \ + -d '{ + "workflowRef": { + "name": "example-workflow", + "graph": "main" + }, + "inputs": { + "message": "Hello from LangGraph!" + }, + "displayName": "Example Workflow Run" + }' +``` + +## Local Testing + +```bash +# Install dependencies +pip install langgraph + +# Run locally +python app/workflow.py +``` + +## Input/Output + +**Input:** +```json +{ + "message": "Hello World" +} +``` + +**Output State:** +```json +{ + "message": "Step 1 processed: Hello World", + "step": 3, + "result": "[Step 1] Hello World\n[Step 2] Processed message\n[Step 3] Final result: Step 1 processed: Hello World\n", + "counter": 3 +} +``` diff --git a/test-workflow-example/app/workflow.py b/test-workflow-example/app/workflow.py new file mode 100644 index 000000000..bd86dd704 --- /dev/null +++ b/test-workflow-example/app/workflow.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +""" +Example LangGraph workflow for testing the LangGraph MVP system. +This workflow demonstrates: +- Simple state management +- Multiple nodes with dependencies +- Input/output handling +""" + +from langgraph.graph import StateGraph, END +from typing import TypedDict, Annotated +from operator import add + + +class State(TypedDict): + """Workflow state.""" + message: str + step: int + result: Annotated[str, add] # Accumulate results + counter: int + + +def build_app(): + """ + Build the workflow graph. + + This is the entry point that the LangGraph runner will call. + Must return a compiled graph. + """ + graph = StateGraph(State) + + def step_one(state: State) -> State: + """First processing step.""" + msg = state.get("message", "default") + return { + "message": f"Step 1 processed: {msg}", + "step": 1, + "result": f"[Step 1] {msg}\n", + "counter": state.get("counter", 0) + 1 + } + + def step_two(state: State) -> State: + """Second processing step.""" + return { + "message": state["message"], + "step": 2, + "result": state.get("result", "") + f"[Step 2] Processed message\n", + "counter": state.get("counter", 0) + 1 + } + + def step_three(state: State) -> State: + """Final step that produces output.""" + return { + "message": state["message"], + "step": 3, + "result": state.get("result", "") + f"[Step 3] Final result: {state['message']}\n", + "counter": state.get("counter", 0) + 1 + } + + # Add nodes + graph.add_node("step_one", step_one) + graph.add_node("step_two", step_two) + graph.add_node("step_three", step_three) + + # Define flow + graph.set_entry_point("step_one") + graph.add_edge("step_one", "step_two") + graph.add_edge("step_two", "step_three") + graph.add_edge("step_three", END) + + # Compile and return + return graph.compile() + + +# For testing locally +if __name__ == "__main__": + import asyncio + + async def test(): + """Test the workflow locally.""" + app = build_app() + result = await app.ainvoke({ + "message": "Hello from local test!", + "step": 0, + "result": "", + "counter": 0 + }) + print("Workflow result:") + print(result) + + asyncio.run(test()) + + diff --git a/test-workflow-example/requirements.txt b/test-workflow-example/requirements.txt new file mode 100644 index 000000000..cd20930f6 --- /dev/null +++ b/test-workflow-example/requirements.txt @@ -0,0 +1,3 @@ +langgraph>=0.2.0 +langgraph-checkpoint-postgres>=0.1.0 + From 6e329196f715923f47f684283a872c709734528b Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Tue, 4 Nov 2025 08:23:31 -0500 Subject: [PATCH 3/4] Update .gitignore to include .spikes directory and remove conflicting entries --- .gitignore | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index b787dcf85..7afed7a82 100644 --- a/.gitignore +++ b/.gitignore @@ -117,10 +117,4 @@ Thumbs.db .playwright-mcp/ .stories/ .testplans/ -.spikes/ -<<<<<<< HEAD -.cursor/worktrees.json -======= -components/backend/Dockerfile.local-build -components/backend/BUILD*.md ->>>>>>> current-agent-workflow-syBFS +.spikes/ \ No newline at end of file From f5d9ad8fb946d32ab2279d728bbcbfd3397323e8 Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Tue, 4 Nov 2025 10:56:36 -0500 Subject: [PATCH 4/4] Refactor __init__.py to clarify package purpose - Updated __init__.py to include a comment indicating that the file makes the app directory a Python package. --- test-workflow-example/app/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test-workflow-example/app/__init__.py b/test-workflow-example/app/__init__.py index 0bcfb87c2..253a656e4 100644 --- a/test-workflow-example/app/__init__.py +++ b/test-workflow-example/app/__init__.py @@ -1,2 +1 @@ -__init__.py - +# This file makes the app directory a Python package