Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 129 additions & 15 deletions module/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type openAPIOperation struct {
Parameters []openAPIParameter `yaml:"parameters" json:"parameters"`
RequestBody *openAPIRequestBody `yaml:"requestBody" json:"requestBody"`
Responses map[string]openAPIResponse `yaml:"responses" json:"responses"`
XPipeline string `yaml:"x-pipeline" json:"x-pipeline"`
}

// openAPIParameter describes a path, query, header, or cookie parameter.
Expand Down Expand Up @@ -116,13 +117,14 @@ type openAPISchema struct {
// OpenAPIModule parses an OpenAPI v3 spec and registers HTTP routes that
// validate incoming requests against the spec schemas.
type OpenAPIModule struct {
name string
cfg OpenAPIConfig
spec *openAPISpec
specBytes []byte // raw spec bytes for serving (original file content)
specJSON []byte // cached JSON-serialised spec for /openapi.json endpoint
routerName string
logger *slog.Logger
name string
cfg OpenAPIConfig
spec *openAPISpec
specBytes []byte // raw spec bytes for serving (original file content)
specJSON []byte // cached JSON-serialised spec for /openapi.json endpoint
routerName string
logger *slog.Logger
pipelineLookup PipelineLookupFn
}

// NewOpenAPIModule creates a new OpenAPIModule with the given name and config.
Expand Down Expand Up @@ -199,6 +201,12 @@ func (m *OpenAPIModule) RequiresServices() []modular.ServiceDependency { return
// RouterName returns the optional explicit router module name to attach routes to.
func (m *OpenAPIModule) RouterName() string { return m.routerName }

// SetPipelineLookup sets the function used to resolve pipeline names found in
// x-pipeline operation extensions. This must be called before RegisterRoutes.
func (m *OpenAPIModule) SetPipelineLookup(fn PipelineLookupFn) {
m.pipelineLookup = fn
}

// RegisterRoutes attaches all spec paths (and optional Swagger UI / spec endpoints)
// to the given HTTPRouter.
func (m *OpenAPIModule) RegisterRoutes(router HTTPRouter) {
Expand Down Expand Up @@ -276,17 +284,22 @@ func (m *OpenAPIModule) RegisterRoutes(router HTTPRouter) {
// ---- Handler builders ----

// buildRouteHandler creates an HTTPHandler that validates the request (if enabled)
// and returns a 501 Not Implemented stub response. In a full integration the
// caller would wrap this handler or replace the stub with real business logic.
// and either executes the linked pipeline (if x-pipeline is set) or returns a 501
// Not Implemented stub response.
func (m *OpenAPIModule) buildRouteHandler(specPath, method string, op *openAPIOperation) HTTPHandler {
validateReq := m.cfg.Validation.Request
return &openAPIRouteHandler{
h := &openAPIRouteHandler{
module: m,
specPath: specPath,
method: method,
op: op,
validateReq: validateReq,
}
if op.XPipeline != "" {
h.pipelineName = op.XPipeline
h.pipelineLookup = m.pipelineLookup
}
return h
}

// buildSwaggerUIHandler returns an inline Swagger UI page that loads the spec
Expand All @@ -299,11 +312,13 @@ func (m *OpenAPIModule) buildSwaggerUIHandler(specURL string) HTTPHandler {
// ---- openAPIRouteHandler ----

type openAPIRouteHandler struct {
module *OpenAPIModule
specPath string
method string
op *openAPIOperation
validateReq bool
module *OpenAPIModule
specPath string
method string
op *openAPIOperation
validateReq bool
pipelineName string
pipelineLookup PipelineLookupFn
}

func (h *openAPIRouteHandler) Handle(w http.ResponseWriter, r *http.Request) {
Expand All @@ -319,6 +334,58 @@ func (h *openAPIRouteHandler) Handle(w http.ResponseWriter, r *http.Request) {
}
}

// If x-pipeline is configured, execute the named pipeline.
if h.pipelineName != "" {
if h.pipelineLookup == nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{
"error": fmt.Sprintf("pipeline lookup not configured for pipeline %q", h.pipelineName),
})
return
}

pipeline, ok := h.pipelineLookup(h.pipelineName)
if !ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadGateway)
_ = json.NewEncoder(w).Encode(map[string]string{
"error": fmt.Sprintf("pipeline %q not found", h.pipelineName),
})
return
}

data := openAPIExtractRequestData(r)

rw := &trackedResponseWriter{ResponseWriter: w}
ctx := context.WithValue(r.Context(), HTTPResponseWriterContextKey, rw)
ctx = context.WithValue(ctx, HTTPRequestContextKey, r)

// Use a per-request shallow copy of the pipeline to avoid concurrent
// mutations of shared pipeline state (e.g. sequence/event counters).
pipelineCopy := *pipeline
result, err := pipelineCopy.Execute(ctx, data)
if err != nil {
if !rw.written {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{
"error": fmt.Sprintf("pipeline execution failed: %v", err),
})
}
return
}

if rw.written {
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(result.Current)
return
}

// Default stub: 501 Not Implemented
// In a full integration callers wire their own handler on top of this module.
w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -758,3 +825,50 @@ func supportedContentTypes(content map[string]openAPIMediaType) string {
sort.Strings(types)
return strings.Join(types, ", ")
}

// openAPIExtractRequestData builds a trigger data map from an HTTP request,
// extracting query parameters (first value per key) and, for JSON bodies,
// the decoded top-level body fields (without overwriting query param values).
// The request body is restored after reading so downstream handlers can still
// consume it.
func openAPIExtractRequestData(r *http.Request) map[string]any {
const maxBodySize = 1 << 20 // 1 MiB limit for JSON body parsing

data := make(map[string]any)

// Extract query parameters (first value per key).
for k, v := range r.URL.Query() {
if len(v) > 0 {
data[k] = v[0]
}
}

// Extract JSON body fields if Content-Type is application/json.
if r.Body != nil {
ct := r.Header.Get("Content-Type")
if idx := strings.Index(ct, ";"); idx != -1 {
ct = strings.TrimSpace(ct[:idx])
} else {
ct = strings.TrimSpace(ct)
}

if strings.EqualFold(ct, "application/json") {
bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, maxBodySize))
if err == nil && len(bodyBytes) > 0 {
var bodyData map[string]any
if err := json.Unmarshal(bodyBytes, &bodyData); err == nil {
for k, v := range bodyData {
// Do not overwrite query parameters with body fields.
if _, exists := data[k]; !exists {
data[k] = v
}
}
}
// Restore r.Body so downstream handlers can still read it.
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
}
}

return data
}
Loading
Loading