Feat/slack integration#367
Conversation
There was a problem hiding this comment.
Pull request overview
Adds Slack as an additional delivery channel for digests and workflow task notifications, introduces a unified notification subscription model (type → channels), and implements Slack account linking via OAuth.
Changes:
- Replaces legacy per-user boolean subscription flags with
UserNotificationSubscriptionrecords and normalization helpers. - Adds Slack service + Slack message formatters, and wires Slack into worker notifications + evidence digest delivery.
- Adds Slack OAuth linking endpoints and persistence (
SlackLinkAttempt,SlackUserLink), and updates API + swagger to expose channel-based notification preferences.
Reviewed changes
Copilot reviewed 39 out of 41 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| slack.yaml | Adds default Slack config file. |
| internal/service/worker/workflow_task_due_soon_worker_test.go | Updates tests for channel-based subscriptions and adds multi-channel email case. |
| internal/service/worker/workflow_task_digest_worker.go | Sends digest via multiple channels (email + Slack). |
| internal/service/worker/workflow_task_digest_worker_test.go | Adds Slack path tests + helpers for digest worker. |
| internal/service/worker/workflow_task_digest_checker.go | Switches checker to subscription-table driven user selection. |
| internal/service/worker/workflow_task_assigned_worker_test.go | Updates tests for channel-based subscriptions and adds Slack+email coverage. |
| internal/service/worker/user_repository.go | Loads notification subscriptions + Slack link when fetching NotificationUser. |
| internal/service/worker/service.go | Wires SlackService into worker construction. |
| internal/service/worker/service_test.go | Adds MockSlackService and updates worker factory tests. |
| internal/service/worker/risk_workers_test.go | Migrates new relational models in test DB. |
| internal/service/worker/jobs.go | Adds SlackService interface + channel routing and subscription normalization. |
| internal/service/worker/jobs_notification_channels_test.go | Unit tests for notification channel normalization/deduping on users. |
| internal/service/slack/types/types.go | Defines Slack message and send result types. |
| internal/service/slack/service.go | Implements Slack message sending via slack-go client. |
| internal/service/slack/formatters/workflow_task_digest.go | Slack Block Kit formatter for workflow task digests. |
| internal/service/slack/formatters/workflow_task_digest_test.go | Tests Slack digest formatter. |
| internal/service/slack/formatters/workflow_task_assigned.go | Slack Block Kit formatter for task-assigned notifications. |
| internal/service/slack/formatters/types.go | Defines digest summary formatter types. |
| internal/service/slack/formatters/service.go | Slack formatter for evidence digest summary. |
| internal/service/relational/slack.go | Adds SlackUserLink relational model. |
| internal/service/relational/slack_link_attempt.go | Adds SlackLinkAttempt relational model for OAuth state. |
| internal/service/relational/notification_subscription.go | Adds UserNotificationSubscription relational model. |
| internal/service/relational/ccf_internal.go | Removes legacy user subscription booleans from User. |
| internal/service/notification/constants.go | Adds notification type + delivery channel normalization utilities. |
| internal/service/notification/constants_test.go | Tests notification type/channel normalization. |
| internal/service/migrator.go | Migrates legacy subscription columns into subscription table and drops old columns. |
| internal/service/digest/service.go | Sends evidence digest via configured channels (email/Slack) and adds recipient loading. |
| internal/config/slack.go | Adds Slack YAML/env config loading and validation. |
| internal/config/config.go | Loads Slack config into global config. |
| internal/api/handler/users.go | Changes subscriptions API to channel-map model and persists subscriptions. |
| internal/api/handler/users_integration_test.go | Updates integration tests for new subscriptions API shape and validation. |
| internal/api/handler/auth/slack_link.go | Adds Slack OAuth linking endpoints and persistence. |
| internal/api/handler/auth/api.go | Registers Slack link handler routes. |
| go.sum | Adds slack-go + websocket checksums. |
| go.mod | Adds slack-go/slack and websocket dependencies. |
| docs/swagger.yaml | Updates swagger definitions for subscriptions + user fields. |
| docs/swagger.json | Updates swagger JSON for subscriptions + user fields. |
| docs/docs.go | Updates embedded swagger template. |
| cmd/digest.go | Updates digest preview to use recipients API. |
| .gitignore | Ignores oscal-content. |
| .env.example | Adds Slack env var examples. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -0,0 +1 @@ | |||
| enabled: true No newline at end of file | |||
There was a problem hiding this comment.
This default config file enables Slack (enabled: true) but does not include a token. Since LoadSlackConfig requires a token when enabled, this will cause startup warnings and Slack integration to be disabled unless users remember to edit this file. Consider shipping this as an example file (e.g., slack.yaml.example) or defaulting enabled: false and including the other keys with empty placeholders.
| result, err := w.slackService.SendMessage(ctx, slackUserID, message) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to send workflow-task-assigned slack message: %w", err) | ||
| } | ||
| if !result.Success { | ||
| return fmt.Errorf("workflow-task-assigned slack message send failed: %s", result.Error) | ||
| } |
There was a problem hiding this comment.
SlackUserID is a Slack user identifier (e.g. U…), but chat.postMessage expects a channel/DM conversation ID (typically D…) or an existing channel ID. Sending directly to a user ID here will likely fail with channel_not_found. Consider having the Slack service open (or look up) an IM conversation for the user and send to that channel ID, or store the DM channel ID instead of the user ID.
|
|
||
| if err := s.sendDigestSlackToChannel(ctx, recipient.SlackUserID, summary); err != nil { | ||
| s.logger.Errorw("failed to send digest Slack DM to user", | ||
| "user", recipient.User.Email, | ||
| "slackUserID", recipient.SlackUserID, |
There was a problem hiding this comment.
Evidence digest Slack DMs are sent using recipient.SlackUserID as the channel argument, but Slack APIs generally require a channel/IM conversation ID (e.g. D…) rather than a user ID (U…). This will likely fail in production. Consider opening/looking up a DM conversation channel for the user before sending, or persisting the DM channel ID during linking.
| Where("user_id = ?", userUUID.String()). | ||
| Delete(&relational.SlackLinkAttempt{}).Error; err != nil { | ||
| h.sugar.Warnw("Failed to remove Slack link attempts for deleted user", "userID", userUUID.String(), "error", err) | ||
| } |
There was a problem hiding this comment.
User deletion now cleans up SSO + Slack link tables, but it does not remove UserNotificationSubscription rows. Since foreign keys are disabled during migration, this can leave orphaned subscriptions behind. Consider deleting relational.UserNotificationSubscription (and any other new Slack-related rows) for the user as part of this handler.
| } | |
| } | |
| if err := h.db.Unscoped(). | |
| Where("user_id = ?", userUUID.String()). | |
| Delete(&relational.UserNotificationSubscription{}).Error; err != nil { | |
| h.sugar.Warnw("Failed to remove notification subscriptions for deleted user", "userID", userUUID.String(), "error", err) | |
| } |
| var count int64 | ||
| if err := db.Model(&relational.UserNotificationSubscription{}). | ||
| Where("user_id = ? AND notification_type = ?", subscribedUsers[i].ID, notification.NotificationTypeTaskAvailable). | ||
| Count(&count).Error; err != nil { | ||
| return err | ||
| } | ||
| if count > 0 { | ||
| continue | ||
| } | ||
|
|
||
| if err := db.Create(&relational.UserNotificationSubscription{ | ||
| UserID: subscribedUsers[i].ID, | ||
| NotificationType: notification.NotificationTypeTaskAvailable, | ||
| Channels: []string{notification.DeliveryChannelEmail}, | ||
| }).Error; err != nil { | ||
| return err |
There was a problem hiding this comment.
This migration performs a COUNT query and then an INSERT per subscribed user, which can be very slow on large user tables (N+1 queries). Consider inserting in bulk using INSERT ... SELECT or CreateInBatches with OnConflict DoNothing (unique index already exists) to avoid the per-row existence check.
| var count int64 | ||
| if err := db.Model(&relational.UserNotificationSubscription{}). | ||
| Where("user_id = ? AND notification_type = ?", subscribedUsers[i].ID, notification.NotificationTypeEvidenceDigest). | ||
| Count(&count).Error; err != nil { | ||
| return err | ||
| } | ||
| if count > 0 { | ||
| continue | ||
| } | ||
|
|
||
| if err := db.Create(&relational.UserNotificationSubscription{ | ||
| UserID: subscribedUsers[i].ID, | ||
| NotificationType: notification.NotificationTypeEvidenceDigest, | ||
| Channels: []string{notification.DeliveryChannelEmail}, | ||
| }).Error; err != nil { | ||
| return err |
There was a problem hiding this comment.
Same N+1 pattern here (COUNT + INSERT per user) can make migrations slow at scale. Prefer a bulk insert with OnConflict DoNothing (or an INSERT ... SELECT) to backfill subscriptions efficiently.
| var count int64 | ||
| if err := db.Model(&relational.UserNotificationSubscription{}). | ||
| Where("user_id = ? AND notification_type = ?", subscribedUsers[i].ID, notification.NotificationTypeTaskDailyDigest). | ||
| Count(&count).Error; err != nil { | ||
| return err | ||
| } | ||
| if count > 0 { | ||
| continue | ||
| } | ||
|
|
||
| if err := db.Create(&relational.UserNotificationSubscription{ | ||
| UserID: subscribedUsers[i].ID, | ||
| NotificationType: notification.NotificationTypeTaskDailyDigest, | ||
| Channels: []string{notification.DeliveryChannelEmail}, | ||
| }).Error; err != nil { | ||
| return err |
There was a problem hiding this comment.
Same N+1 pattern here (COUNT + INSERT per user) can make migrations slow at scale. Prefer a bulk insert with OnConflict DoNothing (or an INSERT ... SELECT) to backfill subscriptions efficiently.
| github.com/gobwas/glob v0.2.3 // indirect | ||
| github.com/gogo/protobuf v1.3.2 // indirect | ||
| github.com/gohugoio/hugo v0.139.4 // indirect | ||
| github.com/gorilla/websocket v1.5.3 // indirect | ||
| github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect |
There was a problem hiding this comment.
github.com/gorilla/websocket is now required by the Slack dependency, but it's listed as // indirect. Running go mod tidy should promote direct dependencies and keep go.mod consistent with actual imports.
| github.com/sagikazarmark/locafero v0.12.0 // indirect | ||
| github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 // indirect | ||
| github.com/shirou/gopsutil/v4 v4.25.1 // indirect | ||
| github.com/sirupsen/logrus v1.9.3 // indirect | ||
| github.com/slack-go/slack v0.20.0 // indirect | ||
| github.com/spf13/afero v1.15.0 // indirect |
There was a problem hiding this comment.
github.com/slack-go/slack is imported directly in internal/service/slack/... but is marked // indirect in go.mod. Consider running go mod tidy so direct dependencies are recorded correctly and the module files remain stable.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 39 out of 41 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
internal/service/digest/service.go:405
SendGlobalDigestcallsSendDigestSlackbefore the early-return checks that skip the digest when there is no evidence or no issues. This means Slack will receive a digest even when email digests are intentionally skipped; if the intent is to keep behavior consistent across channels, move the Slack send after the skip conditions (or update the skip logic to explicitly allow Slack).
// SendGlobalDigest sends the global digest to all active users (Phase 0)
func (s *Service) SendGlobalDigest(ctx context.Context) error {
summary, err := s.GetGlobalEvidenceSummary(ctx)
if err != nil {
return fmt.Errorf("failed to get evidence summary: %w", err)
}
if err := s.SendDigestSlack(ctx, summary); err != nil {
s.logger.Warnw("Failed to send digest to Slack", "error", err)
}
// Skip if there's nothing to report
if summary.TotalCount == 0 {
s.logger.Debug("No evidence found, skipping digest")
return nil
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for _, notificationType := range types { | ||
| rows = append(rows, relational.UserNotificationSubscription{ | ||
| UserID: userID, | ||
| NotificationType: notificationType, | ||
| Channels: notifications[notificationType], | ||
| }) | ||
| } |
There was a problem hiding this comment.
relational.UserNotificationSubscription.Channels is datatypes.JSONSlice[string], but this code assigns a []string (notifications[notificationType]) directly, which is not assignable and will fail to compile. Convert the slice to the correct type (or change the model field type) when building the rows.
| if err := db.Create(&relational.UserNotificationSubscription{ | ||
| UserID: subscribedUsers[i].ID, | ||
| NotificationType: notification.NotificationTypeTaskAvailable, | ||
| Channels: []string{notification.DeliveryChannelEmail}, | ||
| }).Error; err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
UserNotificationSubscription.Channels is datatypes.JSONSlice[string], but the migration backfill assigns a []string literal. This is a type mismatch and will not compile without an explicit conversion (e.g., using the JSONSlice type in the literal).
| } | ||
|
|
||
| normalizedChannels, invalidChannels := notification.NormalizeDeliveryChannels(subscriptions[i].Channels) | ||
| if len(invalidChannels) > 0 { | ||
| w.logger.Warnw( | ||
| "WorkflowTaskDigestCheckerWorker: ignoring invalid delivery channels in task daily digest subscription", | ||
| "user_id", userID, | ||
| "invalid_channels", invalidChannels, | ||
| "channels", subscriptions[i].Channels, | ||
| ) | ||
| } |
There was a problem hiding this comment.
subscriptions[i].Channels is datatypes.JSONSlice[string] on relational.UserNotificationSubscription, but NormalizeDeliveryChannels expects a []string. Passing the JSONSlice directly will not compile; convert to []string (or update the helper to accept the JSONSlice type) before normalizing.
fd7f3f9 to
9322266
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 41 out of 43 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (u NotificationUser) NotificationChannels(notificationType string) []string { | ||
| normalizedType, ok := notification.NormalizeNotificationType(notificationType) | ||
| if !ok { | ||
| return nil | ||
| } | ||
|
|
||
| seen := map[string]struct{}{} | ||
| channels := make([]string, 0) | ||
| for _, subscription := range u.NotificationSubscriptions { | ||
| currentType, currentTypeOK := notification.NormalizeNotificationType(subscription.NotificationType) | ||
| if !currentTypeOK || currentType != normalizedType { | ||
| continue | ||
| } | ||
|
|
||
| for _, current := range subscription.Channels { | ||
| channel, channelOK := notification.NormalizeDeliveryChannel(current) | ||
| if !channelOK { | ||
| continue | ||
| } | ||
| if _, ok := seen[channel]; ok { | ||
| continue | ||
| } | ||
| seen[channel] = struct{}{} | ||
| channels = append(channels, channel) | ||
| } | ||
| } | ||
|
|
||
| return channels | ||
| } |
There was a problem hiding this comment.
NotificationUser.NotificationChannels builds the returned slice in iteration order and does not sort it. Because subscription rows can be returned from the DB in an undefined order, this can make channel iteration/logging non-deterministic (and can lead to flaky tests if assertions depend on ordering). Consider sorting the final channels slice (or using notification.NormalizeDeliveryChannels to both validate and return a stable order).
| var users []relational.User | ||
| if err := w.db.WithContext(ctx). | ||
| Where("task_daily_digest_subscribed = ? AND deleted_at IS NULL", true). | ||
| Select("id"). | ||
| Where("id IN ?", userIDs). | ||
| Find(&users).Error; err != nil { | ||
| return fmt.Errorf("workflow-task-digest-checker: failed to query subscribed users: %w", err) | ||
| return fmt.Errorf("workflow-task-digest-checker: failed to load subscribed users: %w", err) | ||
| } |
There was a problem hiding this comment.
The digest checker now selects users solely by id IN ? without filtering out inactive/locked accounts. This can enqueue digest jobs for users who should not receive notifications. Consider adding the same constraints used elsewhere (e.g., is_active = true and is_locked = false) when loading relational.User rows to avoid sending to disabled accounts.
| var subscribedUsers []legacySubscribedUser | ||
| if err := db.Table("ccf_users"). | ||
| Select("id"). | ||
| Where("task_available_email_subscribed = ?", true). |
There was a problem hiding this comment.
These legacy backfill queries pull subscribed user IDs from ccf_users without excluding soft-deleted users. That can create ccf_user_notification_subscriptions rows for deleted accounts before the legacy columns are dropped. Consider adding deleted_at IS NULL (and any other relevant constraints like is_active/is_locked) to the ccf_users query used for backfill.
| Where("task_available_email_subscribed = ?", true). | |
| Where("task_available_email_subscribed = ?", true). | |
| Where("deleted_at IS NULL"). |
| func validateSendInput(channel string, message *types.Message) error { | ||
| if message == nil { | ||
| return fmt.Errorf("message is required") | ||
| } | ||
| if strings.TrimSpace(channel) == "" { | ||
| return fmt.Errorf("channel is required") | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
validateSendInput only checks that message is non-nil and channel is non-empty. Since SendMessage always includes slack.MsgOptionText(message.Text, ...), passing an empty message.Text could lead to Slack API errors (or messages with no usable fallback). Consider validating that at least one of message.Text or len(message.Blocks)>0 is present, and/or defaulting message.Text to a non-empty fallback when blocks are provided.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 41 out of 43 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| func (h *UserHandler) replaceUserNotificationSubscriptions(ctx context.Context, userID string, notifications map[string][]string) error { | ||
| return h.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { | ||
| if err := tx.Where("user_id = ?", userID).Delete(&relational.UserNotificationSubscription{}).Error; err != nil { |
There was a problem hiding this comment.
replaceUserNotificationSubscriptions uses Delete(...) on a soft-delete model, so each update will accumulate additional (soft-deleted) rows for the user over time. If history isn't needed, use Unscoped().Delete(...) (or an upsert/update strategy) to prevent unbounded growth of ccf_user_notification_subscriptions.
| if err := tx.Where("user_id = ?", userID).Delete(&relational.UserNotificationSubscription{}).Error; err != nil { | |
| if err := tx.Unscoped().Where("user_id = ?", userID).Delete(&relational.UserNotificationSubscription{}).Error; err != nil { |
| for i := range subscribedUsers { | ||
| if subscribedUsers[i].ID == "" { | ||
| db.Logger.Warn( | ||
| context.Background(), | ||
| "Skipping legacy task-available email subscription row with empty user ID (index=%d)", | ||
| i, | ||
| ) | ||
| continue | ||
| } | ||
|
|
||
| var count int64 | ||
| if err := db.Model(&relational.UserNotificationSubscription{}). | ||
| Where("user_id = ? AND notification_type = ?", subscribedUsers[i].ID, notification.NotificationTypeTaskAvailable). | ||
| Count(&count).Error; err != nil { | ||
| return err | ||
| } | ||
| if count > 0 { | ||
| continue | ||
| } | ||
|
|
||
| if err := db.Create(&relational.UserNotificationSubscription{ | ||
| UserID: subscribedUsers[i].ID, | ||
| NotificationType: notification.NotificationTypeTaskAvailable, | ||
| Channels: []string{notification.DeliveryChannelEmail}, | ||
| }).Error; err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
The legacy subscription backfill loops per-user and does a Count(...) + Create(...) per row, which can be very slow on large user tables (N+1 queries during migration). Consider a set-based insert (bulk INSERT ... SELECT with ON CONFLICT DO NOTHING / ON CONFLICT ... WHERE deleted_at IS NULL) to backfill in one statement before dropping the legacy column.
| channels := user.NotificationChannels(notification.NotificationTypeTaskAvailable) | ||
| if len(channels) == 0 { | ||
| w.logger.Debugw("WorkflowTaskDueSoonWorker: user not subscribed, skipping", | ||
| "step_execution_id", args.StepExecutionID, | ||
| "user_id", args.UserID, | ||
| ) | ||
| return nil | ||
| } | ||
|
|
||
| for _, channel := range channels { | ||
| switch channel { | ||
| case notification.DeliveryChannelEmail: | ||
| if err := w.sendEmail(ctx, args, user); err != nil { | ||
| return err | ||
| } | ||
| default: | ||
| w.logger.Debugw("WorkflowTaskDueSoonWorker: unsupported channel, skipping", | ||
| "step_execution_id", args.StepExecutionID, | ||
| "user_id", args.UserID, | ||
| "channel", channel, | ||
| ) | ||
| } |
There was a problem hiding this comment.
WorkflowTaskDueSoonWorker now reads the task_available subscription channels but only supports email. If a user selects Slack-only for taskAvailable, they will receive no due-soon reminders even though Slack is a supported channel elsewhere. Consider either adding Slack delivery here too, or filtering to email-only without logging Slack as "unsupported" (since it is supported for the same notification type in other workers).
| github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 // indirect | ||
| github.com/shirou/gopsutil/v4 v4.25.1 // indirect | ||
| github.com/sirupsen/logrus v1.9.3 // indirect | ||
| github.com/slack-go/slack v0.20.0 // indirect |
There was a problem hiding this comment.
github.com/slack-go/slack is imported by non-test packages (e.g., internal/service/slack/...), so it should be a direct dependency. Running go mod tidy should drop the // indirect marker (same for github.com/gorilla/websocket if it becomes direct). Keeping incorrect // indirect annotations makes dependency management noisier.
| github.com/slack-go/slack v0.20.0 // indirect | |
| github.com/slack-go/slack v0.20.0 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 44 out of 46 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (s *Service) sendDigestSlackToChannel(ctx context.Context, channel string, summary *EvidenceSummary) error { | ||
| if strings.TrimSpace(channel) == "" { | ||
| return fmt.Errorf("slack channel is required") | ||
| } | ||
|
|
||
| slackService, err := slacksvc.NewService(s.config.Slack, s.logger) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to initialize Slack service for digest: %w", err) | ||
| } | ||
|
|
||
| data := formatters.DigestSummary{ | ||
| TotalCount: summary.TotalCount, | ||
| SatisfiedCount: summary.SatisfiedCount, | ||
| NotSatisfiedCount: summary.NotSatisfiedCount, | ||
| ExpiredCount: summary.ExpiredCount, | ||
| TopExpired: toSlackDigestEvidence(summary.TopExpired), | ||
| TopNotSatisfied: toSlackDigestEvidence(summary.TopNotSatisfied), | ||
| BaseURL: s.config.WebBaseURL, | ||
| } | ||
| message, err := formatters.FormatDigestMessage(&data) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to format Slack message for digest: %w", err) | ||
| } | ||
| _, err = slackService.SendMessage(ctx, channel, message) | ||
|
|
||
| if err != nil { | ||
| return fmt.Errorf("failed to send Slack message for digest: %w", err) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
sendDigestSlackToChannel creates a new Slack service instance on every call. In SendGlobalDigest this can execute once per recipient (and again for the global channel), which is unnecessary overhead and makes Slack client behavior harder to control. Consider constructing and storing a single Slack service/client on digest.Service (or passing one in) and reusing it for all digest deliveries in a run.
|
|
||
| UserID string `json:"userId" gorm:"not null;uniqueIndex:idx_ccf_user_notification_subscriptions_unique,WHERE:deleted_at IS NULL"` | ||
|
|
||
| NotificationType string `json:"notificationType" gorm:"not null;uniqueIndex:idx_ccf_user_notification_subscriptions_unique,WHERE:deleted_at IS NULL"` |
There was a problem hiding this comment.
UserNotificationSubscription is queried by notification_type in multiple workers/services, but the schema only defines a composite unique index on (user_id, notification_type) (plus deleted_at filter). Consider adding an additional index on notification_type (and possibly (notification_type, user_id)), to avoid full table scans when loading all subscribers for a given notification type.
| NotificationType string `json:"notificationType" gorm:"not null;uniqueIndex:idx_ccf_user_notification_subscriptions_unique,WHERE:deleted_at IS NULL"` | |
| NotificationType string `json:"notificationType" gorm:"not null;uniqueIndex:idx_ccf_user_notification_subscriptions_unique,WHERE:deleted_at IS NULL;index:idx_ccf_user_notification_subscriptions_notification_type,WHERE:deleted_at IS NULL"` |
| for _, channel := range channels { | ||
| switch channel { | ||
| case notification.DeliveryChannelEmail: | ||
| if err := w.sendEmail(ctx, args, user.Email, user.FullName()); err != nil { | ||
| return err | ||
| } | ||
| case notification.DeliveryChannelSlack: | ||
| if err := w.sendSlack(ctx, args, user); err != nil { | ||
| return err | ||
| } | ||
| default: | ||
| w.logger.Debugw("WorkflowTaskAssignedWorker: unsupported channel, skipping", | ||
| "step_execution_id", args.StepExecutionID, | ||
| "user_id", args.UserID, | ||
| "channel", channel, | ||
| ) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Multi-channel delivery currently returns immediately on the first channel error. If (for example) email succeeds but Slack fails, the job will error and River will retry, which can resend the already-successful email and create duplicate notifications. Consider isolating channels into separate jobs, persisting per-channel delivery state/idempotency keys, or at least attempting all channels and only retrying the failed channel(s) to avoid duplicates.
| for _, channel := range channels { | ||
| switch channel { | ||
| case notification.DeliveryChannelEmail: | ||
| if err := w.sendEmail(ctx, args, user); err != nil { | ||
| return err | ||
| } | ||
| case notification.DeliveryChannelSlack: | ||
| if err := w.sendSlack(ctx, args, user); err != nil { | ||
| return err | ||
| } | ||
| default: | ||
| w.logger.Debugw("WorkflowTaskDueSoonWorker: unsupported channel, skipping", | ||
| "step_execution_id", args.StepExecutionID, | ||
| "user_id", args.UserID, | ||
| "channel", channel, | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
Multi-channel delivery returns immediately on the first channel error, which can lead to duplicate notifications on retry (e.g., email sent successfully, Slack fails, then retry sends email again). Consider separating channel deliveries into independent jobs or tracking per-channel delivery status so retries don’t re-deliver already-successful channels.
| for _, channel := range channels { | ||
| switch channel { | ||
| case notification.DeliveryChannelEmail: | ||
| if err := w.sendEmail(ctx, args.UserID, user.Email, data); err != nil { | ||
| return err | ||
| } | ||
| case notification.DeliveryChannelSlack: | ||
| if err := w.sendSlack(ctx, args.UserID, user, data); err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
Multi-channel delivery returns on the first channel error. If one channel succeeds and a later one fails, River retries will re-send the already-successful channel, producing duplicates. Consider making per-channel deliveries idempotent or splitting each channel into its own job so retries don’t duplicate notifications.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 51 out of 53 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 51 out of 53 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| out := make(map[string][]string, len(rows)) | ||
| for i := range rows { | ||
| channels := make([]string, len(rows[i].Channels)) | ||
| copy(channels, rows[i].Channels) | ||
| wireType, ok := notification.WireNotificationType(rows[i].NotificationType) | ||
| if !ok { | ||
| wireType = rows[i].NotificationType | ||
| } | ||
| out[wireType] = channels | ||
| } |
There was a problem hiding this comment.
rows[i].Channels is datatypes.JSONSlice[string], so copy(channels, rows[i].Channels) will not compile because the source slice type isn't []string. Convert/cast the JSONSlice to []string (or use a helper like slices.Clone([]string(rows[i].Channels))) before copying.
| func workflowDeliveryChannelsForAssignment(assignedToType string) []string { | ||
| if assignedToType == notification.DeliveryChannelEmail { | ||
| return []string{notification.DeliveryChannelEmail} | ||
| } | ||
| return allWorkflowNotificationChannels() | ||
| } |
There was a problem hiding this comment.
workflowDeliveryChannelsForAssignment compares assignedToType (a workflow assignment type like workflows.AssignmentTypeEmail) to notification.DeliveryChannelEmail. They currently share the same string value ("email"), but this conflates two different domains and will silently break if either constant set changes. Compare against the workflow assignment type constant instead.
| func (h *SlackLinkHandler) setLinkCookie(ctx echo.Context, name, value string, ttl time.Duration) { | ||
| cookie := new(http.Cookie) | ||
| cookie.Name = name | ||
| cookie.Value = value | ||
| cookie.Expires = time.Now().Add(ttl) | ||
| cookie.HttpOnly = true | ||
| cookie.Path = "/" | ||
| // OAuth callback is cross-site navigation from Slack -> API callback. | ||
| cookie.SameSite = http.SameSiteLaxMode | ||
| cookie.Secure = true | ||
| ctx.SetCookie(cookie) | ||
| } | ||
|
|
||
| func (h *SlackLinkHandler) clearLinkCookie(ctx echo.Context, name string) { | ||
| cookie := new(http.Cookie) | ||
| cookie.Name = name | ||
| cookie.Value = "" | ||
| cookie.Expires = time.Now().Add(-1 * time.Hour) | ||
| cookie.HttpOnly = true | ||
| cookie.Path = "/" | ||
| cookie.SameSite = http.SameSiteLaxMode | ||
| cookie.Secure = true | ||
| ctx.SetCookie(cookie) |
There was a problem hiding this comment.
Slack link state cookies are always marked Secure=true. In development this can prevent the browser from setting/sending the cookie over plain HTTP (unlike the existing SSO flow which toggles Secure based on environment), causing callback state validation to fail. Consider mirroring the SSO handler behavior (or basing Secure on request scheme/config) so linking works reliably in dev while remaining secure in prod.
| } | ||
|
|
||
| normalizedChannels, invalidChannels := notification.NormalizeDeliveryChannels(subscriptions[i].Channels) | ||
| if len(invalidChannels) > 0 { | ||
| w.logger.Warnw( |
There was a problem hiding this comment.
subscriptions[i].Channels is datatypes.JSONSlice[string], which is not directly assignable to []string. Calling notification.NormalizeDeliveryChannels(subscriptions[i].Channels) will not compile; convert/cast the JSONSlice to []string first (or normalize by iterating over the slice elements).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 51 out of 53 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if args.AssignedToType == notification.DeliveryChannelEmail { | ||
| if channel != "" && channel != notification.DeliveryChannelEmail { | ||
| w.logger.Debugw("WorkflowTaskAssignedWorker: unsupported channel for email assignee, skipping", | ||
| "step_execution_id", args.StepExecutionID, | ||
| "user_id", args.UserID, | ||
| "channel", channel, | ||
| ) | ||
| return nil | ||
| } | ||
| return w.sendToEmailAddress(ctx, args) |
There was a problem hiding this comment.
This branch uses notification.DeliveryChannelEmail to detect an email assignee type (args.AssignedToType). It currently works because workflows.AssignmentTypeEmail is also the string "email", but these are different domains (assignment type vs delivery channel). Please compare against workflows.AssignmentTypeEmail.String() (or a dedicated constant) to make the intent clear and prevent future breakage if either set of constants changes.
No description provided.