From a563dc8c881904d69fdf578de08d1de9c2e86ac9 Mon Sep 17 00:00:00 2001 From: Marcus Pasell <3690498+rickyrombo@users.noreply.github.com> Date: Fri, 15 May 2026 17:00:11 -0500 Subject: [PATCH] Backfill sol_purchases from usdc_purchases Step 1 of the purchases-domain cutover from Python-indexer tables to the Go indexer's sol_* tables. Adds a created_at column to sol_purchases (parity with the legacy table), backfills historical purchases + their splits from usdc_purchases into sol_purchases and sol_payments, adds the v_usdc_purchases compatibility view, and mirrors the notification trigger onto sol_purchases. Readers stay on the legacy table for now; the route swap is a follow-up PR. Also drops a stale block_timestamp column from the sol_purchases entry in sql/01_schema.sql that no migration creates and nothing references. Co-Authored-By: Claude Opus 4.7 --- api/dbv1/models.go | 27 +++++- ddl/functions/handle_usdc_purchase.sql | 88 ++++++++++++++++++ .../0199_backfill_sol_purchases.sql | 73 +++++++++++++++ ddl/views/v_usdc_purchases.sql | 79 ++++++++++++++++ sql/01_schema.sql | 92 ++++++++++++++++++- 5 files changed, 353 insertions(+), 6 deletions(-) create mode 100644 ddl/migrations/0199_backfill_sol_purchases.sql create mode 100644 ddl/views/v_usdc_purchases.sql diff --git a/api/dbv1/models.go b/api/dbv1/models.go index 1732c725..c1185f3b 100644 --- a/api/dbv1/models.go +++ b/api/dbv1/models.go @@ -2110,11 +2110,11 @@ type SolPurchase struct { // Purchase transactions include the blocknumber that the content was most recently updated in order to ensure that the relevant pricing information has been indexed before evaluating whether the purchase is valid. ValidAfterBlocknumber int64 `json:"valid_after_blocknumber"` // A purchase is valid if it meets the pricing information set by the artist. If the pricing information is not available yet (as indicated by the valid_after_blocknumber), then is_valid will be NULL which indicates a "pending" state. - IsValid pgtype.Bool `json:"is_valid"` - City pgtype.Text `json:"city"` - Region pgtype.Text `json:"region"` - Country pgtype.Text `json:"country"` - BlockTimestamp pgtype.Timestamptz `json:"block_timestamp"` + IsValid pgtype.Bool `json:"is_valid"` + City pgtype.Text `json:"city"` + Region pgtype.Text `json:"region"` + Country pgtype.Text `json:"country"` + CreatedAt *time.Time `json:"created_at"` } // Queue for retrying failed indexer updates. @@ -2649,6 +2649,23 @@ type VChallengeDisbursement struct { UserID int32 `json:"user_id"` } +type VUsdcPurchase struct { + Signature string `json:"signature"` + Slot int64 `json:"slot"` + BuyerUserID int32 `json:"buyer_user_id"` + SellerUserID interface{} `json:"seller_user_id"` + Amount int64 `json:"amount"` + ContentType UsdcPurchaseContentType `json:"content_type"` + ContentID int32 `json:"content_id"` + CreatedAt *time.Time `json:"created_at"` + ExtraAmount interface{} `json:"extra_amount"` + Access UsdcPurchaseAccessType `json:"access"` + City pgtype.Text `json:"city"` + Region pgtype.Text `json:"region"` + Country pgtype.Text `json:"country"` + Splits interface{} `json:"splits"` +} + type VolumeLeaderExclusion struct { Address string `json:"address"` Description pgtype.Text `json:"description"` diff --git a/ddl/functions/handle_usdc_purchase.sql b/ddl/functions/handle_usdc_purchase.sql index 1db16d06..0129778b 100644 --- a/ddl/functions/handle_usdc_purchase.sql +++ b/ddl/functions/handle_usdc_purchase.sql @@ -56,3 +56,91 @@ do $$ begin exception when others then null; end $$; + + +-- Mirror of handle_usdc_purchase for the new indexer's table. Resolves +-- seller_user_id from current content ownership (same CASE used in +-- v_usdc_purchases). Notification shape and group_id format match the legacy +-- trigger so the two can dual-fire during cutover (second insert is a no-op +-- via on conflict). extra_amount and vendor are emitted as null in the new +-- payload because they aren't denormalized onto sol_purchases. +create or replace function handle_sol_purchase() returns trigger as $$ +declare + resolved_seller_user_id integer; +begin + if new.is_valid is not true then + return null; + end if; + + if new.content_type = 'track' then + select owner_id into resolved_seller_user_id + from tracks + where track_id = new.content_id + and is_current = true + limit 1; + else + select playlist_owner_id into resolved_seller_user_id + from playlists + where playlist_id = new.content_id + and is_current = true + limit 1; + end if; + + if resolved_seller_user_id is null then + return null; + end if; + + insert into notification + (slot, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.slot, + ARRAY [resolved_seller_user_id], + new.created_at, + 'usdc_purchase_seller', + new.buyer_user_id, + 'usdc_purchase_seller:' || 'seller_user_id:' || resolved_seller_user_id || ':buyer_user_id:' || new.buyer_user_id || ':content_id:' || new.content_id || ':content_type:' || new.content_type, + json_build_object( + 'content_type', new.content_type, + 'buyer_user_id', new.buyer_user_id, + 'seller_user_id', resolved_seller_user_id, + 'amount', new.amount, + 'extra_amount', null, + 'content_id', new.content_id, + 'vendor', null + ) + ), + ( + new.slot, + ARRAY [new.buyer_user_id], + new.created_at, + 'usdc_purchase_buyer', + new.buyer_user_id, + 'usdc_purchase_buyer:' || 'seller_user_id:' || resolved_seller_user_id || ':buyer_user_id:' || new.buyer_user_id || ':content_id:' || new.content_id || ':content_type:' || new.content_type, + json_build_object( + 'content_type', new.content_type, + 'buyer_user_id', new.buyer_user_id, + 'seller_user_id', resolved_seller_user_id, + 'amount', new.amount, + 'extra_amount', null, + 'content_id', new.content_id, + 'vendor', null + ) + ) + on conflict do nothing; + + return null; + exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$ language plpgsql; + +do $$ begin + create trigger on_sol_purchase + after insert on sol_purchases + for each row execute procedure handle_sol_purchase(); +exception + when others then null; +end $$; diff --git a/ddl/migrations/0199_backfill_sol_purchases.sql b/ddl/migrations/0199_backfill_sol_purchases.sql new file mode 100644 index 00000000..37c08fb4 --- /dev/null +++ b/ddl/migrations/0199_backfill_sol_purchases.sql @@ -0,0 +1,73 @@ +BEGIN; +SET LOCAL statement_timeout = 0; + +-- Parity with the legacy usdc_purchases.created_at column. The Go indexer +-- writes new rows close to on-chain time, so DEFAULT NOW() is acceptable; the +-- backfill below corrects rows that came from the legacy table. +ALTER TABLE sol_purchases + ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT NOW(); + +-- Backfill historical purchases that predate the Go indexer. from_account is +-- resolved via the buyer's USDC user_bank so the NOT NULL column has a real +-- value. Falls back to '' for buyers whose bank account is unknown — the +-- sol_purchases_from_account_idx tolerates empty strings. +INSERT INTO sol_purchases ( + signature, instruction_index, amount, slot, + from_account, content_type, content_id, buyer_user_id, + access_type, valid_after_blocknumber, is_valid, + city, region, country, created_at +) +SELECT + up.signature, + 0 AS instruction_index, + up.amount, + up.slot, + COALESCE(uuba.bank_account, '') AS from_account, + up.content_type::text, + up.content_id, + up.buyer_user_id, + up.access::text, + 0 AS valid_after_blocknumber, + TRUE AS is_valid, + up.city, up.region, up.country, + up.created_at +FROM usdc_purchases up +LEFT JOIN users u + ON u.user_id = up.buyer_user_id AND u.is_current = TRUE +LEFT JOIN usdc_user_bank_accounts uuba + ON uuba.ethereum_address = u.wallet +ON CONFLICT (signature, instruction_index) DO NOTHING; + +-- Correct created_at for rows the Go indexer wrote before this migration ran: +-- those rows got NOW() from the column default, but the legacy table has the +-- real on-chain time. Only updates rows whose existing created_at is later +-- than the legacy value, so it leaves accurate Go-indexer writes alone. +UPDATE sol_purchases sp + SET created_at = up.created_at + FROM usdc_purchases up + WHERE up.signature = sp.signature + AND up.created_at < sp.created_at; + +-- Explode legacy usdc_purchases.splits JSONB into sol_payments rows. The +-- element shape is {payout_wallet, amount, percentage, user_id, eth_wallet} +-- per add_wallet_info_to_splits() in +-- discovery-provider/src/queries/get_extended_purchase_gate.py. +INSERT INTO sol_payments (signature, instruction_index, route_index, to_account, amount, slot) +SELECT + up.signature, + 0 AS instruction_index, + (ord - 1)::int AS route_index, + elem->>'payout_wallet' AS to_account, + (elem->>'amount')::bigint AS amount, + up.slot +FROM usdc_purchases up +CROSS JOIN LATERAL jsonb_array_elements(up.splits) WITH ORDINALITY arr(elem, ord) +WHERE elem->>'payout_wallet' IS NOT NULL +ON CONFLICT (signature, instruction_index, route_index) DO NOTHING; + +-- Default sort across the purchases / sales / library routes is by created_at; +-- restore the index parity the legacy table had via idx_usdc_purchases_created_at. +CREATE INDEX IF NOT EXISTS sol_purchases_created_at_idx + ON sol_purchases (created_at); + +COMMIT; diff --git a/ddl/views/v_usdc_purchases.sql b/ddl/views/v_usdc_purchases.sql new file mode 100644 index 00000000..3bb0eb0e --- /dev/null +++ b/ddl/views/v_usdc_purchases.sql @@ -0,0 +1,79 @@ +DROP VIEW IF EXISTS v_usdc_purchases; +CREATE VIEW v_usdc_purchases AS +SELECT + sp.signature, + sp.slot, + sp.buyer_user_id, + CASE sp.content_type + WHEN 'track' THEN t.owner_id + WHEN 'album' THEN p.playlist_owner_id + WHEN 'playlist' THEN p.playlist_owner_id + END AS seller_user_id, + sp.amount, + sp.content_type::usdc_purchase_content_type AS content_type, + sp.content_id, + sp.created_at, + GREATEST( + sp.amount - COALESCE( + CASE sp.content_type + WHEN 'track' THEN ( + SELECT tph.total_price_cents * 10000 + FROM track_price_history tph + WHERE tph.track_id = sp.content_id + AND tph.block_timestamp <= sp.created_at + ORDER BY tph.block_timestamp DESC + LIMIT 1 + ) + ELSE ( + SELECT aph.total_price_cents * 10000 + FROM album_price_history aph + WHERE aph.playlist_id = sp.content_id + AND aph.block_timestamp <= sp.created_at + ORDER BY aph.block_timestamp DESC + LIMIT 1 + ) + END, + 0 + ), + 0 + ) AS extra_amount, + sp.access_type::usdc_purchase_access_type AS access, + sp.city, sp.region, sp.country, + ( + SELECT COALESCE( + jsonb_agg( + jsonb_build_object( + 'user_id', COALESCE(u_payout.user_id, u_sca.user_id), + 'payout_wallet', pay.to_account, + 'amount', pay.amount, + 'percentage', pay.amount * 100.0 / NULLIF(sp.amount, 0) + ) + ORDER BY pay.route_index + ), + '[]'::jsonb + ) + FROM sol_payments pay + LEFT JOIN users u_payout + ON u_payout.spl_usdc_payout_wallet = pay.to_account + AND u_payout.is_current = TRUE + LEFT JOIN sol_claimable_accounts sca + ON sca.account = pay.to_account + AND sca.mint = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v' + LEFT JOIN users u_sca + ON u_sca.wallet = sca.ethereum_address + AND u_sca.is_current = TRUE + WHERE pay.signature = sp.signature + AND pay.instruction_index = sp.instruction_index + ) AS splits +FROM sol_purchases sp +LEFT JOIN tracks t + ON sp.content_type = 'track' + AND t.track_id = sp.content_id + AND t.is_current = TRUE +LEFT JOIN playlists p + ON sp.content_type IN ('album', 'playlist') + AND p.playlist_id = sp.content_id + AND p.is_current = TRUE +WHERE sp.is_valid IS TRUE; + +COMMENT ON VIEW v_usdc_purchases IS 'Compatibility view exposing sol_purchases + sol_payments in the column shape API routes used to read from usdc_purchases. seller_user_id is the current content owner (not snapshotted at purchase time). extra_amount is amount paid minus base price from price history. vendor is intentionally dropped.'; diff --git a/sql/01_schema.sql b/sql/01_schema.sql index 17bf3e7f..305ca77b 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -8320,7 +8320,7 @@ CREATE TABLE public.sol_purchases ( city character varying, region character varying, country character varying, - block_timestamp timestamp with time zone + created_at timestamp without time zone DEFAULT now() ); @@ -11956,6 +11956,13 @@ CREATE INDEX sol_purchases_from_account_idx ON public.sol_purchases USING btree COMMENT ON INDEX public.sol_purchases_from_account_idx IS 'Used for getting purchases by a user via their account.'; +-- +-- Name: sol_purchases_created_at_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX sol_purchases_created_at_idx ON public.sol_purchases USING btree (created_at); + + -- -- Name: sol_purchases_valid_idx; Type: INDEX; Schema: public; Owner: - -- @@ -12889,6 +12896,89 @@ CREATE VIEW public.v_challenge_disbursements AS JOIN public.users ON (((users.wallet = rd.recipient_eth_address) AND (users.is_current = true)))); +-- +-- Name: v_usdc_purchases; Type: VIEW; Schema: public; Owner: - +-- + +CREATE VIEW public.v_usdc_purchases AS + SELECT sp.signature, + sp.slot, + sp.buyer_user_id, + CASE sp.content_type + WHEN 'track'::text THEN t.owner_id + WHEN 'album'::text THEN p.playlist_owner_id + WHEN 'playlist'::text THEN p.playlist_owner_id + END AS seller_user_id, + sp.amount, + (sp.content_type)::public.usdc_purchase_content_type AS content_type, + sp.content_id, + sp.created_at, + GREATEST( + sp.amount - COALESCE( + CASE sp.content_type + WHEN 'track'::text THEN ( + SELECT (tph.total_price_cents * 10000) + FROM public.track_price_history tph + WHERE tph.track_id = sp.content_id + AND tph.block_timestamp <= sp.created_at + ORDER BY tph.block_timestamp DESC + LIMIT 1 + ) + ELSE ( + SELECT (aph.total_price_cents * 10000) + FROM public.album_price_history aph + WHERE aph.playlist_id = sp.content_id + AND aph.block_timestamp <= sp.created_at + ORDER BY aph.block_timestamp DESC + LIMIT 1 + ) + END, + 0 + ), + 0 + ) AS extra_amount, + (sp.access_type)::public.usdc_purchase_access_type AS access, + sp.city, + sp.region, + sp.country, + ( + SELECT COALESCE( + jsonb_agg( + jsonb_build_object( + 'user_id', COALESCE(u_payout.user_id, u_sca.user_id), + 'payout_wallet', pay.to_account, + 'amount', pay.amount, + 'percentage', ((pay.amount * 100.0) / NULLIF(sp.amount, 0)) + ) + ORDER BY pay.route_index + ), + '[]'::jsonb + ) + FROM public.sol_payments pay + LEFT JOIN public.users u_payout + ON u_payout.spl_usdc_payout_wallet = pay.to_account + AND u_payout.is_current = true + LEFT JOIN public.sol_claimable_accounts sca + ON sca.account = pay.to_account + AND sca.mint = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v'::text + LEFT JOIN public.users u_sca + ON u_sca.wallet = sca.ethereum_address + AND u_sca.is_current = true + WHERE pay.signature = sp.signature + AND pay.instruction_index = sp.instruction_index + ) AS splits + FROM public.sol_purchases sp + LEFT JOIN public.tracks t + ON sp.content_type = 'track'::text + AND t.track_id = sp.content_id + AND t.is_current = true + LEFT JOIN public.playlists p + ON sp.content_type IN ('album'::text, 'playlist'::text) + AND p.playlist_id = sp.content_id + AND p.is_current = true + WHERE sp.is_valid IS TRUE; + + -- -- PostgreSQL database dump complete --