diff --git a/api-contracts/dispatcher/dispatcher.proto b/api-contracts/dispatcher/dispatcher.proto index b571ec8ae..581ea1f00 100644 --- a/api-contracts/dispatcher/dispatcher.proto +++ b/api-contracts/dispatcher/dispatcher.proto @@ -100,6 +100,9 @@ message AssignedAction { // the step name string stepName = 12; + + // the count number of the retry attempt + int32 retryCount = 13; } message WorkerListenRequest { diff --git a/frontend/docs/pages/home/features/retries/simple.mdx b/frontend/docs/pages/home/features/retries/simple.mdx index 1abe975fb..02dbd93b0 100644 --- a/frontend/docs/pages/home/features/retries/simple.mdx +++ b/frontend/docs/pages/home/features/retries/simple.mdx @@ -1,3 +1,5 @@ +import { Callout, Card, Cards, Steps, Tabs } from "nextra/components"; + # Retry Strategies in Hatchet: Simple Step Retry Hatchet provides a simple and effective way to handle failures in your workflow steps using the step-level retry configuration. This feature allows you to specify the number of times a step should be retried if it fails, helping to improve the reliability and resilience of your workflows. @@ -39,6 +41,48 @@ It's important to note that step-level retries are not suitable for all types of Additionally, if a step interacts with external services or databases, you should ensure that the operation is idempotent (i.e., can be safely repeated without changing the result) before enabling retries. Otherwise, retrying the step could lead to unintended side effects or inconsistencies in your data. +## Accessing the Retry Count in a Step + +If you need to access the current retry count within a step, you can use the `retryCount` method available in the step context: + + + + +```python +@hatchet.step(timeout='2s', retries=3) +def step1(self, context: Context): + retry_count = context.retry_count() + print(f"Retry count: {retry_count}") + raise Exception("Step failed") +``` + + + + +```typescript +async function step(context: Context) { + const retryCount = context.retryCount(); + console.log(`Retry count: ${retryCount}`); + throw new Error("Step failed"); +} +``` + + + + +```go +func(ctx worker.HatchetContext) (result *stepOneOutput, err error) { + count := ctx.RetryCount() + + return &stepOneOutput{ + Message: "Count is: " + strconv.Itoa(count), + }, nil +} +``` + + + + ## Conclusion Hatchet's step-level retry feature is a simple and effective way to handle transient failures in your workflow steps, improving the reliability and resilience of your workflows. By specifying the number of retries for each step, you can ensure that your workflows can recover from temporary issues without requiring complex error handling logic. diff --git a/internal/services/dispatcher/contracts/dispatcher.pb.go b/internal/services/dispatcher/contracts/dispatcher.pb.go index c1aafc93b..00fd9ad72 100644 --- a/internal/services/dispatcher/contracts/dispatcher.pb.go +++ b/internal/services/dispatcher/contracts/dispatcher.pb.go @@ -497,6 +497,8 @@ type AssignedAction struct { ActionPayload string `protobuf:"bytes,11,opt,name=actionPayload,proto3" json:"actionPayload,omitempty"` // the step name StepName string `protobuf:"bytes,12,opt,name=stepName,proto3" json:"stepName,omitempty"` + // the count number of the retry attempt + RetryCount int32 `protobuf:"varint,13,opt,name=retryCount,proto3" json:"retryCount,omitempty"` } func (x *AssignedAction) Reset() { @@ -615,6 +617,13 @@ func (x *AssignedAction) GetStepName() string { return "" } +func (x *AssignedAction) GetRetryCount() int32 { + if x != nil { + return x.RetryCount + } + return 0 +} + type WorkerListenRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1828,7 +1837,7 @@ var file_dispatcher_proto_rawDesc = []byte{ 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, - 0x22, 0x8b, 0x03, 0x0a, 0x0e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, 0x74, + 0x22, 0xab, 0x03, 0x0a, 0x0e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x75, 0x6e, 0x49, 0x64, @@ -1852,7 +1861,9 @@ var file_dispatcher_proto_rawDesc = []byte{ 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x74, 0x65, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0c, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x74, 0x65, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x31, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x74, 0x65, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1e, + 0x0a, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x0d, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x31, 0x0a, 0x13, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 146357427..7d4688dbe 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -62,6 +62,7 @@ func (worker *subscribedWorker) StartStepRun( ActionPayload: string(inputBytes), StepName: stepName, WorkflowRunId: sqlchelpers.UUIDToStr(stepRun.WorkflowRunId), + RetryCount: stepRun.StepRun.RetryCount, }) } @@ -105,6 +106,7 @@ func (worker *subscribedWorker) CancelStepRun( ActionType: contracts.ActionType_CANCEL_STEP_RUN, StepName: stepRun.StepReadableId.String, WorkflowRunId: sqlchelpers.UUIDToStr(stepRun.WorkflowRunId), + RetryCount: stepRun.StepRun.RetryCount, }) } diff --git a/pkg/client/dispatcher.go b/pkg/client/dispatcher.go index 59768e146..7b3d186b9 100644 --- a/pkg/client/dispatcher.go +++ b/pkg/client/dispatcher.go @@ -91,6 +91,9 @@ type Action struct { // the action type ActionType ActionType + + // the count of the retry attempt + RetryCount int32 } type WorkerActionListener interface { @@ -332,6 +335,7 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error ActionId: assignedAction.ActionId, ActionType: actionType, ActionPayload: []byte(unquoted), + RetryCount: assignedAction.RetryCount, } } }() diff --git a/pkg/worker/context.go b/pkg/worker/context.go index f783937a9..03faa1494 100644 --- a/pkg/worker/context.go +++ b/pkg/worker/context.go @@ -41,6 +41,8 @@ type HatchetContext interface { RefreshTimeout(incrementTimeoutBy string) error + RetryCount() int + client() client.Client action() *client.Action @@ -195,6 +197,10 @@ func (h *hatchetContext) StreamEvent(message []byte) { } } +func (h *hatchetContext) RetryCount() int { + return int(h.a.RetryCount) +} + func (h *hatchetContext) index() int { return h.i } diff --git a/pkg/worker/middleware_test.go b/pkg/worker/middleware_test.go index 8604654e6..0c26d3b01 100644 --- a/pkg/worker/middleware_test.go +++ b/pkg/worker/middleware_test.go @@ -64,6 +64,10 @@ func (c *testHatchetContext) StreamEvent(message []byte) { panic("not implemented") } +func (c *testHatchetContext) RetryCount() int { + panic("not implemented") +} + func (c *testHatchetContext) action() *client.Action { panic("not implemented") }