Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions ddl/migrations/0204_seed_phase_2_challenges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Seed catalog rows for Phase 2 challenge processors.
--
-- Values mirror apps/packages/discovery-provider/src/challenges/challenges.json
-- with ON CONFLICT DO UPDATE so the catalog stays aligned (matches apps'
-- create_new_challenges.py behavior).
--
-- Phase 2 set (all aggregate type — they accumulate per-occurrence):
-- c first_weekly_comment (each ISO week)
-- t tastemaker (10 catalog amount × per-track win)
-- cp comment_pin (pinned by verified artist on their track)
-- cs cosign (parent owner cosigns a remix; CURRENTLY INACTIVE in apps)
-- w remix_contest_winner (winner of remix contest hosted by verified user)
-- b audio_matching_buyer (USDC content purchase, file under buyer)
-- s audio_matching_seller (same purchase, file under verified seller)

BEGIN;

INSERT INTO challenges (id, type, amount, active, step_count, starting_block, weekly_pool, cooldown_days) VALUES
('c', 'aggregate', '1', true, 2147483647, 0, 2147483647, 7),
('t', 'aggregate', '100', true, 2147483647, 0, 2147483647, 7),
('cp', 'aggregate', '10', true, 2147483647, 1979515, 2147483647, 7),
('cs', 'aggregate', '1000', false, 2147483647, 95017582, 50000, 7),
('w', 'aggregate', '1000', true, 2147483647, 98950182, 50000, 7),
('b', 'aggregate', '1', true, 2147483647, 220157041, 25000, 7),
('s', 'aggregate', '5', true, 2147483647, 220157041, 25000, 7)
ON CONFLICT (id) DO UPDATE SET
type = EXCLUDED.type,
amount = EXCLUDED.amount,
active = EXCLUDED.active,
step_count = EXCLUDED.step_count,
starting_block = EXCLUDED.starting_block,
weekly_pool = EXCLUDED.weekly_pool,
cooldown_days = EXCLUDED.cooldown_days;

COMMIT;
131 changes: 131 additions & 0 deletions jobs/challenges/audio_matching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package challenges

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

// AudioMatchingProcessor implements challenges "b" and "s" — both fire on
// USDC content purchases (tracks/playlists/albums), with different
// recipients and gates.
//
// Source: v_usdc_purchases — a stable view over sol_purchases that exposes
// (buyer_user_id, seller_user_id, content_id, amount, slot, created_at).
//
// Specifier (matches apps for both b and s):
//
// <hex_buyer_user_id>:<hex_content_id>
//
// Amount = challenge.amount × dollars (catalog: b=1, s=5; dollars = amount/1e6).
// "b" recipient = buyer; "s" recipient = seller, and only if seller is verified.
type AudioMatchingProcessor struct {
ID string
ForBuyer bool // true => buyer earns; false => seller earns
VerifyOnly bool // only s gates on seller is_verified
}

func NewAudioMatchingBuyerProcessor() Processor {
return &AudioMatchingProcessor{ID: "b", ForBuyer: true, VerifyOnly: false}
}
func NewAudioMatchingSellerProcessor() Processor {
return &AudioMatchingProcessor{ID: "s", ForBuyer: false, VerifyOnly: true}
}

func (p *AudioMatchingProcessor) ChallengeID() string { return p.ID }

const usdcDecimals = 6

func (p *AudioMatchingProcessor) checkpointName() string {
return "challenges:" + p.ID + ":last_slot"
}

func (p *AudioMatchingProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
if err != nil {
return fmt.Errorf("load challenge: %w", err)
}
if !ok || !c.Active {
return nil
}
catalogAmount := c.AmountInt()

prev, err := readCheckpointInt(ctx, tx, p.checkpointName())
if err != nil {
return fmt.Errorf("read checkpoint: %w", err)
}

// VerifyOnly adds a JOIN to filter seller verification at query time so
// we don't allocate rows we'll throw away.
verifyJoin := ""
if p.VerifyOnly {
verifyJoin = `
JOIN users seller ON seller.user_id = v.seller_user_id
AND seller.is_current = true
AND seller.is_verified = true
`
}

query := `
SELECT v.slot, v.buyer_user_id, v.seller_user_id, v.content_id, v.amount
FROM v_usdc_purchases v
` + verifyJoin + `
WHERE v.slot > $1
AND v.seller_user_id IS NOT NULL
ORDER BY v.slot ASC
`
rows, err := tx.Query(ctx, query, prev)
if err != nil {
return fmt.Errorf("scan purchases: %w", err)
}
type purchaseRow struct {
slot int64
buyerID int64
sellerID int64
contentID int64
amountMicro int64
}
var results []purchaseRow
maxSlot := prev
for rows.Next() {
var r purchaseRow
if err := rows.Scan(&r.slot, &r.buyerID, &r.sellerID, &r.contentID, &r.amountMicro); err != nil {
rows.Close()
return err
}
results = append(results, r)
if r.slot > maxSlot {
maxSlot = r.slot
}
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}

for _, r := range results {
dollars := int32(r.amountMicro / 1_000_000)
if dollars <= 0 {
continue // shouldn't happen for valid purchases; defensive
}
recipient := r.buyerID
if !p.ForBuyer {
recipient = r.sellerID
}
rewardAmount := catalogAmount * dollars
specifier := fmt.Sprintf("%x:%x", r.buyerID, r.contentID)
if err := UpsertUserChallenge(ctx, tx,
p.ChallengeID(), specifier, recipient, 1, 1, rewardAmount,
); err != nil {
return fmt.Errorf("upsert: %w", err)
}
}

if maxSlot > prev {
if err := writeCheckpointInt(ctx, tx, p.checkpointName(), maxSlot); err != nil {
return fmt.Errorf("save checkpoint: %w", err)
}
}
return nil
}
117 changes: 117 additions & 0 deletions jobs/challenges/audio_matching_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package challenges

import (
"context"
"fmt"
"testing"

"api.audius.co/database"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestAudioMatching_BuyerAndSeller — one purchase mints two user_challenges
// rows: 'b' filed under the buyer, 's' filed under the (verified) seller.
// Amount math: catalog × dollars (b=1×, s=5×).
func TestAudioMatching_BuyerAndSeller(t *testing.T) {
pool := withChallengesDB(t)
ctx := context.Background()

database.Seed(pool, database.FixtureMap{
"blocks": {{"blockhash": "blk_am", "number": 1}},
"users": {
{"user_id": 1400, "wallet": "0x1400", "is_verified": false}, // buyer
{"user_id": 1401, "wallet": "0x1401", "is_verified": true}, // seller (verified)
},
"tracks": {{"track_id": 140000, "owner_id": 1401, "title": "Premium", "blocknumber": 1}},
})

// sol_purchases is the source-of-truth; v_usdc_purchases is a view over it.
// amount = 10 dollars = 10_000_000 micro-USDC.
_, err := pool.Exec(ctx, `
INSERT INTO sol_purchases
(signature, instruction_index, amount, slot, from_account, content_type, content_id,
buyer_user_id, access_type, valid_after_blocknumber, is_valid, created_at)
VALUES ('sig-am-1', 0, 10000000, 1000, 'from_account', 'track', 140000,
1400, 'stream', 0, true, now())
`)
require.NoError(t, err)

runProcessor(t, pool, NewAudioMatchingBuyerProcessor())
runProcessor(t, pool, NewAudioMatchingSellerProcessor())

spec := fmt.Sprintf("%x:%x", 1400, 140000)

rb, ok := queryUserChallenge(t, pool, "b", spec)
if assert.True(t, ok) {
assert.True(t, rb.IsComplete)
assert.Equal(t, int64(1400), rb.UserID, "b filed under buyer")
assert.Equal(t, int32(10), rb.Amount, "$10 × 1 = 10 AUDIO")
}

rs, ok := queryUserChallenge(t, pool, "s", spec)
if assert.True(t, ok) {
assert.True(t, rs.IsComplete)
assert.Equal(t, int64(1401), rs.UserID, "s filed under seller")
assert.Equal(t, int32(50), rs.Amount, "$10 × 5 = 50 AUDIO")
}
}

// TestAudioMatching_SellerUnverifiedNoRow — unverified seller gets no 's'
// row; buyer still earns 'b'.
func TestAudioMatching_SellerUnverifiedNoRow(t *testing.T) {
pool := withChallengesDB(t)
ctx := context.Background()

database.Seed(pool, database.FixtureMap{
"blocks": {{"blockhash": "blk_am2", "number": 1}},
"users": {
{"user_id": 1410, "wallet": "0x1410"},
{"user_id": 1411, "wallet": "0x1411", "is_verified": false},
},
"tracks": {{"track_id": 141000, "owner_id": 1411, "title": "X", "blocknumber": 1}},
})
_, err := pool.Exec(ctx, `
INSERT INTO sol_purchases
(signature, instruction_index, amount, slot, from_account, content_type, content_id,
buyer_user_id, access_type, valid_after_blocknumber, is_valid, created_at)
VALUES ('sig-am-2', 0, 5000000, 1100, 'from', 'track', 141000,
1410, 'stream', 0, true, now())
`)
require.NoError(t, err)

runProcessor(t, pool, NewAudioMatchingBuyerProcessor())
runProcessor(t, pool, NewAudioMatchingSellerProcessor())

spec := fmt.Sprintf("%x:%x", 1410, 141000)
_, hasB := queryUserChallenge(t, pool, "b", spec)
_, hasS := queryUserChallenge(t, pool, "s", spec)
assert.True(t, hasB, "buyer still earns")
assert.False(t, hasS, "unverified seller does not earn s")
}

// TestAudioMatching_InvalidPurchaseExcluded — sol_purchases rows with
// is_valid=false are filtered out by v_usdc_purchases.
func TestAudioMatching_InvalidPurchaseExcluded(t *testing.T) {
pool := withChallengesDB(t)
ctx := context.Background()

database.Seed(pool, database.FixtureMap{
"blocks": {{"blockhash": "blk_am3", "number": 1}},
"users": {{"user_id": 1420, "wallet": "0x1420"}, {"user_id": 1421, "wallet": "0x1421", "is_verified": true}},
"tracks": {{"track_id": 142000, "owner_id": 1421, "title": "X", "blocknumber": 1}},
})
_, err := pool.Exec(ctx, `
INSERT INTO sol_purchases
(signature, instruction_index, amount, slot, from_account, content_type, content_id,
buyer_user_id, access_type, valid_after_blocknumber, is_valid, created_at)
VALUES ('sig-am-3', 0, 10000000, 1200, 'from', 'track', 142000,
1420, 'stream', 0, false, now())
`)
require.NoError(t, err)

runProcessor(t, pool, NewAudioMatchingBuyerProcessor())

_, ok := queryUserChallenge(t, pool, "b", fmt.Sprintf("%x:%x", 1420, 142000))
assert.False(t, ok, "invalid purchase should not earn")
}
80 changes: 80 additions & 0 deletions jobs/challenges/comment_pin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package challenges

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

// CommentPinProcessor implements challenge "cp" — when a verified track
// owner pins another user's comment on their track, that commenter earns
// the challenge. Mirrors apps' pin_comment dispatch in comment.py:1080.
//
// Specifier (matches apps): <hex_commenter_user_id>:<hex_track_id>
//
// Source data: tracks.pinned_comment_id joined to comments.
// Gates:
// - track owner must be verified
// - commenter != track owner (self-pins don't earn)
//
// A pin → unpin → re-pin cycle still only mints one row per (commenter, track)
// thanks to specifier uniqueness, matching apps.
type CommentPinProcessor struct{}

func (p *CommentPinProcessor) ChallengeID() string { return "cp" }

func (p *CommentPinProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
if err != nil {
return fmt.Errorf("load challenge: %w", err)
}
if !ok || !c.Active {
return nil
}
amount := c.AmountInt()

rows, err := tx.Query(ctx, `
SELECT cm.user_id AS commenter_user_id, t.track_id
FROM tracks t
JOIN comments cm ON cm.comment_id = t.pinned_comment_id
JOIN users u ON u.user_id = t.owner_id
WHERE t.pinned_comment_id IS NOT NULL
AND t.is_current = true
AND t.is_delete = false
AND u.is_current = true
AND u.is_verified = true
AND cm.is_delete = false
AND cm.user_id <> t.owner_id
`)
if err != nil {
return fmt.Errorf("scan pinned comments: %w", err)
}
type pinRow struct {
commenterUserID int64
trackID int64
}
var results []pinRow
for rows.Next() {
var r pinRow
if err := rows.Scan(&r.commenterUserID, &r.trackID); err != nil {
rows.Close()
return err
}
results = append(results, r)
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}

for _, r := range results {
specifier := fmt.Sprintf("%x:%x", r.commenterUserID, r.trackID)
if err := UpsertUserChallenge(ctx, tx,
p.ChallengeID(), specifier, r.commenterUserID, 1, 1, amount,
); err != nil {
return fmt.Errorf("upsert: %w", err)
}
}
return nil
}
Loading
Loading