From e97bbba61f9bf2ad64d732c23ad2368b26d5e403 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 05:19:15 +0000 Subject: [PATCH 1/3] Initial plan From a556faa21a96f91b686ce08cbe1b09d9820f663e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 05:38:39 +0000 Subject: [PATCH 2/3] feat: add x-pipeline extension support to OpenAPI module When an OpenAPI operation includes `x-pipeline: `, the route handler executes the named pipeline instead of returning 501 Not Implemented. Pipeline lookup is injected by the wiring hook from the engine's pipeline registry. Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/openapi.go | 97 +++++++++++++++--- module/openapi_test.go | 201 ++++++++++++++++++++++++++++++++++++++ plugins/openapi/plugin.go | 20 ++++ 3 files changed, 303 insertions(+), 15 deletions(-) diff --git a/module/openapi.go b/module/openapi.go index d249205f..f18a39a9 100644 --- a/module/openapi.go +++ b/module/openapi.go @@ -71,6 +71,7 @@ type openAPIOperation struct { Parameters []openAPIParameter `yaml:"parameters" json:"parameters"` RequestBody *openAPIRequestBody `yaml:"requestBody" json:"requestBody"` Responses map[string]openAPIResponse `yaml:"responses" json:"responses"` + XPipeline string `yaml:"x-pipeline" json:"x-pipeline"` } // openAPIParameter describes a path, query, header, or cookie parameter. @@ -116,13 +117,14 @@ type openAPISchema struct { // OpenAPIModule parses an OpenAPI v3 spec and registers HTTP routes that // validate incoming requests against the spec schemas. type OpenAPIModule struct { - name string - cfg OpenAPIConfig - spec *openAPISpec - specBytes []byte // raw spec bytes for serving (original file content) - specJSON []byte // cached JSON-serialised spec for /openapi.json endpoint - routerName string - logger *slog.Logger + name string + cfg OpenAPIConfig + spec *openAPISpec + specBytes []byte // raw spec bytes for serving (original file content) + specJSON []byte // cached JSON-serialised spec for /openapi.json endpoint + routerName string + logger *slog.Logger + pipelineLookup PipelineLookupFn } // NewOpenAPIModule creates a new OpenAPIModule with the given name and config. @@ -199,6 +201,12 @@ func (m *OpenAPIModule) RequiresServices() []modular.ServiceDependency { return // RouterName returns the optional explicit router module name to attach routes to. func (m *OpenAPIModule) RouterName() string { return m.routerName } +// SetPipelineLookup sets the function used to resolve pipeline names found in +// x-pipeline operation extensions. This must be called before RegisterRoutes. +func (m *OpenAPIModule) SetPipelineLookup(fn PipelineLookupFn) { + m.pipelineLookup = fn +} + // RegisterRoutes attaches all spec paths (and optional Swagger UI / spec endpoints) // to the given HTTPRouter. func (m *OpenAPIModule) RegisterRoutes(router HTTPRouter) { @@ -276,17 +284,22 @@ func (m *OpenAPIModule) RegisterRoutes(router HTTPRouter) { // ---- Handler builders ---- // buildRouteHandler creates an HTTPHandler that validates the request (if enabled) -// and returns a 501 Not Implemented stub response. In a full integration the -// caller would wrap this handler or replace the stub with real business logic. +// and either executes the linked pipeline (if x-pipeline is set) or returns a 501 +// Not Implemented stub response. func (m *OpenAPIModule) buildRouteHandler(specPath, method string, op *openAPIOperation) HTTPHandler { validateReq := m.cfg.Validation.Request - return &openAPIRouteHandler{ + h := &openAPIRouteHandler{ module: m, specPath: specPath, method: method, op: op, validateReq: validateReq, } + if op.XPipeline != "" { + h.pipelineName = op.XPipeline + h.pipelineLookup = m.pipelineLookup + } + return h } // buildSwaggerUIHandler returns an inline Swagger UI page that loads the spec @@ -299,11 +312,13 @@ func (m *OpenAPIModule) buildSwaggerUIHandler(specURL string) HTTPHandler { // ---- openAPIRouteHandler ---- type openAPIRouteHandler struct { - module *OpenAPIModule - specPath string - method string - op *openAPIOperation - validateReq bool + module *OpenAPIModule + specPath string + method string + op *openAPIOperation + validateReq bool + pipelineName string + pipelineLookup PipelineLookupFn } func (h *openAPIRouteHandler) Handle(w http.ResponseWriter, r *http.Request) { @@ -319,6 +334,46 @@ func (h *openAPIRouteHandler) Handle(w http.ResponseWriter, r *http.Request) { } } + // If x-pipeline is configured, execute the named pipeline. + if h.pipelineName != "" && h.pipelineLookup != nil { + pipeline, ok := h.pipelineLookup(h.pipelineName) + if !ok { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadGateway) + _ = json.NewEncoder(w).Encode(map[string]string{ + "error": fmt.Sprintf("pipeline %q not found", h.pipelineName), + }) + return + } + + data := openAPIExtractRequestData(r) + + rw := &trackedResponseWriter{ResponseWriter: w} + ctx := context.WithValue(r.Context(), HTTPResponseWriterContextKey, rw) + ctx = context.WithValue(ctx, HTTPRequestContextKey, r) + + result, err := pipeline.Execute(ctx, data) + if err != nil { + if !rw.written { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{ + "error": fmt.Sprintf("pipeline execution failed: %v", err), + }) + } + return + } + + if rw.written { + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(result.Current) + return + } + // Default stub: 501 Not Implemented // In a full integration callers wire their own handler on top of this module. w.Header().Set("Content-Type", "application/json") @@ -758,3 +813,15 @@ func supportedContentTypes(content map[string]openAPIMediaType) string { sort.Strings(types) return strings.Join(types, ", ") } + +// openAPIExtractRequestData builds a trigger data map from an HTTP request, +// extracting query parameters and (for JSON bodies) the decoded body fields. +func openAPIExtractRequestData(r *http.Request) map[string]any { + data := make(map[string]any) + for k, v := range r.URL.Query() { + if len(v) > 0 { + data[k] = v[0] + } + } + return data +} diff --git a/module/openapi_test.go b/module/openapi_test.go index 00942366..488518d4 100644 --- a/module/openapi_test.go +++ b/module/openapi_test.go @@ -14,6 +14,35 @@ import ( // ---- spec fixtures ---- +const xPipelineYAML = ` +openapi: "3.0.0" +info: + title: Pipeline API + version: "1.0.0" +paths: + /greet: + get: + operationId: greetUser + summary: Greet the user + x-pipeline: greet-pipeline + parameters: + - name: name + in: query + required: false + schema: + type: string + responses: + "200": + description: Greeting + /stub: + get: + operationId: stubOp + summary: No pipeline + responses: + "200": + description: OK +` + const petstoreYAML = ` openapi: "3.0.0" info: @@ -762,3 +791,175 @@ func routeKeys(r *testRouter) []string { } return keys } + +// ---- x-pipeline tests ---- + +func TestOpenAPIModule_XPipelineParsed(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", xPipelineYAML) + + mod := NewOpenAPIModule("pipe-api", OpenAPIConfig{ + SpecFile: specPath, + BasePath: "/api", + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init: %v", err) + } + + // Verify x-pipeline was parsed for the greet operation. + pathItem := mod.spec.Paths["/greet"] + if pathItem == nil { + t.Fatal("expected /greet path in spec") + } + op := pathItem["get"] + if op == nil { + t.Fatal("expected GET operation on /greet") + } + if op.XPipeline != "greet-pipeline" { + t.Errorf("expected x-pipeline 'greet-pipeline', got %q", op.XPipeline) + } + + // Verify the stub operation has no x-pipeline. + stubItem := mod.spec.Paths["/stub"] + if stubItem == nil { + t.Fatal("expected /stub path in spec") + } + stubOp := stubItem["get"] + if stubOp == nil { + t.Fatal("expected GET operation on /stub") + } + if stubOp.XPipeline != "" { + t.Errorf("expected empty x-pipeline on stub, got %q", stubOp.XPipeline) + } +} + +func TestOpenAPIModule_XPipeline_ExecutesPipeline(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", xPipelineYAML) + + mod := NewOpenAPIModule("pipe-api", OpenAPIConfig{ + SpecFile: specPath, + BasePath: "/api", + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init: %v", err) + } + + // Create a simple pipeline that sets a greeting. + greetStep := &stubPipelineStep{ + name: "set-greeting", + exec: func(_ context.Context, pc *PipelineContext) (*StepResult, error) { + name := "world" + if n, ok := pc.TriggerData["name"].(string); ok && n != "" { + name = n + } + return &StepResult{Output: map[string]any{"greeting": "hello " + name}}, nil + }, + } + greetPipeline := &Pipeline{ + Name: "greet-pipeline", + Steps: []PipelineStep{greetStep}, + } + + mod.SetPipelineLookup(func(name string) (*Pipeline, bool) { + if name == "greet-pipeline" { + return greetPipeline, true + } + return nil, false + }) + + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("GET", "/api/greet") + if h == nil { + t.Fatal("GET /api/greet handler not found") + } + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/api/greet?name=Alice", nil) + h.Handle(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]any + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("invalid JSON response: %v", err) + } + if resp["greeting"] != "hello Alice" { + t.Errorf("expected greeting 'hello Alice', got %v", resp["greeting"]) + } +} + +func TestOpenAPIModule_XPipeline_NotFound(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", xPipelineYAML) + + mod := NewOpenAPIModule("pipe-api", OpenAPIConfig{ + SpecFile: specPath, + BasePath: "/api", + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init: %v", err) + } + + // Set a lookup that never finds the pipeline. + mod.SetPipelineLookup(func(name string) (*Pipeline, bool) { + return nil, false + }) + + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("GET", "/api/greet") + if h == nil { + t.Fatal("GET /api/greet handler not found") + } + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/api/greet", nil) + h.Handle(w, r) + + if w.Code != http.StatusBadGateway { + t.Errorf("expected 502 for missing pipeline, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestOpenAPIModule_XPipeline_StubWithoutPipeline(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", xPipelineYAML) + + mod := NewOpenAPIModule("pipe-api", OpenAPIConfig{ + SpecFile: specPath, + BasePath: "/api", + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init: %v", err) + } + + // No pipeline lookup set — routes without x-pipeline should still return 501. + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("GET", "/api/stub") + if h == nil { + t.Fatal("GET /api/stub handler not found") + } + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/api/stub", nil) + h.Handle(w, r) + + if w.Code != http.StatusNotImplemented { + t.Errorf("expected 501 for stub, got %d: %s", w.Code, w.Body.String()) + } +} + +// stubPipelineStep is a minimal PipelineStep implementation for testing. +type stubPipelineStep struct { + name string + exec func(ctx context.Context, pc *PipelineContext) (*StepResult, error) +} + +func (s *stubPipelineStep) Name() string { return s.name } +func (s *stubPipelineStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + return s.exec(ctx, pc) +} diff --git a/plugins/openapi/plugin.go b/plugins/openapi/plugin.go index 92c796bd..d987a10d 100644 --- a/plugins/openapi/plugin.go +++ b/plugins/openapi/plugin.go @@ -187,6 +187,12 @@ func (p *Plugin) WiringHooks() []plugin.WiringHook { } } +// pipelineProvider is a local interface satisfied by the workflow engine, +// used to resolve pipelines referenced by x-pipeline in OpenAPI operations. +type pipelineProvider interface { + GetPipeline(name string) (*module.Pipeline, bool) +} + // wireOpenAPIRoutes finds all OpenAPIModule instances registered as services and // registers their routes on the best matching HTTPRouter. func wireOpenAPIRoutes(app modular.Application, cfg *config.WorkflowConfig) error { @@ -219,6 +225,13 @@ func wireOpenAPIRoutes(app modular.Application, cfg *config.WorkflowConfig) erro } } + // Resolve the pipeline provider (engine) for x-pipeline support. + var pp pipelineProvider + var engineSvc any + if err := app.GetService("workflowEngine", &engineSvc); err == nil && engineSvc != nil { + pp, _ = engineSvc.(pipelineProvider) + } + for _, svc := range app.SvcRegistry() { oaMod, ok := svc.(*module.OpenAPIModule) if !ok { @@ -269,6 +282,13 @@ func wireOpenAPIRoutes(app modular.Application, cfg *config.WorkflowConfig) erro continue } + // Inject pipeline lookup so x-pipeline operations can execute pipelines. + if pp != nil { + oaMod.SetPipelineLookup(func(name string) (*module.Pipeline, bool) { + return pp.GetPipeline(name) + }) + } + oaMod.RegisterRoutes(targetRouter) } From 039c0e2fd480e31b0213cc7f8daae7c2e31cf260 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 06:00:33 +0000 Subject: [PATCH 3/3] fix: address review feedback on x-pipeline handler - Return 500 (not silent 501 fallback) when x-pipeline is set but pipelineLookup is nil, making misconfiguration easier to diagnose - Use per-request shallow copy of Pipeline to avoid concurrent mutations of shared pipeline state (seqNum, etc.) - Implement JSON body extraction in openAPIExtractRequestData with a 1 MiB limit, body restoration, and query-param precedence over body - Add tests: nil-lookup 500, JSON body extraction, query-param priority, non-JSON body skipped Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/openapi.go | 53 ++++++++++++++++++++++++-- module/openapi_test.go | 85 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 3 deletions(-) diff --git a/module/openapi.go b/module/openapi.go index f18a39a9..e91cfbae 100644 --- a/module/openapi.go +++ b/module/openapi.go @@ -335,7 +335,16 @@ func (h *openAPIRouteHandler) Handle(w http.ResponseWriter, r *http.Request) { } // If x-pipeline is configured, execute the named pipeline. - if h.pipelineName != "" && h.pipelineLookup != nil { + if h.pipelineName != "" { + if h.pipelineLookup == nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{ + "error": fmt.Sprintf("pipeline lookup not configured for pipeline %q", h.pipelineName), + }) + return + } + pipeline, ok := h.pipelineLookup(h.pipelineName) if !ok { w.Header().Set("Content-Type", "application/json") @@ -352,7 +361,10 @@ func (h *openAPIRouteHandler) Handle(w http.ResponseWriter, r *http.Request) { ctx := context.WithValue(r.Context(), HTTPResponseWriterContextKey, rw) ctx = context.WithValue(ctx, HTTPRequestContextKey, r) - result, err := pipeline.Execute(ctx, data) + // Use a per-request shallow copy of the pipeline to avoid concurrent + // mutations of shared pipeline state (e.g. sequence/event counters). + pipelineCopy := *pipeline + result, err := pipelineCopy.Execute(ctx, data) if err != nil { if !rw.written { w.Header().Set("Content-Type", "application/json") @@ -815,13 +827,48 @@ func supportedContentTypes(content map[string]openAPIMediaType) string { } // openAPIExtractRequestData builds a trigger data map from an HTTP request, -// extracting query parameters and (for JSON bodies) the decoded body fields. +// extracting query parameters (first value per key) and, for JSON bodies, +// the decoded top-level body fields (without overwriting query param values). +// The request body is restored after reading so downstream handlers can still +// consume it. func openAPIExtractRequestData(r *http.Request) map[string]any { + const maxBodySize = 1 << 20 // 1 MiB limit for JSON body parsing + data := make(map[string]any) + + // Extract query parameters (first value per key). for k, v := range r.URL.Query() { if len(v) > 0 { data[k] = v[0] } } + + // Extract JSON body fields if Content-Type is application/json. + if r.Body != nil { + ct := r.Header.Get("Content-Type") + if idx := strings.Index(ct, ";"); idx != -1 { + ct = strings.TrimSpace(ct[:idx]) + } else { + ct = strings.TrimSpace(ct) + } + + if strings.EqualFold(ct, "application/json") { + bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, maxBodySize)) + if err == nil && len(bodyBytes) > 0 { + var bodyData map[string]any + if err := json.Unmarshal(bodyBytes, &bodyData); err == nil { + for k, v := range bodyData { + // Do not overwrite query parameters with body fields. + if _, exists := data[k]; !exists { + data[k] = v + } + } + } + // Restore r.Body so downstream handlers can still read it. + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + } + } + } + return data } diff --git a/module/openapi_test.go b/module/openapi_test.go index 488518d4..87d889b5 100644 --- a/module/openapi_test.go +++ b/module/openapi_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "io" "net/http" "net/http/httptest" "os" @@ -953,6 +954,90 @@ func TestOpenAPIModule_XPipeline_StubWithoutPipeline(t *testing.T) { } } +func TestOpenAPIModule_XPipeline_NilLookup(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", xPipelineYAML) + + mod := NewOpenAPIModule("pipe-api", OpenAPIConfig{ + SpecFile: specPath, + BasePath: "/api", + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init: %v", err) + } + + // No lookup set — x-pipeline route should return 500. + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("GET", "/api/greet") + if h == nil { + t.Fatal("GET /api/greet handler not found") + } + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/api/greet", nil) + h.Handle(w, r) + + if w.Code != http.StatusInternalServerError { + t.Errorf("expected 500 for nil pipeline lookup, got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "pipeline lookup not configured") { + t.Errorf("expected descriptive error, got: %s", w.Body.String()) + } +} + +func TestOpenAPIExtractRequestData_JSONBody(t *testing.T) { + body := `{"foo": "bar", "count": 42}` + r := httptest.NewRequest(http.MethodPost, "/test?q=1", bytes.NewBufferString(body)) + r.Header.Set("Content-Type", "application/json") + + data := openAPIExtractRequestData(r) + + if data["q"] != "1" { + t.Errorf("expected query param q=1, got %v", data["q"]) + } + if data["foo"] != "bar" { + t.Errorf("expected body field foo=bar, got %v", data["foo"]) + } + if data["count"] != float64(42) { + t.Errorf("expected body field count=42, got %v", data["count"]) + } + + // Body must be restored. + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("failed to read restored body: %v", err) + } + if string(bodyBytes) != body { + t.Errorf("body not restored: got %q, want %q", string(bodyBytes), body) + } +} + +func TestOpenAPIExtractRequestData_QueryParamNotOverwrittenByBody(t *testing.T) { + body := `{"name": "from-body"}` + r := httptest.NewRequest(http.MethodPost, "/test?name=from-query", bytes.NewBufferString(body)) + r.Header.Set("Content-Type", "application/json") + + data := openAPIExtractRequestData(r) + + // Query param should win over body field. + if data["name"] != "from-query" { + t.Errorf("expected query param to win, got %v", data["name"]) + } +} + +func TestOpenAPIExtractRequestData_NonJSONBodySkipped(t *testing.T) { + r := httptest.NewRequest(http.MethodPost, "/test", bytes.NewBufferString("plain text")) + r.Header.Set("Content-Type", "text/plain") + + data := openAPIExtractRequestData(r) + + // No body fields should appear. + if len(data) != 0 { + t.Errorf("expected empty data for non-JSON body, got %v", data) + } +} + // stubPipelineStep is a minimal PipelineStep implementation for testing. type stubPipelineStep struct { name string