diff --git a/package.json b/package.json index d914595cff..a6137da3f5 100644 --- a/package.json +++ b/package.json @@ -82,6 +82,8 @@ "test:cloudflare:api": "vitest run --exclude=tests/cli* --exclude=tests/updates* --exclude=tests/stats* --exclude=tests/channel_self* --config vitest.config.cloudflare.ts", "test:cloudflare:updates": "vitest run tests/updates* --config vitest.config.cloudflare.ts", "stripe:backfill-retention-metrics": "bun scripts/backfill_retention_metrics.ts", + "stripe:backfill-org-conversion-rate": "bun scripts/backfill_org_conversion_rate_trend.ts", + "stripe:backfill-customer-countries": "bun scripts/backfill_stripe_customer_countries.ts", "stripe:sync-org-names": "bun scripts/sync_stripe_org_names.ts", "lint": "eslint \"src/**/*.{vue,ts,js}\"", "fmt": "bun run lint:fix && bun run lint:sql", diff --git a/scripts/admin_stripe_backfill_utils.ts b/scripts/admin_stripe_backfill_utils.ts new file mode 100644 index 0000000000..43bf3ff195 --- /dev/null +++ b/scripts/admin_stripe_backfill_utils.ts @@ -0,0 +1,114 @@ +import type { Database } from '../supabase/functions/_backend/utils/supabase.types.ts' +import { existsSync } from 'node:fs' +import { readFile } from 'node:fs/promises' +import { createClient } from '@supabase/supabase-js' +import Stripe from 'stripe' + +export const DEFAULT_ENV_FILE = './internal/cloudflare/.env.prod' + +export function getArgValue(args: string[], prefix: string): string | null { + const arg = args.find(value => value.startsWith(`${prefix}=`)) + if (!arg) + return null + return arg.slice(prefix.length + 1) +} + +export async function loadEnv(filePath: string) { + if (!existsSync(filePath)) + return {} + + const text = await readFile(filePath, 'utf8') + const env: Record = {} + + for (const line of text.split('\n')) { + const trimmed = line.trim() + if (!trimmed || trimmed.startsWith('#')) + continue + + const separatorIndex = trimmed.indexOf('=') + if (separatorIndex <= 0) + continue + + const key = trimmed.slice(0, separatorIndex) + let value = trimmed.slice(separatorIndex + 1) + if ((value.startsWith('"') && value.endsWith('"')) || (value.startsWith('\'') && value.endsWith('\''))) + value = value.slice(1, -1) + env[key] = value + } + + return env +} + +export function getRequiredEnv(env: Record, key: string) { + const value = env[key]?.trim() + if (!value) + throw new Error(`Missing ${key}`) + return value +} + +export function getSupabaseServiceRoleKey(env: Record) { + const value = env.SUPABASE_SERVICE_ROLE_KEY?.trim() || env.SUPABASE_SERVICE_KEY?.trim() + if (!value) + throw new Error('Missing SUPABASE_SERVICE_ROLE_KEY') + return value +} + +export function createSupabaseServiceClient(env: Record) { + return createClient( + getRequiredEnv(env, 'SUPABASE_URL'), + getSupabaseServiceRoleKey(env), + { auth: { autoRefreshToken: false, persistSession: false, detectSessionInUrl: false } }, + ) +} + +export function createStripeClient(secretKey: string, apiBaseUrl?: string) { + let hostConfig: Partial[1]>, 'host' | 'port' | 'protocol'>> = {} + + if (apiBaseUrl?.trim()) { + const parsed = new URL(apiBaseUrl) + hostConfig = { + host: parsed.hostname, + port: Number.parseInt(parsed.port || (parsed.protocol === 'https:' ? '443' : '80'), 10), + protocol: parsed.protocol.replace(':', '') as 'http' | 'https', + } + } + + type StripeApiVersion = NonNullable[1]>['apiVersion'] + return new Stripe(secretKey, { + apiVersion: '2026-03-25.dahlia' as StripeApiVersion, + httpClient: Stripe.createFetchHttpClient(), + ...hostConfig, + }) +} + +export async function asyncPool(limit: number, items: T[], iterator: (item: T) => Promise) { + const executing = new Set>() + + for (const item of items) { + const task = iterator(item).finally(() => { + executing.delete(task) + }) + executing.add(task) + + if (executing.size >= limit) + await Promise.race(executing) + } + + await Promise.all(executing) +} + +export function parsePositiveInteger(value: string | null, label: string, fallback: number) { + if (value === null) + return fallback + + const parsed = Number.parseInt(value, 10) + if (!Number.isInteger(parsed) || parsed < 1) + throw new Error(`${label} must be a positive integer`) + + return parsed +} + +export function isActionableStripeCustomerId(customerId: string | null | undefined) { + const trimmedCustomerId = customerId?.trim() + return !!trimmedCustomerId && !trimmedCustomerId.startsWith('pending_') +} diff --git a/scripts/backfill_org_conversion_rate_trend.ts b/scripts/backfill_org_conversion_rate_trend.ts new file mode 100644 index 0000000000..a7fde5ba23 --- /dev/null +++ b/scripts/backfill_org_conversion_rate_trend.ts @@ -0,0 +1,249 @@ +/* + * Backfill the admin "Org Conversion Rate Trend" metric. + * + * The historical paying counts in public.global_stats are Stripe-backed + * snapshots written by the admin stats cron. The raw org count was not stored, + * so this script reconstructs that denominator from public.orgs.created_at. + * + * Dry run, defaulting to the last 30 UTC calendar days: + * bun run stripe:backfill-org-conversion-rate + * + * Apply a date range: + * bun run stripe:backfill-org-conversion-rate --apply --from=2026-02-01 --to=2026-04-30 + * + * Apply every stored global_stats row: + * bun run stripe:backfill-org-conversion-rate --apply --all + */ +import type { Database } from '../supabase/functions/_backend/utils/supabase.types.ts' +import process from 'node:process' +import { asyncPool, createSupabaseServiceClient, DEFAULT_ENV_FILE, getArgValue, loadEnv, parsePositiveInteger } from './admin_stripe_backfill_utils.ts' + +const DEFAULT_LOOKBACK_DAYS = 30 +const DEFAULT_CONCURRENCY = 10 +const DEFAULT_PAGE_SIZE = 1000 +const DATE_ID_REGEX = /^\d{4}-\d{2}-\d{2}$/ + +type SupabaseClient = ReturnType +type GlobalStatsRow = Pick< + Database['public']['Tables']['global_stats']['Row'], + 'date_id' | 'paying' | 'org_conversion_rate' +> +type OrgCreatedAtRow = Pick + +export interface OrgConversionRateBackfillRow { + changed: boolean + current_rate: number + date_id: string + orgs: number + paying: number + next_rate: number +} + +function getDateId(targetDate = new Date()) { + return new Date(Date.UTC(targetDate.getUTCFullYear(), targetDate.getUTCMonth(), targetDate.getUTCDate())).toISOString().slice(0, 10) +} + +function assertDateId(value: string, label: string) { + if (!DATE_ID_REGEX.test(value)) + throw new Error(`${label} must use YYYY-MM-DD`) + return value +} + +function getDefaultFromDateId(referenceDate = new Date()) { + const date = new Date(Date.UTC(referenceDate.getUTCFullYear(), referenceDate.getUTCMonth(), referenceDate.getUTCDate())) + date.setUTCDate(date.getUTCDate() - DEFAULT_LOOKBACK_DAYS + 1) + return getDateId(date) +} + +function getNextDateId(dateId: string) { + const date = new Date(`${dateId}T00:00:00.000Z`) + date.setUTCDate(date.getUTCDate() + 1) + return getDateId(date) +} + +function toMetricNumber(value: number | string | null | undefined) { + const numberValue = Number(value ?? 0) + return Number.isFinite(numberValue) ? numberValue : 0 +} + +export function calculateOrgConversionRate(paying: number | string | null | undefined, orgs: number | string | null | undefined) { + const payingCount = toMetricNumber(paying) + const orgCount = toMetricNumber(orgs) + if (orgCount <= 0) + return 0 + return Number(((payingCount * 100) / orgCount).toFixed(1)) +} + +function buildOrgCountsByDateId(dateIds: string[], orgRows: OrgCreatedAtRow[]) { + const orgCreatedAtTimes = orgRows + .map(row => row.created_at ? Date.parse(row.created_at) : Number.NaN) + .filter(Number.isFinite) + .sort((left, right) => left - right) + const countsByDateId = new Map() + let orgIndex = 0 + + for (const dateId of [...dateIds].sort()) { + const endExclusive = Date.parse(`${getNextDateId(dateId)}T00:00:00.000Z`) + while (orgIndex < orgCreatedAtTimes.length && orgCreatedAtTimes[orgIndex]! < endExclusive) + orgIndex++ + countsByDateId.set(dateId, orgIndex) + } + + return countsByDateId +} + +export function buildOrgConversionRateBackfillRows(rows: GlobalStatsRow[], orgRows: OrgCreatedAtRow[]): OrgConversionRateBackfillRow[] { + const orgCountsByDateId = buildOrgCountsByDateId(rows.map(row => row.date_id), orgRows) + + return rows.map((row) => { + const orgs = orgCountsByDateId.get(row.date_id) ?? 0 + const paying = toMetricNumber(row.paying) + const currentRate = toMetricNumber(row.org_conversion_rate) + const nextRate = calculateOrgConversionRate(paying, orgs) + return { + date_id: row.date_id, + orgs, + paying, + current_rate: currentRate, + next_rate: nextRate, + changed: Math.abs(currentRate - nextRate) > 0.0001, + } + }) +} + +async function fetchGlobalStatsRows(supabase: SupabaseClient, fromDateId: string | null, toDateId: string | null) { + const rows: GlobalStatsRow[] = [] + let offset = 0 + + while (true) { + let query = supabase + .from('global_stats') + .select('date_id, paying, org_conversion_rate') + .order('date_id', { ascending: true }) + .range(offset, offset + DEFAULT_PAGE_SIZE - 1) + + if (fromDateId) + query = query.gte('date_id', fromDateId) + if (toDateId) + query = query.lte('date_id', toDateId) + + const { data, error } = await query + if (error) + throw error + if (!data?.length) + break + + rows.push(...data) + if (data.length < DEFAULT_PAGE_SIZE) + break + offset += DEFAULT_PAGE_SIZE + } + + return rows +} + +async function fetchOrgCreatedAtRows(supabase: SupabaseClient, toDateId: string | null) { + const rows: OrgCreatedAtRow[] = [] + let offset = 0 + + while (true) { + let query = supabase + .from('orgs') + .select('created_at') + .order('created_at', { ascending: true }) + .range(offset, offset + DEFAULT_PAGE_SIZE - 1) + + if (toDateId) + query = query.lt('created_at', `${getNextDateId(toDateId)}T00:00:00.000Z`) + + const { data, error } = await query + if (error) + throw error + if (!data?.length) + break + + rows.push(...data) + if (data.length < DEFAULT_PAGE_SIZE) + break + offset += DEFAULT_PAGE_SIZE + } + + return rows +} + +async function updateConversionRate(supabase: SupabaseClient, row: OrgConversionRateBackfillRow) { + const { error } = await supabase + .from('global_stats') + .update({ org_conversion_rate: row.next_rate }) + .eq('date_id', row.date_id) + + if (error) + throw error +} + +async function main(args = process.argv.slice(2), runtimeEnv: Record = process.env) { + const apply = args.includes('--apply') + const all = args.includes('--all') + const envFile = getArgValue(args, '--env-file') ?? DEFAULT_ENV_FILE + const concurrency = parsePositiveInteger(getArgValue(args, '--concurrency'), '--concurrency', DEFAULT_CONCURRENCY) + const fromDateId = all + ? null + : assertDateId(getArgValue(args, '--from') ?? getDefaultFromDateId(), '--from') + const toDateId = all + ? null + : assertDateId(getArgValue(args, '--to') ?? getDateId(), '--to') + + if (fromDateId && toDateId && fromDateId > toDateId) + throw new Error('--from must be before or equal to --to') + + const fileEnv = await loadEnv(envFile) + const env = { + ...fileEnv, + ...runtimeEnv, + } + const supabase = createSupabaseServiceClient(env) + + const rows = await fetchGlobalStatsRows(supabase, fromDateId, toDateId) + const orgRows = await fetchOrgCreatedAtRows(supabase, toDateId) + const backfillRows = buildOrgConversionRateBackfillRows(rows, orgRows) + const changedRows = backfillRows.filter(row => row.changed) + + console.log(`Loaded ${rows.length} global_stats rows`) + console.log(`Loaded ${orgRows.length} org rows for denominator reconstruction`) + console.log(`Env file: ${envFile}`) + if (all) + console.log('Scope: all global_stats rows') + else + console.log(`Scope: ${fromDateId} to ${toDateId}`) + console.log(`Rows needing update: ${changedRows.length}`) + + const sampleRows = changedRows.slice(0, 10) + if (sampleRows.length > 0) { + console.log('Sample updates:') + for (const row of sampleRows) + console.log(`${row.date_id}: ${row.current_rate}% -> ${row.next_rate}% (${row.paying}/${row.orgs})`) + } + + if (!apply) { + console.log('Dry run only. Pass --apply to update global_stats.') + return + } + + if (changedRows.length === 0) { + console.log('Nothing to update.') + return + } + + let updated = 0 + await asyncPool(concurrency, changedRows, async (row) => { + await updateConversionRate(supabase, row) + updated++ + if (updated % 100 === 0 || updated === changedRows.length) + console.log(`Updated ${updated}/${changedRows.length}`) + }) + + console.log(`Done. Updated ${updated}/${changedRows.length} org conversion rate rows.`) +} + +if (import.meta.main) + await main() diff --git a/scripts/backfill_stripe_customer_countries.ts b/scripts/backfill_stripe_customer_countries.ts new file mode 100644 index 0000000000..72876a07f8 --- /dev/null +++ b/scripts/backfill_stripe_customer_countries.ts @@ -0,0 +1,235 @@ +/* + * Backfill the admin "Top Billing Countries" metric. + * + * The dashboard reads public.stripe_info.customer_country. This script syncs + * that column from Stripe customer profile addresses for historical customers. + * + * Dry run for missing country rows: + * bun run stripe:backfill-customer-countries + * + * Apply missing countries: + * bun run stripe:backfill-customer-countries --apply + * + * Refresh existing country values too: + * bun run stripe:backfill-customer-countries --apply --refresh-existing + */ +import type Stripe from 'stripe' +import type { Database } from '../supabase/functions/_backend/utils/supabase.types.ts' +import { mkdir, writeFile } from 'node:fs/promises' +import process from 'node:process' +import { asyncPool, createStripeClient, createSupabaseServiceClient, DEFAULT_ENV_FILE, getArgValue, getRequiredEnv, isActionableStripeCustomerId, loadEnv, parsePositiveInteger } from './admin_stripe_backfill_utils.ts' + +const DEFAULT_CONCURRENCY = 8 +const DEFAULT_PAGE_SIZE = 1000 +const FAILURE_OUTPUT = './tmp/stripe_customer_country_backfill_failures.json' +const ISO_COUNTRY_CODE_REGEX = /^[A-Z]{2}$/ + +type SupabaseClient = ReturnType +type StripeInfoCountryRow = Pick< + Database['public']['Tables']['stripe_info']['Row'], + 'customer_id' | 'customer_country' +> + +export interface StripeCustomerCountryBackfillCandidate { + current_country: string | null + customer_id: string + next_country: string | null +} + +interface BackfillFailure { + customerId: string + error: string +} + +export function normalizeStripeCountryCode(country: string | null | undefined) { + if (!country) + return null + + const normalized = country.trim().toUpperCase() + if (!normalized || !ISO_COUNTRY_CODE_REGEX.test(normalized)) + return null + + return normalized +} + +export function getCustomerProfileCountry(customer: Stripe.Customer | Stripe.DeletedCustomer) { + if (customer.deleted) + return null + + return normalizeStripeCountryCode(customer.address?.country ?? null) +} + +export function shouldUpdateCustomerCountry(currentCountry: string | null | undefined, nextCountry: string | null, refreshExisting: boolean) { + const normalizedCurrentCountry = normalizeStripeCountryCode(currentCountry) + if (refreshExisting) + return normalizedCurrentCountry !== nextCountry + return normalizedCurrentCountry === null && nextCountry !== null +} + +async function fetchStripeInfoCountryRows( + supabase: SupabaseClient, + options: { + customerId?: string | null + missingOnly: boolean + }, +) { + const rows: StripeInfoCountryRow[] = [] + let lastSeenCustomerId: string | null = null + + while (true) { + let query = supabase + .from('stripe_info') + .select('customer_id, customer_country') + .order('customer_id', { ascending: true }) + .limit(DEFAULT_PAGE_SIZE) + + if (options.customerId) + query = query.eq('customer_id', options.customerId) + else if (lastSeenCustomerId) + query = query.gt('customer_id', lastSeenCustomerId) + + if (options.missingOnly) + query = query.is('customer_country', null) + + const { data, error } = await query + if (error) + throw error + if (!data?.length) + break + + rows.push(...data) + + if (options.customerId || data.length < DEFAULT_PAGE_SIZE) + break + lastSeenCustomerId = data.at(-1)?.customer_id ?? null + } + + return rows +} + +async function updateCustomerCountry(supabase: SupabaseClient, customerId: string, country: string | null) { + const { error } = await supabase + .from('stripe_info') + .update({ customer_country: country }) + .eq('customer_id', customerId) + + if (error) + throw error +} + +async function writeFailures(failures: BackfillFailure[]) { + if (failures.length === 0) + return + + await mkdir('./tmp', { recursive: true }) + await writeFile(FAILURE_OUTPUT, `${JSON.stringify(failures, null, 2)}\n`) + console.log(`Failure details written to ${FAILURE_OUTPUT}`) +} + +async function main(args = process.argv.slice(2), runtimeEnv: Record = process.env) { + const apply = args.includes('--apply') + const refreshExisting = args.includes('--refresh-existing') + const envFile = getArgValue(args, '--env-file') ?? DEFAULT_ENV_FILE + const customerId = getArgValue(args, '--customer-id') + const limitArg = getArgValue(args, '--limit') + const limit = limitArg ? parsePositiveInteger(limitArg, '--limit', 0) : null + const concurrency = parsePositiveInteger(getArgValue(args, '--concurrency'), '--concurrency', DEFAULT_CONCURRENCY) + + const fileEnv = await loadEnv(envFile) + const env = { + ...fileEnv, + ...runtimeEnv, + } + + const supabase = createSupabaseServiceClient(env) + const stripe = createStripeClient( + getRequiredEnv(env, 'STRIPE_SECRET_KEY'), + env.STRIPE_API_BASE_URL?.trim(), + ) + + const rows = await fetchStripeInfoCountryRows(supabase, { + customerId, + missingOnly: !refreshExisting, + }) + const actionableRows = rows.filter(row => isActionableStripeCustomerId(row.customer_id)) + const limitedRows = limit ? actionableRows.slice(0, limit) : actionableRows + + console.log(`Loaded ${rows.length} stripe_info rows (${actionableRows.length} actionable)`) + console.log(`Env file: ${envFile}`) + if (customerId) + console.log(`Scoped to customer: ${customerId}`) + if (refreshExisting) + console.log('Mode: refresh existing country values') + else + console.log('Mode: fill missing country values only') + + const failures: BackfillFailure[] = [] + const candidates: StripeCustomerCountryBackfillCandidate[] = [] + let checked = 0 + + await asyncPool(concurrency, limitedRows, async (row) => { + try { + const customer = await stripe.customers.retrieve(row.customer_id) + const nextCountry = getCustomerProfileCountry(customer) + if (shouldUpdateCustomerCountry(row.customer_country, nextCountry, refreshExisting)) { + candidates.push({ + customer_id: row.customer_id, + current_country: normalizeStripeCountryCode(row.customer_country), + next_country: nextCountry, + }) + } + checked++ + if (checked % 100 === 0 || checked === limitedRows.length) + console.log(`Checked ${checked}/${limitedRows.length}`) + } + catch (error) { + failures.push({ + customerId: row.customer_id, + error: error instanceof Error ? error.message : String(error), + }) + } + }) + + console.log(`Candidates needing update: ${candidates.length}`) + if (candidates.length > 0) { + console.log('Sample updates:') + for (const candidate of candidates.slice(0, 10)) { + const from = candidate.current_country ?? 'null' + const to = candidate.next_country ?? 'null' + console.log(`${candidate.customer_id}: ${from} -> ${to}`) + } + } + + if (!apply) { + await writeFailures(failures) + if (failures.length > 0) + throw new Error(`Stripe customer country backfill dry run had ${failures.length} failures`) + console.log('Dry run only. Pass --apply to update stripe_info.') + return + } + + let updated = 0 + await asyncPool(concurrency, candidates, async (candidate) => { + try { + await updateCustomerCountry(supabase, candidate.customer_id, candidate.next_country) + updated++ + if (updated % 100 === 0 || updated === candidates.length) + console.log(`Updated ${updated}/${candidates.length}`) + } + catch (error) { + failures.push({ + customerId: candidate.customer_id, + error: error instanceof Error ? error.message : String(error), + }) + } + }) + + await writeFailures(failures) + if (failures.length > 0) + throw new Error(`Stripe customer country backfill completed with ${failures.length} failures`) + + console.log(`Done. Updated ${updated}/${candidates.length} customer country rows.`) +} + +if (import.meta.main) + await main() diff --git a/tests/admin-stripe-backfill-scripts.unit.test.ts b/tests/admin-stripe-backfill-scripts.unit.test.ts new file mode 100644 index 0000000000..8751e0395d --- /dev/null +++ b/tests/admin-stripe-backfill-scripts.unit.test.ts @@ -0,0 +1,82 @@ +import type Stripe from 'stripe' +import { describe, expect, it } from 'vitest' +import { isActionableStripeCustomerId } from '../scripts/admin_stripe_backfill_utils.ts' +import { buildOrgConversionRateBackfillRows, calculateOrgConversionRate } from '../scripts/backfill_org_conversion_rate_trend.ts' +import { getCustomerProfileCountry, normalizeStripeCountryCode, shouldUpdateCustomerCountry } from '../scripts/backfill_stripe_customer_countries.ts' + +describe('admin Stripe backfill scripts', () => { + it.concurrent('calculates org conversion rates from paying and org snapshots', () => { + expect(calculateOrgConversionRate(25, 200)).toBe(12.5) + expect(calculateOrgConversionRate('1', '3')).toBe(33.3) + expect(calculateOrgConversionRate(10, 0)).toBe(0) + }) + + it.concurrent('marks only changed org conversion rows', () => { + const rows = buildOrgConversionRateBackfillRows([ + { + date_id: '2026-04-01', + paying: 25, + org_conversion_rate: 0, + }, + { + date_id: '2026-04-02', + paying: 50, + org_conversion_rate: 25, + }, + ] as any, [ + ...Array.from({ length: 200 }, () => ({ created_at: '2026-04-01T12:00:00.000Z' })), + { created_at: '2026-04-03T00:00:00.000Z' }, + ]) + + expect(rows).toEqual([ + { + date_id: '2026-04-01', + orgs: 200, + paying: 25, + current_rate: 0, + next_rate: 12.5, + changed: true, + }, + { + date_id: '2026-04-02', + orgs: 200, + paying: 50, + current_rate: 25, + next_rate: 25, + changed: false, + }, + ]) + }) + + it.concurrent('normalizes Stripe country codes', () => { + expect(normalizeStripeCountryCode(' us ')).toBe('US') + expect(normalizeStripeCountryCode('USA')).toBeNull() + expect(normalizeStripeCountryCode('')).toBeNull() + }) + + it.concurrent('reads customer profile country from Stripe customers', () => { + expect(getCustomerProfileCountry({ + deleted: false, + address: { country: 'fr' }, + } as unknown as Stripe.Customer)).toBe('FR') + + expect(getCustomerProfileCountry({ + deleted: true, + id: 'cus_deleted', + object: 'customer', + } as Stripe.DeletedCustomer)).toBeNull() + }) + + it.concurrent('decides when customer country rows need updates', () => { + expect(shouldUpdateCustomerCountry(null, 'US', false)).toBe(true) + expect(shouldUpdateCustomerCountry('US', 'FR', false)).toBe(false) + expect(shouldUpdateCustomerCountry('US', 'FR', true)).toBe(true) + expect(shouldUpdateCustomerCountry(' us ', 'US', true)).toBe(false) + }) + + it.concurrent('skips pending Stripe customer placeholders', () => { + expect(isActionableStripeCustomerId('cus_123')).toBe(true) + expect(isActionableStripeCustomerId('pending_org_id')).toBe(false) + expect(isActionableStripeCustomerId('')).toBe(false) + }) +})