From 700f3fb379c3e3e9f3e2f3594e042ddf4186983d Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Wed, 22 May 2024 12:48:51 -0400 Subject: [PATCH 1/6] feat: expose retry count --- api-contracts/dispatcher/dispatcher.proto | 3 +++ .../dispatcher/contracts/dispatcher.pb.go | 15 +++++++++++++-- internal/services/dispatcher/server.go | 2 ++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/api-contracts/dispatcher/dispatcher.proto b/api-contracts/dispatcher/dispatcher.proto index b571ec8ae..581ea1f00 100644 --- a/api-contracts/dispatcher/dispatcher.proto +++ b/api-contracts/dispatcher/dispatcher.proto @@ -100,6 +100,9 @@ message AssignedAction { // the step name string stepName = 12; + + // the count number of the retry attempt + int32 retryCount = 13; } message WorkerListenRequest { diff --git a/internal/services/dispatcher/contracts/dispatcher.pb.go b/internal/services/dispatcher/contracts/dispatcher.pb.go index c1aafc93b..00fd9ad72 100644 --- a/internal/services/dispatcher/contracts/dispatcher.pb.go +++ b/internal/services/dispatcher/contracts/dispatcher.pb.go @@ -497,6 +497,8 @@ type AssignedAction struct { ActionPayload string `protobuf:"bytes,11,opt,name=actionPayload,proto3" json:"actionPayload,omitempty"` // the step name StepName string `protobuf:"bytes,12,opt,name=stepName,proto3" json:"stepName,omitempty"` + // the count number of the retry attempt + RetryCount int32 `protobuf:"varint,13,opt,name=retryCount,proto3" json:"retryCount,omitempty"` } func (x *AssignedAction) Reset() { @@ -615,6 +617,13 @@ func (x *AssignedAction) GetStepName() string { return "" } +func (x *AssignedAction) GetRetryCount() int32 { + if x != nil { + return x.RetryCount + } + return 0 +} + type WorkerListenRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1828,7 +1837,7 @@ var file_dispatcher_proto_rawDesc = []byte{ 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, - 0x22, 0x8b, 0x03, 0x0a, 0x0e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, 0x74, + 0x22, 0xab, 0x03, 0x0a, 0x0e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x75, 0x6e, 0x49, 0x64, @@ -1852,7 +1861,9 @@ var file_dispatcher_proto_rawDesc = []byte{ 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x74, 0x65, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0c, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x74, 0x65, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x31, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x74, 0x65, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1e, + 0x0a, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x0d, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x31, 0x0a, 0x13, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index f8838bace..55ab6979a 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -62,6 +62,7 @@ func (worker *subscribedWorker) StartStepRun( ActionPayload: string(inputBytes), StepName: stepName, WorkflowRunId: sqlchelpers.UUIDToStr(stepRun.WorkflowRunId), + RetryCount: stepRun.StepRun.RetryCount, }) } @@ -105,6 +106,7 @@ func (worker *subscribedWorker) CancelStepRun( ActionType: contracts.ActionType_CANCEL_STEP_RUN, StepName: stepRun.StepReadableId.String, WorkflowRunId: sqlchelpers.UUIDToStr(stepRun.WorkflowRunId), + RetryCount: stepRun.StepRun.RetryCount, }) } From 680895ecea1c25bcc2715eedd7d01b90b5c90db5 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Wed, 22 May 2024 13:09:13 -0400 Subject: [PATCH 2/6] feat: expose retry count go --- pkg/client/dispatcher.go | 4 ++++ pkg/worker/context.go | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/pkg/client/dispatcher.go b/pkg/client/dispatcher.go index 59768e146..7b3d186b9 100644 --- a/pkg/client/dispatcher.go +++ b/pkg/client/dispatcher.go @@ -91,6 +91,9 @@ type Action struct { // the action type ActionType ActionType + + // the count of the retry attempt + RetryCount int32 } type WorkerActionListener interface { @@ -332,6 +335,7 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error ActionId: assignedAction.ActionId, ActionType: actionType, ActionPayload: []byte(unquoted), + RetryCount: assignedAction.RetryCount, } } }() diff --git a/pkg/worker/context.go b/pkg/worker/context.go index f6965b990..e8a29b17f 100644 --- a/pkg/worker/context.go +++ b/pkg/worker/context.go @@ -40,6 +40,8 @@ type HatchetContext interface { RefreshTimeout(incrementTimeoutBy string) error + RetryCount() int + client() client.Client action() *client.Action @@ -194,6 +196,10 @@ func (h *hatchetContext) StreamEvent(message []byte) { } } +func (h *hatchetContext) RetryCount() int { + return int(h.a.RetryCount) +} + func (h *hatchetContext) index() int { return h.i } From f75d2c5d46b1cd82c28c82719d0498fc65f4bbe8 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Wed, 22 May 2024 13:09:23 -0400 Subject: [PATCH 3/6] docs: accessing retry count --- .../pages/home/features/retries/simple.mdx | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/frontend/docs/pages/home/features/retries/simple.mdx b/frontend/docs/pages/home/features/retries/simple.mdx index 1abe975fb..fb1b3a2a2 100644 --- a/frontend/docs/pages/home/features/retries/simple.mdx +++ b/frontend/docs/pages/home/features/retries/simple.mdx @@ -39,8 +39,47 @@ It's important to note that step-level retries are not suitable for all types of Additionally, if a step interacts with external services or databases, you should ensure that the operation is idempotent (i.e., can be safely repeated without changing the result) before enabling retries. Otherwise, retrying the step could lead to unintended side effects or inconsistencies in your data. +## Accessing the Retry Count in a Step + +If you need to access the current retry count within a step, you can use the `retryCount` method available in the step context: + + + +```python +@hatchet.step(timeout='2s', retries=3) +def step1(self, context: Context): + retry_count = context.retry_count() + print(f"Retry count: {retry_count}") + raise Exception("Step failed") +``` + + +```typescript +async function step(context: Context) { + const retryCount = context.retryCount(); + console.log(`Retry count: ${retryCount}`); + throw new Error("Step failed"); +} +```` + + +```go +func(ctx worker.HatchetContext) (result *stepOneOutput, err error) { + count := ctx.RetryCount() + +return &stepOneOutput{ +Message: "Count is: " + strconv.Itoa(count), +}, nil +} + +``` + + + + ## Conclusion Hatchet's step-level retry feature is a simple and effective way to handle transient failures in your workflow steps, improving the reliability and resilience of your workflows. By specifying the number of retries for each step, you can ensure that your workflows can recover from temporary issues without requiring complex error handling logic. Remember to use retries judiciously and only for steps that are idempotent and can safely be repeated. For more advanced retry strategies, such as exponential backoff or circuit breaking, stay tuned for future updates to Hatchet's retry capabilities. +``` From 8a56b24e083b7e5a0c5d389745d26f8a06148e73 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Wed, 22 May 2024 13:12:39 -0400 Subject: [PATCH 4/6] fix: import --- frontend/docs/pages/home/features/retries/simple.mdx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/frontend/docs/pages/home/features/retries/simple.mdx b/frontend/docs/pages/home/features/retries/simple.mdx index fb1b3a2a2..6d674f374 100644 --- a/frontend/docs/pages/home/features/retries/simple.mdx +++ b/frontend/docs/pages/home/features/retries/simple.mdx @@ -1,3 +1,5 @@ +import { Callout, Card, Cards, Steps, Tabs } from "nextra/components"; + # Retry Strategies in Hatchet: Simple Step Retry Hatchet provides a simple and effective way to handle failures in your workflow steps using the step-level retry configuration. This feature allows you to specify the number of times a step should be retried if it fails, helping to improve the reliability and resilience of your workflows. From f3eeaa7be1900ceac705ec415a6294816d3f4de7 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Wed, 22 May 2024 13:30:22 -0400 Subject: [PATCH 5/6] fix: tests --- pkg/worker/middleware_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/worker/middleware_test.go b/pkg/worker/middleware_test.go index 8604654e6..0c26d3b01 100644 --- a/pkg/worker/middleware_test.go +++ b/pkg/worker/middleware_test.go @@ -64,6 +64,10 @@ func (c *testHatchetContext) StreamEvent(message []byte) { panic("not implemented") } +func (c *testHatchetContext) RetryCount() int { + panic("not implemented") +} + func (c *testHatchetContext) action() *client.Action { panic("not implemented") } From 35918649922655a7ab5dfce0bd6f096c0d66ceaa Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 7 Jun 2024 11:01:54 -0400 Subject: [PATCH 6/6] fix: docs formatting --- .../docs/pages/home/features/retries/simple.mdx | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/frontend/docs/pages/home/features/retries/simple.mdx b/frontend/docs/pages/home/features/retries/simple.mdx index 6d674f374..02dbd93b0 100644 --- a/frontend/docs/pages/home/features/retries/simple.mdx +++ b/frontend/docs/pages/home/features/retries/simple.mdx @@ -47,6 +47,7 @@ If you need to access the current retry count within a step, you can use the `re + ```python @hatchet.step(timeout='2s', retries=3) def step1(self, context: Context): @@ -54,34 +55,36 @@ def step1(self, context: Context): print(f"Retry count: {retry_count}") raise Exception("Step failed") ``` + + ```typescript async function step(context: Context) { const retryCount = context.retryCount(); console.log(`Retry count: ${retryCount}`); throw new Error("Step failed"); } -```` +``` + + ```go func(ctx worker.HatchetContext) (result *stepOneOutput, err error) { count := ctx.RetryCount() -return &stepOneOutput{ -Message: "Count is: " + strconv.Itoa(count), -}, nil + return &stepOneOutput{ + Message: "Count is: " + strconv.Itoa(count), + }, nil } - ``` + - ## Conclusion Hatchet's step-level retry feature is a simple and effective way to handle transient failures in your workflow steps, improving the reliability and resilience of your workflows. By specifying the number of retries for each step, you can ensure that your workflows can recover from temporary issues without requiring complex error handling logic. Remember to use retries judiciously and only for steps that are idempotent and can safely be repeated. For more advanced retry strategies, such as exponential backoff or circuit breaking, stay tuned for future updates to Hatchet's retry capabilities. -```