diff --git a/.changeset/forward-parent-pointer-discard.md b/.changeset/forward-parent-pointer-discard.md new file mode 100644 index 00000000000..b55505ef1ec --- /dev/null +++ b/.changeset/forward-parent-pointer-discard.md @@ -0,0 +1,5 @@ +--- +"@effect/workflow": patch +--- + +Forward the parent pointer when spawning a child workflow with `discard: true` diff --git a/packages/cluster/test/ClusterWorkflowEngine.test.ts b/packages/cluster/test/ClusterWorkflowEngine.test.ts index dc4a9317e4e..5b42e034204 100644 --- a/packages/cluster/test/ClusterWorkflowEngine.test.ts +++ b/packages/cluster/test/ClusterWorkflowEngine.test.ts @@ -245,6 +245,36 @@ describe.concurrent("ClusterWorkflowEngine", () => { assert.isTrue(flags.get("catch")) }).pipe(Effect.provide(TestWorkflowLayer))) + + it.effect("forwards parent pointer when spawning a child with discard:true", () => + Effect.gen(function*() { + const driver = yield* MessageStorage.MemoryDriver + const fiber = yield* DiscardParentWorkflow.execute({ id: "discard-parent-1" }).pipe( + Effect.fork + ) + yield* TestClock.adjust(1) + yield* Fiber.join(fiber) + + const findRun = (entityType: string) => + driver.journal.find( + (envelope) => + envelope._tag === "Request" && + envelope.address.entityType === entityType && + envelope.tag === "run" + ) + const parentRun = findRun("Workflow/DiscardParentWorkflow") + const childRun = findRun("Workflow/DiscardChildWorkflow") + assert.exists(parentRun, "expected a run envelope for the parent workflow") + assert.exists(childRun, "expected a run envelope for the child workflow") + + const childPayload = (childRun as { payload: Record }).payload + const parent = childPayload["~@effect/workflow/parent"] as + | { workflowName: string; executionId: string } + | undefined + assert.exists(parent, "child payload should carry the parent pointer") + expect(parent!.workflowName).toEqual("DiscardParentWorkflow") + expect(parent!.executionId).toEqual((parentRun as { address: { entityId: string } }).address.entityId) + }).pipe(Effect.provide(TestWorkflowLayer))) }) const TestShardingConfig = ShardingConfig.layer({ @@ -498,6 +528,34 @@ const ChildWorkflowLayer = ChildWorkflow.toLayer(Effect.fnUntraced(function*() { flags.set("child-end", true) })) +const DiscardParentWorkflow = Workflow.make({ + name: "DiscardParentWorkflow", + payload: { id: Schema.String }, + idempotencyKey(payload) { + return payload.id + } +}) + +const DiscardChildWorkflow = Workflow.make({ + name: "DiscardChildWorkflow", + payload: { id: Schema.String }, + idempotencyKey(payload) { + return payload.id + } +}) + +const DiscardParentWorkflowLayer = DiscardParentWorkflow.toLayer( + Effect.fnUntraced(function*({ id }) { + yield* DiscardChildWorkflow.execute({ id: `${id}-child` }, { discard: true }) + }) +) + +const DiscardChildWorkflowLayer = DiscardChildWorkflow.toLayer( + Effect.fnUntraced(function*() { + return yield* Effect.void + }) +) + const SuspendOnFailureWorkflow = Workflow.make({ name: "SuspendOnFailureWorkflow", payload: { @@ -556,6 +614,8 @@ const TestWorkflowLayer = EmailWorkflowLayer.pipe( Layer.merge(DurableRaceWorkflowLayer), Layer.merge(ParentWorkflowLayer), Layer.merge(ChildWorkflowLayer), + Layer.merge(DiscardParentWorkflowLayer), + Layer.merge(DiscardChildWorkflowLayer), Layer.merge(SuspendOnFailureWorkflowLayer), Layer.merge(CatchWorkflowLayer), Layer.provideMerge(Flags.Default), diff --git a/packages/workflow/src/WorkflowEngine.ts b/packages/workflow/src/WorkflowEngine.ts index 5a2b58e0280..bf84d944b4c 100644 --- a/packages/workflow/src/WorkflowEngine.ts +++ b/packages/workflow/src/WorkflowEngine.ts @@ -366,7 +366,8 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] => yield* options.execute(self, { executionId, payload: payload as object, - discard: true + discard: true, + parent: Option.getOrUndefined(parentInstance) }) return executionId }