Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: expose retry count #21

Merged
merged 8 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/simple/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MyWorkflow:
def __init__(self):
self.my_value = "test"

@hatchet.step()
@hatchet.step(timeout="2s", retries=3)
def step1(self, context: Context):
print("executed step1")
time.sleep(10)
Expand Down
3 changes: 3 additions & 0 deletions hatchet_sdk/clients/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(
action_id: str,
action_payload: str,
action_type: ActionType,
retry_count: int,
):
self.worker_id = worker_id
self.workflow_run_id = workflow_run_id
Expand All @@ -93,6 +94,7 @@ def __init__(
self.action_id = action_id
self.action_payload = action_payload
self.action_type = action_type
self.retry_count = retry_count


class WorkerActionListener:
Expand Down Expand Up @@ -264,6 +266,7 @@ async def _generator(self) -> AsyncGenerator[Action, None]:
action_id=assigned_action.actionId,
action_payload=action_payload,
action_type=action_type,
retry_count=assigned_action.retryCount,
)

yield action
Expand Down
1 change: 1 addition & 0 deletions hatchet_sdk/clients/rest/models/step_run_event_reason.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class StepRunEventReason(str, Enum):
REASSIGNED = "REASSIGNED"
TIMED_OUT = "TIMED_OUT"
SLOT_RELEASED = "SLOT_RELEASED"
RETRIED_BY_USER = "RETRIED_BY_USER"

@classmethod
def from_json(cls, json_str: str) -> Self:
Expand Down
7 changes: 7 additions & 0 deletions hatchet_sdk/clients/rest/models/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class Worker(BaseModel):
description="The time this worker last sent a heartbeat.",
alias="lastHeartbeatAt",
)
last_listener_established: Optional[datetime] = Field(
default=None,
description="The time this worker last sent a heartbeat.",
alias="lastListenerEstablished",
)
actions: Optional[List[StrictStr]] = Field(
default=None, description="The actions this worker can perform."
)
Expand Down Expand Up @@ -71,6 +76,7 @@ class Worker(BaseModel):
"metadata",
"name",
"lastHeartbeatAt",
"lastListenerEstablished",
"actions",
"recentStepRuns",
"status",
Expand Down Expand Up @@ -156,6 +162,7 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
),
"name": obj.get("name"),
"lastHeartbeatAt": obj.get("lastHeartbeatAt"),
"lastListenerEstablished": obj.get("lastListenerEstablished"),
"actions": obj.get("actions"),
"recentStepRuns": (
[StepRun.from_dict(_item) for _item in obj["recentStepRuns"]]
Expand Down
3 changes: 3 additions & 0 deletions hatchet_sdk/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,6 @@ def refresh_timeout(self, increment_by: str):
)
except Exception as e:
logger.error(f"Error refreshing timeout: {e}")

def retry_count(self):
return self.action.retry_count
108 changes: 54 additions & 54 deletions hatchet_sdk/dispatcher_pb2.py

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions hatchet_sdk/dispatcher_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class WorkerRegisterResponse(_message.Message):
def __init__(self, tenantId: _Optional[str] = ..., workerId: _Optional[str] = ..., workerName: _Optional[str] = ...) -> None: ...

class AssignedAction(_message.Message):
__slots__ = ("tenantId", "workflowRunId", "getGroupKeyRunId", "jobId", "jobName", "jobRunId", "stepId", "stepRunId", "actionId", "actionType", "actionPayload", "stepName")
__slots__ = ("tenantId", "workflowRunId", "getGroupKeyRunId", "jobId", "jobName", "jobRunId", "stepId", "stepRunId", "actionId", "actionType", "actionPayload", "stepName", "retryCount")
TENANTID_FIELD_NUMBER: _ClassVar[int]
WORKFLOWRUNID_FIELD_NUMBER: _ClassVar[int]
GETGROUPKEYRUNID_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -105,6 +105,7 @@ class AssignedAction(_message.Message):
ACTIONTYPE_FIELD_NUMBER: _ClassVar[int]
ACTIONPAYLOAD_FIELD_NUMBER: _ClassVar[int]
STEPNAME_FIELD_NUMBER: _ClassVar[int]
RETRYCOUNT_FIELD_NUMBER: _ClassVar[int]
tenantId: str
workflowRunId: str
getGroupKeyRunId: str
Expand All @@ -117,7 +118,8 @@ class AssignedAction(_message.Message):
actionType: ActionType
actionPayload: str
stepName: str
def __init__(self, tenantId: _Optional[str] = ..., workflowRunId: _Optional[str] = ..., getGroupKeyRunId: _Optional[str] = ..., jobId: _Optional[str] = ..., jobName: _Optional[str] = ..., jobRunId: _Optional[str] = ..., stepId: _Optional[str] = ..., stepRunId: _Optional[str] = ..., actionId: _Optional[str] = ..., actionType: _Optional[_Union[ActionType, str]] = ..., actionPayload: _Optional[str] = ..., stepName: _Optional[str] = ...) -> None: ...
retryCount: int
def __init__(self, tenantId: _Optional[str] = ..., workflowRunId: _Optional[str] = ..., getGroupKeyRunId: _Optional[str] = ..., jobId: _Optional[str] = ..., jobName: _Optional[str] = ..., jobRunId: _Optional[str] = ..., stepId: _Optional[str] = ..., stepRunId: _Optional[str] = ..., actionId: _Optional[str] = ..., actionType: _Optional[_Union[ActionType, str]] = ..., actionPayload: _Optional[str] = ..., stepName: _Optional[str] = ..., retryCount: _Optional[int] = ...) -> None: ...

class WorkerListenRequest(_message.Message):
__slots__ = ("workerId",)
Expand Down
1 change: 1 addition & 0 deletions lint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pre-commit run --all-files || pre-commit run --all-files
Loading