Skip to content

feat: add step.foreach and step.webhook_verify pipeline steps#158

Merged
intel352 merged 1 commit intomainfrom
feature/step-foreach-webhook-verify
Feb 25, 2026
Merged

feat: add step.foreach and step.webhook_verify pipeline steps#158
intel352 merged 1 commit intomainfrom
feature/step-foreach-webhook-verify

Conversation

@intel352
Copy link
Contributor

Summary

  • step.foreach: Iterates over a collection (resolved from pipeline context by key or dot-path like steps.fetch.rows), executes inline sub-steps for each item, and returns {results: [...], count: N}. Sub-steps share a child context with configurable item_key (default "item") and index_key (default "index"). An error in any sub-step stops iteration immediately.

  • step.webhook_verify: Validates HMAC-SHA256 signatures on incoming webhook requests before the pipeline continues. Supports three providers:

    • github: X-Hub-Signature-256 header (sha256=<hex>)
    • stripe: Stripe-Signature header (t=<timestamp>,v1=<hex>) with 5-minute timestamp window enforcement
    • generic: Configurable header (default X-Signature, raw hex)
    • On failure: writes HTTP 401 and returns StepResult{Stop: true}
    • Secrets support env var expansion ($MY_SECRET / ${MY_SECRET})
    • Caches raw body in _raw_body metadata to avoid double-reads

Implementation notes

  • step.foreach uses a lazy registry getter (func() *StepRegistry) so its factory is registered before all step types are loaded, allowing sub-steps to reference any step type registered by any plugin
  • The plugin stores the concrete *module.StepRegistry (via type assertion in SetStepRegistry) so step.foreach can call Create() at pipeline-configuration time
  • Collection resolution supports simple keys, dot-paths through step outputs, and trigger data
  • All helper functions are prefixed with foreach to avoid collision with existing walkPath/splitDotPath in pipeline_step_sub_workflow.go

Test plan

  • TestForEachStep_IteratesOverSliceOfMaps — iterates, passes item/index to templates
  • TestForEachStep_EmptyCollection — returns {results: [], count: 0}
  • TestForEachStep_DefaultItemAndIndexKeys — defaults to "item" / "index"
  • TestForEachStep_SubStepErrorStopsIteration — stops after first sub-step error
  • TestForEachStep_FactoryRejectsMissingCollection — validation error
  • TestForEachStep_FactoryRejectsInvalidSubStepType — unknown type error
  • TestForEachStep_IteratesWithStepOutputAccess — chained sub-steps share context
  • TestForEachStep_CollectionFromStepOutputs — dot-path "steps.fetch.rows" resolution
  • TestWebhookVerifyStep_ValidGitHub — HMAC matches
  • TestWebhookVerifyStep_InvalidGitHub — wrong signature → Stop + 401
  • TestWebhookVerifyStep_MissingGitHubHeader — missing header → Stop
  • TestWebhookVerifyStep_ValidStripe — current timestamp passes
  • TestWebhookVerifyStep_StripeExpiredTimestamp — 10 min old → Stop
  • TestWebhookVerifyStep_InvalidStripeSignature — wrong secret → Stop + 401
  • TestWebhookVerifyStep_ValidGeneric — custom header
  • TestWebhookVerifyStep_GenericDefaultHeader — defaults to X-Signature
  • TestWebhookVerifyStep_MissingGenericHeader — missing → Stop
  • TestWebhookVerifyStep_NoHTTPRequest — no request in context → Stop
  • TestWebhookVerifyStep_FactoryRejects* — validation errors
  • TestWebhookVerifyStep_RawBodyCachedInMetadata — uses pre-read body from metadata
  • Full go test ./... suite passes

🤖 Generated with Claude Code

step.foreach iterates over a collection (resolved from pipeline context
by key or dot-path like "steps.fetch.rows"), executes inline sub-steps
for each item, and returns {results, count}. Sub-steps share a child
context with item_key (default "item") and index_key (default "index")
injected. An error in any sub-step stops iteration immediately.

step.webhook_verify validates HMAC-SHA256 signatures for incoming
webhooks before the pipeline continues. Supports:
- GitHub: X-Hub-Signature-256 (sha256=<hex>)
- Stripe: Stripe-Signature (t=<ts>,v1=<hex>) with 5-min timestamp window
- Generic: configurable header (default X-Signature, raw hex)

On failure writes HTTP 401 and returns StepResult{Stop: true}.
On success returns {verified: true}.

The plugin stores the concrete *module.StepRegistry so step.foreach can
create sub-steps of any registered type at pipeline-configuration time.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings February 25, 2026 00:26
@intel352 intel352 merged commit bd13eeb into main Feb 25, 2026
16 checks passed
@intel352 intel352 deleted the feature/step-foreach-webhook-verify branch February 25, 2026 00:27
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds two new generic pipeline steps to the workflow engine’s pipeline-steps plugin: an iterator (step.foreach) for running inline sub-steps over a collection, and an HMAC-based request authenticator (step.webhook_verify) for validating incoming webhook signatures before continuing a pipeline.

Changes:

  • Register new step types step.foreach and step.webhook_verify in the pipeline-steps plugin and update plugin tests accordingly.
  • Implement step.foreach with collection resolution (simple key, trigger data, and dot-path through steps.* outputs) and inline sub-step construction.
  • Implement step.webhook_verify for GitHub/Stripe/generic signature verification with tests.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
plugins/pipelinesteps/plugin.go Registers the two new step factories and stores a concrete step registry for foreach.
plugins/pipelinesteps/plugin_test.go Updates expected step types and factory count assertions.
module/pipeline_step_foreach.go Implements step.foreach factory + execution and collection/path helpers.
module/pipeline_step_foreach_test.go Adds unit tests for step.foreach.
module/pipeline_step_webhook_verify.go Implements step.webhook_verify factory + verification logic and request-body caching.
module/pipeline_step_webhook_verify_test.go Adds unit tests for GitHub/Stripe/generic verification behavior.

Comment on lines +119 to +133
for _, step := range s.subSteps {
result, execErr := step.Execute(ctx, childPC)
if execErr != nil {
return nil, fmt.Errorf("foreach step %q: iteration %d, sub-step %q failed: %w",
s.name, i, step.Name(), execErr)
}
if result != nil && result.Output != nil {
childPC.MergeStepOutput(step.Name(), result.Output)
maps.Copy(iterResult, result.Output)
}
if result != nil && result.Stop {
break
}
}
collected = append(collected, iterResult)
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any sub-step returns Stop: true (e.g. step.json_response/step.delegate), step.foreach currently only breaks the inner sub-step loop and then continues iterating remaining items, ultimately returning Stop: false. This prevents sub-steps from stopping the pipeline as intended. Propagate the stop signal by returning Stop: true from step.foreach (and likely stop iterating) when a sub-step sets it.

Copilot uses AI. Check for mistakes.
// including types registered by other plugins loaded after this one.
"step.foreach": wrapStepFactory(module.NewForEachStepFactory(func() *module.StepRegistry {
return p.concreteStepRegistry
}, nil)),
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

step.foreach is registered by calling module.NewForEachStepFactory(..., nil), so sub-steps will be built with a nil modular.Application and steps that depend on the app service registry (db_exec/db_query/delegate/etc.) will break. After adjusting NewForEachStepFactory to use the app provided to the returned StepFactory, update this registration to stop passing nil (and consider returning a clear error if p.concreteStepRegistry is still nil when the factory is invoked).

Suggested change
}, nil)),
})),

Copilot uses AI. Check for mistakes.
if w, ok := pc.Metadata["_http_response_writer"].(http.ResponseWriter); ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte(`{"error":"unauthorized","reason":"webhook signature verification failed"}`))
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When unauthorized writes a 401 response to _http_response_writer, it should also set pc.Metadata["_response_handled"] = true. The HTTP handlers use this flag to decide whether to write a fallback JSON response; without it, they can append an additional response body after the 401, potentially leaking data.

Suggested change
_, _ = w.Write([]byte(`{"error":"unauthorized","reason":"webhook signature verification failed"}`))
_, _ = w.Write([]byte(`{"error":"unauthorized","reason":"webhook signature verification failed"}`))
pc.Metadata["_response_handled"] = true

Copilot uses AI. Check for mistakes.
Comment on lines +216 to +224
body, err := io.ReadAll(req.Body)
if err != nil {
return nil, err
}

// Cache it for other steps that may need the raw body
pc.Metadata["_raw_body"] = body

return body, nil
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readBody consumes req.Body via io.ReadAll but never restores it. Downstream steps like step.request_parse and step.validate_request_body read from req.Body directly, so after step.webhook_verify runs they will see an empty body. After reading, reset req.Body to a fresh reader (e.g., io.NopCloser(bytes.NewReader(body))) in addition to caching _raw_body.

Copilot uses AI. Check for mistakes.
Comment on lines +319 to +321
// Compile-time check: ensure the step_fail factory signature matches StepFactory.
// This avoids having an unused import of fmt.
var _ = fmt.Sprintf
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fmt import is only kept alive via var _ = fmt.Sprintf. It would be cleaner to remove the unused import and this dummy assignment (or use fmt for real assertions/logging if needed).

Copilot uses AI. Check for mistakes.
Comment on lines +26 to +73
func NewForEachStepFactory(registryFn func() *StepRegistry, app modular.Application) StepFactory {
return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) {
registry := registryFn()

collection, _ := config["collection"].(string)
if collection == "" {
return nil, fmt.Errorf("foreach step %q: 'collection' is required", name)
}

itemKey, _ := config["item_key"].(string)
if itemKey == "" {
itemKey = "item"
}

indexKey, _ := config["index_key"].(string)
if indexKey == "" {
indexKey = "index"
}

// Build sub-steps from inline config
stepsRaw, _ := config["steps"].([]any)
subSteps := make([]PipelineStep, 0, len(stepsRaw))
for i, raw := range stepsRaw {
stepCfg, ok := raw.(map[string]any)
if !ok {
return nil, fmt.Errorf("foreach step %q: steps[%d] must be a map", name, i)
}

stepType, _ := stepCfg["type"].(string)
if stepType == "" {
return nil, fmt.Errorf("foreach step %q: steps[%d] missing 'type'", name, i)
}

stepName, _ := stepCfg["name"].(string)
if stepName == "" {
stepName = fmt.Sprintf("%s-sub-%d", name, i)
}

// Build the step config without meta fields
subCfg := make(map[string]any)
for k, v := range stepCfg {
if k != "type" && k != "name" {
subCfg[k] = v
}
}

step, err := registry.Create(stepType, stepName, subCfg, app)
if err != nil {
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewForEachStepFactory captures a modular.Application argument but the returned StepFactory ignores its own app parameter (it uses the captured app in registry.Create). In engine usage the app is only available at step-factory invocation time, so sub-steps that require app (e.g. delegate/db_*) will be created with a nil app and later fail. Use the app parameter passed to the returned StepFactory (and drop the captured app argument) when calling registry.Create.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants