Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) {
// scanners live in api/jobs/challenges/.
jobs.NewIndexChallengesJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 30*time.Second)

// Time-based notifications that the legacy Python beat produced. Unlike
// the event-driven notifications (handled by DB triggers), these fire on
// a timer because they depend on elapsed time, not an indexed entity.
jobs.NewEngagementNotificationsJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 10*time.Minute)

jobs.NewListenStreakReminderJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 10*time.Second)

jobs.NewRemixContestNotificationsJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 30*time.Second)
}

func (ci *CoreIndexer) Close() {
Expand Down
138 changes: 138 additions & 0 deletions jobs/create_engagement_notifications.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package jobs

import (
"context"
"fmt"
"sync"
"time"

"api.audius.co/config"
"api.audius.co/database"
"api.audius.co/logging"
"github.com/jackc/pgx/v5"
"go.uber.org/zap"
)

// EngagementNotificationsJob inserts "claimable_reward" notifications for
// challenges whose 7-day cooldown has elapsed but which haven't been disbursed
// or notified yet. Mirrors
// apps/packages/discovery-provider/src/tasks/create_engagement_notifications.py.
//
// This complements the handle_on_user_challenge DB trigger rather than
// duplicating it: that trigger emits an immediate claimable_reward only for
// cooldown_days = 0 challenges, and emits reward_in_cooldown for cooldown_days
// > 0. This job is what later promotes the 7-day-cooldown challenges to
// claimable_reward once their cooldown elapses — hence the cooldown_days = 7
// filter, matching Python's hardcoded check.
type EngagementNotificationsJob struct {
pool database.DbPool
logger *zap.Logger
now func() time.Time

mutex sync.Mutex
isRunning bool
}

// engagementStartDatetime matches Python's START_DATETIME — challenges
// completed before this date predate the cooldown-notification feature and are
// never notified.
const engagementStartDatetime = "2024-06-06"

const engagementBatchSize = 500

func NewEngagementNotificationsJob(cfg config.Config, pool database.DbPool) *EngagementNotificationsJob {
return &EngagementNotificationsJob{
pool: pool,
logger: logging.NewZapLogger(cfg).Named("EngagementNotificationsJob"),
now: time.Now,
}
}

// ScheduleEvery runs the job every `interval` until the context is cancelled.
func (j *EngagementNotificationsJob) ScheduleEvery(ctx context.Context, interval time.Duration) *EngagementNotificationsJob {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
j.Run(ctx)
case <-ctx.Done():
j.logger.Info("Job shutting down")
return
}
}
}()
return j
}

// Run executes the job once.
func (j *EngagementNotificationsJob) Run(ctx context.Context) {
if err := j.run(ctx); err != nil {
j.logger.Error("Job run failed", zap.Error(err))
}
}

func (j *EngagementNotificationsJob) run(ctx context.Context) error {
j.mutex.Lock()
if j.isRunning {
j.mutex.Unlock()
return fmt.Errorf("job is already running")
}
j.isRunning = true
j.mutex.Unlock()
defer func() {
j.mutex.Lock()
j.isRunning = false
j.mutex.Unlock()
}()

// completed_at < now - 1 week. The matching (group_id, specifier) dedup is
// handled by uq_notification via ON CONFLICT; the NOT EXISTS debounce
// suppresses a second claimable_reward for the same user within the hour
// before completion (Python's per-row existing_notification check).
res, err := j.pool.Exec(ctx, `
INSERT INTO notification (specifier, group_id, blocknumber, user_ids, type, data, timestamp)
SELECT
uc.specifier,
'claimable_reward:' || uc.user_id::text || ':challenge:' || uc.challenge_id || ':specifier:' || uc.specifier,
uc.completed_blocknumber,
ARRAY[uc.user_id],
'claimable_reward',
jsonb_build_object(
'specifier', uc.specifier,
'challenge_id', uc.challenge_id,
'amount', uc.amount
),
uc.completed_at
FROM user_challenges uc
JOIN challenges c ON c.id = uc.challenge_id
LEFT JOIN challenge_disbursements cd
ON cd.challenge_id = uc.challenge_id AND cd.specifier = uc.specifier
WHERE uc.is_complete
AND uc.completed_at >= @start_datetime
AND uc.completed_at < @cooldown_cutoff
AND c.cooldown_days = 7
AND cd.specifier IS NULL
AND NOT EXISTS (
SELECT 1 FROM notification n
WHERE n.type = 'claimable_reward'
AND n.user_ids @> ARRAY[uc.user_id]
AND n.timestamp >= uc.completed_at - INTERVAL '1 hour'
)
LIMIT @batch_size
ON CONFLICT (group_id, specifier) DO NOTHING
`, pgx.NamedArgs{
"start_datetime": engagementStartDatetime,
"cooldown_cutoff": j.now().Add(-7 * 24 * time.Hour),
"batch_size": engagementBatchSize,
})
if err != nil {
return fmt.Errorf("insert claimable_reward notifications: %w", err)
}

if n := res.RowsAffected(); n > 0 {
j.logger.Info("Inserted claimable_reward notifications", zap.Int64("count", n))
}
return nil
}
116 changes: 116 additions & 0 deletions jobs/create_engagement_notifications_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package jobs

import (
"context"
"testing"
"time"

"api.audius.co/database"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestEngagementNotifications_FullPipeline verifies a completed, undisbursed,
// 7-day-cooldown challenge past its cooldown produces exactly one
// claimable_reward notification, and re-running is idempotent.
func TestEngagementNotifications_FullPipeline(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_jobs")
defer pool.Close()

ctx := context.Background()
now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC)
completedAt := now.Add(-8 * 24 * time.Hour) // past the 7-day cooldown

// Seed challenges before user_challenges: the latter has a FK to the former
// and database.Seed processes non-entity tables in randomized map order, so
// a single combined call can violate the FK. Two calls force the order.
database.Seed(pool, database.FixtureMap{
"users": {{"user_id": 1, "wallet": "0x01"}},
"challenges": {
{"id": "b", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 7},
},
})
database.Seed(pool, database.FixtureMap{
"user_challenges": {
{"challenge_id": "b", "user_id": 1, "specifier": "s1", "is_complete": true,
"amount": 100, "completed_at": completedAt, "completed_blocknumber": 42},
},
})

job := NewEngagementNotificationsJob(newTestConfig(), pool)
job.now = func() time.Time { return now }

require.NoError(t, job.run(ctx))

var typ, groupID, specifier string
var userIDs []int32
require.NoError(t, pool.QueryRow(ctx,
`SELECT type, group_id, specifier, user_ids FROM notification WHERE type = 'claimable_reward'`,
).Scan(&typ, &groupID, &specifier, &userIDs))
assert.Equal(t, "claimable_reward", typ)
assert.Equal(t, "claimable_reward:1:challenge:b:specifier:s1", groupID)
assert.Equal(t, "s1", specifier)
assert.Equal(t, []int32{1}, userIDs)

// Idempotent: a second run inserts nothing more.
require.NoError(t, job.run(ctx))
assert.Equal(t, 1, countNotifications(t, ctx, pool, "claimable_reward"))
}

// TestEngagementNotifications_Exclusions verifies the cooldown, disbursement,
// and not-yet-elapsed filters all suppress a claimable_reward from the job.
//
// Note on the test fixtures: the handle_on_user_challenge DB trigger fires when
// a complete user_challenge is seeded. For a challenge with cooldown_days > 0 it
// inserts a `reward_in_cooldown` (never a `claimable_reward`); only a
// cooldown_days = 0 challenge would get an immediate `claimable_reward` from the
// trigger. We therefore use positive, non-7 cooldowns for the "wrong cooldown"
// row so the trigger never produces a claimable_reward, isolating the job's
// behavior — which only ever emits claimable_reward for cooldown_days = 7.
func TestEngagementNotifications_Exclusions(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_jobs")
defer pool.Close()

ctx := context.Background()
now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC)
old := now.Add(-8 * 24 * time.Hour)

// Seed challenges before user_challenges (FK dependency + randomized Seed
// ordering — see TestEngagementNotifications_FullPipeline).
database.Seed(pool, database.FixtureMap{
"users": {{"user_id": 1, "wallet": "0x01"}},
"challenges": {
{"id": "cd7", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 7},
{"id": "cd14", "type": "aggregate", "amount": "10", "active": true, "cooldown_days": 14},
},
})
database.Seed(pool, database.FixtureMap{
"user_challenges": {
// Disbursed already -> excluded.
{"challenge_id": "cd7", "user_id": 1, "specifier": "disbursed", "is_complete": true, "amount": 100, "completed_at": old},
// Wrong cooldown_days (not 7) -> excluded by the job's cooldown_days = 7 filter.
{"challenge_id": "cd14", "user_id": 1, "specifier": "wrongCooldown", "is_complete": true, "amount": 100, "completed_at": old},
// Cooldown not yet elapsed -> excluded.
{"challenge_id": "cd7", "user_id": 1, "specifier": "tooRecent", "is_complete": true, "amount": 100, "completed_at": now.Add(-2 * 24 * time.Hour)},
// Not complete -> excluded.
{"challenge_id": "cd7", "user_id": 1, "specifier": "incomplete", "is_complete": false, "amount": 100, "completed_at": old},
},
"challenge_disbursements": {
{"challenge_id": "cd7", "user_id": 1, "specifier": "disbursed", "signature": "sig", "amount": "100"},
},
})

job := NewEngagementNotificationsJob(newTestConfig(), pool)
job.now = func() time.Time { return now }
require.NoError(t, job.run(ctx))

assert.Equal(t, 0, countNotifications(t, ctx, pool, "claimable_reward"))
}

func countNotifications(t *testing.T, ctx context.Context, pool database.DbPool, typ string) int {
t.Helper()
var n int
require.NoError(t, pool.QueryRow(ctx,
"SELECT COUNT(*) FROM notification WHERE type = $1", typ).Scan(&n))
return n
}
122 changes: 122 additions & 0 deletions jobs/create_listen_streak_reminder_notifications.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package jobs

import (
"context"
"fmt"
"sync"
"time"

"api.audius.co/config"
"api.audius.co/database"
"api.audius.co/logging"
"github.com/jackc/pgx/v5"
"go.uber.org/zap"
)

// ListenStreakReminderJob reminds users whose listen streak is about to lapse.
// Mirrors
// apps/packages/discovery-provider/src/tasks/create_listen_streak_reminder_notifications.py.
//
// A streak is kept alive by listening at least once every 48 hours. We notify
// in the hour-wide window 42-43 hours after the last listen — 6 hours of slack
// before the streak breaks. The (group_id, specifier) pair encodes user +
// last-listen date, so uq_notification prevents re-reminding for the same day.
type ListenStreakReminderJob struct {
pool database.DbPool
logger *zap.Logger
env string
now func() time.Time

mutex sync.Mutex
isRunning bool
}

// listenStreakBuffer is the hours-since-last-listen at which we remind (2 days
// minus 6 hours). Matches Python's LISTEN_STREAK_BUFFER.
const listenStreakBuffer = 42 * time.Hour

func NewListenStreakReminderJob(cfg config.Config, pool database.DbPool) *ListenStreakReminderJob {
return &ListenStreakReminderJob{
pool: pool,
logger: logging.NewZapLogger(cfg).Named("ListenStreakReminderJob"),
env: cfg.Env,
now: time.Now,
}
}

// ScheduleEvery runs the job every `interval` until the context is cancelled.
func (j *ListenStreakReminderJob) ScheduleEvery(ctx context.Context, interval time.Duration) *ListenStreakReminderJob {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
j.Run(ctx)
case <-ctx.Done():
j.logger.Info("Job shutting down")
return
}
}
}()
return j
}

// Run executes the job once.
func (j *ListenStreakReminderJob) Run(ctx context.Context) {
if err := j.run(ctx); err != nil {
j.logger.Error("Job run failed", zap.Error(err))
}
}

func (j *ListenStreakReminderJob) run(ctx context.Context) error {
j.mutex.Lock()
if j.isRunning {
j.mutex.Unlock()
return fmt.Errorf("job is already running")
}
j.isRunning = true
j.mutex.Unlock()
defer func() {
j.mutex.Lock()
j.isRunning = false
j.mutex.Unlock()
}()

now := j.now()
windowEnd := now.Add(-listenStreakBuffer)
windowStart := now.Add(-listenStreakBuffer - time.Hour)
// Stage uses a tight 1-2 minute window so the flow can be exercised
// end-to-end without waiting ~2 days, matching Python's env branch.
if j.env == "stage" {
windowEnd = now.Add(-1 * time.Minute)
windowStart = now.Add(-2 * time.Minute)
}

res, err := j.pool.Exec(ctx, `
INSERT INTO notification (specifier, group_id, blocknumber, user_ids, type, data, timestamp)
SELECT
cls.user_id::text,
'listen_streak_reminder:' || cls.user_id::text || ':' || to_char(cls.last_listen_date, 'YYYY-MM-DD'),
NULL,
ARRAY[cls.user_id],
'listen_streak_reminder',
jsonb_build_object('streak', cls.listen_streak),
@now
FROM challenge_listen_streak cls
WHERE cls.last_listen_date BETWEEN @window_start AND @window_end
ON CONFLICT (group_id, specifier) DO NOTHING
`, pgx.NamedArgs{
"now": now,
"window_start": windowStart,
"window_end": windowEnd,
})
if err != nil {
return fmt.Errorf("insert listen_streak_reminder notifications: %w", err)
}

if n := res.RowsAffected(); n > 0 {
j.logger.Info("Inserted listen_streak_reminder notifications", zap.Int64("count", n))
}
return nil
}
Loading
Loading