diff --git a/ddl/migrations/0204_seed_phase_2_challenges.sql b/ddl/migrations/0204_seed_phase_2_challenges.sql new file mode 100644 index 00000000..e0ebfa66 --- /dev/null +++ b/ddl/migrations/0204_seed_phase_2_challenges.sql @@ -0,0 +1,35 @@ +-- Seed catalog rows for Phase 2 challenge processors. +-- +-- Values mirror apps/packages/discovery-provider/src/challenges/challenges.json +-- with ON CONFLICT DO UPDATE so the catalog stays aligned (matches apps' +-- create_new_challenges.py behavior). +-- +-- Phase 2 set (all aggregate type — they accumulate per-occurrence): +-- c first_weekly_comment (each ISO week) +-- t tastemaker (10 catalog amount × per-track win) +-- cp comment_pin (pinned by verified artist on their track) +-- cs cosign (parent owner cosigns a remix; CURRENTLY INACTIVE in apps) +-- w remix_contest_winner (winner of remix contest hosted by verified user) +-- b audio_matching_buyer (USDC content purchase, file under buyer) +-- s audio_matching_seller (same purchase, file under verified seller) + +BEGIN; + +INSERT INTO challenges (id, type, amount, active, step_count, starting_block, weekly_pool, cooldown_days) VALUES + ('c', 'aggregate', '1', true, 2147483647, 0, 2147483647, 7), + ('t', 'aggregate', '100', true, 2147483647, 0, 2147483647, 7), + ('cp', 'aggregate', '10', true, 2147483647, 1979515, 2147483647, 7), + ('cs', 'aggregate', '1000', false, 2147483647, 95017582, 50000, 7), + ('w', 'aggregate', '1000', true, 2147483647, 98950182, 50000, 7), + ('b', 'aggregate', '1', true, 2147483647, 220157041, 25000, 7), + ('s', 'aggregate', '5', true, 2147483647, 220157041, 25000, 7) +ON CONFLICT (id) DO UPDATE SET + type = EXCLUDED.type, + amount = EXCLUDED.amount, + active = EXCLUDED.active, + step_count = EXCLUDED.step_count, + starting_block = EXCLUDED.starting_block, + weekly_pool = EXCLUDED.weekly_pool, + cooldown_days = EXCLUDED.cooldown_days; + +COMMIT; diff --git a/jobs/challenges/audio_matching.go b/jobs/challenges/audio_matching.go new file mode 100644 index 00000000..e016f957 --- /dev/null +++ b/jobs/challenges/audio_matching.go @@ -0,0 +1,131 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// AudioMatchingProcessor implements challenges "b" and "s" — both fire on +// USDC content purchases (tracks/playlists/albums), with different +// recipients and gates. +// +// Source: v_usdc_purchases — a stable view over sol_purchases that exposes +// (buyer_user_id, seller_user_id, content_id, amount, slot, created_at). +// +// Specifier (matches apps for both b and s): +// +// : +// +// Amount = challenge.amount × dollars (catalog: b=1, s=5; dollars = amount/1e6). +// "b" recipient = buyer; "s" recipient = seller, and only if seller is verified. +type AudioMatchingProcessor struct { + ID string + ForBuyer bool // true => buyer earns; false => seller earns + VerifyOnly bool // only s gates on seller is_verified +} + +func NewAudioMatchingBuyerProcessor() Processor { + return &AudioMatchingProcessor{ID: "b", ForBuyer: true, VerifyOnly: false} +} +func NewAudioMatchingSellerProcessor() Processor { + return &AudioMatchingProcessor{ID: "s", ForBuyer: false, VerifyOnly: true} +} + +func (p *AudioMatchingProcessor) ChallengeID() string { return p.ID } + +const usdcDecimals = 6 + +func (p *AudioMatchingProcessor) checkpointName() string { + return "challenges:" + p.ID + ":last_slot" +} + +func (p *AudioMatchingProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + catalogAmount := c.AmountInt() + + prev, err := readCheckpointInt(ctx, tx, p.checkpointName()) + if err != nil { + return fmt.Errorf("read checkpoint: %w", err) + } + + // VerifyOnly adds a JOIN to filter seller verification at query time so + // we don't allocate rows we'll throw away. + verifyJoin := "" + if p.VerifyOnly { + verifyJoin = ` + JOIN users seller ON seller.user_id = v.seller_user_id + AND seller.is_current = true + AND seller.is_verified = true + ` + } + + query := ` + SELECT v.slot, v.buyer_user_id, v.seller_user_id, v.content_id, v.amount + FROM v_usdc_purchases v + ` + verifyJoin + ` + WHERE v.slot > $1 + AND v.seller_user_id IS NOT NULL + ORDER BY v.slot ASC + ` + rows, err := tx.Query(ctx, query, prev) + if err != nil { + return fmt.Errorf("scan purchases: %w", err) + } + type purchaseRow struct { + slot int64 + buyerID int64 + sellerID int64 + contentID int64 + amountMicro int64 + } + var results []purchaseRow + maxSlot := prev + for rows.Next() { + var r purchaseRow + if err := rows.Scan(&r.slot, &r.buyerID, &r.sellerID, &r.contentID, &r.amountMicro); err != nil { + rows.Close() + return err + } + results = append(results, r) + if r.slot > maxSlot { + maxSlot = r.slot + } + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, r := range results { + dollars := int32(r.amountMicro / 1_000_000) + if dollars <= 0 { + continue // shouldn't happen for valid purchases; defensive + } + recipient := r.buyerID + if !p.ForBuyer { + recipient = r.sellerID + } + rewardAmount := catalogAmount * dollars + specifier := fmt.Sprintf("%x:%x", r.buyerID, r.contentID) + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, recipient, 1, 1, rewardAmount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + + if maxSlot > prev { + if err := writeCheckpointInt(ctx, tx, p.checkpointName(), maxSlot); err != nil { + return fmt.Errorf("save checkpoint: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/audio_matching_test.go b/jobs/challenges/audio_matching_test.go new file mode 100644 index 00000000..b9f136f3 --- /dev/null +++ b/jobs/challenges/audio_matching_test.go @@ -0,0 +1,117 @@ +package challenges + +import ( + "context" + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestAudioMatching_BuyerAndSeller — one purchase mints two user_challenges +// rows: 'b' filed under the buyer, 's' filed under the (verified) seller. +// Amount math: catalog × dollars (b=1×, s=5×). +func TestAudioMatching_BuyerAndSeller(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_am", "number": 1}}, + "users": { + {"user_id": 1400, "wallet": "0x1400", "is_verified": false}, // buyer + {"user_id": 1401, "wallet": "0x1401", "is_verified": true}, // seller (verified) + }, + "tracks": {{"track_id": 140000, "owner_id": 1401, "title": "Premium", "blocknumber": 1}}, + }) + + // sol_purchases is the source-of-truth; v_usdc_purchases is a view over it. + // amount = 10 dollars = 10_000_000 micro-USDC. + _, err := pool.Exec(ctx, ` + INSERT INTO sol_purchases + (signature, instruction_index, amount, slot, from_account, content_type, content_id, + buyer_user_id, access_type, valid_after_blocknumber, is_valid, created_at) + VALUES ('sig-am-1', 0, 10000000, 1000, 'from_account', 'track', 140000, + 1400, 'stream', 0, true, now()) + `) + require.NoError(t, err) + + runProcessor(t, pool, NewAudioMatchingBuyerProcessor()) + runProcessor(t, pool, NewAudioMatchingSellerProcessor()) + + spec := fmt.Sprintf("%x:%x", 1400, 140000) + + rb, ok := queryUserChallenge(t, pool, "b", spec) + if assert.True(t, ok) { + assert.True(t, rb.IsComplete) + assert.Equal(t, int64(1400), rb.UserID, "b filed under buyer") + assert.Equal(t, int32(10), rb.Amount, "$10 × 1 = 10 AUDIO") + } + + rs, ok := queryUserChallenge(t, pool, "s", spec) + if assert.True(t, ok) { + assert.True(t, rs.IsComplete) + assert.Equal(t, int64(1401), rs.UserID, "s filed under seller") + assert.Equal(t, int32(50), rs.Amount, "$10 × 5 = 50 AUDIO") + } +} + +// TestAudioMatching_SellerUnverifiedNoRow — unverified seller gets no 's' +// row; buyer still earns 'b'. +func TestAudioMatching_SellerUnverifiedNoRow(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_am2", "number": 1}}, + "users": { + {"user_id": 1410, "wallet": "0x1410"}, + {"user_id": 1411, "wallet": "0x1411", "is_verified": false}, + }, + "tracks": {{"track_id": 141000, "owner_id": 1411, "title": "X", "blocknumber": 1}}, + }) + _, err := pool.Exec(ctx, ` + INSERT INTO sol_purchases + (signature, instruction_index, amount, slot, from_account, content_type, content_id, + buyer_user_id, access_type, valid_after_blocknumber, is_valid, created_at) + VALUES ('sig-am-2', 0, 5000000, 1100, 'from', 'track', 141000, + 1410, 'stream', 0, true, now()) + `) + require.NoError(t, err) + + runProcessor(t, pool, NewAudioMatchingBuyerProcessor()) + runProcessor(t, pool, NewAudioMatchingSellerProcessor()) + + spec := fmt.Sprintf("%x:%x", 1410, 141000) + _, hasB := queryUserChallenge(t, pool, "b", spec) + _, hasS := queryUserChallenge(t, pool, "s", spec) + assert.True(t, hasB, "buyer still earns") + assert.False(t, hasS, "unverified seller does not earn s") +} + +// TestAudioMatching_InvalidPurchaseExcluded — sol_purchases rows with +// is_valid=false are filtered out by v_usdc_purchases. +func TestAudioMatching_InvalidPurchaseExcluded(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_am3", "number": 1}}, + "users": {{"user_id": 1420, "wallet": "0x1420"}, {"user_id": 1421, "wallet": "0x1421", "is_verified": true}}, + "tracks": {{"track_id": 142000, "owner_id": 1421, "title": "X", "blocknumber": 1}}, + }) + _, err := pool.Exec(ctx, ` + INSERT INTO sol_purchases + (signature, instruction_index, amount, slot, from_account, content_type, content_id, + buyer_user_id, access_type, valid_after_blocknumber, is_valid, created_at) + VALUES ('sig-am-3', 0, 10000000, 1200, 'from', 'track', 142000, + 1420, 'stream', 0, false, now()) + `) + require.NoError(t, err) + + runProcessor(t, pool, NewAudioMatchingBuyerProcessor()) + + _, ok := queryUserChallenge(t, pool, "b", fmt.Sprintf("%x:%x", 1420, 142000)) + assert.False(t, ok, "invalid purchase should not earn") +} diff --git a/jobs/challenges/comment_pin.go b/jobs/challenges/comment_pin.go new file mode 100644 index 00000000..4ba6a7cd --- /dev/null +++ b/jobs/challenges/comment_pin.go @@ -0,0 +1,80 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// CommentPinProcessor implements challenge "cp" — when a verified track +// owner pins another user's comment on their track, that commenter earns +// the challenge. Mirrors apps' pin_comment dispatch in comment.py:1080. +// +// Specifier (matches apps): : +// +// Source data: tracks.pinned_comment_id joined to comments. +// Gates: +// - track owner must be verified +// - commenter != track owner (self-pins don't earn) +// +// A pin → unpin → re-pin cycle still only mints one row per (commenter, track) +// thanks to specifier uniqueness, matching apps. +type CommentPinProcessor struct{} + +func (p *CommentPinProcessor) ChallengeID() string { return "cp" } + +func (p *CommentPinProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + + rows, err := tx.Query(ctx, ` + SELECT cm.user_id AS commenter_user_id, t.track_id + FROM tracks t + JOIN comments cm ON cm.comment_id = t.pinned_comment_id + JOIN users u ON u.user_id = t.owner_id + WHERE t.pinned_comment_id IS NOT NULL + AND t.is_current = true + AND t.is_delete = false + AND u.is_current = true + AND u.is_verified = true + AND cm.is_delete = false + AND cm.user_id <> t.owner_id + `) + if err != nil { + return fmt.Errorf("scan pinned comments: %w", err) + } + type pinRow struct { + commenterUserID int64 + trackID int64 + } + var results []pinRow + for rows.Next() { + var r pinRow + if err := rows.Scan(&r.commenterUserID, &r.trackID); err != nil { + rows.Close() + return err + } + results = append(results, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, r := range results { + specifier := fmt.Sprintf("%x:%x", r.commenterUserID, r.trackID) + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, r.commenterUserID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/comment_pin_test.go b/jobs/challenges/comment_pin_test.go new file mode 100644 index 00000000..43bfd8f1 --- /dev/null +++ b/jobs/challenges/comment_pin_test.go @@ -0,0 +1,66 @@ +package challenges + +import ( + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" +) + +func TestCommentPin_VerifiedOwnerPinsOthersComment(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_cp", "number": 1}}, + "users": { + {"user_id": 900, "wallet": "0x900", "is_verified": true}, // verified track owner + {"user_id": 901, "wallet": "0x901", "is_verified": false}, // commenter + }, + "tracks": {{"track_id": 9000, "owner_id": 900, "title": "T", "blocknumber": 1, "pinned_comment_id": 1}}, + "comments": { + {"comment_id": 1, "user_id": 901, "entity_id": 9000, "entity_type": "Track", "text": "nice!", "blockhash": "x", "txhash": "tx"}, + }, + }) + + runProcessor(t, pool, &CommentPinProcessor{}) + + r, ok := queryUserChallenge(t, pool, "cp", fmt.Sprintf("%x:%x", 901, 9000)) + if assert.True(t, ok) { + assert.True(t, r.IsComplete) + assert.Equal(t, int32(10), r.Amount) + assert.Equal(t, int64(901), r.UserID, "row should be filed under commenter") + } +} + +func TestCommentPin_SkippedWhenOwnerNotVerified(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_cp2", "number": 1}}, + "users": { + {"user_id": 910, "wallet": "0x910", "is_verified": false}, // owner not verified + {"user_id": 911, "wallet": "0x911"}, + }, + "tracks": {{"track_id": 9100, "owner_id": 910, "title": "T", "blocknumber": 1, "pinned_comment_id": 1}}, + "comments": {{"comment_id": 1, "user_id": 911, "entity_id": 9100, "entity_type": "Track", "text": "x", "blockhash": "x", "txhash": "tx"}}, + }) + + runProcessor(t, pool, &CommentPinProcessor{}) + + _, ok := queryUserChallenge(t, pool, "cp", fmt.Sprintf("%x:%x", 911, 9100)) + assert.False(t, ok) +} + +func TestCommentPin_SkippedForSelfPin(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_cp3", "number": 1}}, + "users": {{"user_id": 920, "wallet": "0x920", "is_verified": true}}, + "tracks": {{"track_id": 9200, "owner_id": 920, "title": "T", "blocknumber": 1, "pinned_comment_id": 1}}, + "comments": {{"comment_id": 1, "user_id": 920, "entity_id": 9200, "entity_type": "Track", "text": "self", "blockhash": "x", "txhash": "tx"}}, + }) + + runProcessor(t, pool, &CommentPinProcessor{}) + + _, ok := queryUserChallenge(t, pool, "cp", fmt.Sprintf("%x:%x", 920, 9200)) + assert.False(t, ok, "self-pin should not earn the challenge") +} diff --git a/jobs/challenges/cosign.go b/jobs/challenges/cosign.go new file mode 100644 index 00000000..f1988794 --- /dev/null +++ b/jobs/challenges/cosign.go @@ -0,0 +1,140 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// CosignProcessor implements challenge "cs" — a verified parent-track +// owner saving or reposting a remix of their own track earns the +// *remixer* the cosign reward. Mirrors apps' dispatch in +// social_features.py:135. +// +// Specifier (matches apps): +// +// : +// +// Cap: max 5 cosigns per parent-owner per rolling 30 days (apps' +// MAX_COSIGNS_PER_MONTH). We enforce that by counting existing +// user_challenges rows with the same specifier prefix in the last 30 +// days and skipping insertion when the cap is reached. +// +// Note: catalog row for "cs" is currently inactive in apps. Reconcile +// short-circuits when inactive — code is in place for when it's enabled. +type CosignProcessor struct{} + +func (p *CosignProcessor) ChallengeID() string { return "cs" } + +const cosignCapPerMonth = 5 + +func (p *CosignProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + + // Find (parent_owner, remix_owner, remix_track_id, cosign_at) tuples + // where the parent owner has saved or reposted the remix and the + // parent owner is verified. Use the *earliest* action timestamp per + // (parent_owner, remix) so the cooldown calc is stable. + rows, err := tx.Query(ctx, ` + WITH cosign_actions AS ( + SELECT r.parent_track_id, + r.child_track_id, + parent.owner_id AS parent_owner_id, + child.owner_id AS remixer_user_id, + MIN(a.created_at) AS cosign_at + FROM remixes r + JOIN tracks parent ON parent.track_id = r.parent_track_id + AND parent.is_current = true + JOIN tracks child ON child.track_id = r.child_track_id + AND child.is_current = true + JOIN users u ON u.user_id = parent.owner_id + AND u.is_current = true + AND u.is_verified = true + JOIN ( + SELECT user_id, repost_item_id AS item_id, created_at + FROM reposts + WHERE is_current = true AND is_delete = false AND repost_type = 'track' + UNION ALL + SELECT user_id, save_item_id AS item_id, created_at + FROM saves + WHERE is_current = true AND is_delete = false AND save_type = 'track' + ) a ON a.user_id = parent.owner_id AND a.item_id = r.child_track_id + GROUP BY r.parent_track_id, r.child_track_id, + parent.owner_id, child.owner_id + ) + SELECT parent_owner_id, remixer_user_id, child_track_id, cosign_at + FROM cosign_actions + ORDER BY cosign_at ASC + `) + if err != nil { + return fmt.Errorf("scan cosigns: %w", err) + } + type cosignRow struct { + parentOwnerID int64 + remixerID int64 + remixTrackID int64 + } + var results []cosignRow + for rows.Next() { + var r cosignRow + var skipTime any // we don't need cosign_at after sort + if err := rows.Scan(&r.parentOwnerID, &r.remixerID, &r.remixTrackID, &skipTime); err != nil { + rows.Close() + return err + } + results = append(results, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, r := range results { + specifier := fmt.Sprintf("%x:%x", r.parentOwnerID, r.remixTrackID) + specifierPrefix := fmt.Sprintf("%x:", r.parentOwnerID) + + // Per-parent-owner 30-day cap. Counts ALL cosign user_challenges + // for this parent-owner in the last 30 days; matches apps' + // behavior (the cap is on the *cosigner*, not the recipient). + var cosignsLastMonth int + if err := tx.QueryRow(ctx, ` + SELECT COUNT(*) FROM user_challenges + WHERE challenge_id = 'cs' + AND specifier LIKE $1 + AND created_at >= now() - INTERVAL '30 days' + `, specifierPrefix+"%").Scan(&cosignsLastMonth); err != nil { + return fmt.Errorf("count recent cosigns: %w", err) + } + + // If this specifier already exists, the upsert below is a no-op, + // so we shouldn't gate on the cap for already-minted rows. + var alreadyHave bool + if err := tx.QueryRow(ctx, ` + SELECT EXISTS (SELECT 1 FROM user_challenges + WHERE challenge_id = 'cs' AND specifier = $1) + `, specifier).Scan(&alreadyHave); err != nil { + return err + } + if alreadyHave { + continue + } + if cosignsLastMonth >= cosignCapPerMonth { + continue + } + + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, r.remixerID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/cosign_test.go b/jobs/challenges/cosign_test.go new file mode 100644 index 00000000..2f23bb14 --- /dev/null +++ b/jobs/challenges/cosign_test.go @@ -0,0 +1,102 @@ +package challenges + +import ( + "context" + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestCosign_VerifiedParentReposting — happy path: verified parent owner +// reposts a remix of their own track → remixer earns 'cs'. +func TestCosign_VerifiedParentReposting(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + // 'cs' is active=false by default. Enable for the test. + _, err := pool.Exec(ctx, "UPDATE challenges SET active = true WHERE id = 'cs'") + require.NoError(t, err) + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_cs", "number": 1}}, + "users": { + {"user_id": 1000, "wallet": "0x1000", "is_verified": true}, // parent owner (cosigner) + {"user_id": 1001, "wallet": "0x1001", "is_verified": false}, // remixer + }, + "tracks": { + {"track_id": 100000, "owner_id": 1000, "title": "Parent", "blocknumber": 1}, + {"track_id": 100001, "owner_id": 1001, "title": "Remix", "blocknumber": 1}, + }, + }) + // Seed remixes + parent owner reposts the remix. + _, err = pool.Exec(ctx, ` + INSERT INTO remixes (parent_track_id, child_track_id) VALUES (100000, 100001) + `) + require.NoError(t, err) + _, err = pool.Exec(ctx, ` + INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash) + VALUES (1000, 100001, 'track', true, false, now(), 'tx-r1') + `) + require.NoError(t, err) + + runProcessor(t, pool, &CosignProcessor{}) + + r, ok := queryUserChallenge(t, pool, "cs", fmt.Sprintf("%x:%x", 1000, 100001)) + if assert.True(t, ok) { + assert.True(t, r.IsComplete) + assert.Equal(t, int64(1001), r.UserID, "row filed under the remixer") + assert.Equal(t, int32(1000), r.Amount) + } +} + +// TestCosign_MonthCap — 5 cosigns per parent-owner per rolling 30 days. +// Seed 5 already-completed user_challenges to simulate the cap and verify +// a 6th candidate is skipped. +func TestCosign_MonthCap(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + _, err := pool.Exec(ctx, "UPDATE challenges SET active = true WHERE id = 'cs'") + require.NoError(t, err) + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_cs2", "number": 1}}, + "users": { + {"user_id": 1100, "wallet": "0x1100", "is_verified": true}, + {"user_id": 1101, "wallet": "0x1101"}, + }, + "tracks": { + {"track_id": 110000, "owner_id": 1100, "title": "Parent", "blocknumber": 1}, + {"track_id": 110001, "owner_id": 1101, "title": "Remix6", "blocknumber": 1}, + }, + }) + + // Pre-populate 5 cosigns for parent 1100 within the last 30 days. + // completed_at must be non-null so the on_user_challenge trigger can + // write its notification with a non-null timestamp. + for i := 0; i < 5; i++ { + spec := fmt.Sprintf("%x:%x", 1100, 50000+i) // arbitrary remix ids + _, err := pool.Exec(ctx, ` + INSERT INTO user_challenges (challenge_id, user_id, specifier, is_complete, amount, created_at, completed_at) + VALUES ('cs', $1, $2, true, 1000, now(), now()) + `, 5000+i, spec) + require.NoError(t, err) + } + // Add the would-be 6th candidate. + _, err = pool.Exec(ctx, ` + INSERT INTO remixes (parent_track_id, child_track_id) VALUES (110000, 110001) + `) + require.NoError(t, err) + _, err = pool.Exec(ctx, ` + INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash) + VALUES (1100, 110001, 'track', true, false, now(), 'tx-r6') + `) + require.NoError(t, err) + + runProcessor(t, pool, &CosignProcessor{}) + + _, ok := queryUserChallenge(t, pool, "cs", fmt.Sprintf("%x:%x", 1100, 110001)) + assert.False(t, ok, "6th cosign within 30 days should be blocked by cap") +} diff --git a/jobs/challenges/first_weekly_comment.go b/jobs/challenges/first_weekly_comment.go new file mode 100644 index 00000000..168f76ee --- /dev/null +++ b/jobs/challenges/first_weekly_comment.go @@ -0,0 +1,75 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// FirstWeeklyCommentProcessor implements challenge "c" — each ISO-week, +// the user's first non-deleted, visible comment mints one user_challenge row. +// +// Specifier shape (matches apps): +// +// : where WW is two-digit ISO week number +// +// Idempotency is by specifier — re-scanning the same week never adds +// duplicate rows. Each subsequent week the user posts, a new specifier +// (and thus a new row) is minted. +type FirstWeeklyCommentProcessor struct{} + +func (p *FirstWeeklyCommentProcessor) ChallengeID() string { return "c" } + +func (p *FirstWeeklyCommentProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + + // One row per (user, isoyear, isoweek). EXTRACT(ISOYEAR / WEEK) gives + // the ISO-8601 week numbers Python's date.isocalendar() returns. + rows, err := tx.Query(ctx, ` + SELECT DISTINCT user_id, + EXTRACT(ISOYEAR FROM created_at)::int AS iso_year, + EXTRACT(WEEK FROM created_at)::int AS iso_week + FROM comments + WHERE is_delete = false + AND is_visible = true + `) + if err != nil { + return fmt.Errorf("scan comments: %w", err) + } + type weekRow struct { + userID int64 + isoYear int + isoWeek int + } + var results []weekRow + for rows.Next() { + var r weekRow + if err := rows.Scan(&r.userID, &r.isoYear, &r.isoWeek); err != nil { + rows.Close() + return err + } + results = append(results, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, r := range results { + specifier := fmt.Sprintf("%x:%d%02d", r.userID, r.isoYear, r.isoWeek) + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, r.userID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/first_weekly_comment_test.go b/jobs/challenges/first_weekly_comment_test.go new file mode 100644 index 00000000..ffc0e5fe --- /dev/null +++ b/jobs/challenges/first_weekly_comment_test.go @@ -0,0 +1,48 @@ +package challenges + +import ( + "fmt" + "testing" + "time" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" +) + +// TestFirstWeeklyComment_OneRowPerUserPerWeek verifies that: +// - a single comment in a week mints one user_challenge row +// - multiple comments in the same week still only produce one row +// - comments in different ISO weeks produce different rows +func TestFirstWeeklyComment_OneRowPerUserPerWeek(t *testing.T) { + pool := withChallengesDB(t) + + // Two timestamps in the same ISO week, plus one a few weeks later. + wk1A := time.Date(2025, 1, 6, 12, 0, 0, 0, time.UTC) // Mon Jan 6 = ISO week 02 of 2025 + wk1B := time.Date(2025, 1, 8, 12, 0, 0, 0, time.UTC) // Wed Jan 8 — same week + wk5 := time.Date(2025, 2, 5, 12, 0, 0, 0, time.UTC) // Wed Feb 5 = ISO week 06 + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_fwc", "number": 1}}, + "users": {{"user_id": 700, "wallet": "0x700"}, {"user_id": 800, "wallet": "0x800"}}, + "tracks": {{"track_id": 7000, "owner_id": 800, "title": "T", "blocknumber": 1}}, + "comments": { + {"comment_id": 1, "user_id": 700, "entity_id": 7000, "entity_type": "Track", "text": "a", "created_at": wk1A, "blockhash": "x", "txhash": "tx1"}, + {"comment_id": 2, "user_id": 700, "entity_id": 7000, "entity_type": "Track", "text": "b", "created_at": wk1B, "blockhash": "x", "txhash": "tx2"}, + {"comment_id": 3, "user_id": 700, "entity_id": 7000, "entity_type": "Track", "text": "c", "created_at": wk5, "blockhash": "x", "txhash": "tx3"}, + }, + }) + + runProcessor(t, pool, &FirstWeeklyCommentProcessor{}) + + // One row for ISO week 02/2025 + _, ok := queryUserChallenge(t, pool, "c", fmt.Sprintf("%x:%d%02d", 700, 2025, 2)) + assert.True(t, ok, "expected row for 2025 week 02") + + // One row for ISO week 06/2025 + _, ok = queryUserChallenge(t, pool, "c", fmt.Sprintf("%x:%d%02d", 700, 2025, 6)) + assert.True(t, ok, "expected row for 2025 week 06") + + // No row for week 03 (no comments). + _, ok = queryUserChallenge(t, pool, "c", fmt.Sprintf("%x:%d%02d", 700, 2025, 3)) + assert.False(t, ok) +} diff --git a/jobs/challenges/processor_test.go b/jobs/challenges/processor_test.go index d0f34650..7a0ea517 100644 --- a/jobs/challenges/processor_test.go +++ b/jobs/challenges/processor_test.go @@ -41,6 +41,14 @@ func withChallengesDB(t *testing.T) *pgxpool.Pool { {"tt", "trending", "1000", true, nil, 25346436, 100000, nil}, {"tut", "trending", "1000", true, nil, 25346436, 100000, nil}, {"tp", "trending", "100", true, nil, 25346436, 10000, nil}, + // Phase 2 + {"c", "aggregate", "1", true, i32p(2147483647), 0, 2147483647, i32p(7)}, + {"t", "aggregate", "100", true, i32p(2147483647), 0, 2147483647, i32p(7)}, + {"cp", "aggregate", "10", true, i32p(2147483647), 1979515, 2147483647, i32p(7)}, + {"cs", "aggregate", "1000", false, i32p(2147483647), 95017582, 50000, i32p(7)}, + {"w", "aggregate", "1000", true, i32p(2147483647), 98950182, 50000, i32p(7)}, + {"b", "aggregate", "1", true, i32p(2147483647), 220157041, 25000, i32p(7)}, + {"s", "aggregate", "5", true, i32p(2147483647), 220157041, 25000, i32p(7)}, } for _, r := range rows { _, err := pool.Exec(ctx, ` diff --git a/jobs/challenges/remix_contest_winner.go b/jobs/challenges/remix_contest_winner.go new file mode 100644 index 00000000..d871b040 --- /dev/null +++ b/jobs/challenges/remix_contest_winner.go @@ -0,0 +1,152 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// RemixContestWinnerProcessor implements challenge "w" — winners of a +// remix contest hosted by a verified user. Mirrors apps' dispatch in +// entity_manager/entities/event.py:218 plus the gates in +// challenges/remix_contest_winner_challenge.py. +// +// Source: events table where event_type='remix_contest', is_deleted=false, +// and event_data->>'winners' is a JSON array of *track IDs*. For each +// winner track, we mint a row for the *remixer* (track owner) — same as +// apps' apps' dispatcher. +// +// Specifier (matches apps): : +// +// Caps (per apps): +// - host must be verified +// - max 5 winners per contest (apps MAX_WINNERS_PER_CONTEST) +// - one row per (remixer, contest) — enforced by specifier uniqueness +// - max 5 winner rewards per host per rolling week +// (apps MAX_WINNER_REWARDS_PER_HOST_PER_WEEK) +type RemixContestWinnerProcessor struct{} + +func (p *RemixContestWinnerProcessor) ChallengeID() string { return "w" } + +const ( + remixContestMaxWinnersPerContest = 5 + remixContestMaxWinnerRewardsPerWeek = 5 +) + +func (p *RemixContestWinnerProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + + // Pull all remix contests whose winners array is non-empty, joined to + // the host (must be verified). We unnest the JSON array of winner + // track ids and resolve each one to a remixer via tracks.owner_id. + // jsonb_array_elements_text yields each array element as text, which + // casts cleanly to int regardless of whether the JSON had `[123,456]` + // (raw ints) or `["123","456"]` (string ints) — both are the shapes + // apps' entity_manager has historically written. + rows, err := tx.Query(ctx, ` + SELECT e.event_id, e.user_id AS host_user_id, t.owner_id AS remixer_user_id + FROM events e + JOIN users u ON u.user_id = e.user_id + AND u.is_current = true AND u.is_verified = true + CROSS JOIN LATERAL jsonb_array_elements_text( + COALESCE(e.event_data->'winners', '[]'::jsonb) + ) AS w(track_id_str) + JOIN tracks t ON t.track_id = w.track_id_str::int + AND t.is_current = true AND t.is_delete = false + WHERE e.event_type = 'remix_contest' + AND e.is_deleted = false + `) + if err != nil { + return fmt.Errorf("scan winner candidates: %w", err) + } + type winnerRow struct { + contestID int64 + hostUserID int64 + remixerUserID int64 + } + var results []winnerRow + for rows.Next() { + var r winnerRow + if err := rows.Scan(&r.contestID, &r.hostUserID, &r.remixerUserID); err != nil { + rows.Close() + return err + } + results = append(results, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + // Per-contest winners-count cache to enforce the 5-per-contest cap. + winnersByContest := make(map[int64]int) + // Per-host weekly count cache, populated lazily. + rewardsThisWeekByHost := make(map[int64]int) + + for _, r := range results { + specifier := fmt.Sprintf("%x:%x", r.contestID, r.remixerUserID) + + // Skip already-minted (idempotent). + var alreadyHave bool + if err := tx.QueryRow(ctx, ` + SELECT EXISTS (SELECT 1 FROM user_challenges + WHERE challenge_id = 'w' AND specifier = $1) + `, specifier).Scan(&alreadyHave); err != nil { + return err + } + if alreadyHave { + continue + } + + // Per-contest cap (5 winners max). + if _, ok := winnersByContest[r.contestID]; !ok { + var n int + contestPrefix := fmt.Sprintf("%x:", r.contestID) + if err := tx.QueryRow(ctx, ` + SELECT COUNT(*) FROM user_challenges + WHERE challenge_id = 'w' AND specifier LIKE $1 + `, contestPrefix+"%").Scan(&n); err != nil { + return err + } + winnersByContest[r.contestID] = n + } + if winnersByContest[r.contestID] >= remixContestMaxWinnersPerContest { + continue + } + + // Per-host weekly cap (5 winner rewards per week). + if _, ok := rewardsThisWeekByHost[r.hostUserID]; !ok { + var n int + if err := tx.QueryRow(ctx, ` + SELECT COUNT(*) FROM user_challenges uc + JOIN events e ON e.event_id = SPLIT_PART(uc.specifier, ':', 1)::bigint + WHERE uc.challenge_id = 'w' + AND e.user_id = $1 + AND uc.created_at >= now() - INTERVAL '7 days' + `, r.hostUserID).Scan(&n); err != nil { + return err + } + rewardsThisWeekByHost[r.hostUserID] = n + } + if rewardsThisWeekByHost[r.hostUserID] >= remixContestMaxWinnerRewardsPerWeek { + continue + } + + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, r.remixerUserID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + winnersByContest[r.contestID]++ + rewardsThisWeekByHost[r.hostUserID]++ + } + return nil +} diff --git a/jobs/challenges/remix_contest_winner_test.go b/jobs/challenges/remix_contest_winner_test.go new file mode 100644 index 00000000..c0487113 --- /dev/null +++ b/jobs/challenges/remix_contest_winner_test.go @@ -0,0 +1,77 @@ +package challenges + +import ( + "context" + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestRemixContestWinner_VerifiedHostMintsRows — happy path: verified +// host's remix contest with two winner track IDs → two user_challenges +// rows, one per remixer. +func TestRemixContestWinner_VerifiedHostMintsRows(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_w", "number": 1}}, + "users": { + {"user_id": 1300, "wallet": "0x1300", "is_verified": true}, // host + {"user_id": 1301, "wallet": "0x1301", "is_verified": false}, // remixer A + {"user_id": 1302, "wallet": "0x1302", "is_verified": false}, // remixer B + }, + "tracks": { + {"track_id": 130001, "owner_id": 1301, "title": "Remix A", "blocknumber": 1}, + {"track_id": 130002, "owner_id": 1302, "title": "Remix B", "blocknumber": 1}, + }, + }) + + // events table — schema in api/sql/01_schema.sql. + _, err := pool.Exec(ctx, ` + INSERT INTO events (event_id, event_type, user_id, entity_type, entity_id, is_deleted, event_data, blocknumber, txhash, blockhash) + VALUES (50000, 'remix_contest', 1300, 'track', 130000, false, $1::jsonb, 1, 'tx-e', 'bh-e') + `, `{"winners": [130001, 130002]}`) + require.NoError(t, err) + + runProcessor(t, pool, &RemixContestWinnerProcessor{}) + + for _, winner := range []struct { + userID, trackID int + }{{1301, 130001}, {1302, 130002}} { + spec := fmt.Sprintf("%x:%x", 50000, winner.userID) + r, ok := queryUserChallenge(t, pool, "w", spec) + if assert.True(t, ok, "user %d should win remix contest", winner.userID) { + assert.True(t, r.IsComplete) + assert.Equal(t, int32(1000), r.Amount) + assert.Equal(t, int64(winner.userID), r.UserID) + } + } +} + +func TestRemixContestWinner_UnverifiedHostSkipped(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_w2", "number": 1}}, + "users": { + {"user_id": 1310, "wallet": "0x1310", "is_verified": false}, // unverified host + {"user_id": 1311, "wallet": "0x1311"}, + }, + "tracks": {{"track_id": 131001, "owner_id": 1311, "title": "Remix", "blocknumber": 1}}, + }) + _, err := pool.Exec(ctx, ` + INSERT INTO events (event_id, event_type, user_id, entity_type, entity_id, is_deleted, event_data, blocknumber, txhash, blockhash) + VALUES (50010, 'remix_contest', 1310, 'track', 131000, false, $1::jsonb, 1, 'tx', 'bh') + `, `{"winners": [131001]}`) + require.NoError(t, err) + + runProcessor(t, pool, &RemixContestWinnerProcessor{}) + + _, ok := queryUserChallenge(t, pool, "w", fmt.Sprintf("%x:%x", 50010, 1311)) + assert.False(t, ok, "unverified host should produce no rows") +} diff --git a/jobs/challenges/tastemaker.go b/jobs/challenges/tastemaker.go new file mode 100644 index 00000000..06551729 --- /dev/null +++ b/jobs/challenges/tastemaker.go @@ -0,0 +1,121 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// TastemakerProcessor implements challenge "t" — for each currently-top +// trending track (week range), the earliest tastemakerThreshold reposters +// and earliest tastemakerThreshold savers earn the challenge. +// Mirrors apps' index_tastemaker.py. +// +// Per the user direction, tastemakerTrackLimit was bumped from apps' +// historical 5 to 10. The notification threshold (per-track number of +// reposters/savers eligible) stays at 10 to match apps. +// +// Specifier (matches apps): :t: +// (playlists would use ":p:" but trending_playlist tastemakers aren't +// in apps today, so we only handle tracks.) +type TastemakerProcessor struct{} + +func (p *TastemakerProcessor) ChallengeID() string { return "t" } + +const ( + tastemakerTrackLimit = 10 // top-N trending tracks considered + tastemakerThresholdPerTrack = 10 // earliest N reposters / savers per track +) + +func (p *TastemakerProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + + // Pick top-N tracks for the current week from the (already-running) + // trending score job's output. We look at TRACKS type, pnagD version, + // week range — matches what apps' top-trending lookup returns. + topRows, err := tx.Query(ctx, ` + SELECT track_id + FROM track_trending_scores + WHERE type = 'TRACKS' AND version = 'pnagD' AND time_range = 'week' + ORDER BY score DESC NULLS LAST + LIMIT $1 + `, tastemakerTrackLimit) + if err != nil { + return fmt.Errorf("scan top trending: %w", err) + } + var trackIDs []int64 + for topRows.Next() { + var tid int64 + if err := topRows.Scan(&tid); err != nil { + topRows.Close() + return err + } + trackIDs = append(trackIDs, tid) + } + topRows.Close() + if err := topRows.Err(); err != nil { + return err + } + + if len(trackIDs) == 0 { + return nil + } + + // For each top track, find the union of (earliest 10 reposters) and + // (earliest 10 savers). Each eligible user gets one row per track. + for _, trackID := range trackIDs { + userRows, err := tx.Query(ctx, ` + ( + SELECT user_id FROM reposts + WHERE is_current = true AND is_delete = false + AND repost_type = 'track' + AND repost_item_id = $1 + ORDER BY created_at ASC + LIMIT $2 + ) + UNION + ( + SELECT user_id FROM saves + WHERE is_current = true AND is_delete = false + AND save_type = 'track' + AND save_item_id = $1 + ORDER BY created_at ASC + LIMIT $2 + ) + `, trackID, tastemakerThresholdPerTrack) + if err != nil { + return fmt.Errorf("scan tastemakers for track %d: %w", trackID, err) + } + var userIDs []int64 + for userRows.Next() { + var uid int64 + if err := userRows.Scan(&uid); err != nil { + userRows.Close() + return err + } + userIDs = append(userIDs, uid) + } + userRows.Close() + if err := userRows.Err(); err != nil { + return err + } + + for _, uid := range userIDs { + specifier := fmt.Sprintf("%x:t:%x", uid, trackID) + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, uid, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + } + return nil +} diff --git a/jobs/challenges/tastemaker_test.go b/jobs/challenges/tastemaker_test.go new file mode 100644 index 00000000..d0cac1a2 --- /dev/null +++ b/jobs/challenges/tastemaker_test.go @@ -0,0 +1,59 @@ +package challenges + +import ( + "context" + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestTastemaker_EarliestRepostersAndSavers — for a track in the top-10 +// trending, the earliest 10 reposters AND earliest 10 savers each earn a +// tastemaker row (deduped by specifier). +func TestTastemaker_EarliestRepostersAndSavers(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_tm", "number": 1}}, + "users": { + {"user_id": 1200, "wallet": "0x1200"}, + {"user_id": 1201, "wallet": "0x1201"}, + {"user_id": 1202, "wallet": "0x1202"}, + }, + "tracks": {{"track_id": 120000, "owner_id": 1200, "title": "Hit", "blocknumber": 1}}, + }) + + // Put the track in track_trending_scores (top of week range). + _, err := pool.Exec(ctx, ` + INSERT INTO track_trending_scores (track_id, type, version, time_range, score, created_at) + VALUES (120000, 'TRACKS', 'pnagD', 'week', 999.0, now()) + `) + require.NoError(t, err) + + // 1201 reposts, 1202 saves. + _, err = pool.Exec(ctx, ` + INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash) + VALUES (1201, 120000, 'track', true, false, now() - interval '1 hour', 'tx-rp') + `) + require.NoError(t, err) + _, err = pool.Exec(ctx, ` + INSERT INTO saves (user_id, save_item_id, save_type, is_current, is_delete, created_at, txhash) + VALUES (1202, 120000, 'track', true, false, now() - interval '30 minutes', 'tx-sv') + `) + require.NoError(t, err) + + runProcessor(t, pool, &TastemakerProcessor{}) + + for _, uid := range []int{1201, 1202} { + spec := fmt.Sprintf("%x:t:%x", uid, 120000) + r, ok := queryUserChallenge(t, pool, "t", spec) + if assert.True(t, ok, "user %d should have tastemaker row", uid) { + assert.True(t, r.IsComplete) + assert.Equal(t, int32(100), r.Amount) + } + } +} diff --git a/jobs/index_challenges.go b/jobs/index_challenges.go index c64faebd..dc62342f 100644 --- a/jobs/index_challenges.go +++ b/jobs/index_challenges.go @@ -37,6 +37,7 @@ func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChalle pool: pool, logger: logging.NewZapLogger(cfg).Named("IndexChallengesJob"), processors: []challenges.Processor{ + // Phase 1 &challenges.TrackUploadProcessor{}, &challenges.FirstPlaylistProcessor{}, &challenges.ProfileCompletionProcessor{}, @@ -48,6 +49,14 @@ func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChalle challenges.NewTrendingTrackProcessor(), challenges.NewTrendingUndergroundProcessor(), challenges.NewTrendingPlaylistProcessor(), + // Phase 2 + &challenges.FirstWeeklyCommentProcessor{}, + &challenges.CommentPinProcessor{}, + &challenges.CosignProcessor{}, + &challenges.TastemakerProcessor{}, + &challenges.RemixContestWinnerProcessor{}, + challenges.NewAudioMatchingBuyerProcessor(), + challenges.NewAudioMatchingSellerProcessor(), }, } }