UN-2850 [FIX] Add INPROGRESS notification for scheduled ETL executions and adjust status mapping#1562
Conversation
… executions - Add INPROGRESS status to NotificationStatus enum for scheduled executions - Add SCHEDULED_EXECUTION source to NotificationSource enum - Fix notification status mapping for backward compatibility: - API workflows use COMPLETED status - ETL/TASK workflows use SUCCESS status - Update Slack webhook formatting to inline display - Standardize key naming in Slack notifications (ID → Id) - Trigger INPROGRESS notification when scheduled pipeline starts 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Summary by CodeRabbit
WalkthroughAdds INPROGRESS status and scheduled-execution source to core models, changes final COMPLETED mapping to SUCCESS for non-API workflows, tweaks Slack block formatting and key casing, and integrates guarded scheduler-side notification dispatches for INPROGRESS events. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Scheduler as Scheduler Task
participant API as API Client
participant Notify as Notification Helper
participant Slack as Slack Provider
rect rgb(240,248,255)
note over Scheduler: Pipeline transitions to INPROGRESS
Scheduler->>API: update_pipeline_status(INPROGRESS)
Scheduler->>Scheduler: _send_pipeline_status_notification(...)
end
rect rgb(245,255,250)
note over Scheduler: Build NotificationPayload (source may be scheduled-execution)
Scheduler->>Notify: trigger_notification(pipeline_id, name, payload)
alt Active WEBHOOK notifications found
Notify->>Slack: send_notification_to_worker(webhook, payload with inline blocks)
Slack-->>Notify: 2xx / success
Notify-->>Scheduler: returned (logged success)
else No active WEBHOOKs or non-WEBHOOK
Notify-->>Scheduler: no-op (logged)
end
alt Errors during dispatch
Notify-->>Scheduler: error (logged, not raised)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
workers/notification/providers/slack_webhook.py (1)
117-127: Timestamp labeled UTC but not using UTCdatetime.now() is local time; message says UTC. Use timezone-aware UTC.
Apply:
- from datetime import datetime + from datetime import datetime, timezone @@ - "text": f"_Sent at {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}_", + "text": f"_Sent at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}_",
🧹 Nitpick comments (3)
workers/notification/providers/slack_webhook.py (1)
182-184: Key normalization change OK; optional: keep “ID” uppercaseIf desired, map to “Execution ID” / “Organization ID” for consistency with common acronyms. Not blocking.
workers/scheduler/tasks.py (1)
35-91: Non-fatal notification helper is good; add stack traces and avoid blind exceptsKeep behavior, but log exceptions with stack traces for debuggability.
Apply:
- except Exception as notification_error: - logger.warning( - f"Failed to send notification for {pipeline_type} {pipeline_id}: {notification_error}" - ) + except Exception as notification_error: + logger.warning( + f"Failed to send notification for {pipeline_type} {pipeline_id}: {notification_error}", + exc_info=True, + )Optional: type the signature as
- pipeline_type: WorkflowType | str
- status: NotificationStatus | str
and validate before constructing NotificationPayload (keeps failures localized).unstract/core/src/unstract/core/data_models.py (1)
591-596: COMPLETED mapping change is correct; add tests and optional EXECUTING support
- Back-compat mapping (API→COMPLETED, ETL/TASK→SUCCESS) is correct.
NotificationPayload.from_execution_statusstill rejects PENDING/EXECUTING – consider an optional flag to map EXECUTING→IN_PROGRESS.- Add/confirm unit tests for:
• API: COMPLETED→COMPLETED
• ETL/TASK: COMPLETED→SUCCESS
• EXECUTING: ValueError (current behavior)- Call sites in workers/shared/patterns/notification/helper.py (lines 151–155, 224–228, 336–340)
- Also applies to data_models.py lines 601–604
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (3)
unstract/core/src/unstract/core/data_models.py(3 hunks)workers/notification/providers/slack_webhook.py(2 hunks)workers/scheduler/tasks.py(3 hunks)
🧰 Additional context used
🪛 Ruff (0.13.3)
workers/scheduler/tasks.py
86-86: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (4)
unstract/core/src/unstract/core/data_models.py (2)
476-477: INPROGRESS status addition looks goodEnum extension aligns with scheduled execution notifications.
491-492: New source SCHEDULED_EXECUTION approvedMatches scheduler usage for INPROGRESS notifications.
workers/notification/providers/slack_webhook.py (1)
104-111: Inline block formatting LGTMMore compact Slack messages without losing readability.
workers/scheduler/tasks.py (1)
314-323: Approve INPROGRESS emission location
Parameters are correct andpipeline_typevalues are constrained by the client and model to validWorkflowTypeenums (invalid values simply log a warning and default to API). No changes needed.
|
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
workers/shared/patterns/notification/helper.py (3)
102-107: Add type hint forapi_clientparameter.The
api_clientparameter lacks a type annotation, which reduces code clarity and prevents type checkers from validating its usage.Consider adding the appropriate type hint:
def trigger_notification( - api_client, + api_client: "APIClient", # or the actual type pipeline_id: str, pipeline_name: str, notification_payload: NotificationPayload, ) -> None:
102-155: Consider extracting common notification logic to reduce duplication.This new function shares substantial logic with
trigger_pipeline_notifications(lines 157-241) andtrigger_api_notifications(lines 243-313):
- API call to fetch notifications (lines 115-119)
- Filtering active notifications (lines 124-126)
- Webhook sending loop (lines 137-151)
Extracting the shared notification fetching and dispatching logic into a helper function would improve maintainability and reduce the risk of inconsistencies.
Example refactor:
def _fetch_and_send_notifications( api_client, endpoint: str, entity_id: str, entity_name: str, payload: NotificationPayload, ) -> None: """Shared logic for fetching and sending notifications.""" try: response_data = api_client._make_request( method="GET", endpoint=endpoint, timeout=10, ) notifications_data = response_data.get("notifications", []) active_notifications = [ n for n in notifications_data if n.get("is_active", False) ] if not active_notifications: logger.info(f"No active notifications found for {entity_name}") return logger.info( f"Sending {len(active_notifications)} notifications for {entity_name}" ) for notification in active_notifications: if notification.get("notification_type") == "WEBHOOK": send_notification_to_worker( url=notification["url"], payload=payload, auth_type=notification.get("authorization_type", "NONE"), auth_key=notification.get("authorization_key"), auth_header=notification.get("authorization_header"), max_retries=notification.get("max_retries", 0), platform=notification.get("platform"), ) else: logger.debug( f"Skipping non-webhook notification type: {notification.get('notification_type')}" ) except Exception as e: logger.exception(f"Error sending notifications for {entity_name}: {e}") def trigger_notification( api_client, pipeline_id: str, pipeline_name: str, notification_payload: NotificationPayload, ) -> None: """Trigger notifications for pipeline status updates.""" _fetch_and_send_notifications( api_client=api_client, endpoint=f"v1/webhook/pipeline/{pipeline_id}/notifications/", entity_id=pipeline_id, entity_name=pipeline_name, payload=notification_payload, )
153-154: Uselogging.exceptionfor better error diagnostics.Replace
logging.errorwithlogging.exceptionto automatically include the stack trace, which aids in debugging.As per static analysis:
except Exception as e: - logger.error(f"Error triggering pipeline notifications for {pipeline_id}: {e}") + logger.exception(f"Error triggering pipeline notifications for {pipeline_id}: {e}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
workers/shared/patterns/notification/helper.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.3)
workers/shared/patterns/notification/helper.py
153-153: Do not catch blind exception: Exception
(BLE001)
154-154: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build



What
Why
How
Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
No. Changes maintain backward compatibility:
Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
Screenshots
N/A
Checklist
I have read and understood the Contribution Guidelines.
🤖 Generated with Claude Code