Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions ddl/migrations/0208_seed_challenge_checkpoints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
-- Seed the incremental checkpoints for the dirty-set challenge processors.
--
-- profile_completion (p), track_upload (u), first_playlist (fp), and cosign
-- (cs) were rewritten to recompute only the users/owners/actions touched since
-- a per-processor blocknumber checkpoint, instead of rescanning their entire
-- source tables every tick (a full profile_completion recompute timed out past
-- 90s against prod; track_upload took ~14s). See jobs/challenges/incremental.go.
--
-- A fresh checkpoint defaults to 0, which would backfill each table from block
-- 0 the first time the job runs. We don't want that: the legacy Python stack
-- has already populated user_challenges, and the upserts are idempotent, so a
-- full historical re-derivation is pure redundant write load (and, batched, it
-- would take hours). Seed each checkpoint to the current max blocknumber across
-- its sources so the processors start "from now" and only pick up new activity.
--
-- ON CONFLICT DO NOTHING keeps this idempotent and never rewinds a checkpoint
-- the running job has already advanced. The max(blocknumber) probes are
-- index-only against the per-table blocknumber btrees.
--
-- Checkpoint names must match the constants in the processors:
-- profileCheckpoint = "challenges:p:last_blocknumber"
-- trackUploadCheckpoint = "challenges:u:last_blocknumber"
-- firstPlaylistCheckpoint = "challenges:fp:last_blocknumber"
-- cosignCheckpoint = "challenges:cs:last_blocknumber"

BEGIN;

-- profile_completion: follows, reposts, saves, users
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:p:last_blocknumber', GREATEST(
COALESCE((SELECT max(blocknumber) FROM follows), 0),
COALESCE((SELECT max(blocknumber) FROM reposts), 0),
COALESCE((SELECT max(blocknumber) FROM saves), 0),
COALESCE((SELECT max(blocknumber) FROM users), 0))
ON CONFLICT (tablename) DO NOTHING;

-- track_upload: tracks
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:u:last_blocknumber', COALESCE((SELECT max(blocknumber) FROM tracks), 0)
ON CONFLICT (tablename) DO NOTHING;

-- first_playlist: playlists
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:fp:last_blocknumber', COALESCE((SELECT max(blocknumber) FROM playlists), 0)
ON CONFLICT (tablename) DO NOTHING;

-- cosign: reposts, saves (the action sources)
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:cs:last_blocknumber', GREATEST(
COALESCE((SELECT max(blocknumber) FROM reposts), 0),
COALESCE((SELECT max(blocknumber) FROM saves), 0))
ON CONFLICT (tablename) DO NOTHING;

COMMIT;
153 changes: 95 additions & 58 deletions jobs/challenges/cosign.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,55 @@ import (
// user_challenges rows with the same specifier prefix in the last 30
// days and skipping insertion when the cap is reached.
//
// Incremental: the trigger is a save/repost *action* by a parent owner, so we
// checkpoint on the reposts+saves blocknumber and only look at actions since
// the last tick — joining each back to remixes to see if it cosigns one of the
// actor's own tracks. We advance the checkpoint over every scanned action (not
// just cosign-producing ones) so non-cosign social activity isn't rescanned.
//
// Note: catalog row for "cs" is currently inactive in apps. Reconcile
// short-circuits when inactive — code is in place for when it's enabled.
type CosignProcessor struct{}

func (p *CosignProcessor) ChallengeID() string { return "cs" }

const cosignCapPerMonth = 5
const (
cosignCapPerMonth = 5
cosignCheckpoint = "challenges:cs:last_blocknumber"
)

// cosignDirtySQL surfaces every track save/repost since the checkpoint and
// LEFT JOINs the cosign chain. is_cosign is true only when the actor owns the
// current parent track of a current remix and is verified — i.e. a real
// cosign. Non-cosign rows still come back so we can advance the high-water mark
// past them.
const cosignDirtySQL = `
SELECT da.blocknumber,
da.actor_id,
child.owner_id AS remixer_id,
r.child_track_id,
(parent.track_id IS NOT NULL
AND child.track_id IS NOT NULL
AND u.user_id IS NOT NULL) AS is_cosign
FROM (
SELECT user_id AS actor_id, repost_item_id AS track_id, blocknumber
FROM reposts
WHERE blocknumber > $1 AND is_current AND NOT is_delete AND repost_type = 'track'
UNION ALL
SELECT user_id AS actor_id, save_item_id AS track_id, blocknumber
FROM saves
WHERE blocknumber > $1 AND is_current AND NOT is_delete AND save_type = 'track'
) da
LEFT JOIN remixes r ON r.child_track_id = da.track_id
LEFT JOIN tracks parent ON parent.track_id = r.parent_track_id
AND parent.is_current = true AND parent.owner_id = da.actor_id
LEFT JOIN tracks child ON child.track_id = r.child_track_id
AND child.is_current = true
LEFT JOIN users u ON u.user_id = da.actor_id
AND u.is_current = true AND u.is_verified = true
ORDER BY da.blocknumber ASC
LIMIT $2
`

func (p *CosignProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
Expand All @@ -39,83 +81,57 @@ func (p *CosignProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
}
amount := c.AmountInt()

// Find (parent_owner, remix_owner, remix_track_id, cosign_at) tuples
// where the parent owner has saved or reposted the remix and the
// parent owner is verified. Use the *earliest* action timestamp per
// (parent_owner, remix) so the cooldown calc is stable.
rows, err := tx.Query(ctx, `
WITH cosign_actions AS (
SELECT r.parent_track_id,
r.child_track_id,
parent.owner_id AS parent_owner_id,
child.owner_id AS remixer_user_id,
MIN(a.created_at) AS cosign_at
FROM remixes r
JOIN tracks parent ON parent.track_id = r.parent_track_id
AND parent.is_current = true
JOIN tracks child ON child.track_id = r.child_track_id
AND child.is_current = true
JOIN users u ON u.user_id = parent.owner_id
AND u.is_current = true
AND u.is_verified = true
JOIN (
SELECT user_id, repost_item_id AS item_id, created_at
FROM reposts
WHERE is_current = true AND is_delete = false AND repost_type = 'track'
UNION ALL
SELECT user_id, save_item_id AS item_id, created_at
FROM saves
WHERE is_current = true AND is_delete = false AND save_type = 'track'
) a ON a.user_id = parent.owner_id AND a.item_id = r.child_track_id
GROUP BY r.parent_track_id, r.child_track_id,
parent.owner_id, child.owner_id
)
SELECT parent_owner_id, remixer_user_id, child_track_id, cosign_at
FROM cosign_actions
ORDER BY cosign_at ASC
`)
prev, err := readCheckpointInt(ctx, tx, cosignCheckpoint)
if err != nil {
return fmt.Errorf("scan cosigns: %w", err)
return fmt.Errorf("read checkpoint: %w", err)
}

rows, err := tx.Query(ctx, cosignDirtySQL, prev, dirtyScanBatch)
if err != nil {
return fmt.Errorf("scan cosign actions: %w", err)
}
type cosignRow struct {
parentOwnerID int64
remixerID int64
remixTrackID int64
}
var results []cosignRow
var candidates []cosignRow
scanned := 0
maxBn := prev
for rows.Next() {
var r cosignRow
var skipTime any // we don't need cosign_at after sort
if err := rows.Scan(&r.parentOwnerID, &r.remixerID, &r.remixTrackID, &skipTime); err != nil {
var bn, actorID int64
var remixerID, remixTrackID *int64
var isCosign bool
if err := rows.Scan(&bn, &actorID, &remixerID, &remixTrackID, &isCosign); err != nil {
rows.Close()
return err
}
results = append(results, r)
scanned++
if bn > maxBn {
maxBn = bn
}
if isCosign && remixerID != nil && remixTrackID != nil {
candidates = append(candidates, cosignRow{
parentOwnerID: actorID,
remixerID: *remixerID,
remixTrackID: *remixTrackID,
})
}
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}
if scanned == 0 {
return nil
}

for _, r := range results {
for _, r := range candidates {
specifier := fmt.Sprintf("%x:%x", r.parentOwnerID, r.remixTrackID)
specifierPrefix := fmt.Sprintf("%x:", r.parentOwnerID)

// Per-parent-owner 30-day cap. Counts ALL cosign user_challenges
// for this parent-owner in the last 30 days; matches apps'
// behavior (the cap is on the *cosigner*, not the recipient).
var cosignsLastMonth int
if err := tx.QueryRow(ctx, `
SELECT COUNT(*) FROM user_challenges
WHERE challenge_id = 'cs'
AND specifier LIKE $1
AND created_at >= now() - INTERVAL '30 days'
`, specifierPrefix+"%").Scan(&cosignsLastMonth); err != nil {
return fmt.Errorf("count recent cosigns: %w", err)
}

// If this specifier already exists, the upsert below is a no-op,
// so we shouldn't gate on the cap for already-minted rows.
// If this specifier already exists, the upsert below is a no-op, so
// don't let it consume a cap slot.
var alreadyHave bool
if err := tx.QueryRow(ctx, `
SELECT EXISTS (SELECT 1 FROM user_challenges
Expand All @@ -126,6 +142,20 @@ func (p *CosignProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
if alreadyHave {
continue
}

// Per-parent-owner 30-day cap. Counts ALL cosign user_challenges for
// this parent-owner in the last 30 days; matches apps' behavior (the
// cap is on the *cosigner*, not the recipient). Rows inserted earlier
// in this same tx are visible here, so the cap holds within a batch.
var cosignsLastMonth int
if err := tx.QueryRow(ctx, `
SELECT COUNT(*) FROM user_challenges
WHERE challenge_id = 'cs'
AND specifier LIKE $1
AND created_at >= now() - INTERVAL '30 days'
`, specifierPrefix+"%").Scan(&cosignsLastMonth); err != nil {
return fmt.Errorf("count recent cosigns: %w", err)
}
if cosignsLastMonth >= cosignCapPerMonth {
continue
}
Expand All @@ -136,5 +166,12 @@ func (p *CosignProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
return fmt.Errorf("upsert: %w", err)
}
}

newMax := highWaterMark(prev, maxBn, scanned)
if newMax > prev {
if err := writeCheckpointInt(ctx, tx, cosignCheckpoint, newMax); err != nil {
return fmt.Errorf("save checkpoint: %w", err)
}
}
return nil
}
8 changes: 4 additions & 4 deletions jobs/challenges/cosign_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestCosign_VerifiedParentReposting(t *testing.T) {
`)
require.NoError(t, err)
_, err = pool.Exec(ctx, `
INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash)
VALUES (1000, 100001, 'track', true, false, now(), 'tx-r1')
INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash, blocknumber)
VALUES (1000, 100001, 'track', true, false, now(), 'tx-r1', 1)
`)
require.NoError(t, err)

Expand Down Expand Up @@ -90,8 +90,8 @@ func TestCosign_MonthCap(t *testing.T) {
`)
require.NoError(t, err)
_, err = pool.Exec(ctx, `
INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash)
VALUES (1100, 110001, 'track', true, false, now(), 'tx-r6')
INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash, blocknumber)
VALUES (1100, 110001, 'track', true, false, now(), 'tx-r6', 1)
`)
require.NoError(t, err)

Expand Down
50 changes: 38 additions & 12 deletions jobs/challenges/first_playlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,26 @@ import (
// Mirrors apps/packages/discovery-provider/src/challenges/first_playlist_challenge.py
// (Python just sets is_complete=true when an event fires; we derive it from
// playlists table state directly).
//
// Incremental: the old version re-scanned all 311K playlists and re-upserted
// every distinct owner on every tick. We instead recompute only owners whose
// playlists changed since the checkpoint. See incremental.go.
type FirstPlaylistProcessor struct{}

func (p *FirstPlaylistProcessor) ChallengeID() string { return "fp" }

const firstPlaylistCheckpoint = "challenges:fp:last_blocknumber"

// firstPlaylistDirtySQL returns (playlist_owner_id, blocknumber) for playlists
// changed since the checkpoint. playlists is updated in place so blocknumber
// moves on every create/update/delete.
const firstPlaylistDirtySQL = `
SELECT playlist_owner_id, blocknumber FROM playlists
WHERE blocknumber > $1
ORDER BY blocknumber ASC
LIMIT $2
`

func (p *FirstPlaylistProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
if err != nil {
Expand All @@ -30,34 +46,44 @@ func (p *FirstPlaylistProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error
}
amount := c.AmountInt()

// Find every user with at least one non-deleted playlist at or after
// the starting block. Boolean challenges complete in a single step
// (step_count is null/0 — we treat current_step_count=1, step=1).
return reconcileIncrementalUsers(ctx, tx, firstPlaylistCheckpoint, firstPlaylistDirtySQL,
func(ctx context.Context, tx pgx.Tx, ownerIDs []int64) error {
return p.recompute(ctx, tx, ownerIDs, startingBlock, amount)
})
}

func (p *FirstPlaylistProcessor) recompute(ctx context.Context, tx pgx.Tx, ownerIDs []int64, startingBlock, amount int32) error {
// Keep only owners that currently have a non-deleted playlist at/after the
// starting block. Boolean challenge: complete in a single step.
rows, err := tx.Query(ctx, `
SELECT DISTINCT playlist_owner_id
FROM playlists
WHERE is_current = true
AND is_delete = false
AND blocknumber >= $1
`, startingBlock)
SELECT x.owner_id
FROM unnest($1::bigint[]) AS x(owner_id)
WHERE EXISTS (
SELECT 1 FROM playlists pl
WHERE pl.playlist_owner_id = x.owner_id
AND pl.is_current = true
AND pl.is_delete = false
AND pl.blocknumber >= $2
)
`, ownerIDs, startingBlock)
if err != nil {
return fmt.Errorf("scan playlists: %w", err)
}
var userIDs []int64
var qualifying []int64
for rows.Next() {
var userID int64
if err := rows.Scan(&userID); err != nil {
rows.Close()
return err
}
userIDs = append(userIDs, userID)
qualifying = append(qualifying, userID)
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}

for _, userID := range userIDs {
for _, userID := range qualifying {
if err := UpsertUserChallenge(ctx, tx,
p.ChallengeID(), SpecifierFromUserID(userID),
userID, 1, 1, amount,
Expand Down
Loading
Loading