From b90e372c1b0d9f7d5285030923d0975d8914cb71 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 29 May 2026 14:53:41 -0700 Subject: [PATCH 1/2] feat(notifications): time-based notification jobs (engagement, listen-streak, remix-contest) Adds the three timer-driven notification creators the legacy Python discovery-provider beat produced, as scheduled parity jobs in the core-indexer: - EngagementNotificationsJob: promotes 7-day-cooldown challenges to claimable_reward once their cooldown elapses (complements the handle_on_user_challenge trigger, which only handles cooldown_days=0 and reward_in_cooldown). - ListenStreakReminderJob: reminds users 42-43h after last listen. - RemixContestNotificationsJob: fan/artist contest ended + ending-soon. Each is a set-based INSERT...SELECT...ON CONFLICT DO NOTHING keyed on uq_notification (group_id, specifier) for idempotency, and is wired into startParityJobs. Mirrors the corresponding apps/ Python tasks. Co-Authored-By: Claude Opus 4.7 --- indexer/indexer.go | 12 + jobs/create_engagement_notifications.go | 138 +++++++++ jobs/create_engagement_notifications_test.go | 107 +++++++ ...te_listen_streak_reminder_notifications.go | 122 ++++++++ ...sten_streak_reminder_notifications_test.go | 58 ++++ jobs/create_remix_contest_notifications.go | 269 ++++++++++++++++++ ...create_remix_contest_notifications_test.go | 126 ++++++++ 7 files changed, 832 insertions(+) create mode 100644 jobs/create_engagement_notifications.go create mode 100644 jobs/create_engagement_notifications_test.go create mode 100644 jobs/create_listen_streak_reminder_notifications.go create mode 100644 jobs/create_listen_streak_reminder_notifications_test.go create mode 100644 jobs/create_remix_contest_notifications.go create mode 100644 jobs/create_remix_contest_notifications_test.go diff --git a/indexer/indexer.go b/indexer/indexer.go index 58930466..f7f03203 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -149,6 +149,18 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { // scanners live in api/jobs/challenges/. jobs.NewIndexChallengesJob(ci.Config, ci.pool). ScheduleEvery(ctx, 30*time.Second) + + // Time-based notifications that the legacy Python beat produced. Unlike + // the event-driven notifications (handled by DB triggers), these fire on + // a timer because they depend on elapsed time, not an indexed entity. + jobs.NewEngagementNotificationsJob(ci.Config, ci.pool). + ScheduleEvery(ctx, 10*time.Minute) + + jobs.NewListenStreakReminderJob(ci.Config, ci.pool). + ScheduleEvery(ctx, 10*time.Second) + + jobs.NewRemixContestNotificationsJob(ci.Config, ci.pool). + ScheduleEvery(ctx, 30*time.Second) } func (ci *CoreIndexer) Close() { diff --git a/jobs/create_engagement_notifications.go b/jobs/create_engagement_notifications.go new file mode 100644 index 00000000..3fa8c669 --- /dev/null +++ b/jobs/create_engagement_notifications.go @@ -0,0 +1,138 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/logging" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +// EngagementNotificationsJob inserts "claimable_reward" notifications for +// challenges whose 7-day cooldown has elapsed but which haven't been disbursed +// or notified yet. Mirrors +// apps/packages/discovery-provider/src/tasks/create_engagement_notifications.py. +// +// This complements the handle_on_user_challenge DB trigger rather than +// duplicating it: that trigger emits an immediate claimable_reward only for +// cooldown_days = 0 challenges, and emits reward_in_cooldown for cooldown_days +// > 0. This job is what later promotes the 7-day-cooldown challenges to +// claimable_reward once their cooldown elapses — hence the cooldown_days = 7 +// filter, matching Python's hardcoded check. +type EngagementNotificationsJob struct { + pool database.DbPool + logger *zap.Logger + now func() time.Time + + mutex sync.Mutex + isRunning bool +} + +// engagementStartDatetime matches Python's START_DATETIME — challenges +// completed before this date predate the cooldown-notification feature and are +// never notified. +const engagementStartDatetime = "2024-06-06" + +const engagementBatchSize = 500 + +func NewEngagementNotificationsJob(cfg config.Config, pool database.DbPool) *EngagementNotificationsJob { + return &EngagementNotificationsJob{ + pool: pool, + logger: logging.NewZapLogger(cfg).Named("EngagementNotificationsJob"), + now: time.Now, + } +} + +// ScheduleEvery runs the job every `interval` until the context is cancelled. +func (j *EngagementNotificationsJob) ScheduleEvery(ctx context.Context, interval time.Duration) *EngagementNotificationsJob { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job shutting down") + return + } + } + }() + return j +} + +// Run executes the job once. +func (j *EngagementNotificationsJob) Run(ctx context.Context) { + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } +} + +func (j *EngagementNotificationsJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + // completed_at < now - 1 week. The matching (group_id, specifier) dedup is + // handled by uq_notification via ON CONFLICT; the NOT EXISTS debounce + // suppresses a second claimable_reward for the same user within the hour + // before completion (Python's per-row existing_notification check). + res, err := j.pool.Exec(ctx, ` + INSERT INTO notification (specifier, group_id, blocknumber, user_ids, type, data, timestamp) + SELECT + uc.specifier, + 'claimable_reward:' || uc.user_id::text || ':challenge:' || uc.challenge_id || ':specifier:' || uc.specifier, + uc.completed_blocknumber, + ARRAY[uc.user_id], + 'claimable_reward', + jsonb_build_object( + 'specifier', uc.specifier, + 'challenge_id', uc.challenge_id, + 'amount', uc.amount + ), + uc.completed_at + FROM user_challenges uc + JOIN challenges c ON c.id = uc.challenge_id + LEFT JOIN challenge_disbursements cd + ON cd.challenge_id = uc.challenge_id AND cd.specifier = uc.specifier + WHERE uc.is_complete + AND uc.completed_at >= @start_datetime + AND uc.completed_at < @cooldown_cutoff + AND c.cooldown_days = 7 + AND cd.specifier IS NULL + AND NOT EXISTS ( + SELECT 1 FROM notification n + WHERE n.type = 'claimable_reward' + AND n.user_ids @> ARRAY[uc.user_id] + AND n.timestamp >= uc.completed_at - INTERVAL '1 hour' + ) + LIMIT @batch_size + ON CONFLICT (group_id, specifier) DO NOTHING + `, pgx.NamedArgs{ + "start_datetime": engagementStartDatetime, + "cooldown_cutoff": j.now().Add(-7 * 24 * time.Hour), + "batch_size": engagementBatchSize, + }) + if err != nil { + return fmt.Errorf("insert claimable_reward notifications: %w", err) + } + + if n := res.RowsAffected(); n > 0 { + j.logger.Info("Inserted claimable_reward notifications", zap.Int64("count", n)) + } + return nil +} diff --git a/jobs/create_engagement_notifications_test.go b/jobs/create_engagement_notifications_test.go new file mode 100644 index 00000000..e6b112ad --- /dev/null +++ b/jobs/create_engagement_notifications_test.go @@ -0,0 +1,107 @@ +package jobs + +import ( + "context" + "testing" + "time" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestEngagementNotifications_FullPipeline verifies a completed, undisbursed, +// 7-day-cooldown challenge past its cooldown produces exactly one +// claimable_reward notification, and re-running is idempotent. +func TestEngagementNotifications_FullPipeline(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + + ctx := context.Background() + now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) + completedAt := now.Add(-8 * 24 * time.Hour) // past the 7-day cooldown + + database.Seed(pool, database.FixtureMap{ + "users": {{"user_id": 1, "wallet": "0x01"}}, + "challenges": { + {"id": "b", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 7}, + }, + "user_challenges": { + {"challenge_id": "b", "user_id": 1, "specifier": "s1", "is_complete": true, + "amount": 100, "completed_at": completedAt, "completed_blocknumber": 42}, + }, + }) + + job := NewEngagementNotificationsJob(newTestConfig(), pool) + job.now = func() time.Time { return now } + + require.NoError(t, job.run(ctx)) + + var typ, groupID, specifier string + var userIDs []int32 + require.NoError(t, pool.QueryRow(ctx, + `SELECT type, group_id, specifier, user_ids FROM notification WHERE type = 'claimable_reward'`, + ).Scan(&typ, &groupID, &specifier, &userIDs)) + assert.Equal(t, "claimable_reward", typ) + assert.Equal(t, "claimable_reward:1:challenge:b:specifier:s1", groupID) + assert.Equal(t, "s1", specifier) + assert.Equal(t, []int32{1}, userIDs) + + // Idempotent: a second run inserts nothing more. + require.NoError(t, job.run(ctx)) + assert.Equal(t, 1, countNotifications(t, ctx, pool, "claimable_reward")) +} + +// TestEngagementNotifications_Exclusions verifies the cooldown, disbursement, +// and not-yet-elapsed filters all suppress a claimable_reward from the job. +// +// Note on the test fixtures: the handle_on_user_challenge DB trigger fires when +// a complete user_challenge is seeded. For a challenge with cooldown_days > 0 it +// inserts a `reward_in_cooldown` (never a `claimable_reward`); only a +// cooldown_days = 0 challenge would get an immediate `claimable_reward` from the +// trigger. We therefore use positive, non-7 cooldowns for the "wrong cooldown" +// row so the trigger never produces a claimable_reward, isolating the job's +// behavior — which only ever emits claimable_reward for cooldown_days = 7. +func TestEngagementNotifications_Exclusions(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + + ctx := context.Background() + now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) + old := now.Add(-8 * 24 * time.Hour) + + database.Seed(pool, database.FixtureMap{ + "users": {{"user_id": 1, "wallet": "0x01"}}, + "challenges": { + {"id": "cd7", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 7}, + {"id": "cd14", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 14}, + }, + "user_challenges": { + // Disbursed already -> excluded. + {"challenge_id": "cd7", "user_id": 1, "specifier": "disbursed", "is_complete": true, "amount": 100, "completed_at": old}, + // Wrong cooldown_days (not 7) -> excluded by the job's cooldown_days = 7 filter. + {"challenge_id": "cd14", "user_id": 1, "specifier": "wrongCooldown", "is_complete": true, "amount": 100, "completed_at": old}, + // Cooldown not yet elapsed -> excluded. + {"challenge_id": "cd7", "user_id": 1, "specifier": "tooRecent", "is_complete": true, "amount": 100, "completed_at": now.Add(-2 * 24 * time.Hour)}, + // Not complete -> excluded. + {"challenge_id": "cd7", "user_id": 1, "specifier": "incomplete", "is_complete": false, "amount": 100, "completed_at": old}, + }, + "challenge_disbursements": { + {"challenge_id": "cd7", "user_id": 1, "specifier": "disbursed", "signature": "sig", "amount": "100"}, + }, + }) + + job := NewEngagementNotificationsJob(newTestConfig(), pool) + job.now = func() time.Time { return now } + require.NoError(t, job.run(ctx)) + + assert.Equal(t, 0, countNotifications(t, ctx, pool, "claimable_reward")) +} + +func countNotifications(t *testing.T, ctx context.Context, pool database.DbPool, typ string) int { + t.Helper() + var n int + require.NoError(t, pool.QueryRow(ctx, + "SELECT COUNT(*) FROM notification WHERE type = $1", typ).Scan(&n)) + return n +} diff --git a/jobs/create_listen_streak_reminder_notifications.go b/jobs/create_listen_streak_reminder_notifications.go new file mode 100644 index 00000000..54c169dc --- /dev/null +++ b/jobs/create_listen_streak_reminder_notifications.go @@ -0,0 +1,122 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/logging" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +// ListenStreakReminderJob reminds users whose listen streak is about to lapse. +// Mirrors +// apps/packages/discovery-provider/src/tasks/create_listen_streak_reminder_notifications.py. +// +// A streak is kept alive by listening at least once every 48 hours. We notify +// in the hour-wide window 42-43 hours after the last listen — 6 hours of slack +// before the streak breaks. The (group_id, specifier) pair encodes user + +// last-listen date, so uq_notification prevents re-reminding for the same day. +type ListenStreakReminderJob struct { + pool database.DbPool + logger *zap.Logger + env string + now func() time.Time + + mutex sync.Mutex + isRunning bool +} + +// listenStreakBuffer is the hours-since-last-listen at which we remind (2 days +// minus 6 hours). Matches Python's LISTEN_STREAK_BUFFER. +const listenStreakBuffer = 42 * time.Hour + +func NewListenStreakReminderJob(cfg config.Config, pool database.DbPool) *ListenStreakReminderJob { + return &ListenStreakReminderJob{ + pool: pool, + logger: logging.NewZapLogger(cfg).Named("ListenStreakReminderJob"), + env: cfg.Env, + now: time.Now, + } +} + +// ScheduleEvery runs the job every `interval` until the context is cancelled. +func (j *ListenStreakReminderJob) ScheduleEvery(ctx context.Context, interval time.Duration) *ListenStreakReminderJob { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job shutting down") + return + } + } + }() + return j +} + +// Run executes the job once. +func (j *ListenStreakReminderJob) Run(ctx context.Context) { + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } +} + +func (j *ListenStreakReminderJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + now := j.now() + windowEnd := now.Add(-listenStreakBuffer) + windowStart := now.Add(-listenStreakBuffer - time.Hour) + // Stage uses a tight 1-2 minute window so the flow can be exercised + // end-to-end without waiting ~2 days, matching Python's env branch. + if j.env == "stage" { + windowEnd = now.Add(-1 * time.Minute) + windowStart = now.Add(-2 * time.Minute) + } + + res, err := j.pool.Exec(ctx, ` + INSERT INTO notification (specifier, group_id, blocknumber, user_ids, type, data, timestamp) + SELECT + cls.user_id::text, + 'listen_streak_reminder:' || cls.user_id::text || ':' || to_char(cls.last_listen_date, 'YYYY-MM-DD'), + NULL, + ARRAY[cls.user_id], + 'listen_streak_reminder', + jsonb_build_object('streak', cls.listen_streak), + @now + FROM challenge_listen_streak cls + WHERE cls.last_listen_date BETWEEN @window_start AND @window_end + ON CONFLICT (group_id, specifier) DO NOTHING + `, pgx.NamedArgs{ + "now": now, + "window_start": windowStart, + "window_end": windowEnd, + }) + if err != nil { + return fmt.Errorf("insert listen_streak_reminder notifications: %w", err) + } + + if n := res.RowsAffected(); n > 0 { + j.logger.Info("Inserted listen_streak_reminder notifications", zap.Int64("count", n)) + } + return nil +} diff --git a/jobs/create_listen_streak_reminder_notifications_test.go b/jobs/create_listen_streak_reminder_notifications_test.go new file mode 100644 index 00000000..2f349ec0 --- /dev/null +++ b/jobs/create_listen_streak_reminder_notifications_test.go @@ -0,0 +1,58 @@ +package jobs + +import ( + "context" + "testing" + "time" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestListenStreakReminder_Window verifies that only users whose last listen +// falls in the 42-43h reminder window are notified, and that re-running is +// idempotent for the same (user, last-listen date). +func TestListenStreakReminder_Window(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + + ctx := context.Background() + now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) + + database.Seed(pool, database.FixtureMap{ + "users": { + {"user_id": 1, "wallet": "0x01"}, + {"user_id": 2, "wallet": "0x02"}, + {"user_id": 3, "wallet": "0x03"}, + }, + "challenge_listen_streak": { + // In window (42.5h ago) -> notified. + {"user_id": 1, "listen_streak": 5, "last_listen_date": now.Add(-42*time.Hour - 30*time.Minute)}, + // Too recent (40h ago) -> excluded. + {"user_id": 2, "listen_streak": 3, "last_listen_date": now.Add(-40 * time.Hour)}, + // Too old (50h ago, streak already broken) -> excluded. + {"user_id": 3, "listen_streak": 9, "last_listen_date": now.Add(-50 * time.Hour)}, + }, + }) + + job := NewListenStreakReminderJob(newTestConfig(), pool) + job.now = func() time.Time { return now } + require.NoError(t, job.run(ctx)) + + var typ, groupID, specifier string + var userIDs []int32 + require.NoError(t, pool.QueryRow(ctx, + `SELECT type, group_id, specifier, user_ids FROM notification WHERE type = 'listen_streak_reminder'`, + ).Scan(&typ, &groupID, &specifier, &userIDs)) + assert.Equal(t, "listen_streak_reminder", typ) + assert.Equal(t, "listen_streak_reminder:1:2025-01-13", groupID) + assert.Equal(t, "1", specifier) + assert.Equal(t, []int32{1}, userIDs) + + assert.Equal(t, 1, countNotifications(t, ctx, pool, "listen_streak_reminder")) + + // Idempotent: a second run inserts nothing more. + require.NoError(t, job.run(ctx)) + assert.Equal(t, 1, countNotifications(t, ctx, pool, "listen_streak_reminder")) +} diff --git a/jobs/create_remix_contest_notifications.go b/jobs/create_remix_contest_notifications.go new file mode 100644 index 00000000..45ff402c --- /dev/null +++ b/jobs/create_remix_contest_notifications.go @@ -0,0 +1,269 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/logging" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +// RemixContestNotificationsJob emits the four time-based remix-contest +// notifications. Mirrors +// apps/packages/discovery-provider/src/tasks/create_remix_contest_notifications.py +// and its four sub-tasks under remix_contest_notifications/. +// +// All four select remix_contest events whose end_date falls in a window +// relative to now, guard that the contest's parent track is still +// current/visible, and insert one notification per recipient. uq_notification +// (group_id, specifier) — where specifier is the recipient user id — provides +// idempotency, so a recipient is notified at most once per (event, type). +type RemixContestNotificationsJob struct { + pool database.DbPool + logger *zap.Logger + now func() time.Time + + mutex sync.Mutex + isRunning bool +} + +const ( + remixContestEndedWindowHours = 24 * time.Hour + fanRemixContestEndingSoonWindow = 72 * time.Hour + artistRemixContestEndingSoonWindow = 48 * time.Hour +) + +func NewRemixContestNotificationsJob(cfg config.Config, pool database.DbPool) *RemixContestNotificationsJob { + return &RemixContestNotificationsJob{ + pool: pool, + logger: logging.NewZapLogger(cfg).Named("RemixContestNotificationsJob"), + now: time.Now, + } +} + +// ScheduleEvery runs the job every `interval` until the context is cancelled. +func (j *RemixContestNotificationsJob) ScheduleEvery(ctx context.Context, interval time.Duration) *RemixContestNotificationsJob { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job shutting down") + return + } + } + }() + return j +} + +// Run executes the job once. +func (j *RemixContestNotificationsJob) Run(ctx context.Context) { + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } +} + +func (j *RemixContestNotificationsJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + now := j.now() + + // Each sub-step is independent; a failure in one shouldn't block the + // others, mirroring the Python dispatcher's per-call isolation. + steps := []struct { + name string + fn func(context.Context, time.Time) (int64, error) + }{ + {"fan_remix_contest_ended", j.fanEnded}, + {"fan_remix_contest_ending_soon", j.fanEndingSoon}, + {"artist_remix_contest_ended", j.artistEnded}, + {"artist_remix_contest_ending_soon", j.artistEndingSoon}, + } + var firstErr error + for _, s := range steps { + n, err := s.fn(ctx, now) + if err != nil { + j.logger.Error("remix contest notification step failed", zap.String("step", s.name), zap.Error(err)) + if firstErr == nil { + firstErr = err + } + continue + } + if n > 0 { + j.logger.Info("Inserted remix contest notifications", zap.String("step", s.name), zap.Int64("count", n)) + } + } + return firstErr +} + +// fanEnded notifies remixers, event subscribers, host followers, and parent +// track favoriters (excluding the host) that a contest they engaged with ended +// in the last 24h. +func (j *RemixContestNotificationsJob) fanEnded(ctx context.Context, now time.Time) (int64, error) { + res, err := j.pool.Exec(ctx, ` + INSERT INTO notification (specifier, group_id, blocknumber, user_ids, type, data, timestamp) + SELECT + aud.user_id::text, + 'fan_remix_contest_ended:' || e.event_id::text, + NULL, + ARRAY[aud.user_id], + 'fan_remix_contest_ended', + jsonb_build_object('entity_id', e.entity_id, 'entity_user_id', pt.owner_id), + @now + FROM events e + JOIN tracks pt ON pt.track_id = e.entity_id + AND pt.is_current AND NOT pt.is_delete AND NOT pt.is_unlisted + CROSS JOIN LATERAL ( + SELECT t.owner_id AS user_id FROM tracks t + WHERE t.is_current AND NOT t.is_delete AND t.remix_of IS NOT NULL + AND t.remix_of->'tracks' @> jsonb_build_array(jsonb_build_object('parent_track_id', e.entity_id)) + UNION + SELECT s.subscriber_id FROM subscriptions s + WHERE s.user_id = e.event_id AND s.entity_type = 'Event' + AND s.is_current AND NOT s.is_delete + UNION + SELECT f.follower_user_id FROM follows f + WHERE f.followee_user_id = e.user_id AND f.is_current AND NOT f.is_delete + UNION + SELECT sv.user_id FROM saves sv + WHERE sv.save_item_id = e.entity_id AND sv.save_type = 'track' + AND sv.is_current AND NOT sv.is_delete + ) aud + WHERE e.event_type = 'remix_contest' AND NOT e.is_deleted + AND e.end_date IS NOT NULL + AND e.end_date BETWEEN @window_start AND @window_end + AND aud.user_id <> e.user_id + ON CONFLICT (group_id, specifier) DO NOTHING + `, pgx.NamedArgs{ + "now": now, + "window_start": now.Add(-remixContestEndedWindowHours), + "window_end": now, + }) + if err != nil { + return 0, err + } + return res.RowsAffected(), nil +} + +// fanEndingSoon notifies host followers, parent track favoriters, and event +// subscribers (excluding the host) that a contest ends within 72h. +func (j *RemixContestNotificationsJob) fanEndingSoon(ctx context.Context, now time.Time) (int64, error) { + res, err := j.pool.Exec(ctx, ` + INSERT INTO notification (specifier, group_id, blocknumber, user_ids, type, data, timestamp) + SELECT + aud.user_id::text, + 'fan_remix_contest_ending_soon:' || e.event_id::text, + NULL, + ARRAY[aud.user_id], + 'fan_remix_contest_ending_soon', + jsonb_build_object('entity_id', e.entity_id, 'entity_user_id', pt.owner_id), + @now + FROM events e + JOIN tracks pt ON pt.track_id = e.entity_id + AND pt.is_current AND NOT pt.is_delete AND NOT pt.is_unlisted + CROSS JOIN LATERAL ( + SELECT f.follower_user_id AS user_id FROM follows f + WHERE f.followee_user_id = e.user_id AND f.is_current AND NOT f.is_delete + UNION + SELECT sv.user_id FROM saves sv + WHERE sv.save_item_id = e.entity_id AND sv.save_type = 'track' + AND sv.is_current AND NOT sv.is_delete + UNION + SELECT s.subscriber_id FROM subscriptions s + WHERE s.user_id = e.event_id AND s.entity_type = 'Event' + AND s.is_current AND NOT s.is_delete + ) aud + WHERE e.event_type = 'remix_contest' AND NOT e.is_deleted + AND e.end_date IS NOT NULL + AND e.end_date BETWEEN @window_start AND @window_end + AND aud.user_id <> e.user_id + ON CONFLICT (group_id, specifier) DO NOTHING + `, pgx.NamedArgs{ + "now": now, + "window_start": now, + "window_end": now.Add(fanRemixContestEndingSoonWindow), + }) + if err != nil { + return 0, err + } + return res.RowsAffected(), nil +} + +// artistEnded notifies the contest host that their contest ended in the last 24h. +func (j *RemixContestNotificationsJob) artistEnded(ctx context.Context, now time.Time) (int64, error) { + res, err := j.pool.Exec(ctx, ` + INSERT INTO notification (specifier, group_id, blocknumber, user_ids, type, data, timestamp) + SELECT + e.user_id::text, + 'artist_remix_contest_ended:' || e.event_id::text, + NULL, + ARRAY[e.user_id], + 'artist_remix_contest_ended', + jsonb_build_object('entity_id', e.entity_id), + @now + FROM events e + JOIN tracks pt ON pt.track_id = e.entity_id + AND pt.is_current AND NOT pt.is_delete AND NOT pt.is_unlisted + WHERE e.event_type = 'remix_contest' AND NOT e.is_deleted + AND e.end_date IS NOT NULL + AND e.end_date BETWEEN @window_start AND @window_end + ON CONFLICT (group_id, specifier) DO NOTHING + `, pgx.NamedArgs{ + "now": now, + "window_start": now.Add(-remixContestEndedWindowHours), + "window_end": now, + }) + if err != nil { + return 0, err + } + return res.RowsAffected(), nil +} + +// artistEndingSoon notifies the contest host that their contest ends within 48h. +func (j *RemixContestNotificationsJob) artistEndingSoon(ctx context.Context, now time.Time) (int64, error) { + res, err := j.pool.Exec(ctx, ` + INSERT INTO notification (specifier, group_id, blocknumber, user_ids, type, data, timestamp) + SELECT + e.user_id::text, + 'artist_remix_contest_ending_soon:' || e.event_id::text, + NULL, + ARRAY[e.user_id], + 'artist_remix_contest_ending_soon', + jsonb_build_object('entity_id', e.entity_id, 'entity_user_id', pt.owner_id), + @now + FROM events e + JOIN tracks pt ON pt.track_id = e.entity_id + AND pt.is_current AND NOT pt.is_delete AND NOT pt.is_unlisted + WHERE e.event_type = 'remix_contest' AND NOT e.is_deleted + AND e.end_date IS NOT NULL + AND e.end_date BETWEEN @window_start AND @window_end + ON CONFLICT (group_id, specifier) DO NOTHING + `, pgx.NamedArgs{ + "now": now, + "window_start": now, + "window_end": now.Add(artistRemixContestEndingSoonWindow), + }) + if err != nil { + return 0, err + } + return res.RowsAffected(), nil +} diff --git a/jobs/create_remix_contest_notifications_test.go b/jobs/create_remix_contest_notifications_test.go new file mode 100644 index 00000000..254828ba --- /dev/null +++ b/jobs/create_remix_contest_notifications_test.go @@ -0,0 +1,126 @@ +package jobs + +import ( + "context" + "testing" + "time" + + "api.audius.co/database" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestRemixContest_Ended verifies that a remix contest whose end_date fell in +// the last 24h notifies the full fan audience (remixers, host followers, parent +// track favoriters, event subscribers — minus the host) plus the host, and that +// the ending-soon steps stay silent. +func TestRemixContest_Ended(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + + ctx := context.Background() + now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) + + seedRemixContest(t, pool, now.Add(-1*time.Hour)) + + job := NewRemixContestNotificationsJob(newTestConfig(), pool) + job.now = func() time.Time { return now } + require.NoError(t, job.run(ctx)) + + // Fan audience: remixer(2), follower(3), favoriter(4), subscriber(5). + assert.Equal(t, 4, countNotifications(t, ctx, pool, "fan_remix_contest_ended")) + // Host only. + assert.Equal(t, 1, countNotifications(t, ctx, pool, "artist_remix_contest_ended")) + // Future-window steps do not fire for an already-ended contest. + assert.Equal(t, 0, countNotifications(t, ctx, pool, "fan_remix_contest_ending_soon")) + assert.Equal(t, 0, countNotifications(t, ctx, pool, "artist_remix_contest_ending_soon")) + + // Host is never in the fan audience. + var fanIDs []int32 + rows, err := pool.Query(ctx, + `SELECT specifier::int FROM notification WHERE type = 'fan_remix_contest_ended' ORDER BY 1`) + require.NoError(t, err) + for rows.Next() { + var id int32 + require.NoError(t, rows.Scan(&id)) + fanIDs = append(fanIDs, id) + } + require.NoError(t, rows.Err()) + assert.Equal(t, []int32{2, 3, 4, 5}, fanIDs) + + // Idempotent. + require.NoError(t, job.run(ctx)) + assert.Equal(t, 4, countNotifications(t, ctx, pool, "fan_remix_contest_ended")) + assert.Equal(t, 1, countNotifications(t, ctx, pool, "artist_remix_contest_ended")) +} + +// TestRemixContest_EndingSoon verifies that a contest ending within the +// soon-window notifies followers, favoriters, and subscribers (but not +// remixers) plus the host, and that the ended steps stay silent. +func TestRemixContest_EndingSoon(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + + ctx := context.Background() + now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) + + // Within both the 48h artist window and the 72h fan window. + seedRemixContest(t, pool, now.Add(36*time.Hour)) + + job := NewRemixContestNotificationsJob(newTestConfig(), pool) + job.now = func() time.Time { return now } + require.NoError(t, job.run(ctx)) + + // Fan ending-soon audience excludes remixers: follower(3), favoriter(4), subscriber(5). + assert.Equal(t, 3, countNotifications(t, ctx, pool, "fan_remix_contest_ending_soon")) + assert.Equal(t, 1, countNotifications(t, ctx, pool, "artist_remix_contest_ending_soon")) + assert.Equal(t, 0, countNotifications(t, ctx, pool, "fan_remix_contest_ended")) + assert.Equal(t, 0, countNotifications(t, ctx, pool, "artist_remix_contest_ended")) + + var fanIDs []int32 + rows, err := pool.Query(ctx, + `SELECT specifier::int FROM notification WHERE type = 'fan_remix_contest_ending_soon' ORDER BY 1`) + require.NoError(t, err) + for rows.Next() { + var id int32 + require.NoError(t, rows.Scan(&id)) + fanIDs = append(fanIDs, id) + } + require.NoError(t, rows.Err()) + assert.Equal(t, []int32{3, 4, 5}, fanIDs) +} + +// seedRemixContest sets up a single remix_contest event (event_id 500, host +// user 100, parent track 1000) with one of each fan-audience source and the +// given end_date. +func seedRemixContest(t *testing.T, pool *pgxpool.Pool, endDate time.Time) { + t.Helper() + database.Seed(pool, database.FixtureMap{ + "users": { + {"user_id": 100, "wallet": "0x100"}, // host + {"user_id": 2, "wallet": "0x02"}, // remixer + {"user_id": 3, "wallet": "0x03"}, // host follower + {"user_id": 4, "wallet": "0x04"}, // parent track favoriter + {"user_id": 5, "wallet": "0x05"}, // event subscriber + }, + "tracks": { + {"track_id": 1000, "owner_id": 100, "title": "parent"}, + {"track_id": 1001, "owner_id": 2, "title": "remix", + "remix_of": `{"tracks":[{"parent_track_id":1000}]}`}, + }, + "events": { + {"event_id": 500, "entity_type": "track", "user_id": 100, "entity_id": 1000, + "event_type": "remix_contest", "end_date": endDate}, + }, + "follows": { + {"follower_user_id": 3, "followee_user_id": 100}, + }, + "saves": { + {"user_id": 4, "save_item_id": 1000, "save_type": "track"}, + }, + "subscriptions": { + {"subscriber_id": 5, "user_id": 500, "entity_type": "Event", "entity_id": 1000}, + }, + }) +} From d9bce14e75e31a83d177f17b8f420150de2295a0 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 29 May 2026 15:03:46 -0700 Subject: [PATCH 2/2] test(notifications): seed challenges before user_challenges to avoid FK flake database.Seed processes non-entity tables in randomized map order, so a single combined fixture could insert user_challenges before its FK parent challenges, causing a flaky 23503 in CI. Split into two Seed calls to force the ordering. Co-Authored-By: Claude Opus 4.7 --- jobs/create_engagement_notifications_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/jobs/create_engagement_notifications_test.go b/jobs/create_engagement_notifications_test.go index e6b112ad..704319e8 100644 --- a/jobs/create_engagement_notifications_test.go +++ b/jobs/create_engagement_notifications_test.go @@ -21,11 +21,16 @@ func TestEngagementNotifications_FullPipeline(t *testing.T) { now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) completedAt := now.Add(-8 * 24 * time.Hour) // past the 7-day cooldown + // Seed challenges before user_challenges: the latter has a FK to the former + // and database.Seed processes non-entity tables in randomized map order, so + // a single combined call can violate the FK. Two calls force the order. database.Seed(pool, database.FixtureMap{ "users": {{"user_id": 1, "wallet": "0x01"}}, "challenges": { {"id": "b", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 7}, }, + }) + database.Seed(pool, database.FixtureMap{ "user_challenges": { {"challenge_id": "b", "user_id": 1, "specifier": "s1", "is_complete": true, "amount": 100, "completed_at": completedAt, "completed_blocknumber": 42}, @@ -70,12 +75,16 @@ func TestEngagementNotifications_Exclusions(t *testing.T) { now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) old := now.Add(-8 * 24 * time.Hour) + // Seed challenges before user_challenges (FK dependency + randomized Seed + // ordering — see TestEngagementNotifications_FullPipeline). database.Seed(pool, database.FixtureMap{ "users": {{"user_id": 1, "wallet": "0x01"}}, "challenges": { {"id": "cd7", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 7}, {"id": "cd14", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 14}, }, + }) + database.Seed(pool, database.FixtureMap{ "user_challenges": { // Disbursed already -> excluded. {"challenge_id": "cd7", "user_id": 1, "specifier": "disbursed", "is_complete": true, "amount": 100, "completed_at": old},