From b80e0c6d4f5bcd4f32fe56080ccdadf9e72e1658 Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 14:26:35 +0200 Subject: [PATCH 01/14] feat(admin): add retention metrics backfill --- package.json | 1 + scripts/backfill_retention_metrics.ts | 917 ++++++++++++++++++ .../_backend/triggers/logsnag_insights.ts | 28 +- .../_backend/triggers/stripe_event.ts | 138 +-- .../_backend/utils/revenue_metrics.ts | 175 ++++ tests/backfill-retention-metrics.unit.test.ts | 152 +++ 6 files changed, 1248 insertions(+), 163 deletions(-) create mode 100644 scripts/backfill_retention_metrics.ts create mode 100644 supabase/functions/_backend/utils/revenue_metrics.ts create mode 100644 tests/backfill-retention-metrics.unit.test.ts diff --git a/package.json b/package.json index b611c6a9f2..7eecc798d6 100644 --- a/package.json +++ b/package.json @@ -72,6 +72,7 @@ "test:cloudflare:plugin": "USE_CLOUDFLARE_WORKERS=true vitest run tests/updates*.test.ts tests/stats*.test.ts tests/channel_self*.test.ts --config vitest.config.cloudflare-plugin.ts", "test:cloudflare:api": "vitest run --exclude=tests/cli* --exclude=tests/updates* --exclude=tests/stats* --exclude=tests/channel_self* --config vitest.config.cloudflare.ts", "test:cloudflare:updates": "vitest run tests/updates* --config vitest.config.cloudflare.ts", + "stripe:backfill-retention-metrics": "bun scripts/backfill_retention_metrics.ts", "stripe:sync-org-names": "bun scripts/sync_stripe_org_names.ts", "lint": "eslint \"src/**/*.{vue,ts,js}\"", "fmt": "bun run lint:fix && bun run lint:sql", diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts new file mode 100644 index 0000000000..250db57703 --- /dev/null +++ b/scripts/backfill_retention_metrics.ts @@ -0,0 +1,917 @@ +/* + * Backfill daily Stripe revenue movement metrics used by admin NRR and churn charts. + * + * Dry run, defaulting to the last 30 UTC calendar days: + * bun run stripe:backfill-retention-metrics + * + * Apply new, unprocessed Stripe subscription events: + * bun run stripe:backfill-retention-metrics --apply --from=2026-04-01 --to=2026-04-23 + * + * Rebuild an exact date range: + * bun run stripe:backfill-retention-metrics --apply --reset --from=2026-04-01 --to=2026-04-23 + * + * Older history requires an exported Stripe events JSON file: + * bun run stripe:backfill-retention-metrics --events-file=./tmp/stripe-events.json --from=2026-01-01 --to=2026-04-23 + */ +import type Stripe from 'stripe' +import type { RevenueMovement, RevenuePlanRow, StripeInfoRevenueState } from '../supabase/functions/_backend/utils/revenue_metrics.ts' +import type { Database } from '../supabase/functions/_backend/utils/supabase.types.ts' +import { existsSync } from 'node:fs' +import { mkdir, readFile, writeFile } from 'node:fs/promises' +import process from 'node:process' +import { createClient } from '@supabase/supabase-js' +import StripeClient from 'stripe' +import { calculateChurnRevenue, calculateNrr, classifyRevenueMovement, getEventDateId, getPreviousDateId, getSubscriptionMrr, hasRevenueMovement } from '../supabase/functions/_backend/utils/revenue_metrics.ts' + +const DEFAULT_ENV_FILE = './internal/cloudflare/.env.prod' +const DEFAULT_LOOKBACK_DAYS = 30 +const EVENT_FETCH_PAGE_SIZE = 100 +const DB_CHUNK_SIZE = 500 +const FAILURE_OUTPUT = './tmp/retention_metric_backfill_failures.json' +const DATE_ID_REGEX = /^\d{4}-\d{2}-\d{2}$/ +const SUBSCRIPTION_EVENT_TYPES = [ + 'customer.subscription.created', + 'customer.subscription.updated', + 'customer.subscription.deleted', +] as const + +type SupabaseClient = ReturnType> +type DailyRevenueMetricRow = Database['public']['Tables']['daily_revenue_metrics']['Row'] +type DailyRevenueMetricInsert = Database['public']['Tables']['daily_revenue_metrics']['Insert'] +type ProcessedStripeEventInsert = Database['public']['Tables']['processed_stripe_events']['Insert'] +type StripeStatus = Database['public']['Enums']['stripe_status'] +type SubscriptionEventType = typeof SUBSCRIPTION_EVENT_TYPES[number] + +interface CustomerPaidAtRow { + customer_id: string + paid_at: string | null +} + +interface TrackedSubscriptionState { + customer_id: string + is_good_plan: boolean + paid_at: string | null + price_id: string | null + product_id: string | null + status: StripeStatus | null + subscription_id: string | null +} + +export interface BackfillRevenueMovementEvent { + event_id: string + event_type: SubscriptionEventType + date_id: string + customer_id: string + opening_mrr: number + current_mrr: number + next_mrr: number + new_business_mrr: number + expansion_mrr: number + contraction_mrr: number + churn_mrr: number +} + +export interface BackfillSummary { + rows: number + opening_mrr: number + new_business_mrr: number + expansion_mrr: number + contraction_mrr: number + churn_mrr: number +} + +interface BuildRevenueMovementEventsOptions { + customerId?: string | null + fromDateId: string + initialPaidAtByCustomerId?: Map + toDateId: string +} + +interface BuildRevenueMovementEventsResult { + movements: BackfillRevenueMovementEvent[] + skipped: { + missingCustomer: number + missingPlan: number + noMovement: number + outOfRange: number + subscriptionMismatch: number + unsupportedEvent: number + } +} + +interface RefreshRetentionMetricsResult { + skippedMissingGlobalStats: string[] + updated: number +} + +function getArgValue(args: string[], prefix: string): string | null { + const arg = args.find(value => value.startsWith(`${prefix}=`)) + if (!arg) + return null + return arg.slice(prefix.length + 1) +} + +async function loadEnv(filePath: string) { + if (!existsSync(filePath)) + return {} + + const text = await readFile(filePath, 'utf8') + const env: Record = {} + + for (const line of text.split('\n')) { + const trimmed = line.trim() + if (!trimmed || trimmed.startsWith('#')) + continue + + const separatorIndex = trimmed.indexOf('=') + if (separatorIndex <= 0) + continue + + const key = trimmed.slice(0, separatorIndex) + let value = trimmed.slice(separatorIndex + 1) + if ((value.startsWith('"') && value.endsWith('"')) || (value.startsWith('\'') && value.endsWith('\''))) + value = value.slice(1, -1) + env[key] = value + } + + return env +} + +function getRequiredEnv(env: Record, key: string) { + const value = env[key]?.trim() + if (!value) + throw new Error(`Missing ${key}`) + return value +} + +function createStripeClient(secretKey: string, apiBaseUrl?: string) { + let hostConfig: Partial[1]>, 'host' | 'port' | 'protocol'>> = {} + + if (apiBaseUrl?.trim()) { + const parsed = new URL(apiBaseUrl) + hostConfig = { + host: parsed.hostname, + port: Number.parseInt(parsed.port || (parsed.protocol === 'https:' ? '443' : '80'), 10), + protocol: parsed.protocol.replace(':', '') as 'http' | 'https', + } + } + + type StripeApiVersion = NonNullable[1]>['apiVersion'] + return new StripeClient(secretKey, { + apiVersion: '2026-03-25.dahlia' as StripeApiVersion, + httpClient: StripeClient.createFetchHttpClient(), + ...hostConfig, + }) +} + +export function parseDateId(value: string, name: string) { + if (!DATE_ID_REGEX.test(value)) + throw new Error(`${name} must use YYYY-MM-DD`) + + const parsed = new Date(`${value}T00:00:00.000Z`) + if (Number.isNaN(parsed.getTime()) || parsed.toISOString().slice(0, 10) !== value) + throw new Error(`${name} must be a valid UTC date`) + + return value +} + +function todayDateId() { + return new Date().toISOString().slice(0, 10) +} + +function dateIdDaysAgo(days: number) { + const now = new Date() + const start = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate())) + start.setUTCDate(start.getUTCDate() - (days - 1)) + return start.toISOString().slice(0, 10) +} + +function dateIdToStartSeconds(dateId: string) { + return Math.floor(new Date(`${dateId}T00:00:00.000Z`).getTime() / 1000) +} + +function dateIdToEndSeconds(dateId: string) { + return Math.floor(new Date(`${dateId}T23:59:59.999Z`).getTime() / 1000) +} + +function compareDateIds(left: string, right: string) { + return left.localeCompare(right) +} + +function getDateIdsBetween(fromDateId: string, toDateId: string) { + const dates: string[] = [] + const cursor = new Date(`${fromDateId}T00:00:00.000Z`) + const end = new Date(`${toDateId}T00:00:00.000Z`) + + while (cursor.getTime() <= end.getTime()) { + dates.push(cursor.toISOString().slice(0, 10)) + cursor.setUTCDate(cursor.getUTCDate() + 1) + } + + return dates +} + +export function isSubscriptionEventType(type: string): type is SubscriptionEventType { + return SUBSCRIPTION_EVENT_TYPES.includes(type as SubscriptionEventType) +} + +function getEventCreatedIso(event: Stripe.Event) { + return new Date(event.created * 1000).toISOString() +} + +function sortStripeEvents(events: Stripe.Event[]) { + return [...events].sort((left, right) => { + if (left.created !== right.created) + return left.created - right.created + return left.id.localeCompare(right.id) + }) +} + +function chunkArray(items: T[], size: number) { + const chunks: T[][] = [] + for (let index = 0; index < items.length; index += size) { + chunks.push(items.slice(index, index + size)) + } + return chunks +} + +function toStripeId(value: unknown) { + if (!value) + return null + if (typeof value === 'string') + return value + if (typeof value === 'object' && 'id' in value && typeof value.id === 'string') + return value.id + return null +} + +function getLicensedSubscriptionItem(items: Stripe.SubscriptionItem[] | undefined) { + return items?.find(item => item.plan?.usage_type === 'licensed') ?? items?.[0] ?? null +} + +function getItemPriceId(item: Stripe.SubscriptionItem | null | undefined) { + if (!item) + return null + + return item.plan?.id ?? toStripeId(item.price) ?? null +} + +function getItemProductId(item: Stripe.SubscriptionItem | null | undefined) { + if (!item) + return null + + return toStripeId(item.plan?.product) ?? toStripeId(item.price?.product) ?? null +} + +function getSubscriptionItems(subscription: Stripe.Subscription) { + return subscription.items?.data as Stripe.SubscriptionItem[] | undefined +} + +function getPreviousSubscriptionItems(event: Stripe.Event) { + const previousAttributes = event.data.previous_attributes as Partial | undefined + return previousAttributes?.items?.data as Stripe.SubscriptionItem[] | undefined +} + +function toRevenueState(state: TrackedSubscriptionState | StripeInfoRevenueState): StripeInfoRevenueState { + if (!state) + return state + + return { + is_good_plan: state.is_good_plan, + paid_at: state.paid_at, + price_id: state.price_id, + product_id: state.product_id, + status: state.status, + } +} + +function getKnownPaidAtBefore( + customerId: string, + eventOccurredAtIso: string, + trackedPaidAt: string | null | undefined, + initialPaidAtByCustomerId?: Map, +) { + const paidAt = trackedPaidAt ?? initialPaidAtByCustomerId?.get(customerId) ?? null + if (!paidAt) + return null + + if (new Date(paidAt).getTime() < new Date(eventOccurredAtIso).getTime()) + return paidAt + + return null +} + +function buildTrackedState( + customerId: string, + subscriptionId: string | null, + status: StripeStatus | null, + priceId: string | null, + productId: string | null, + paidAt: string | null, +): TrackedSubscriptionState { + return { + customer_id: customerId, + is_good_plan: true, + paid_at: paidAt, + price_id: priceId, + product_id: productId, + status, + subscription_id: subscriptionId, + } +} + +function toMovementEvent( + event: Stripe.Event, + customerId: string, + dateId: string, + movement: RevenueMovement, +): BackfillRevenueMovementEvent { + return { + event_id: event.id, + event_type: event.type as SubscriptionEventType, + date_id: dateId, + customer_id: customerId, + opening_mrr: movement.currentMrr, + current_mrr: movement.currentMrr, + next_mrr: movement.nextMrr, + new_business_mrr: movement.newBusinessMrr, + expansion_mrr: movement.expansionMrr, + contraction_mrr: movement.contractionMrr, + churn_mrr: movement.churnMrr, + } +} + +export function buildRevenueMovementEvents( + events: Stripe.Event[], + plans: RevenuePlanRow[], + options: BuildRevenueMovementEventsOptions, +): BuildRevenueMovementEventsResult { + const movements: BackfillRevenueMovementEvent[] = [] + const customerStates = new Map() + const skipped = { + missingCustomer: 0, + missingPlan: 0, + noMovement: 0, + outOfRange: 0, + subscriptionMismatch: 0, + unsupportedEvent: 0, + } + + for (const event of sortStripeEvents(events)) { + if (!isSubscriptionEventType(event.type)) { + skipped.unsupportedEvent++ + continue + } + + const eventOccurredAtIso = getEventCreatedIso(event) + const dateId = getEventDateId(eventOccurredAtIso) + if (compareDateIds(dateId, options.fromDateId) < 0 || compareDateIds(dateId, options.toDateId) > 0) { + skipped.outOfRange++ + continue + } + + const subscription = event.data.object as Stripe.Subscription + const customerId = toStripeId(subscription.customer) + if (!customerId) { + skipped.missingCustomer++ + continue + } + if (options.customerId && customerId !== options.customerId) { + skipped.outOfRange++ + continue + } + + const subscriptionId = subscription.id ?? null + const currentItem = getLicensedSubscriptionItem(getSubscriptionItems(subscription)) + const currentPriceId = getItemPriceId(currentItem) + const currentProductId = getItemProductId(currentItem) + if (!currentPriceId || !currentProductId) { + skipped.missingPlan++ + continue + } + + const trackedState = customerStates.get(customerId) + const previousItem = getLicensedSubscriptionItem(getPreviousSubscriptionItems(event)) + const previousPriceId = getItemPriceId(previousItem) ?? trackedState?.price_id ?? currentPriceId + const previousProductId = getItemProductId(previousItem) ?? trackedState?.product_id ?? currentProductId + const previousMrr = getSubscriptionMrr(plans, { + is_good_plan: true, + paid_at: trackedState?.paid_at ?? eventOccurredAtIso, + price_id: previousPriceId, + product_id: previousProductId, + status: 'succeeded', + }) + const knownPaidAt = getKnownPaidAtBefore(customerId, eventOccurredAtIso, trackedState?.paid_at, options.initialPaidAtByCustomerId) + const activePaidAt = trackedState?.paid_at ?? knownPaidAt ?? (previousMrr > 0 ? eventOccurredAtIso : null) + + let currentState: TrackedSubscriptionState + let nextState: TrackedSubscriptionState + + if (event.type === 'customer.subscription.created') { + currentState = buildTrackedState(customerId, null, 'created', null, null, knownPaidAt) + nextState = buildTrackedState(customerId, subscriptionId, 'succeeded', currentPriceId, currentProductId, knownPaidAt ?? eventOccurredAtIso) + } + else if (event.type === 'customer.subscription.updated') { + currentState = buildTrackedState(customerId, subscriptionId, previousMrr > 0 ? 'succeeded' : 'updated', previousPriceId, previousProductId, activePaidAt) + nextState = buildTrackedState(customerId, subscriptionId, 'succeeded', currentPriceId, currentProductId, activePaidAt ?? eventOccurredAtIso) + } + else { + if (trackedState?.subscription_id && trackedState.subscription_id !== subscriptionId) { + skipped.subscriptionMismatch++ + continue + } + + currentState = buildTrackedState(customerId, subscriptionId, 'succeeded', trackedState?.price_id ?? currentPriceId, trackedState?.product_id ?? currentProductId, activePaidAt ?? eventOccurredAtIso) + nextState = buildTrackedState(customerId, subscriptionId, 'deleted', currentPriceId, currentProductId, activePaidAt ?? eventOccurredAtIso) + } + + const movement = classifyRevenueMovement(toRevenueState(currentState), toRevenueState(nextState), plans) + customerStates.set(customerId, nextState) + + if (!hasRevenueMovement(movement)) { + skipped.noMovement++ + continue + } + + movements.push(toMovementEvent(event, customerId, dateId, movement)) + } + + return { movements, skipped } +} + +export function aggregateRevenueMovementEvents(movements: BackfillRevenueMovementEvent[]): DailyRevenueMetricInsert[] { + const metricsByKey = new Map() + + for (const movement of movements) { + const key = `${movement.date_id}:${movement.customer_id}` + const existing = metricsByKey.get(key) + if (!existing) { + metricsByKey.set(key, { + date_id: movement.date_id, + customer_id: movement.customer_id, + opening_mrr: movement.opening_mrr, + new_business_mrr: movement.new_business_mrr, + expansion_mrr: movement.expansion_mrr, + contraction_mrr: movement.contraction_mrr, + churn_mrr: movement.churn_mrr, + }) + continue + } + + existing.new_business_mrr = Number(existing.new_business_mrr) + movement.new_business_mrr + existing.expansion_mrr = Number(existing.expansion_mrr) + movement.expansion_mrr + existing.contraction_mrr = Number(existing.contraction_mrr) + movement.contraction_mrr + existing.churn_mrr = Number(existing.churn_mrr) + movement.churn_mrr + } + + return [...metricsByKey.values()].sort((left, right) => { + const dateCompare = left.date_id.localeCompare(right.date_id) + if (dateCompare !== 0) + return dateCompare + return left.customer_id.localeCompare(right.customer_id) + }) +} + +export function summarizeDailyRevenueMetrics(rows: Pick[]): BackfillSummary { + return rows.reduce((summary, row) => { + summary.rows++ + summary.opening_mrr += Number(row.opening_mrr) || 0 + summary.new_business_mrr += Number(row.new_business_mrr) || 0 + summary.expansion_mrr += Number(row.expansion_mrr) || 0 + summary.contraction_mrr += Number(row.contraction_mrr) || 0 + summary.churn_mrr += Number(row.churn_mrr) || 0 + return summary + }, { + rows: 0, + opening_mrr: 0, + new_business_mrr: 0, + expansion_mrr: 0, + contraction_mrr: 0, + churn_mrr: 0, + }) +} + +async function loadEventsFile(filePath: string): Promise { + const payload = JSON.parse(await readFile(filePath, 'utf8')) as unknown + const events = Array.isArray(payload) + ? payload + : Array.isArray((payload as { data?: unknown }).data) + ? (payload as { data: unknown[] }).data + : Array.isArray((payload as { events?: unknown }).events) + ? (payload as { events: unknown[] }).events + : null + + if (!events) + throw new Error('--events-file must contain a JSON array, or an object with data/events array') + + return sortStripeEvents(events as Stripe.Event[]) +} + +async function fetchStripeEvents(stripe: StripeClient, fromDateId: string, toDateId: string, limit: number | null) { + const events: Stripe.Event[] = [] + for (const type of SUBSCRIPTION_EVENT_TYPES) { + const params = { + created: { + gte: dateIdToStartSeconds(fromDateId), + lte: dateIdToEndSeconds(toDateId), + }, + limit: EVENT_FETCH_PAGE_SIZE, + type, + } as Stripe.EventListParams + + for await (const event of stripe.events.list(params)) { + events.push(event) + if (limit && events.length >= limit) + return sortStripeEvents(events) + } + } + + return sortStripeEvents(events) +} + +function getCustomerIdsFromEvents(events: Stripe.Event[], customerId?: string | null) { + if (customerId) + return [customerId] + + return [...new Set(events.flatMap((event) => { + if (!isSubscriptionEventType(event.type)) + return [] + const subscription = event.data.object as Stripe.Subscription + const id = toStripeId(subscription.customer) + return id ? [id] : [] + }))].sort() +} + +async function fetchRevenuePlans(supabase: SupabaseClient): Promise { + const { data, error } = await supabase + .from('plans') + .select('stripe_id, price_m, price_y, price_m_id, price_y_id') + .in('name', ['Solo', 'Maker', 'Team', 'Enterprise']) + + if (error) + throw error + + return data ?? [] +} + +async function fetchInitialPaidAtByCustomerId(supabase: SupabaseClient, customerIds: string[]) { + const paidAtByCustomerId = new Map() + for (const chunk of chunkArray(customerIds, DB_CHUNK_SIZE)) { + const { data, error } = await supabase + .from('stripe_info') + .select('customer_id, paid_at') + .in('customer_id', chunk) + + if (error) + throw error + + for (const row of (data ?? []) as CustomerPaidAtRow[]) + paidAtByCustomerId.set(row.customer_id, row.paid_at) + } + return paidAtByCustomerId +} + +async function fetchExistingProcessedEventIds(supabase: SupabaseClient, eventIds: string[]) { + const existing = new Set() + for (const chunk of chunkArray(eventIds, DB_CHUNK_SIZE)) { + const { data, error } = await supabase + .from('processed_stripe_events') + .select('event_id') + .in('event_id', chunk) + + if (error) + throw error + + for (const row of data ?? []) + existing.add(row.event_id) + } + return existing +} + +async function resetBackfillRange(supabase: SupabaseClient, fromDateId: string, toDateId: string, customerId?: string | null) { + let processedDelete = supabase + .from('processed_stripe_events') + .delete() + .gte('date_id', fromDateId) + .lte('date_id', toDateId) + let metricsDelete = supabase + .from('daily_revenue_metrics') + .delete() + .gte('date_id', fromDateId) + .lte('date_id', toDateId) + + if (customerId) { + processedDelete = processedDelete.eq('customer_id', customerId) + metricsDelete = metricsDelete.eq('customer_id', customerId) + } + + const [processedResult, metricsResult] = await Promise.all([processedDelete, metricsDelete]) + if (processedResult.error) + throw processedResult.error + if (metricsResult.error) + throw metricsResult.error +} + +async function insertProcessedEvents(supabase: SupabaseClient, movements: BackfillRevenueMovementEvent[]) { + const rows: ProcessedStripeEventInsert[] = movements.map(movement => ({ + event_id: movement.event_id, + customer_id: movement.customer_id, + date_id: movement.date_id, + })) + + for (const chunk of chunkArray(rows, DB_CHUNK_SIZE)) { + if (chunk.length === 0) + continue + + const { error } = await supabase + .from('processed_stripe_events') + .insert(chunk) + + if (error) + throw error + } +} + +async function fetchExistingDailyRevenueMetrics( + supabase: SupabaseClient, + fromDateId: string, + toDateId: string, + customerId?: string | null, +) { + let query = supabase + .from('daily_revenue_metrics') + .select('*') + .gte('date_id', fromDateId) + .lte('date_id', toDateId) + + if (customerId) + query = query.eq('customer_id', customerId) + + const { data, error } = await query + if (error) + throw error + + return data ?? [] +} + +function mergeMetricRows(existingRows: DailyRevenueMetricRow[], rowsToAdd: DailyRevenueMetricInsert[]) { + const existingByKey = new Map(existingRows.map(row => [`${row.date_id}:${row.customer_id}`, row])) + + return rowsToAdd.map((row) => { + const existing = existingByKey.get(`${row.date_id}:${row.customer_id}`) + if (!existing) + return row + + return { + date_id: row.date_id, + customer_id: row.customer_id, + opening_mrr: Number(existing.opening_mrr) || Number(row.opening_mrr) || 0, + new_business_mrr: (Number(existing.new_business_mrr) || 0) + (Number(row.new_business_mrr) || 0), + expansion_mrr: (Number(existing.expansion_mrr) || 0) + (Number(row.expansion_mrr) || 0), + contraction_mrr: (Number(existing.contraction_mrr) || 0) + (Number(row.contraction_mrr) || 0), + churn_mrr: (Number(existing.churn_mrr) || 0) + (Number(row.churn_mrr) || 0), + } + }) +} + +async function upsertDailyRevenueMetrics(supabase: SupabaseClient, rows: DailyRevenueMetricInsert[]) { + for (const chunk of chunkArray(rows, DB_CHUNK_SIZE)) { + if (chunk.length === 0) + continue + + const { error } = await supabase + .from('daily_revenue_metrics') + .upsert(chunk, { onConflict: 'date_id,customer_id' }) + + if (error) + throw error + } +} + +async function fetchDailyRevenueMetricsForDate(supabase: SupabaseClient, dateId: string) { + const { data, error } = await supabase + .from('daily_revenue_metrics') + .select('opening_mrr, churn_mrr, contraction_mrr, expansion_mrr') + .eq('date_id', dateId) + + if (error) + throw error + + return data ?? [] +} + +async function fetchPreviousMrr(supabase: SupabaseClient, dateId: string) { + const { data, error } = await supabase + .from('global_stats') + .select('mrr') + .eq('date_id', getPreviousDateId(dateId)) + .maybeSingle() + + if (error) + throw error + + return Number(data?.mrr) || 0 +} + +async function refreshGlobalRetentionMetrics(supabase: SupabaseClient, dateIds: string[]): Promise { + const skippedMissingGlobalStats: string[] = [] + let updated = 0 + + for (const dateId of dateIds) { + const [rows, previousMrr] = await Promise.all([ + fetchDailyRevenueMetricsForDate(supabase, dateId), + fetchPreviousMrr(supabase, dateId), + ]) + const retainedChanges = rows.reduce((summary, row) => { + if ((Number(row.opening_mrr) || 0) <= 0) + return summary + + summary.churnMrr += Number(row.churn_mrr) || 0 + summary.contractionMrr += Number(row.contraction_mrr) || 0 + summary.expansionMrr += Number(row.expansion_mrr) || 0 + return summary + }, { churnMrr: 0, contractionMrr: 0, expansionMrr: 0 }) + const totalLostRevenue = rows.reduce((summary, row) => { + summary.churnMrr += Number(row.churn_mrr) || 0 + summary.contractionMrr += Number(row.contraction_mrr) || 0 + return summary + }, { churnMrr: 0, contractionMrr: 0, expansionMrr: 0 }) + + const { data: globalStats, error: globalStatsError } = await supabase + .from('global_stats') + .select('date_id') + .eq('date_id', dateId) + .maybeSingle() + + if (globalStatsError) + throw globalStatsError + if (!globalStats) { + skippedMissingGlobalStats.push(dateId) + continue + } + + const { error } = await supabase + .from('global_stats') + .update({ + churn_revenue: calculateChurnRevenue(totalLostRevenue), + nrr: calculateNrr(previousMrr, retainedChanges), + }) + .eq('date_id', dateId) + + if (error) + throw error + + updated++ + } + + return { skippedMissingGlobalStats, updated } +} + +async function writeFailures(failures: unknown[]) { + if (failures.length === 0) + return + + await mkdir('./tmp', { recursive: true }) + await writeFile(FAILURE_OUTPUT, `${JSON.stringify(failures, null, 2)}\n`) +} + +function printSummary(label: string, summary: BackfillSummary) { + console.log(`${label}: rows=${summary.rows}, opening_mrr=${summary.opening_mrr.toFixed(2)}, new_business_mrr=${summary.new_business_mrr.toFixed(2)}, expansion_mrr=${summary.expansion_mrr.toFixed(2)}, contraction_mrr=${summary.contraction_mrr.toFixed(2)}, churn_mrr=${summary.churn_mrr.toFixed(2)}`) +} + +async function main(args = process.argv.slice(2), runtimeEnv: Record = process.env) { + const apply = args.includes('--apply') + const reset = args.includes('--reset') + const envFile = getArgValue(args, '--env-file') ?? DEFAULT_ENV_FILE + const eventsFile = getArgValue(args, '--events-file') + const customerId = getArgValue(args, '--customer-id') + const limitArg = getArgValue(args, '--limit') + const limit = limitArg ? Number.parseInt(limitArg, 10) : null + const fromDateId = parseDateId(getArgValue(args, '--from') ?? dateIdDaysAgo(DEFAULT_LOOKBACK_DAYS), '--from') + const toDateId = parseDateId(getArgValue(args, '--to') ?? todayDateId(), '--to') + + if (compareDateIds(fromDateId, toDateId) > 0) + throw new Error('--from must be before or equal to --to') + if (limit !== null && (!Number.isInteger(limit) || limit < 1)) + throw new Error('--limit must be a positive integer') + + const fileEnv = await loadEnv(envFile) + const env = { + ...fileEnv, + ...runtimeEnv, + } + const supabaseUrl = getRequiredEnv(env, 'SUPABASE_URL') + const supabaseServiceRoleKey = env.SUPABASE_SERVICE_ROLE_KEY?.trim() || env.SUPABASE_SERVICE_KEY?.trim() + if (!supabaseServiceRoleKey) + throw new Error('Missing SUPABASE_SERVICE_ROLE_KEY') + + const supabase = createClient( + supabaseUrl, + supabaseServiceRoleKey, + { auth: { autoRefreshToken: false, detectSessionInUrl: false, persistSession: false } }, + ) + + console.log(`Backfill range: ${fromDateId}..${toDateId}`) + console.log(`Env file: ${envFile}`) + if (customerId) + console.log(`Scoped to customer: ${customerId}`) + if (!apply) + console.log('Dry run only. Pass --apply to write rows.') + if (reset) + console.log('Reset enabled. Existing processed_stripe_events and daily_revenue_metrics rows in range will be rebuilt.') + + let events: Stripe.Event[] + if (eventsFile) { + events = await loadEventsFile(eventsFile) + console.log(`Loaded ${events.length} events from ${eventsFile}`) + } + else { + const oldestEventApiDateId = dateIdDaysAgo(DEFAULT_LOOKBACK_DAYS) + if (compareDateIds(fromDateId, oldestEventApiDateId) < 0) + console.warn('Stripe Events API only exposes recent events. Use --events-file for older archived Stripe events.') + + const stripeSecretKey = getRequiredEnv(env, 'STRIPE_SECRET_KEY') + const stripe = createStripeClient(stripeSecretKey, env.STRIPE_API_BASE_URL?.trim()) + events = await fetchStripeEvents(stripe, fromDateId, toDateId, limit) + console.log(`Fetched ${events.length} subscription events from Stripe`) + } + + const customerIds = getCustomerIdsFromEvents(events, customerId) + const [plans, initialPaidAtByCustomerId] = await Promise.all([ + fetchRevenuePlans(supabase), + fetchInitialPaidAtByCustomerId(supabase, customerIds), + ]) + const { movements, skipped } = buildRevenueMovementEvents(events, plans, { + customerId, + fromDateId, + initialPaidAtByCustomerId, + toDateId, + }) + const movementSummary = summarizeDailyRevenueMetrics(movements.map(movement => ({ + opening_mrr: movement.opening_mrr, + new_business_mrr: movement.new_business_mrr, + expansion_mrr: movement.expansion_mrr, + contraction_mrr: movement.contraction_mrr, + churn_mrr: movement.churn_mrr, + }))) + printSummary('Detected revenue movements', movementSummary) + console.log(`Skipped: ${JSON.stringify(skipped)}`) + + let movementsToApply = movements + if (!reset) { + const existingEventIds = await fetchExistingProcessedEventIds(supabase, movements.map(movement => movement.event_id)) + movementsToApply = movements.filter(movement => !existingEventIds.has(movement.event_id)) + console.log(`Existing processed events skipped: ${movements.length - movementsToApply.length}`) + } + + const metricRowsToApply = aggregateRevenueMovementEvents(movementsToApply) + printSummary('Daily metrics to apply', summarizeDailyRevenueMetrics(metricRowsToApply)) + + if (!apply) { + console.log('Sample metric rows:') + for (const row of metricRowsToApply.slice(0, 10)) + console.log(row) + return + } + + if (reset) + await resetBackfillRange(supabase, fromDateId, toDateId, customerId) + + const existingMetrics = reset + ? [] + : await fetchExistingDailyRevenueMetrics(supabase, fromDateId, toDateId, customerId) + const mergedMetricRows = reset + ? metricRowsToApply + : mergeMetricRows(existingMetrics, metricRowsToApply) + + const failures: unknown[] = [] + try { + await insertProcessedEvents(supabase, movementsToApply) + await upsertDailyRevenueMetrics(supabase, mergedMetricRows) + const retentionDates = reset + ? getDateIdsBetween(fromDateId, toDateId) + : [...new Set(metricRowsToApply.map(row => row.date_id))].sort() + const refreshResult = await refreshGlobalRetentionMetrics(supabase, retentionDates) + console.log(`Updated global_stats retention metrics for ${refreshResult.updated} dates`) + if (refreshResult.skippedMissingGlobalStats.length > 0) + console.log(`Skipped missing global_stats rows: ${refreshResult.skippedMissingGlobalStats.join(', ')}`) + } + catch (error) { + failures.push({ + error: error instanceof Error ? error.message : String(error), + fromDateId, + toDateId, + customerId, + reset, + }) + } + + await writeFailures(failures) + if (failures.length > 0) + throw new Error(`Retention metric backfill failed. Details written to ${FAILURE_OUTPUT}`) + + console.log(`Done. Processed movements: ${movementsToApply.length}. Daily metric rows: ${mergedMetricRows.length}.`) +} + +if (import.meta.main) + await main() diff --git a/supabase/functions/_backend/triggers/logsnag_insights.ts b/supabase/functions/_backend/triggers/logsnag_insights.ts index 1f4c63683b..0487b024e9 100644 --- a/supabase/functions/_backend/triggers/logsnag_insights.ts +++ b/supabase/functions/_backend/triggers/logsnag_insights.ts @@ -11,6 +11,7 @@ import { BRES, middlewareAPISecret } from '../utils/hono.ts' import { cloudlog, cloudlogErr } from '../utils/logging.ts' import { logsnagInsights } from '../utils/logsnag.ts' import { closeClient, getDrizzleClient, getPgClient } from '../utils/pg.ts' +import { calculateChurnRevenue, calculateNrr, getPreviousDateId } from '../utils/revenue_metrics.ts' import { countAllApps, countAllUpdates, countAllUpdatesExternal, getUpdateStats } from '../utils/stats.ts' import { supabaseAdmin } from '../utils/supabase.ts' import { sendEventToTracking } from '../utils/tracking.ts' @@ -62,11 +63,6 @@ interface PlanRevenue { plan_enterprise_monthly: number plan_enterprise_yearly: number } -interface DailyRevenueChangeSummary { - churnMrr: number - contractionMrr: number - expansionMrr: number -} interface RevenueRetentionMetrics { churnRevenue: number nrr: number @@ -149,28 +145,6 @@ function countUniqueCustomers(...rowSets: Array - -interface RevenueMovement { - currentMrr: number - nextMrr: number - newBusinessMrr: number - expansionMrr: number - contractionMrr: number - churnMrr: number -} type PersistRevenueMovementResult = 'applied' | 'duplicate' | 'missing' | 'stale' -const ZERO_REVENUE_MOVEMENT: RevenueMovement = { - currentMrr: 0, - nextMrr: 0, - newBusinessMrr: 0, - expansionMrr: 0, - contractionMrr: 0, - churnMrr: 0, -} const STRIPE_INFO_TRANSACTION_COLUMNS = [ 'bandwidth_exceeded', 'build_time_exceeded', @@ -178,117 +155,6 @@ function getPlanChangeTrackingEventName(statusName: string) { return statusName === 'upgraded' ? 'User Upgraded' : 'User Plan Changed' } -function getEventDateId(eventOccurredAtIso: string) { - return new Date(eventOccurredAtIso).toISOString().slice(0, 10) -} - -function getPlanMrr(plan: RevenuePlanRow | null | undefined, priceId: string | null | undefined) { - if (!plan || !priceId) - return 0 - - if (plan.price_m_id === priceId) - return Number(plan.price_m) || 0 - - if (plan.price_y_id === priceId) - return (Number(plan.price_y) || 0) / 12 - - return 0 -} - -function getPlanByProductId(plans: RevenuePlanRow[], productId: string | null | undefined) { - if (!productId) - return null - - return plans.find(plan => plan.stripe_id === productId) ?? null -} - -function getSubscriptionMrr(plans: RevenuePlanRow[], stripeInfo: StripeInfoRevenueState) { - if (!stripeInfo || stripeInfo.status !== 'succeeded' || stripeInfo.is_good_plan === false) - return 0 - - return getPlanMrr(getPlanByProductId(plans, stripeInfo.product_id), stripeInfo.price_id) -} - -function classifyRevenueMovement( - currentStripeInfo: StripeInfoRevenueState, - nextStripeInfo: StripeInfoRevenueState, - plans: RevenuePlanRow[], -): RevenueMovement { - const currentMrr = getSubscriptionMrr(plans, currentStripeInfo) - const nextMrr = getSubscriptionMrr(plans, nextStripeInfo) - - if (currentMrr === 0 && nextMrr === 0) - return { ...ZERO_REVENUE_MOVEMENT } - - if (currentMrr === 0 && nextMrr > 0) { - if (!currentStripeInfo?.paid_at) { - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - newBusinessMrr: nextMrr, - } - } - - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - expansionMrr: nextMrr, - } - } - - if (currentMrr > 0 && nextMrr === 0) { - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - churnMrr: currentMrr, - } - } - - if (nextMrr > currentMrr) { - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - expansionMrr: nextMrr - currentMrr, - } - } - - if (currentMrr > nextMrr) { - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - contractionMrr: currentMrr - nextMrr, - } - } - - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - } -} - -function hasRevenueMovement(movement: RevenueMovement) { - return movement.newBusinessMrr > 0 - || movement.expansionMrr > 0 - || movement.contractionMrr > 0 - || movement.churnMrr > 0 -} - -function isStaleStripeEvent( - currentStripeInfo: Pick | null | undefined, - eventOccurredAtIso: string, -) { - if (!currentStripeInfo?.last_stripe_event_at) - return false - - return new Date(currentStripeInfo.last_stripe_event_at).getTime() > new Date(eventOccurredAtIso).getTime() -} - async function getRevenuePlans(c: Context): Promise { const { data: plans, error } = await supabaseAdmin(c) .from('plans') diff --git a/supabase/functions/_backend/utils/revenue_metrics.ts b/supabase/functions/_backend/utils/revenue_metrics.ts new file mode 100644 index 0000000000..b0bd90969f --- /dev/null +++ b/supabase/functions/_backend/utils/revenue_metrics.ts @@ -0,0 +1,175 @@ +import type { Database } from './supabase.types.ts' + +type PlanRow = Database['public']['Tables']['plans']['Row'] +type StripeInfoRow = Database['public']['Tables']['stripe_info']['Row'] + +export type StripeInfoRevenueState = { + is_good_plan?: boolean | null + paid_at?: string | null + price_id?: string | null + product_id?: string | null + status?: Database['public']['Enums']['stripe_status'] | null +} | null | undefined + +export type RevenuePlanRow = Pick + +export interface RevenueMovement { + currentMrr: number + nextMrr: number + newBusinessMrr: number + expansionMrr: number + contractionMrr: number + churnMrr: number +} + +export interface DailyRevenueChangeSummary { + churnMrr: number + contractionMrr: number + expansionMrr: number +} + +const ZERO_REVENUE_MOVEMENT: RevenueMovement = { + currentMrr: 0, + nextMrr: 0, + newBusinessMrr: 0, + expansionMrr: 0, + contractionMrr: 0, + churnMrr: 0, +} + +export function getRevenueMetricDateId(targetDate = new Date()) { + return new Date(Date.UTC(targetDate.getUTCFullYear(), targetDate.getUTCMonth(), targetDate.getUTCDate())).toISOString().slice(0, 10) +} + +export function getEventDateId(eventOccurredAtIso: string) { + return new Date(eventOccurredAtIso).toISOString().slice(0, 10) +} + +export function getPreviousDateId(dateId: string) { + const target = new Date(`${dateId}T00:00:00.000Z`) + target.setUTCDate(target.getUTCDate() - 1) + return getRevenueMetricDateId(target) +} + +function getPlanMrr(plan: RevenuePlanRow | null | undefined, priceId: string | null | undefined) { + if (!plan || !priceId) + return 0 + + if (plan.price_m_id === priceId) + return Number(plan.price_m) || 0 + + if (plan.price_y_id === priceId) + return (Number(plan.price_y) || 0) / 12 + + return 0 +} + +function getPlanByProductId(plans: RevenuePlanRow[], productId: string | null | undefined) { + if (!productId) + return null + + return plans.find(plan => plan.stripe_id === productId) ?? null +} + +export function getSubscriptionMrr(plans: RevenuePlanRow[], stripeInfo: StripeInfoRevenueState) { + if (!stripeInfo || stripeInfo.status !== 'succeeded' || stripeInfo.is_good_plan === false) + return 0 + + return getPlanMrr(getPlanByProductId(plans, stripeInfo.product_id), stripeInfo.price_id) +} + +export function classifyRevenueMovement( + currentStripeInfo: StripeInfoRevenueState, + nextStripeInfo: StripeInfoRevenueState, + plans: RevenuePlanRow[], +): RevenueMovement { + const currentMrr = getSubscriptionMrr(plans, currentStripeInfo) + const nextMrr = getSubscriptionMrr(plans, nextStripeInfo) + + if (currentMrr === 0 && nextMrr === 0) + return { ...ZERO_REVENUE_MOVEMENT } + + if (currentMrr === 0 && nextMrr > 0) { + if (!currentStripeInfo?.paid_at) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + newBusinessMrr: nextMrr, + } + } + + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + expansionMrr: nextMrr, + } + } + + if (currentMrr > 0 && nextMrr === 0) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + churnMrr: currentMrr, + } + } + + if (nextMrr > currentMrr) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + expansionMrr: nextMrr - currentMrr, + } + } + + if (currentMrr > nextMrr) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + contractionMrr: currentMrr - nextMrr, + } + } + + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + } +} + +export function hasRevenueMovement(movement: RevenueMovement) { + return movement.newBusinessMrr > 0 + || movement.expansionMrr > 0 + || movement.contractionMrr > 0 + || movement.churnMrr > 0 +} + +export function isStaleStripeEvent( + currentStripeInfo: Pick | null | undefined, + eventOccurredAtIso: string, +) { + if (!currentStripeInfo?.last_stripe_event_at) + return false + + return new Date(currentStripeInfo.last_stripe_event_at).getTime() > new Date(eventOccurredAtIso).getTime() +} + +export function calculateNrr(previousMrr: number, dailyChanges: DailyRevenueChangeSummary) { + if (previousMrr <= 0) + return 100 + + const retainedMrr = Math.max( + previousMrr - dailyChanges.churnMrr - dailyChanges.contractionMrr + dailyChanges.expansionMrr, + 0, + ) + + return Number(((retainedMrr / previousMrr) * 100).toFixed(2)) +} + +export function calculateChurnRevenue(dailyChanges: DailyRevenueChangeSummary) { + return Number((dailyChanges.churnMrr + dailyChanges.contractionMrr).toFixed(2)) +} diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts new file mode 100644 index 0000000000..40a4f89a3d --- /dev/null +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -0,0 +1,152 @@ +import type Stripe from 'stripe' +import { describe, expect, it } from 'vitest' +import { aggregateRevenueMovementEvents, buildRevenueMovementEvents, summarizeDailyRevenueMetrics } from '../scripts/backfill_retention_metrics.ts' + +const plans = [ + { + stripe_id: 'prod_solo', + price_m: 12, + price_m_id: 'price_solo_monthly', + price_y: 120, + price_y_id: 'price_solo_yearly', + }, + { + stripe_id: 'prod_team', + price_m: 49, + price_m_id: 'price_team_monthly', + price_y: 468, + price_y_id: 'price_team_yearly', + }, +] as const + +function subscriptionItem(priceId: string, productId: string) { + return { + plan: { + id: priceId, + product: productId, + usage_type: 'licensed', + }, + } as Stripe.SubscriptionItem +} + +function subscriptionEvent( + id: string, + type: 'customer.subscription.created' | 'customer.subscription.deleted' | 'customer.subscription.updated', + created: number, + customerId: string, + subscriptionId: string, + priceId: string, + productId: string, + previous?: { priceId: string, productId: string }, +) { + return { + id, + type, + created, + data: { + object: { + id: subscriptionId, + object: 'subscription', + customer: customerId, + items: { + data: [subscriptionItem(priceId, productId)], + }, + }, + previous_attributes: previous + ? { + items: { + data: [subscriptionItem(previous.priceId, previous.productId)], + }, + } + : undefined, + }, + } as Stripe.Event +} + +describe('retention metric backfill helpers', () => { + it.concurrent('builds new business metrics from created subscription events', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_create', 'customer.subscription.created', 1774353600, 'cus_new', 'sub_new', 'price_solo_monthly', 'prod_solo'), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(1) + expect(result.movements[0]).toMatchObject({ + event_id: 'evt_create', + customer_id: 'cus_new', + date_id: '2026-03-24', + opening_mrr: 0, + new_business_mrr: 12, + expansion_mrr: 0, + contraction_mrr: 0, + churn_mrr: 0, + }) + }) + + it.concurrent('builds expansion metrics from subscription update previous attributes', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_update', 'customer.subscription.updated', 1774353600, 'cus_existing', 'sub_existing', 'price_team_monthly', 'prod_team', { + priceId: 'price_solo_monthly', + productId: 'prod_solo', + }), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(1) + expect(result.movements[0]).toMatchObject({ + current_mrr: 12, + next_mrr: 49, + expansion_mrr: 37, + }) + }) + + it.concurrent('builds churn metrics from deleted subscription events', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_deleted', 'customer.subscription.deleted', 1774353600, 'cus_churned', 'sub_churned', 'price_team_yearly', 'prod_team'), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(1) + expect(result.movements[0]).toMatchObject({ + current_mrr: 39, + next_mrr: 0, + churn_mrr: 39, + }) + }) + + it.concurrent('aggregates multiple movements for one customer-day with the first opening MRR', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_create', 'customer.subscription.created', 1774353600, 'cus_sequence', 'sub_sequence', 'price_solo_monthly', 'prod_solo'), + subscriptionEvent('evt_update', 'customer.subscription.updated', 1774357200, 'cus_sequence', 'sub_sequence', 'price_team_monthly', 'prod_team'), + subscriptionEvent('evt_delete', 'customer.subscription.deleted', 1774360800, 'cus_sequence', 'sub_sequence', 'price_team_monthly', 'prod_team'), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + const rows = aggregateRevenueMovementEvents(result.movements) + expect(rows).toEqual([ + { + date_id: '2026-03-24', + customer_id: 'cus_sequence', + opening_mrr: 0, + new_business_mrr: 12, + expansion_mrr: 37, + contraction_mrr: 0, + churn_mrr: 49, + }, + ]) + expect(summarizeDailyRevenueMetrics(rows)).toMatchObject({ + rows: 1, + new_business_mrr: 12, + expansion_mrr: 37, + churn_mrr: 49, + }) + }) +}) From 4126c13f116348cb26106da8e6feef145a28edfc Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 14:42:15 +0200 Subject: [PATCH 02/14] fix(admin): harden retention backfill merges --- scripts/backfill_retention_metrics.ts | 29 +++++++----- tests/backfill-retention-metrics.unit.test.ts | 45 ++++++++++++++++++- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 250db57703..6bf6c860f4 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -42,9 +42,10 @@ type ProcessedStripeEventInsert = Database['public']['Tables']['processed_stripe type StripeStatus = Database['public']['Enums']['stripe_status'] type SubscriptionEventType = typeof SUBSCRIPTION_EVENT_TYPES[number] -interface CustomerPaidAtRow { +interface CustomerRevenueBaselineRow { customer_id: string paid_at: string | null + subscription_id: string | null } interface TrackedSubscriptionState { @@ -84,6 +85,7 @@ interface BuildRevenueMovementEventsOptions { customerId?: string | null fromDateId: string initialPaidAtByCustomerId?: Map + initialSubscriptionIdByCustomerId?: Map toDateId: string } @@ -416,7 +418,8 @@ export function buildRevenueMovementEvents( nextState = buildTrackedState(customerId, subscriptionId, 'succeeded', currentPriceId, currentProductId, activePaidAt ?? eventOccurredAtIso) } else { - if (trackedState?.subscription_id && trackedState.subscription_id !== subscriptionId) { + const baselineSubscriptionId = trackedState?.subscription_id ?? options.initialSubscriptionIdByCustomerId?.get(customerId) ?? null + if (baselineSubscriptionId && baselineSubscriptionId !== subscriptionId) { skipped.subscriptionMismatch++ continue } @@ -554,21 +557,24 @@ async function fetchRevenuePlans(supabase: SupabaseClient): Promise() + const subscriptionIdByCustomerId = new Map() for (const chunk of chunkArray(customerIds, DB_CHUNK_SIZE)) { const { data, error } = await supabase .from('stripe_info') - .select('customer_id, paid_at') + .select('customer_id, paid_at, subscription_id') .in('customer_id', chunk) if (error) throw error - for (const row of (data ?? []) as CustomerPaidAtRow[]) + for (const row of (data ?? []) as CustomerRevenueBaselineRow[]) { paidAtByCustomerId.set(row.customer_id, row.paid_at) + subscriptionIdByCustomerId.set(row.customer_id, row.subscription_id) + } } - return paidAtByCustomerId + return { paidAtByCustomerId, subscriptionIdByCustomerId } } async function fetchExistingProcessedEventIds(supabase: SupabaseClient, eventIds: string[]) { @@ -654,7 +660,7 @@ async function fetchExistingDailyRevenueMetrics( return data ?? [] } -function mergeMetricRows(existingRows: DailyRevenueMetricRow[], rowsToAdd: DailyRevenueMetricInsert[]) { +export function mergeMetricRows(existingRows: DailyRevenueMetricRow[], rowsToAdd: DailyRevenueMetricInsert[]) { const existingByKey = new Map(existingRows.map(row => [`${row.date_id}:${row.customer_id}`, row])) return rowsToAdd.map((row) => { @@ -665,7 +671,7 @@ function mergeMetricRows(existingRows: DailyRevenueMetricRow[], rowsToAdd: Daily return { date_id: row.date_id, customer_id: row.customer_id, - opening_mrr: Number(existing.opening_mrr) || Number(row.opening_mrr) || 0, + opening_mrr: existing.opening_mrr ?? row.opening_mrr ?? 0, new_business_mrr: (Number(existing.new_business_mrr) || 0) + (Number(row.new_business_mrr) || 0), expansion_mrr: (Number(existing.expansion_mrr) || 0) + (Number(row.expansion_mrr) || 0), contraction_mrr: (Number(existing.contraction_mrr) || 0) + (Number(row.contraction_mrr) || 0), @@ -837,14 +843,15 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record ({ diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts index 40a4f89a3d..c428632572 100644 --- a/tests/backfill-retention-metrics.unit.test.ts +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -1,6 +1,6 @@ import type Stripe from 'stripe' import { describe, expect, it } from 'vitest' -import { aggregateRevenueMovementEvents, buildRevenueMovementEvents, summarizeDailyRevenueMetrics } from '../scripts/backfill_retention_metrics.ts' +import { aggregateRevenueMovementEvents, buildRevenueMovementEvents, mergeMetricRows, summarizeDailyRevenueMetrics } from '../scripts/backfill_retention_metrics.ts' const plans = [ { @@ -149,4 +149,47 @@ describe('retention metric backfill helpers', () => { churn_mrr: 49, }) }) + + it.concurrent('keeps an existing zero opening MRR when incrementally merging same-day rows', () => { + const merged = mergeMetricRows([ + { + date_id: '2026-03-24', + customer_id: 'cus_sequence', + opening_mrr: 0, + new_business_mrr: 12, + expansion_mrr: 0, + contraction_mrr: 0, + churn_mrr: 0, + } as any, + ], [ + { + date_id: '2026-03-24', + customer_id: 'cus_sequence', + opening_mrr: 49, + new_business_mrr: 0, + expansion_mrr: 0, + contraction_mrr: 0, + churn_mrr: 49, + }, + ]) + + expect(merged[0]).toMatchObject({ + opening_mrr: 0, + new_business_mrr: 12, + churn_mrr: 49, + }) + }) + + it.concurrent('skips first-in-range deleted events when the stored subscription id differs', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_old_deleted', 'customer.subscription.deleted', 1774353600, 'cus_active', 'sub_old', 'price_team_monthly', 'prod_team'), + ], plans as any, { + fromDateId: '2026-03-24', + initialSubscriptionIdByCustomerId: new Map([['cus_active', 'sub_new']]), + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(0) + expect(result.skipped.subscriptionMismatch).toBe(1) + }) }) From 6476d53f89d6bd69000c6e2aaf60e66216f174fb Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 14:55:14 +0200 Subject: [PATCH 03/14] fix(admin): validate retention backfill inputs --- scripts/backfill_retention_metrics.ts | 58 ++++++++++++++++++- .../_backend/utils/revenue_metrics.ts | 2 +- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 6bf6c860f4..970d5cd08c 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -9,6 +9,8 @@ * * Rebuild an exact date range: * bun run stripe:backfill-retention-metrics --apply --reset --from=2026-04-01 --to=2026-04-23 + * reset=true deletes existing rows before insertProcessedEvents/upsertDailyRevenueMetrics; + * if a reset apply fails, re-run the same command to rebuild the deleted range. * * Older history requires an exported Stripe events JSON file: * bun run stripe:backfill-retention-metrics --events-file=./tmp/stripe-events.json --from=2026-01-01 --to=2026-04-23 @@ -229,6 +231,24 @@ function sortStripeEvents(events: Stripe.Event[]) { }) } +function parseStripeEventCreatedSeconds(value: unknown) { + if (typeof value === 'number' && Number.isFinite(value)) + return value + + if (typeof value !== 'string') + return null + + const numericValue = Number(value) + if (Number.isFinite(numericValue)) + return numericValue + + const parsedDate = Date.parse(value) + if (Number.isNaN(parsedDate)) + return null + + return Math.floor(parsedDate / 1000) +} + function chunkArray(items: T[], size: number) { const chunks: T[][] = [] for (let index = 0; index < items.length; index += size) { @@ -247,6 +267,38 @@ function toStripeId(value: unknown) { return null } +function normalizeStripeEventFromFile(event: unknown, index: number): Stripe.Event { + if (typeof event !== 'object' || event === null) + throw new Error(`--events-file contains malformed Stripe event at index ${index}: event must be an object`) + + const candidate = event as { + created?: unknown + data?: { object?: unknown } + id?: unknown + type?: unknown + } + if (typeof candidate.id !== 'string') + throw new Error(`--events-file contains malformed Stripe event at index ${index}: missing string id`) + if (typeof candidate.type !== 'string') + throw new Error(`--events-file contains malformed Stripe event at index ${index}: missing string type`) + + const created = parseStripeEventCreatedSeconds(candidate.created) + if (created === null) + throw new Error(`--events-file contains malformed Stripe event at index ${index}: missing numeric or parseable created value`) + + const dataObject = candidate.data?.object + if (typeof dataObject !== 'object' || dataObject === null) + throw new Error(`--events-file contains malformed Stripe event at index ${index}: missing data.object`) + + if (!toStripeId((dataObject as { customer?: unknown }).customer)) + throw new Error(`--events-file contains malformed Stripe event at index ${index}: missing data.object.customer`) + + return { + ...(event as Stripe.Event), + created, + } +} + function getLicensedSubscriptionItem(items: Stripe.SubscriptionItem[] | undefined) { return items?.find(item => item.plan?.usage_type === 'licensed') ?? items?.[0] ?? null } @@ -507,7 +559,7 @@ async function loadEventsFile(filePath: string): Promise { if (!events) throw new Error('--events-file must contain a JSON array, or an object with data/events array') - return sortStripeEvents(events as Stripe.Event[]) + return sortStripeEvents(events.map(normalizeStripeEventFromFile)) } async function fetchStripeEvents(stripe: StripeClient, fromDateId: string, toDateId: string, limit: number | null) { @@ -631,7 +683,7 @@ async function insertProcessedEvents(supabase: SupabaseClient, movements: Backfi const { error } = await supabase .from('processed_stripe_events') - .insert(chunk) + .upsert(chunk, { ignoreDuplicates: true, onConflict: 'event_id' }) if (error) throw error @@ -824,7 +876,7 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record Date: Thu, 23 Apr 2026 15:10:36 +0200 Subject: [PATCH 04/14] fix(admin): make retention backfill transactional --- scripts/backfill_retention_metrics.ts | 332 +++++++++++------- tests/backfill-retention-metrics.unit.test.ts | 18 + 2 files changed, 225 insertions(+), 125 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 970d5cd08c..f3e84dd5cd 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -9,8 +9,8 @@ * * Rebuild an exact date range: * bun run stripe:backfill-retention-metrics --apply --reset --from=2026-04-01 --to=2026-04-23 - * reset=true deletes existing rows before insertProcessedEvents/upsertDailyRevenueMetrics; - * if a reset apply fails, re-run the same command to rebuild the deleted range. + * --apply uses one Postgres transaction for metric writes, global_stats refresh, + * processed-event markers, and optional --reset deletes. * * Older history requires an exported Stripe events JSON file: * bun run stripe:backfill-retention-metrics --events-file=./tmp/stripe-events.json --from=2026-01-01 --to=2026-04-23 @@ -22,6 +22,7 @@ import { existsSync } from 'node:fs' import { mkdir, readFile, writeFile } from 'node:fs/promises' import process from 'node:process' import { createClient } from '@supabase/supabase-js' +import { Client as PgClient } from 'pg' import StripeClient from 'stripe' import { calculateChurnRevenue, calculateNrr, classifyRevenueMovement, getEventDateId, getPreviousDateId, getSubscriptionMrr, hasRevenueMovement } from '../supabase/functions/_backend/utils/revenue_metrics.ts' @@ -40,7 +41,6 @@ const SUBSCRIPTION_EVENT_TYPES = [ type SupabaseClient = ReturnType> type DailyRevenueMetricRow = Database['public']['Tables']['daily_revenue_metrics']['Row'] type DailyRevenueMetricInsert = Database['public']['Tables']['daily_revenue_metrics']['Insert'] -type ProcessedStripeEventInsert = Database['public']['Tables']['processed_stripe_events']['Insert'] type StripeStatus = Database['public']['Enums']['stripe_status'] type SubscriptionEventType = typeof SUBSCRIPTION_EVENT_TYPES[number] @@ -108,6 +108,27 @@ interface RefreshRetentionMetricsResult { updated: number } +interface ApplyBackfillTransactionOptions { + customerId?: string | null + databaseUrl: string + fromDateId: string + mergedMetricRows: DailyRevenueMetricInsert[] + movementsToApply: BackfillRevenueMovementEvent[] + reset: boolean + retentionDates: string[] + toDateId: string +} + +interface RetentionMetricSummaryRow { + has_global_stats: boolean + lost_churn_mrr: number | string | null + lost_contraction_mrr: number | string | null + previous_mrr: number | string | null + retained_churn_mrr: number | string | null + retained_contraction_mrr: number | string | null + retained_expansion_mrr: number | string | null +} + function getArgValue(args: string[], prefix: string): string | null { const arg = args.find(value => value.startsWith(`${prefix}=`)) if (!arg) @@ -148,6 +169,24 @@ function getRequiredEnv(env: Record, key: string) { return value } +function getDatabaseUrl(env: Record) { + return env.DATABASE_URL?.trim() + || env.POSTGRES_URL?.trim() + || env.SUPABASE_DB_URL?.trim() + || env.SUPABASE_DB_DIRECT_URL?.trim() + || env.DIRECT_URL?.trim() + || null +} + +function createPgClient(databaseUrl: string) { + const host = new URL(databaseUrl).hostname + const usesLocalDatabase = host === 'localhost' || host === '127.0.0.1' || host === '::1' + return new PgClient({ + connectionString: databaseUrl, + ssl: usesLocalDatabase ? false : { rejectUnauthorized: false }, + }) +} + function createStripeClient(secretKey: string, apiBaseUrl?: string) { let hostConfig: Partial[1]>, 'host' | 'port' | 'protocol'>> = {} @@ -446,14 +485,15 @@ export function buildRevenueMovementEvents( const trackedState = customerStates.get(customerId) const previousItem = getLicensedSubscriptionItem(getPreviousSubscriptionItems(event)) - const previousPriceId = getItemPriceId(previousItem) ?? trackedState?.price_id ?? currentPriceId - const previousProductId = getItemProductId(previousItem) ?? trackedState?.product_id ?? currentProductId + const previousPriceId = getItemPriceId(previousItem) ?? trackedState?.price_id ?? null + const previousProductId = getItemProductId(previousItem) ?? trackedState?.product_id ?? null + const previousStatus = trackedState?.status ?? (previousItem ? 'succeeded' : null) const previousMrr = getSubscriptionMrr(plans, { is_good_plan: true, paid_at: trackedState?.paid_at ?? eventOccurredAtIso, price_id: previousPriceId, product_id: previousProductId, - status: 'succeeded', + status: previousStatus, }) const knownPaidAt = getKnownPaidAtBefore(customerId, eventOccurredAtIso, trackedState?.paid_at, options.initialPaidAtByCustomerId) const activePaidAt = trackedState?.paid_at ?? knownPaidAt ?? (previousMrr > 0 ? eventOccurredAtIso : null) @@ -646,50 +686,6 @@ async function fetchExistingProcessedEventIds(supabase: SupabaseClient, eventIds return existing } -async function resetBackfillRange(supabase: SupabaseClient, fromDateId: string, toDateId: string, customerId?: string | null) { - let processedDelete = supabase - .from('processed_stripe_events') - .delete() - .gte('date_id', fromDateId) - .lte('date_id', toDateId) - let metricsDelete = supabase - .from('daily_revenue_metrics') - .delete() - .gte('date_id', fromDateId) - .lte('date_id', toDateId) - - if (customerId) { - processedDelete = processedDelete.eq('customer_id', customerId) - metricsDelete = metricsDelete.eq('customer_id', customerId) - } - - const [processedResult, metricsResult] = await Promise.all([processedDelete, metricsDelete]) - if (processedResult.error) - throw processedResult.error - if (metricsResult.error) - throw metricsResult.error -} - -async function insertProcessedEvents(supabase: SupabaseClient, movements: BackfillRevenueMovementEvent[]) { - const rows: ProcessedStripeEventInsert[] = movements.map(movement => ({ - event_id: movement.event_id, - customer_id: movement.customer_id, - date_id: movement.date_id, - })) - - for (const chunk of chunkArray(rows, DB_CHUNK_SIZE)) { - if (chunk.length === 0) - continue - - const { error } = await supabase - .from('processed_stripe_events') - .upsert(chunk, { ignoreDuplicates: true, onConflict: 'event_id' }) - - if (error) - throw error - } -} - async function fetchExistingDailyRevenueMetrics( supabase: SupabaseClient, fromDateId: string, @@ -732,99 +728,173 @@ export function mergeMetricRows(existingRows: DailyRevenueMetricRow[], rowsToAdd }) } -async function upsertDailyRevenueMetrics(supabase: SupabaseClient, rows: DailyRevenueMetricInsert[]) { - for (const chunk of chunkArray(rows, DB_CHUNK_SIZE)) { - if (chunk.length === 0) - continue +async function withPgTransaction(databaseUrl: string, action: (client: PgClient) => Promise) { + const client = createPgClient(databaseUrl) + await client.connect() - const { error } = await supabase - .from('daily_revenue_metrics') - .upsert(chunk, { onConflict: 'date_id,customer_id' }) + try { + await client.query('BEGIN') + const result = await action(client) + await client.query('COMMIT') + return result + } + catch (error) { + await client.query('ROLLBACK') + throw error + } + finally { + await client.end() + } +} - if (error) - throw error +async function resetBackfillRangePg(client: PgClient, fromDateId: string, toDateId: string, customerId?: string | null) { + const values = [fromDateId, toDateId] + const predicates = ['date_id >= $1', 'date_id <= $2'] + if (customerId) { + values.push(customerId) + predicates.push(`customer_id = $${values.length}`) } + + await client.query(`DELETE FROM public.processed_stripe_events WHERE ${predicates.join(' AND ')}`, values) + await client.query(`DELETE FROM public.daily_revenue_metrics WHERE ${predicates.join(' AND ')}`, values) } -async function fetchDailyRevenueMetricsForDate(supabase: SupabaseClient, dateId: string) { - const { data, error } = await supabase - .from('daily_revenue_metrics') - .select('opening_mrr, churn_mrr, contraction_mrr, expansion_mrr') - .eq('date_id', dateId) +async function upsertDailyRevenueMetricsPg(client: PgClient, rows: DailyRevenueMetricInsert[]) { + for (const chunk of chunkArray(rows, DB_CHUNK_SIZE)) { + if (chunk.length === 0) + continue - if (error) - throw error + const values: Array = [] + const placeholders = chunk.map((row, index) => { + const offset = index * 7 + values.push( + row.date_id, + row.customer_id, + Number(row.opening_mrr) || 0, + Number(row.new_business_mrr) || 0, + Number(row.expansion_mrr) || 0, + Number(row.contraction_mrr) || 0, + Number(row.churn_mrr) || 0, + ) + return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7})` + }) - return data ?? [] + await client.query(` + INSERT INTO public.daily_revenue_metrics ( + date_id, + customer_id, + opening_mrr, + new_business_mrr, + expansion_mrr, + contraction_mrr, + churn_mrr + ) + VALUES ${placeholders.join(', ')} + ON CONFLICT (date_id, customer_id) DO UPDATE + SET + updated_at = now(), + opening_mrr = EXCLUDED.opening_mrr, + new_business_mrr = EXCLUDED.new_business_mrr, + expansion_mrr = EXCLUDED.expansion_mrr, + contraction_mrr = EXCLUDED.contraction_mrr, + churn_mrr = EXCLUDED.churn_mrr + `, values) + } } -async function fetchPreviousMrr(supabase: SupabaseClient, dateId: string) { - const { data, error } = await supabase - .from('global_stats') - .select('mrr') - .eq('date_id', getPreviousDateId(dateId)) - .maybeSingle() +async function insertProcessedEventsPg(client: PgClient, movements: BackfillRevenueMovementEvent[]) { + for (const chunk of chunkArray(movements, DB_CHUNK_SIZE)) { + if (chunk.length === 0) + continue - if (error) - throw error + const values: string[] = [] + const placeholders = chunk.map((movement, index) => { + const offset = index * 3 + values.push(movement.event_id, movement.customer_id, movement.date_id) + return `($${offset + 1}, $${offset + 2}, $${offset + 3})` + }) - return Number(data?.mrr) || 0 + await client.query(` + INSERT INTO public.processed_stripe_events ( + event_id, + customer_id, + date_id + ) + VALUES ${placeholders.join(', ')} + ON CONFLICT (event_id) DO NOTHING + `, values) + } } -async function refreshGlobalRetentionMetrics(supabase: SupabaseClient, dateIds: string[]): Promise { +async function refreshGlobalRetentionMetricsPg(client: PgClient, dateIds: string[]): Promise { const skippedMissingGlobalStats: string[] = [] let updated = 0 for (const dateId of dateIds) { - const [rows, previousMrr] = await Promise.all([ - fetchDailyRevenueMetricsForDate(supabase, dateId), - fetchPreviousMrr(supabase, dateId), - ]) - const retainedChanges = rows.reduce((summary, row) => { - if ((Number(row.opening_mrr) || 0) <= 0) - return summary - - summary.churnMrr += Number(row.churn_mrr) || 0 - summary.contractionMrr += Number(row.contraction_mrr) || 0 - summary.expansionMrr += Number(row.expansion_mrr) || 0 - return summary - }, { churnMrr: 0, contractionMrr: 0, expansionMrr: 0 }) - const totalLostRevenue = rows.reduce((summary, row) => { - summary.churnMrr += Number(row.churn_mrr) || 0 - summary.contractionMrr += Number(row.contraction_mrr) || 0 - return summary - }, { churnMrr: 0, contractionMrr: 0, expansionMrr: 0 }) - - const { data: globalStats, error: globalStatsError } = await supabase - .from('global_stats') - .select('date_id') - .eq('date_id', dateId) - .maybeSingle() - - if (globalStatsError) - throw globalStatsError - if (!globalStats) { + const { rows } = await client.query(` + SELECT + EXISTS ( + SELECT 1 + FROM public.global_stats + WHERE date_id = $1 + ) AS has_global_stats, + COALESCE(( + SELECT mrr + FROM public.global_stats + WHERE date_id = $2 + ), 0) AS previous_mrr, + COALESCE(SUM(CASE WHEN opening_mrr > 0 THEN churn_mrr ELSE 0 END), 0) AS retained_churn_mrr, + COALESCE(SUM(CASE WHEN opening_mrr > 0 THEN contraction_mrr ELSE 0 END), 0) AS retained_contraction_mrr, + COALESCE(SUM(CASE WHEN opening_mrr > 0 THEN expansion_mrr ELSE 0 END), 0) AS retained_expansion_mrr, + COALESCE(SUM(churn_mrr), 0) AS lost_churn_mrr, + COALESCE(SUM(contraction_mrr), 0) AS lost_contraction_mrr + FROM public.daily_revenue_metrics + WHERE date_id = $1 + `, [dateId, getPreviousDateId(dateId)]) + const row = rows[0] + + if (!row?.has_global_stats) { skippedMissingGlobalStats.push(dateId) continue } - const { error } = await supabase - .from('global_stats') - .update({ - churn_revenue: calculateChurnRevenue(totalLostRevenue), - nrr: calculateNrr(previousMrr, retainedChanges), - }) - .eq('date_id', dateId) - - if (error) - throw error - + await client.query(` + UPDATE public.global_stats + SET + churn_revenue = $2, + nrr = $3 + WHERE date_id = $1 + `, [ + dateId, + calculateChurnRevenue({ + churnMrr: Number(row.lost_churn_mrr) || 0, + contractionMrr: Number(row.lost_contraction_mrr) || 0, + expansionMrr: 0, + }), + calculateNrr(Number(row.previous_mrr) || 0, { + churnMrr: Number(row.retained_churn_mrr) || 0, + contractionMrr: Number(row.retained_contraction_mrr) || 0, + expansionMrr: Number(row.retained_expansion_mrr) || 0, + }), + ]) updated++ } return { skippedMissingGlobalStats, updated } } +async function applyBackfillTransaction(options: ApplyBackfillTransactionOptions) { + return withPgTransaction(options.databaseUrl, async (client) => { + if (options.reset) + await resetBackfillRangePg(client, options.fromDateId, options.toDateId, options.customerId) + + await upsertDailyRevenueMetricsPg(client, options.mergedMetricRows) + const refreshResult = await refreshGlobalRetentionMetricsPg(client, options.retentionDates) + await insertProcessedEventsPg(client, options.movementsToApply) + return refreshResult + }) +} + async function writeFailures(failures: unknown[]) { if (failures.length === 0) return @@ -876,7 +946,11 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record row.date_id))].sort() const failures: unknown[] = [] try { - await insertProcessedEvents(supabase, movementsToApply) - await upsertDailyRevenueMetrics(supabase, mergedMetricRows) - const retentionDates = reset - ? getDateIdsBetween(fromDateId, toDateId) - : [...new Set(metricRowsToApply.map(row => row.date_id))].sort() - const refreshResult = await refreshGlobalRetentionMetrics(supabase, retentionDates) + const refreshResult = await applyBackfillTransaction({ + customerId, + databaseUrl: applyDatabaseUrl, + fromDateId, + mergedMetricRows, + movementsToApply, + reset, + retentionDates, + toDateId, + }) console.log(`Updated global_stats retention metrics for ${refreshResult.updated} dates`) if (refreshResult.skippedMissingGlobalStats.length > 0) console.log(`Skipped missing global_stats rows: ${refreshResult.skippedMissingGlobalStats.join(', ')}`) diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts index c428632572..ad1c58f91d 100644 --- a/tests/backfill-retention-metrics.unit.test.ts +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -104,6 +104,24 @@ describe('retention metric backfill helpers', () => { }) }) + it.concurrent('builds new business metrics from status-only subscription update activations', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_status_update', 'customer.subscription.updated', 1774353600, 'cus_activated', 'sub_activated', 'price_solo_monthly', 'prod_solo'), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(1) + expect(result.movements[0]).toMatchObject({ + event_id: 'evt_status_update', + customer_id: 'cus_activated', + current_mrr: 0, + next_mrr: 12, + new_business_mrr: 12, + }) + }) + it.concurrent('builds churn metrics from deleted subscription events', () => { const result = buildRevenueMovementEvents([ subscriptionEvent('evt_deleted', 'customer.subscription.deleted', 1774353600, 'cus_churned', 'sub_churned', 'price_team_yearly', 'prod_team'), From 392f6be34f06b336dd048cc0904950f95d7b63c8 Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 15:25:14 +0200 Subject: [PATCH 05/14] fix(admin): make retention backfill state safe --- scripts/backfill_retention_metrics.ts | 93 ++++++++++++++----- tests/backfill-retention-metrics.unit.test.ts | 54 +++++++++-- 2 files changed, 114 insertions(+), 33 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index f3e84dd5cd..b68df8106f 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -47,7 +47,6 @@ type SubscriptionEventType = typeof SUBSCRIPTION_EVENT_TYPES[number] interface CustomerRevenueBaselineRow { customer_id: string paid_at: string | null - subscription_id: string | null } interface TrackedSubscriptionState { @@ -87,7 +86,6 @@ interface BuildRevenueMovementEventsOptions { customerId?: string | null fromDateId: string initialPaidAtByCustomerId?: Map - initialSubscriptionIdByCustomerId?: Map toDateId: string } @@ -169,6 +167,13 @@ function getRequiredEnv(env: Record, key: string) { return value } +function getRequiredDatabaseUrl(env: Record) { + const value = getDatabaseUrl(env) + if (!value) + throw new Error('--apply requires DATABASE_URL, POSTGRES_URL, SUPABASE_DB_URL, SUPABASE_DB_DIRECT_URL, or DIRECT_URL so metric writes and processed-event markers are committed atomically') + return value +} + function getDatabaseUrl(env: Record) { return env.DATABASE_URL?.trim() || env.POSTGRES_URL?.trim() @@ -339,7 +344,14 @@ function normalizeStripeEventFromFile(event: unknown, index: number): Stripe.Eve } function getLicensedSubscriptionItem(items: Stripe.SubscriptionItem[] | undefined) { - return items?.find(item => item.plan?.usage_type === 'licensed') ?? items?.[0] ?? null + const licensedItem = items?.find(item => item.plan?.usage_type === 'licensed') ?? null + if (licensedItem) + return licensedItem + + if (items?.length) + console.warn(`No licensed subscription item found; ignoring ${items.length} subscription item(s). First item usage_type=${items[0]?.plan?.usage_type ?? 'unknown'}`) + + return null } function getItemPriceId(item: Stripe.SubscriptionItem | null | undefined) { @@ -365,6 +377,25 @@ function getPreviousSubscriptionItems(event: Stripe.Event) { return previousAttributes?.items?.data as Stripe.SubscriptionItem[] | undefined } +function toBackfillStripeStatus(status: unknown): StripeStatus | null { + if (status === 'active' || status === 'trialing' || status === 'succeeded') + return 'succeeded' + if (status === 'created' || status === 'updated' || status === 'failed' || status === 'deleted' || status === 'canceled') + return status + return null +} + +function getPreviousSubscriptionStatus(event: Stripe.Event) { + const previousAttributes = event.data.previous_attributes as Partial | undefined + if (!previousAttributes || !Object.hasOwn(previousAttributes, 'status')) + return { hasStatus: false, status: null as StripeStatus | null } + + return { + hasStatus: true, + status: toBackfillStripeStatus(previousAttributes.status), + } +} + function toRevenueState(state: TrackedSubscriptionState | StripeInfoRevenueState): StripeInfoRevenueState { if (!state) return state @@ -458,7 +489,8 @@ export function buildRevenueMovementEvents( const eventOccurredAtIso = getEventCreatedIso(event) const dateId = getEventDateId(eventOccurredAtIso) - if (compareDateIds(dateId, options.fromDateId) < 0 || compareDateIds(dateId, options.toDateId) > 0) { + const isBeforeRange = compareDateIds(dateId, options.fromDateId) < 0 + if (compareDateIds(dateId, options.toDateId) > 0) { skipped.outOfRange++ continue } @@ -485,9 +517,11 @@ export function buildRevenueMovementEvents( const trackedState = customerStates.get(customerId) const previousItem = getLicensedSubscriptionItem(getPreviousSubscriptionItems(event)) - const previousPriceId = getItemPriceId(previousItem) ?? trackedState?.price_id ?? null - const previousProductId = getItemProductId(previousItem) ?? trackedState?.product_id ?? null - const previousStatus = trackedState?.status ?? (previousItem ? 'succeeded' : null) + const previousStatusChange = getPreviousSubscriptionStatus(event) + const shouldReuseCurrentPlanForPreviousState = !trackedState && !previousItem && previousStatusChange.status === 'succeeded' + const previousPriceId = getItemPriceId(previousItem) ?? trackedState?.price_id ?? (shouldReuseCurrentPlanForPreviousState ? currentPriceId : null) + const previousProductId = getItemProductId(previousItem) ?? trackedState?.product_id ?? (shouldReuseCurrentPlanForPreviousState ? currentProductId : null) + const previousStatus = trackedState?.status ?? (previousItem ? 'succeeded' : previousStatusChange.status) const previousMrr = getSubscriptionMrr(plans, { is_good_plan: true, paid_at: trackedState?.paid_at ?? eventOccurredAtIso, @@ -506,11 +540,20 @@ export function buildRevenueMovementEvents( nextState = buildTrackedState(customerId, subscriptionId, 'succeeded', currentPriceId, currentProductId, knownPaidAt ?? eventOccurredAtIso) } else if (event.type === 'customer.subscription.updated') { + const hasPreviousRevenueState = Boolean(trackedState || previousItem || previousStatusChange.hasStatus) currentState = buildTrackedState(customerId, subscriptionId, previousMrr > 0 ? 'succeeded' : 'updated', previousPriceId, previousProductId, activePaidAt) nextState = buildTrackedState(customerId, subscriptionId, 'succeeded', currentPriceId, currentProductId, activePaidAt ?? eventOccurredAtIso) + if (!hasPreviousRevenueState) { + customerStates.set(customerId, nextState) + if (isBeforeRange) + skipped.outOfRange++ + else + skipped.noMovement++ + continue + } } else { - const baselineSubscriptionId = trackedState?.subscription_id ?? options.initialSubscriptionIdByCustomerId?.get(customerId) ?? null + const baselineSubscriptionId = trackedState?.subscription_id ?? null if (baselineSubscriptionId && baselineSubscriptionId !== subscriptionId) { skipped.subscriptionMismatch++ continue @@ -523,6 +566,11 @@ export function buildRevenueMovementEvents( const movement = classifyRevenueMovement(toRevenueState(currentState), toRevenueState(nextState), plans) customerStates.set(customerId, nextState) + if (isBeforeRange) { + skipped.outOfRange++ + continue + } + if (!hasRevenueMovement(movement)) { skipped.noMovement++ continue @@ -651,11 +699,10 @@ async function fetchRevenuePlans(supabase: SupabaseClient): Promise() - const subscriptionIdByCustomerId = new Map() for (const chunk of chunkArray(customerIds, DB_CHUNK_SIZE)) { const { data, error } = await supabase .from('stripe_info') - .select('customer_id, paid_at, subscription_id') + .select('customer_id, paid_at') .in('customer_id', chunk) if (error) @@ -663,10 +710,9 @@ async function fetchInitialCustomerRevenueBaseline(supabase: SupabaseClient, cus for (const row of (data ?? []) as CustomerRevenueBaselineRow[]) { paidAtByCustomerId.set(row.customer_id, row.paid_at) - subscriptionIdByCustomerId.set(row.customer_id, row.subscription_id) } } - return { paidAtByCustomerId, subscriptionIdByCustomerId } + return { paidAtByCustomerId } } async function fetchExistingProcessedEventIds(supabase: SupabaseClient, eventIds: string[]) { @@ -948,9 +994,7 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record 0 + ? oldestEventApiDateId + : fromDateId + events = await fetchStripeEvents(stripe, fetchFromDateId, toDateId, limit) + if (fetchFromDateId !== fromDateId) + console.log(`Fetched recent Stripe events from ${fetchFromDateId} to seed subscription state before ${fromDateId}`) console.log(`Fetched ${events.length} subscription events from Stripe`) } @@ -977,7 +1029,6 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record ({ @@ -1007,10 +1058,6 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record = {} + if (previous?.priceId && previous.productId) { + previousAttributes.items = { + data: [subscriptionItem(previous.priceId, previous.productId)], + } as any + } + if (previous && 'status' in previous) + previousAttributes.status = previous.status as any + return { id, type, @@ -53,11 +62,7 @@ function subscriptionEvent( }, }, previous_attributes: previous - ? { - items: { - data: [subscriptionItem(previous.priceId, previous.productId)], - }, - } + ? previousAttributes : undefined, }, } as Stripe.Event @@ -104,9 +109,11 @@ describe('retention metric backfill helpers', () => { }) }) - it.concurrent('builds new business metrics from status-only subscription update activations', () => { + it.concurrent('builds new business metrics from status-only subscription update activations with previous status', () => { const result = buildRevenueMovementEvents([ - subscriptionEvent('evt_status_update', 'customer.subscription.updated', 1774353600, 'cus_activated', 'sub_activated', 'price_solo_monthly', 'prod_solo'), + subscriptionEvent('evt_status_update', 'customer.subscription.updated', 1774353600, 'cus_activated', 'sub_activated', 'price_solo_monthly', 'prod_solo', { + status: 'incomplete', + }), ], plans as any, { fromDateId: '2026-03-24', toDateId: '2026-03-24', @@ -122,6 +129,18 @@ describe('retention metric backfill helpers', () => { }) }) + it.concurrent('skips first subscription update when previous revenue state is unknown', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_metadata_update', 'customer.subscription.updated', 1774353600, 'cus_existing_unknown', 'sub_existing_unknown', 'price_solo_monthly', 'prod_solo'), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(0) + expect(result.skipped.noMovement).toBe(1) + }) + it.concurrent('builds churn metrics from deleted subscription events', () => { const result = buildRevenueMovementEvents([ subscriptionEvent('evt_deleted', 'customer.subscription.deleted', 1774353600, 'cus_churned', 'sub_churned', 'price_team_yearly', 'prod_team'), @@ -198,16 +217,31 @@ describe('retention metric backfill helpers', () => { }) }) - it.concurrent('skips first-in-range deleted events when the stored subscription id differs', () => { + it.concurrent('skips deleted events when pre-range state tracks a different subscription id', () => { const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_pre_range_create', 'customer.subscription.created', 1774267200, 'cus_active', 'sub_new', 'price_team_monthly', 'prod_team'), subscriptionEvent('evt_old_deleted', 'customer.subscription.deleted', 1774353600, 'cus_active', 'sub_old', 'price_team_monthly', 'prod_team'), ], plans as any, { fromDateId: '2026-03-24', - initialSubscriptionIdByCustomerId: new Map([['cus_active', 'sub_new']]), toDateId: '2026-03-24', }) expect(result.movements).toHaveLength(0) expect(result.skipped.subscriptionMismatch).toBe(1) }) + + it.concurrent('does not use current subscription id as a historical deletion baseline', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_deleted', 'customer.subscription.deleted', 1774353600, 'cus_historical_churn', 'sub_old', 'price_team_monthly', 'prod_team'), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(1) + expect(result.movements[0]).toMatchObject({ + event_id: 'evt_deleted', + churn_mrr: 49, + }) + }) }) From 33fcbc4afe4a3f81c91e5f73330e171d4efb009a Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 15:32:46 +0200 Subject: [PATCH 06/14] fix(admin): address retention backfill review nits --- scripts/backfill_retention_metrics.ts | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index b68df8106f..7c2afac1f2 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -109,6 +109,7 @@ interface RefreshRetentionMetricsResult { interface ApplyBackfillTransactionOptions { customerId?: string | null databaseUrl: string + env: Record fromDateId: string mergedMetricRows: DailyRevenueMetricInsert[] movementsToApply: BackfillRevenueMovementEvent[] @@ -183,12 +184,18 @@ function getDatabaseUrl(env: Record) { || null } -function createPgClient(databaseUrl: string) { +function shouldAllowSelfSignedPgCertificate(env: Record) { + return env.PG_ALLOW_SELF_SIGNED_CERT?.trim() === 'true' || env.PG_SSL_REJECT_UNAUTHORIZED?.trim() === '0' +} + +function createPgClient(databaseUrl: string, env: Record) { const host = new URL(databaseUrl).hostname const usesLocalDatabase = host === 'localhost' || host === '127.0.0.1' || host === '::1' return new PgClient({ connectionString: databaseUrl, - ssl: usesLocalDatabase ? false : { rejectUnauthorized: false }, + // Keep certificate validation on by default; disable it only for managed + // poolers that require self-signed certs and are explicitly opted in. + ssl: usesLocalDatabase ? false : { rejectUnauthorized: !shouldAllowSelfSignedPgCertificate(env) }, }) } @@ -396,10 +403,7 @@ function getPreviousSubscriptionStatus(event: Stripe.Event) { } } -function toRevenueState(state: TrackedSubscriptionState | StripeInfoRevenueState): StripeInfoRevenueState { - if (!state) - return state - +function toRevenueState(state: TrackedSubscriptionState): NonNullable { return { is_good_plan: state.is_good_plan, paid_at: state.paid_at, @@ -774,8 +778,8 @@ export function mergeMetricRows(existingRows: DailyRevenueMetricRow[], rowsToAdd }) } -async function withPgTransaction(databaseUrl: string, action: (client: PgClient) => Promise) { - const client = createPgClient(databaseUrl) +async function withPgTransaction(databaseUrl: string, env: Record, action: (client: PgClient) => Promise) { + const client = createPgClient(databaseUrl, env) await client.connect() try { @@ -930,7 +934,7 @@ async function refreshGlobalRetentionMetricsPg(client: PgClient, dateIds: string } async function applyBackfillTransaction(options: ApplyBackfillTransactionOptions) { - return withPgTransaction(options.databaseUrl, async (client) => { + return withPgTransaction(options.databaseUrl, options.env, async (client) => { if (options.reset) await resetBackfillRangePg(client, options.fromDateId, options.toDateId, options.customerId) @@ -1073,6 +1077,7 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record Date: Thu, 23 Apr 2026 15:41:40 +0200 Subject: [PATCH 07/14] fix(admin): complete retention backfill review fixes --- scripts/backfill_retention_metrics.ts | 44 +++++++++++++------ tests/backfill-retention-metrics.unit.test.ts | 14 ++++++ 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 7c2afac1f2..e6e58175db 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -30,6 +30,7 @@ const DEFAULT_ENV_FILE = './internal/cloudflare/.env.prod' const DEFAULT_LOOKBACK_DAYS = 30 const EVENT_FETCH_PAGE_SIZE = 100 const DB_CHUNK_SIZE = 500 +const DB_PAGE_SIZE = 1000 const FAILURE_OUTPUT = './tmp/retention_metric_backfill_failures.json' const DATE_ID_REGEX = /^\d{4}-\d{2}-\d{2}$/ const SUBSCRIPTION_EVENT_TYPES = [ @@ -385,8 +386,10 @@ function getPreviousSubscriptionItems(event: Stripe.Event) { } function toBackfillStripeStatus(status: unknown): StripeStatus | null { - if (status === 'active' || status === 'trialing' || status === 'succeeded') + if (status === 'active' || status === 'trialing' || status === 'past_due' || status === 'unpaid' || status === 'succeeded') return 'succeeded' + if (status === 'incomplete' || status === 'incomplete_expired' || status === 'paused') + return 'created' if (status === 'created' || status === 'updated' || status === 'failed' || status === 'deleted' || status === 'canceled') return status return null @@ -544,7 +547,7 @@ export function buildRevenueMovementEvents( nextState = buildTrackedState(customerId, subscriptionId, 'succeeded', currentPriceId, currentProductId, knownPaidAt ?? eventOccurredAtIso) } else if (event.type === 'customer.subscription.updated') { - const hasPreviousRevenueState = Boolean(trackedState || previousItem || previousStatusChange.hasStatus) + const hasPreviousRevenueState = Boolean(trackedState || previousItem || previousStatusChange.status) currentState = buildTrackedState(customerId, subscriptionId, previousMrr > 0 ? 'succeeded' : 'updated', previousPriceId, previousProductId, activePaidAt) nextState = buildTrackedState(customerId, subscriptionId, 'succeeded', currentPriceId, currentProductId, activePaidAt ?? eventOccurredAtIso) if (!hasPreviousRevenueState) { @@ -742,20 +745,33 @@ async function fetchExistingDailyRevenueMetrics( toDateId: string, customerId?: string | null, ) { - let query = supabase - .from('daily_revenue_metrics') - .select('*') - .gte('date_id', fromDateId) - .lte('date_id', toDateId) - - if (customerId) - query = query.eq('customer_id', customerId) + const rows: DailyRevenueMetricRow[] = [] + let offset = 0 + + while (true) { + let query = supabase + .from('daily_revenue_metrics') + .select('*') + .gte('date_id', fromDateId) + .lte('date_id', toDateId) + .order('date_id', { ascending: true }) + .order('customer_id', { ascending: true }) + .range(offset, offset + DB_PAGE_SIZE - 1) + + if (customerId) + query = query.eq('customer_id', customerId) + + const { data, error } = await query + if (error) + throw error - const { data, error } = await query - if (error) - throw error + rows.push(...(data ?? [])) + if (!data || data.length < DB_PAGE_SIZE) + break + offset += DB_PAGE_SIZE + } - return data ?? [] + return rows } export function mergeMetricRows(existingRows: DailyRevenueMetricRow[], rowsToAdd: DailyRevenueMetricInsert[]) { diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts index 94554673c6..0478aa3003 100644 --- a/tests/backfill-retention-metrics.unit.test.ts +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -141,6 +141,20 @@ describe('retention metric backfill helpers', () => { expect(result.skipped.noMovement).toBe(1) }) + it.concurrent('does not invent movement for first past due subscription updates', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_past_due_update', 'customer.subscription.updated', 1774353600, 'cus_past_due', 'sub_past_due', 'price_solo_monthly', 'prod_solo', { + status: 'past_due', + }), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(0) + expect(result.skipped.noMovement).toBe(1) + }) + it.concurrent('builds churn metrics from deleted subscription events', () => { const result = buildRevenueMovementEvents([ subscriptionEvent('evt_deleted', 'customer.subscription.deleted', 1774353600, 'cus_churned', 'sub_churned', 'price_team_yearly', 'prod_team'), From 4610c737613df579bdddeffd8b9e83124d13d766 Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 15:51:39 +0200 Subject: [PATCH 08/14] fix(admin): claim retention backfill events in transaction --- scripts/backfill_retention_metrics.ts | 126 ++++++++++++-------------- 1 file changed, 59 insertions(+), 67 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index e6e58175db..6297dbd815 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -30,7 +30,6 @@ const DEFAULT_ENV_FILE = './internal/cloudflare/.env.prod' const DEFAULT_LOOKBACK_DAYS = 30 const EVENT_FETCH_PAGE_SIZE = 100 const DB_CHUNK_SIZE = 500 -const DB_PAGE_SIZE = 1000 const FAILURE_OUTPUT = './tmp/retention_metric_backfill_failures.json' const DATE_ID_REGEX = /^\d{4}-\d{2}-\d{2}$/ const SUBSCRIPTION_EVENT_TYPES = [ @@ -112,13 +111,17 @@ interface ApplyBackfillTransactionOptions { databaseUrl: string env: Record fromDateId: string - mergedMetricRows: DailyRevenueMetricInsert[] - movementsToApply: BackfillRevenueMovementEvent[] + movements: BackfillRevenueMovementEvent[] reset: boolean retentionDates: string[] toDateId: string } +interface ApplyBackfillTransactionResult extends RefreshRetentionMetricsResult { + metricRowsApplied: number + movementsApplied: number +} + interface RetentionMetricSummaryRow { has_global_stats: boolean lost_churn_mrr: number | string | null @@ -739,41 +742,6 @@ async function fetchExistingProcessedEventIds(supabase: SupabaseClient, eventIds return existing } -async function fetchExistingDailyRevenueMetrics( - supabase: SupabaseClient, - fromDateId: string, - toDateId: string, - customerId?: string | null, -) { - const rows: DailyRevenueMetricRow[] = [] - let offset = 0 - - while (true) { - let query = supabase - .from('daily_revenue_metrics') - .select('*') - .gte('date_id', fromDateId) - .lte('date_id', toDateId) - .order('date_id', { ascending: true }) - .order('customer_id', { ascending: true }) - .range(offset, offset + DB_PAGE_SIZE - 1) - - if (customerId) - query = query.eq('customer_id', customerId) - - const { data, error } = await query - if (error) - throw error - - rows.push(...(data ?? [])) - if (!data || data.length < DB_PAGE_SIZE) - break - offset += DB_PAGE_SIZE - } - - return rows -} - export function mergeMetricRows(existingRows: DailyRevenueMetricRow[], rowsToAdd: DailyRevenueMetricInsert[]) { const existingByKey = new Map(existingRows.map(row => [`${row.date_id}:${row.customer_id}`, row])) @@ -825,7 +793,7 @@ async function resetBackfillRangePg(client: PgClient, fromDateId: string, toDate await client.query(`DELETE FROM public.daily_revenue_metrics WHERE ${predicates.join(' AND ')}`, values) } -async function upsertDailyRevenueMetricsPg(client: PgClient, rows: DailyRevenueMetricInsert[]) { +async function upsertDailyRevenueMetricsPg(client: PgClient, rows: DailyRevenueMetricInsert[], mode: 'additive' | 'exact') { for (const chunk of chunkArray(rows, DB_CHUNK_SIZE)) { if (chunk.length === 0) continue @@ -845,6 +813,21 @@ async function upsertDailyRevenueMetricsPg(client: PgClient, rows: DailyRevenueM return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7})` }) + const updateClause = mode === 'exact' + ? ` + opening_mrr = EXCLUDED.opening_mrr, + new_business_mrr = EXCLUDED.new_business_mrr, + expansion_mrr = EXCLUDED.expansion_mrr, + contraction_mrr = EXCLUDED.contraction_mrr, + churn_mrr = EXCLUDED.churn_mrr + ` + : ` + new_business_mrr = public.daily_revenue_metrics.new_business_mrr + EXCLUDED.new_business_mrr, + expansion_mrr = public.daily_revenue_metrics.expansion_mrr + EXCLUDED.expansion_mrr, + contraction_mrr = public.daily_revenue_metrics.contraction_mrr + EXCLUDED.contraction_mrr, + churn_mrr = public.daily_revenue_metrics.churn_mrr + EXCLUDED.churn_mrr + ` + await client.query(` INSERT INTO public.daily_revenue_metrics ( date_id, @@ -859,16 +842,14 @@ async function upsertDailyRevenueMetricsPg(client: PgClient, rows: DailyRevenueM ON CONFLICT (date_id, customer_id) DO UPDATE SET updated_at = now(), - opening_mrr = EXCLUDED.opening_mrr, - new_business_mrr = EXCLUDED.new_business_mrr, - expansion_mrr = EXCLUDED.expansion_mrr, - contraction_mrr = EXCLUDED.contraction_mrr, - churn_mrr = EXCLUDED.churn_mrr + ${updateClause} `, values) } } -async function insertProcessedEventsPg(client: PgClient, movements: BackfillRevenueMovementEvent[]) { +async function claimProcessedEventsPg(client: PgClient, movements: BackfillRevenueMovementEvent[]) { + const claimedEventIds = new Set() + for (const chunk of chunkArray(movements, DB_CHUNK_SIZE)) { if (chunk.length === 0) continue @@ -880,7 +861,7 @@ async function insertProcessedEventsPg(client: PgClient, movements: BackfillReve return `($${offset + 1}, $${offset + 2}, $${offset + 3})` }) - await client.query(` + const { rows } = await client.query<{ event_id: string }>(` INSERT INTO public.processed_stripe_events ( event_id, customer_id, @@ -888,8 +869,14 @@ async function insertProcessedEventsPg(client: PgClient, movements: BackfillReve ) VALUES ${placeholders.join(', ')} ON CONFLICT (event_id) DO NOTHING + RETURNING event_id `, values) + + for (const row of rows) + claimedEventIds.add(row.event_id) } + + return movements.filter(movement => claimedEventIds.has(movement.event_id)) } async function refreshGlobalRetentionMetricsPg(client: PgClient, dateIds: string[]): Promise { @@ -949,15 +936,23 @@ async function refreshGlobalRetentionMetricsPg(client: PgClient, dateIds: string return { skippedMissingGlobalStats, updated } } -async function applyBackfillTransaction(options: ApplyBackfillTransactionOptions) { +async function applyBackfillTransaction(options: ApplyBackfillTransactionOptions): Promise { return withPgTransaction(options.databaseUrl, options.env, async (client) => { if (options.reset) await resetBackfillRangePg(client, options.fromDateId, options.toDateId, options.customerId) - await upsertDailyRevenueMetricsPg(client, options.mergedMetricRows) - const refreshResult = await refreshGlobalRetentionMetricsPg(client, options.retentionDates) - await insertProcessedEventsPg(client, options.movementsToApply) - return refreshResult + const movementsToApply = await claimProcessedEventsPg(client, options.movements) + const metricRowsToApply = aggregateRevenueMovementEvents(movementsToApply) + await upsertDailyRevenueMetricsPg(client, metricRowsToApply, options.reset ? 'exact' : 'additive') + const retentionDates = options.reset + ? options.retentionDates + : [...new Set(metricRowsToApply.map(row => row.date_id))].sort() + const refreshResult = await refreshGlobalRetentionMetricsPg(client, retentionDates) + return { + ...refreshResult, + metricRowsApplied: metricRowsToApply.length, + movementsApplied: movementsToApply.length, + } }) } @@ -1061,46 +1056,43 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record movement.event_id)) - movementsToApply = movements.filter(movement => !existingEventIds.has(movement.event_id)) - console.log(`Existing processed events skipped: ${movements.length - movementsToApply.length}`) + previewMovementsToApply = movements.filter(movement => !existingEventIds.has(movement.event_id)) + console.log(`Existing processed events skipped: ${movements.length - previewMovementsToApply.length}`) } - const metricRowsToApply = aggregateRevenueMovementEvents(movementsToApply) - printSummary('Daily metrics to apply', summarizeDailyRevenueMetrics(metricRowsToApply)) + const previewMetricRowsToApply = aggregateRevenueMovementEvents(previewMovementsToApply) + printSummary(apply ? 'Candidate daily metrics to apply' : 'Daily metrics to apply', summarizeDailyRevenueMetrics(previewMetricRowsToApply)) if (!apply) { console.log('Sample metric rows:') - for (const row of metricRowsToApply.slice(0, 10)) + for (const row of previewMetricRowsToApply.slice(0, 10)) console.log(row) return } - const existingMetrics = reset - ? [] - : await fetchExistingDailyRevenueMetrics(supabase, fromDateId, toDateId, customerId) - const mergedMetricRows = reset - ? metricRowsToApply - : mergeMetricRows(existingMetrics, metricRowsToApply) const retentionDates = reset ? getDateIdsBetween(fromDateId, toDateId) - : [...new Set(metricRowsToApply.map(row => row.date_id))].sort() + : [] const failures: unknown[] = [] + let appliedMovements = 0 + let appliedMetricRows = 0 try { const refreshResult = await applyBackfillTransaction({ customerId, databaseUrl, env, fromDateId, - mergedMetricRows, - movementsToApply, + movements, reset, retentionDates, toDateId, }) + appliedMovements = refreshResult.movementsApplied + appliedMetricRows = refreshResult.metricRowsApplied console.log(`Updated global_stats retention metrics for ${refreshResult.updated} dates`) if (refreshResult.skippedMissingGlobalStats.length > 0) console.log(`Skipped missing global_stats rows: ${refreshResult.skippedMissingGlobalStats.join(', ')}`) @@ -1119,7 +1111,7 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record 0) throw new Error(`Retention metric backfill failed. Details written to ${FAILURE_OUTPUT}`) - console.log(`Done. Processed movements: ${movementsToApply.length}. Daily metric rows: ${mergedMetricRows.length}.`) + console.log(`Done. Processed movements: ${appliedMovements}. Daily metric rows: ${appliedMetricRows}.`) } if (import.meta.main) From 2703ddb9a2d401bb778ec0db5973f4d57f367307 Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 16:03:21 +0200 Subject: [PATCH 09/14] fix(admin): align retention backfill deletions --- scripts/backfill_retention_metrics.ts | 39 +++++++++++++++++-- tests/backfill-retention-metrics.unit.test.ts | 18 ++++++++- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 6297dbd815..ef06d09820 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -379,6 +379,18 @@ function getItemProductId(item: Stripe.SubscriptionItem | null | undefined) { return toStripeId(item.plan?.product) ?? toStripeId(item.price?.product) ?? null } +function getItemPeriodEndIso(item: Stripe.SubscriptionItem | null | undefined) { + if (!item?.current_period_end) + return null + + return new Date(item.current_period_end * 1000).toISOString() +} + +function isActiveUntilPeriodEnd(item: Stripe.SubscriptionItem | null | undefined, eventOccurredAtIso: string) { + const periodEndIso = getItemPeriodEndIso(item) + return Boolean(periodEndIso && new Date(periodEndIso).getTime() > new Date(eventOccurredAtIso).getTime()) +} + function getSubscriptionItems(subscription: Stripe.Subscription) { return subscription.items?.data as Stripe.SubscriptionItem[] | undefined } @@ -570,7 +582,7 @@ export function buildRevenueMovementEvents( } currentState = buildTrackedState(customerId, subscriptionId, 'succeeded', trackedState?.price_id ?? currentPriceId, trackedState?.product_id ?? currentProductId, activePaidAt ?? eventOccurredAtIso) - nextState = buildTrackedState(customerId, subscriptionId, 'deleted', currentPriceId, currentProductId, activePaidAt ?? eventOccurredAtIso) + nextState = buildTrackedState(customerId, subscriptionId, isActiveUntilPeriodEnd(currentItem, eventOccurredAtIso) ? 'succeeded' : 'deleted', currentPriceId, currentProductId, activePaidAt ?? eventOccurredAtIso) } const movement = classifyRevenueMovement(toRevenueState(currentState), toRevenueState(nextState), plans) @@ -625,6 +637,20 @@ export function aggregateRevenueMovementEvents(movements: BackfillRevenueMovemen }) } +function dedupeRevenueMovementEvents(movements: BackfillRevenueMovementEvent[]) { + const deduped: BackfillRevenueMovementEvent[] = [] + const seenEventIds = new Set() + + for (const movement of movements) { + if (seenEventIds.has(movement.event_id)) + continue + seenEventIds.add(movement.event_id) + deduped.push(movement) + } + + return deduped +} + export function summarizeDailyRevenueMetrics(rows: Pick[]): BackfillSummary { return rows.reduce((summary, row) => { summary.rows++ @@ -848,9 +874,10 @@ async function upsertDailyRevenueMetricsPg(client: PgClient, rows: DailyRevenueM } async function claimProcessedEventsPg(client: PgClient, movements: BackfillRevenueMovementEvent[]) { + const uniqueMovements = dedupeRevenueMovementEvents(movements) const claimedEventIds = new Set() - for (const chunk of chunkArray(movements, DB_CHUNK_SIZE)) { + for (const chunk of chunkArray(uniqueMovements, DB_CHUNK_SIZE)) { if (chunk.length === 0) continue @@ -876,7 +903,7 @@ async function claimProcessedEventsPg(client: PgClient, movements: BackfillReven claimedEventIds.add(row.event_id) } - return movements.filter(movement => claimedEventIds.has(movement.event_id)) + return uniqueMovements.filter(movement => claimedEventIds.has(movement.event_id)) } async function refreshGlobalRetentionMetricsPg(client: PgClient, dateIds: string[]): Promise { @@ -1040,12 +1067,16 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record ({ opening_mrr: movement.opening_mrr, new_business_mrr: movement.new_business_mrr, diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts index 0478aa3003..7c7ba8db18 100644 --- a/tests/backfill-retention-metrics.unit.test.ts +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -19,8 +19,9 @@ const plans = [ }, ] as const -function subscriptionItem(priceId: string, productId: string) { +function subscriptionItem(priceId: string, productId: string, currentPeriodEnd?: number) { return { + current_period_end: currentPeriodEnd, plan: { id: priceId, product: productId, @@ -38,6 +39,7 @@ function subscriptionEvent( priceId: string, productId: string, previous?: { priceId?: string, productId?: string, status?: string }, + currentPeriodEnd?: number, ) { const previousAttributes: Partial = {} if (previous?.priceId && previous.productId) { @@ -58,7 +60,7 @@ function subscriptionEvent( object: 'subscription', customer: customerId, items: { - data: [subscriptionItem(priceId, productId)], + data: [subscriptionItem(priceId, productId, currentPeriodEnd)], }, }, previous_attributes: previous @@ -171,6 +173,18 @@ describe('retention metric backfill helpers', () => { }) }) + it.concurrent('does not churn deleted subscriptions that are active until period end', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_deleted_period_end', 'customer.subscription.deleted', 1774353600, 'cus_canceling', 'sub_canceling', 'price_team_monthly', 'prod_team', undefined, 1774440000), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(0) + expect(result.skipped.noMovement).toBe(1) + }) + it.concurrent('aggregates multiple movements for one customer-day with the first opening MRR', () => { const result = buildRevenueMovementEvents([ subscriptionEvent('evt_create', 'customer.subscription.created', 1774353600, 'cus_sequence', 'sub_sequence', 'price_solo_monthly', 'prod_solo'), From 1ea39557c845ae24b2316434e8b1edecc53014b8 Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 16:18:52 +0200 Subject: [PATCH 10/14] fix(admin): keep retention backfill script scoped --- scripts/backfill_retention_metrics.ts | 162 +++++++++++++++- .../_backend/triggers/logsnag_insights.ts | 28 ++- .../_backend/triggers/stripe_event.ts | 138 +++++++++++++- .../_backend/utils/revenue_metrics.ts | 175 ------------------ 4 files changed, 323 insertions(+), 180 deletions(-) delete mode 100644 supabase/functions/_backend/utils/revenue_metrics.ts diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index ef06d09820..6d277ada5d 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -16,7 +16,6 @@ * bun run stripe:backfill-retention-metrics --events-file=./tmp/stripe-events.json --from=2026-01-01 --to=2026-04-23 */ import type Stripe from 'stripe' -import type { RevenueMovement, RevenuePlanRow, StripeInfoRevenueState } from '../supabase/functions/_backend/utils/revenue_metrics.ts' import type { Database } from '../supabase/functions/_backend/utils/supabase.types.ts' import { existsSync } from 'node:fs' import { mkdir, readFile, writeFile } from 'node:fs/promises' @@ -24,7 +23,6 @@ import process from 'node:process' import { createClient } from '@supabase/supabase-js' import { Client as PgClient } from 'pg' import StripeClient from 'stripe' -import { calculateChurnRevenue, calculateNrr, classifyRevenueMovement, getEventDateId, getPreviousDateId, getSubscriptionMrr, hasRevenueMovement } from '../supabase/functions/_backend/utils/revenue_metrics.ts' const DEFAULT_ENV_FILE = './internal/cloudflare/.env.prod' const DEFAULT_LOOKBACK_DAYS = 30 @@ -41,8 +39,17 @@ const SUBSCRIPTION_EVENT_TYPES = [ type SupabaseClient = ReturnType> type DailyRevenueMetricRow = Database['public']['Tables']['daily_revenue_metrics']['Row'] type DailyRevenueMetricInsert = Database['public']['Tables']['daily_revenue_metrics']['Insert'] +type PlanRow = Database['public']['Tables']['plans']['Row'] type StripeStatus = Database['public']['Enums']['stripe_status'] type SubscriptionEventType = typeof SUBSCRIPTION_EVENT_TYPES[number] +type StripeInfoRevenueState = { + is_good_plan?: boolean | null + paid_at?: string | null + price_id?: string | null + product_id?: string | null + status?: StripeStatus | null +} | null | undefined +type RevenuePlanRow = Pick interface CustomerRevenueBaselineRow { customer_id: string @@ -132,6 +139,30 @@ interface RetentionMetricSummaryRow { retained_expansion_mrr: number | string | null } +interface RevenueMovement { + currentMrr: number + nextMrr: number + newBusinessMrr: number + expansionMrr: number + contractionMrr: number + churnMrr: number +} + +interface DailyRevenueChangeSummary { + churnMrr: number + contractionMrr: number + expansionMrr: number +} + +const ZERO_REVENUE_MOVEMENT: RevenueMovement = { + currentMrr: 0, + nextMrr: 0, + newBusinessMrr: 0, + expansionMrr: 0, + contractionMrr: 0, + churnMrr: 0, +} + function getArgValue(args: string[], prefix: string): string | null { const arg = args.find(value => value.startsWith(`${prefix}=`)) if (!arg) @@ -421,6 +452,133 @@ function getPreviousSubscriptionStatus(event: Stripe.Event) { } } +function getRevenueMetricDateId(targetDate = new Date()) { + return new Date(Date.UTC(targetDate.getUTCFullYear(), targetDate.getUTCMonth(), targetDate.getUTCDate())).toISOString().slice(0, 10) +} + +function getEventDateId(eventOccurredAtIso: string) { + return new Date(eventOccurredAtIso).toISOString().slice(0, 10) +} + +function getPreviousDateId(dateId: string) { + const target = new Date(`${dateId}T00:00:00.000Z`) + target.setUTCDate(target.getUTCDate() - 1) + return getRevenueMetricDateId(target) +} + +function getPlanMrr(plan: RevenuePlanRow | null | undefined, priceId: string | null | undefined) { + if (!plan || !priceId) + return 0 + + if (plan.price_m_id === priceId) + return Number(plan.price_m) || 0 + + if (plan.price_y_id === priceId) + return (Number(plan.price_y) || 0) / 12 + + return 0 +} + +function getPlanByProductId(plans: RevenuePlanRow[], productId: string | null | undefined) { + if (!productId) + return null + + return plans.find(plan => plan.stripe_id === productId) ?? null +} + +function getSubscriptionMrr(plans: RevenuePlanRow[], stripeInfo: StripeInfoRevenueState) { + if (stripeInfo?.status !== 'succeeded' || stripeInfo?.is_good_plan === false) + return 0 + + return getPlanMrr(getPlanByProductId(plans, stripeInfo.product_id), stripeInfo.price_id) +} + +function classifyRevenueMovement( + currentStripeInfo: StripeInfoRevenueState, + nextStripeInfo: StripeInfoRevenueState, + plans: RevenuePlanRow[], +): RevenueMovement { + const currentMrr = getSubscriptionMrr(plans, currentStripeInfo) + const nextMrr = getSubscriptionMrr(plans, nextStripeInfo) + + if (currentMrr === 0 && nextMrr === 0) + return { ...ZERO_REVENUE_MOVEMENT } + + if (currentMrr === 0 && nextMrr > 0) { + if (!currentStripeInfo?.paid_at) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + newBusinessMrr: nextMrr, + } + } + + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + expansionMrr: nextMrr, + } + } + + if (currentMrr > 0 && nextMrr === 0) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + churnMrr: currentMrr, + } + } + + if (nextMrr > currentMrr) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + expansionMrr: nextMrr - currentMrr, + } + } + + if (currentMrr > nextMrr) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + contractionMrr: currentMrr - nextMrr, + } + } + + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + } +} + +function hasRevenueMovement(movement: RevenueMovement) { + return movement.newBusinessMrr > 0 + || movement.expansionMrr > 0 + || movement.contractionMrr > 0 + || movement.churnMrr > 0 +} + +function calculateNrr(previousMrr: number, dailyChanges: DailyRevenueChangeSummary) { + if (previousMrr <= 0) + return 100 + + const retainedMrr = Math.max( + previousMrr - dailyChanges.churnMrr - dailyChanges.contractionMrr + dailyChanges.expansionMrr, + 0, + ) + + return Number(((retainedMrr / previousMrr) * 100).toFixed(2)) +} + +function calculateChurnRevenue(dailyChanges: DailyRevenueChangeSummary) { + return Number((dailyChanges.churnMrr + dailyChanges.contractionMrr).toFixed(2)) +} + function toRevenueState(state: TrackedSubscriptionState): NonNullable { return { is_good_plan: state.is_good_plan, diff --git a/supabase/functions/_backend/triggers/logsnag_insights.ts b/supabase/functions/_backend/triggers/logsnag_insights.ts index 0487b024e9..1f4c63683b 100644 --- a/supabase/functions/_backend/triggers/logsnag_insights.ts +++ b/supabase/functions/_backend/triggers/logsnag_insights.ts @@ -11,7 +11,6 @@ import { BRES, middlewareAPISecret } from '../utils/hono.ts' import { cloudlog, cloudlogErr } from '../utils/logging.ts' import { logsnagInsights } from '../utils/logsnag.ts' import { closeClient, getDrizzleClient, getPgClient } from '../utils/pg.ts' -import { calculateChurnRevenue, calculateNrr, getPreviousDateId } from '../utils/revenue_metrics.ts' import { countAllApps, countAllUpdates, countAllUpdatesExternal, getUpdateStats } from '../utils/stats.ts' import { supabaseAdmin } from '../utils/supabase.ts' import { sendEventToTracking } from '../utils/tracking.ts' @@ -63,6 +62,11 @@ interface PlanRevenue { plan_enterprise_monthly: number plan_enterprise_yearly: number } +interface DailyRevenueChangeSummary { + churnMrr: number + contractionMrr: number + expansionMrr: number +} interface RevenueRetentionMetrics { churnRevenue: number nrr: number @@ -145,6 +149,28 @@ function countUniqueCustomers(...rowSets: Array + +interface RevenueMovement { + currentMrr: number + nextMrr: number + newBusinessMrr: number + expansionMrr: number + contractionMrr: number + churnMrr: number +} type PersistRevenueMovementResult = 'applied' | 'duplicate' | 'missing' | 'stale' +const ZERO_REVENUE_MOVEMENT: RevenueMovement = { + currentMrr: 0, + nextMrr: 0, + newBusinessMrr: 0, + expansionMrr: 0, + contractionMrr: 0, + churnMrr: 0, +} const STRIPE_INFO_TRANSACTION_COLUMNS = [ 'bandwidth_exceeded', 'build_time_exceeded', @@ -155,6 +178,117 @@ function getPlanChangeTrackingEventName(statusName: string) { return statusName === 'upgraded' ? 'User Upgraded' : 'User Plan Changed' } +function getEventDateId(eventOccurredAtIso: string) { + return new Date(eventOccurredAtIso).toISOString().slice(0, 10) +} + +function getPlanMrr(plan: RevenuePlanRow | null | undefined, priceId: string | null | undefined) { + if (!plan || !priceId) + return 0 + + if (plan.price_m_id === priceId) + return Number(plan.price_m) || 0 + + if (plan.price_y_id === priceId) + return (Number(plan.price_y) || 0) / 12 + + return 0 +} + +function getPlanByProductId(plans: RevenuePlanRow[], productId: string | null | undefined) { + if (!productId) + return null + + return plans.find(plan => plan.stripe_id === productId) ?? null +} + +function getSubscriptionMrr(plans: RevenuePlanRow[], stripeInfo: StripeInfoRevenueState) { + if (!stripeInfo || stripeInfo.status !== 'succeeded' || stripeInfo.is_good_plan === false) + return 0 + + return getPlanMrr(getPlanByProductId(plans, stripeInfo.product_id), stripeInfo.price_id) +} + +function classifyRevenueMovement( + currentStripeInfo: StripeInfoRevenueState, + nextStripeInfo: StripeInfoRevenueState, + plans: RevenuePlanRow[], +): RevenueMovement { + const currentMrr = getSubscriptionMrr(plans, currentStripeInfo) + const nextMrr = getSubscriptionMrr(plans, nextStripeInfo) + + if (currentMrr === 0 && nextMrr === 0) + return { ...ZERO_REVENUE_MOVEMENT } + + if (currentMrr === 0 && nextMrr > 0) { + if (!currentStripeInfo?.paid_at) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + newBusinessMrr: nextMrr, + } + } + + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + expansionMrr: nextMrr, + } + } + + if (currentMrr > 0 && nextMrr === 0) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + churnMrr: currentMrr, + } + } + + if (nextMrr > currentMrr) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + expansionMrr: nextMrr - currentMrr, + } + } + + if (currentMrr > nextMrr) { + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + contractionMrr: currentMrr - nextMrr, + } + } + + return { + ...ZERO_REVENUE_MOVEMENT, + currentMrr, + nextMrr, + } +} + +function hasRevenueMovement(movement: RevenueMovement) { + return movement.newBusinessMrr > 0 + || movement.expansionMrr > 0 + || movement.contractionMrr > 0 + || movement.churnMrr > 0 +} + +function isStaleStripeEvent( + currentStripeInfo: Pick | null | undefined, + eventOccurredAtIso: string, +) { + if (!currentStripeInfo?.last_stripe_event_at) + return false + + return new Date(currentStripeInfo.last_stripe_event_at).getTime() > new Date(eventOccurredAtIso).getTime() +} + async function getRevenuePlans(c: Context): Promise { const { data: plans, error } = await supabaseAdmin(c) .from('plans') diff --git a/supabase/functions/_backend/utils/revenue_metrics.ts b/supabase/functions/_backend/utils/revenue_metrics.ts deleted file mode 100644 index 0eebe0a20d..0000000000 --- a/supabase/functions/_backend/utils/revenue_metrics.ts +++ /dev/null @@ -1,175 +0,0 @@ -import type { Database } from './supabase.types.ts' - -type PlanRow = Database['public']['Tables']['plans']['Row'] -type StripeInfoRow = Database['public']['Tables']['stripe_info']['Row'] - -export type StripeInfoRevenueState = { - is_good_plan?: boolean | null - paid_at?: string | null - price_id?: string | null - product_id?: string | null - status?: Database['public']['Enums']['stripe_status'] | null -} | null | undefined - -export type RevenuePlanRow = Pick - -export interface RevenueMovement { - currentMrr: number - nextMrr: number - newBusinessMrr: number - expansionMrr: number - contractionMrr: number - churnMrr: number -} - -export interface DailyRevenueChangeSummary { - churnMrr: number - contractionMrr: number - expansionMrr: number -} - -const ZERO_REVENUE_MOVEMENT: RevenueMovement = { - currentMrr: 0, - nextMrr: 0, - newBusinessMrr: 0, - expansionMrr: 0, - contractionMrr: 0, - churnMrr: 0, -} - -export function getRevenueMetricDateId(targetDate = new Date()) { - return new Date(Date.UTC(targetDate.getUTCFullYear(), targetDate.getUTCMonth(), targetDate.getUTCDate())).toISOString().slice(0, 10) -} - -export function getEventDateId(eventOccurredAtIso: string) { - return new Date(eventOccurredAtIso).toISOString().slice(0, 10) -} - -export function getPreviousDateId(dateId: string) { - const target = new Date(`${dateId}T00:00:00.000Z`) - target.setUTCDate(target.getUTCDate() - 1) - return getRevenueMetricDateId(target) -} - -function getPlanMrr(plan: RevenuePlanRow | null | undefined, priceId: string | null | undefined) { - if (!plan || !priceId) - return 0 - - if (plan.price_m_id === priceId) - return Number(plan.price_m) || 0 - - if (plan.price_y_id === priceId) - return (Number(plan.price_y) || 0) / 12 - - return 0 -} - -function getPlanByProductId(plans: RevenuePlanRow[], productId: string | null | undefined) { - if (!productId) - return null - - return plans.find(plan => plan.stripe_id === productId) ?? null -} - -export function getSubscriptionMrr(plans: RevenuePlanRow[], stripeInfo: StripeInfoRevenueState) { - if (stripeInfo?.status !== 'succeeded' || stripeInfo?.is_good_plan === false) - return 0 - - return getPlanMrr(getPlanByProductId(plans, stripeInfo.product_id), stripeInfo.price_id) -} - -export function classifyRevenueMovement( - currentStripeInfo: StripeInfoRevenueState, - nextStripeInfo: StripeInfoRevenueState, - plans: RevenuePlanRow[], -): RevenueMovement { - const currentMrr = getSubscriptionMrr(plans, currentStripeInfo) - const nextMrr = getSubscriptionMrr(plans, nextStripeInfo) - - if (currentMrr === 0 && nextMrr === 0) - return { ...ZERO_REVENUE_MOVEMENT } - - if (currentMrr === 0 && nextMrr > 0) { - if (!currentStripeInfo?.paid_at) { - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - newBusinessMrr: nextMrr, - } - } - - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - expansionMrr: nextMrr, - } - } - - if (currentMrr > 0 && nextMrr === 0) { - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - churnMrr: currentMrr, - } - } - - if (nextMrr > currentMrr) { - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - expansionMrr: nextMrr - currentMrr, - } - } - - if (currentMrr > nextMrr) { - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - contractionMrr: currentMrr - nextMrr, - } - } - - return { - ...ZERO_REVENUE_MOVEMENT, - currentMrr, - nextMrr, - } -} - -export function hasRevenueMovement(movement: RevenueMovement) { - return movement.newBusinessMrr > 0 - || movement.expansionMrr > 0 - || movement.contractionMrr > 0 - || movement.churnMrr > 0 -} - -export function isStaleStripeEvent( - currentStripeInfo: Pick | null | undefined, - eventOccurredAtIso: string, -) { - if (!currentStripeInfo?.last_stripe_event_at) - return false - - return new Date(currentStripeInfo.last_stripe_event_at).getTime() > new Date(eventOccurredAtIso).getTime() -} - -export function calculateNrr(previousMrr: number, dailyChanges: DailyRevenueChangeSummary) { - if (previousMrr <= 0) - return 100 - - const retainedMrr = Math.max( - previousMrr - dailyChanges.churnMrr - dailyChanges.contractionMrr + dailyChanges.expansionMrr, - 0, - ) - - return Number(((retainedMrr / previousMrr) * 100).toFixed(2)) -} - -export function calculateChurnRevenue(dailyChanges: DailyRevenueChangeSummary) { - return Number((dailyChanges.churnMrr + dailyChanges.contractionMrr).toFixed(2)) -} From 7169c3240ebc7a4a7cc69af88fb51555bcb86133 Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 16:39:04 +0200 Subject: [PATCH 11/14] fix(admin): preserve backfill event order --- scripts/backfill_retention_metrics.ts | 13 ++++++---- tests/backfill-retention-metrics.unit.test.ts | 26 +++++++++++++++++++ 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 6d277ada5d..8bd236d31c 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -310,11 +310,14 @@ function getEventCreatedIso(event: Stripe.Event) { } function sortStripeEvents(events: Stripe.Event[]) { - return [...events].sort((left, right) => { - if (left.created !== right.created) - return left.created - right.created - return left.id.localeCompare(right.id) - }) + return events + .map((event, index) => ({ event, index })) + .sort((left, right) => { + if (left.event.created !== right.event.created) + return left.event.created - right.event.created + return left.index - right.index + }) + .map(item => item.event) } function parseStripeEventCreatedSeconds(value: unknown) { diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts index 7c7ba8db18..1748b9fef7 100644 --- a/tests/backfill-retention-metrics.unit.test.ts +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -215,6 +215,32 @@ describe('retention metric backfill helpers', () => { }) }) + it.concurrent('preserves input order for same-timestamp events', () => { + const result = buildRevenueMovementEvents([ + subscriptionEvent('evt_z_create', 'customer.subscription.created', 1774353600, 'cus_same_second', 'sub_same_second', 'price_solo_monthly', 'prod_solo'), + subscriptionEvent('evt_a_delete', 'customer.subscription.deleted', 1774353600, 'cus_same_second', 'sub_same_second', 'price_solo_monthly', 'prod_solo'), + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(2) + expect(result.movements.map(movement => movement.event_id)).toEqual([ + 'evt_z_create', + 'evt_a_delete', + ]) + expect(result.movements[0]).toMatchObject({ + new_business_mrr: 12, + expansion_mrr: 0, + churn_mrr: 0, + }) + expect(result.movements[1]).toMatchObject({ + new_business_mrr: 0, + expansion_mrr: 0, + churn_mrr: 12, + }) + }) + it.concurrent('keeps an existing zero opening MRR when incrementally merging same-day rows', () => { const merged = mergeMetricRows([ { From dfe95c521115db98e9b067cfcfde7a52c3dd520c Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 16:56:43 +0200 Subject: [PATCH 12/14] fix(admin): guard reset backfill snapshots --- scripts/backfill_retention_metrics.ts | 67 ++++++++++++++++++- tests/backfill-retention-metrics.unit.test.ts | 22 +++++- 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 8bd236d31c..792e0812c3 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -129,6 +129,10 @@ interface ApplyBackfillTransactionResult extends RefreshRetentionMetricsResult { movementsApplied: number } +interface ProcessedEventIdRow { + event_id: string +} + interface RetentionMetricSummaryRow { has_global_stats: boolean lost_churn_mrr: number | string | null @@ -812,6 +816,24 @@ function dedupeRevenueMovementEvents(movements: BackfillRevenueMovementEvent[]) return deduped } +export function findMissingResetSnapshotEventIds( + movements: BackfillRevenueMovementEvent[], + processedEventIds: string[], + sampleSize = 10, +) { + const snapshotEventIds = new Set(movements.map(movement => movement.event_id)) + const missingEventIds: string[] = [] + + for (const eventId of processedEventIds) { + if (!snapshotEventIds.has(eventId)) + missingEventIds.push(eventId) + if (missingEventIds.length >= sampleSize) + break + } + + return missingEventIds +} + export function summarizeDailyRevenueMetrics(rows: Pick[]): BackfillSummary { return rows.reduce((summary, row) => { summary.rows++ @@ -1067,6 +1089,46 @@ async function claimProcessedEventsPg(client: PgClient, movements: BackfillReven return uniqueMovements.filter(movement => claimedEventIds.has(movement.event_id)) } +async function lockResetRetentionTablesPg(client: PgClient) { + // Block concurrent webhook writes while reset deletes and exact metric upserts run. + await client.query(` + LOCK TABLE + public.processed_stripe_events, + public.daily_revenue_metrics + IN SHARE ROW EXCLUSIVE MODE + `) +} + +async function assertResetSnapshotIsCurrentPg( + client: PgClient, + movements: BackfillRevenueMovementEvent[], + fromDateId: string, + toDateId: string, + customerId?: string | null, +) { + const values = [fromDateId, toDateId] + const predicates = [ + `date_id >= $1`, + `date_id <= $2`, + ] + + if (customerId) { + values.push(customerId) + predicates.push(`customer_id = $3`) + } + + const { rows } = await client.query(` + SELECT event_id + FROM public.processed_stripe_events + WHERE ${predicates.join(' AND ')} + `, values) + + const missingEventIds = findMissingResetSnapshotEventIds(movements, rows.map(row => row.event_id)) + if (missingEventIds.length > 0) { + throw new Error(`--apply --reset snapshot is stale for ${fromDateId}..${toDateId}; fetch events again before retrying. Missing processed event ids: ${missingEventIds.join(', ')}`) + } +} + async function refreshGlobalRetentionMetricsPg(client: PgClient, dateIds: string[]): Promise { const skippedMissingGlobalStats: string[] = [] let updated = 0 @@ -1126,8 +1188,11 @@ async function refreshGlobalRetentionMetricsPg(client: PgClient, dateIds: string async function applyBackfillTransaction(options: ApplyBackfillTransactionOptions): Promise { return withPgTransaction(options.databaseUrl, options.env, async (client) => { - if (options.reset) + if (options.reset) { + await lockResetRetentionTablesPg(client) + await assertResetSnapshotIsCurrentPg(client, options.movements, options.fromDateId, options.toDateId, options.customerId) await resetBackfillRangePg(client, options.fromDateId, options.toDateId, options.customerId) + } const movementsToApply = await claimProcessedEventsPg(client, options.movements) const metricRowsToApply = aggregateRevenueMovementEvents(movementsToApply) diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts index 1748b9fef7..2041862dde 100644 --- a/tests/backfill-retention-metrics.unit.test.ts +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -1,6 +1,6 @@ import type Stripe from 'stripe' import { describe, expect, it } from 'vitest' -import { aggregateRevenueMovementEvents, buildRevenueMovementEvents, mergeMetricRows, summarizeDailyRevenueMetrics } from '../scripts/backfill_retention_metrics.ts' +import { aggregateRevenueMovementEvents, buildRevenueMovementEvents, findMissingResetSnapshotEventIds, mergeMetricRows, summarizeDailyRevenueMetrics } from '../scripts/backfill_retention_metrics.ts' const plans = [ { @@ -271,6 +271,26 @@ describe('retention metric backfill helpers', () => { }) }) + it.concurrent('detects reset snapshots that miss already processed event ids', () => { + const missing = findMissingResetSnapshotEventIds([ + { + event_id: 'evt_known', + event_type: 'customer.subscription.created', + date_id: '2026-03-24', + customer_id: 'cus_reset', + opening_mrr: 0, + current_mrr: 0, + next_mrr: 12, + new_business_mrr: 12, + expansion_mrr: 0, + contraction_mrr: 0, + churn_mrr: 0, + }, + ], ['evt_known', 'evt_missing']) + + expect(missing).toEqual(['evt_missing']) + }) + it.concurrent('skips deleted events when pre-range state tracks a different subscription id', () => { const result = buildRevenueMovementEvents([ subscriptionEvent('evt_pre_range_create', 'customer.subscription.created', 1774267200, 'cus_active', 'sub_new', 'price_team_monthly', 'prod_team'), From 64c15bdf2156c4671ca2861cc338737ef26ae3fb Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 17:08:36 +0200 Subject: [PATCH 13/14] fix(admin): harden retention event fetches --- scripts/backfill_retention_metrics.ts | 54 +++++++++++++------ tests/backfill-retention-metrics.unit.test.ts | 53 +++++++++++++++++- 2 files changed, 89 insertions(+), 18 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 792e0812c3..6b520b29fb 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -108,6 +108,11 @@ interface BuildRevenueMovementEventsResult { } } +interface StripeEventFetchResult { + events: Stripe.Event[] + reachedLimit: boolean +} + interface RefreshRetentionMetricsResult { skippedMissingGlobalStats: string[] updated: number @@ -869,26 +874,32 @@ async function loadEventsFile(filePath: string): Promise { return sortStripeEvents(events.map(normalizeStripeEventFromFile)) } -async function fetchStripeEvents(stripe: StripeClient, fromDateId: string, toDateId: string, limit: number | null) { +export async function fetchStripeEvents(stripe: Pick, fromDateId: string, toDateId: string, limit: number | null): Promise { const events: Stripe.Event[] = [] - for (const type of SUBSCRIPTION_EVENT_TYPES) { - const params = { - created: { - gte: dateIdToStartSeconds(fromDateId), - lte: dateIdToEndSeconds(toDateId), - }, - limit: EVENT_FETCH_PAGE_SIZE, - type, - } as Stripe.EventListParams - - for await (const event of stripe.events.list(params)) { - events.push(event) - if (limit && events.length >= limit) - return sortStripeEvents(events) + + const params = { + created: { + gte: dateIdToStartSeconds(fromDateId), + lte: dateIdToEndSeconds(toDateId), + }, + limit: EVENT_FETCH_PAGE_SIZE, + types: [...SUBSCRIPTION_EVENT_TYPES], + } as Stripe.EventListParams + + for await (const event of stripe.events.list(params)) { + events.push(event) + if (limit && events.length >= limit) { + return { + events: sortStripeEvents(events), + reachedLimit: true, + } } } - return sortStripeEvents(events) + return { + events: sortStripeEvents(events), + reachedLimit: false, + } } function getCustomerIdsFromEvents(events: Stripe.Event[], customerId?: string | null) { @@ -1265,6 +1276,7 @@ async function main(args = process.argv.slice(2), runtimeEnv: Record 0 ? oldestEventApiDateId : fromDateId - events = await fetchStripeEvents(stripe, fetchFromDateId, toDateId, limit) + const fetchedEvents = await fetchStripeEvents(stripe, fetchFromDateId, toDateId, limit) + events = fetchedEvents.events + reachedEventFetchLimit = fetchedEvents.reachedLimit if (fetchFromDateId !== fromDateId) console.log(`Fetched recent Stripe events from ${fetchFromDateId} to seed subscription state before ${fromDateId}`) console.log(`Fetched ${events.length} subscription events from Stripe`) + if (reachedEventFetchLimit) + console.warn(`Stripe event fetch stopped at --limit=${limit}`) + } + + if (apply && reset && reachedEventFetchLimit) { + throw new Error('--apply --reset cannot use a truncated Stripe event snapshot. Increase or remove --limit, or provide --events-file.') } const customerIds = getCustomerIdsFromEvents(events, customerId) diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts index 2041862dde..8025e39d40 100644 --- a/tests/backfill-retention-metrics.unit.test.ts +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -1,6 +1,6 @@ import type Stripe from 'stripe' import { describe, expect, it } from 'vitest' -import { aggregateRevenueMovementEvents, buildRevenueMovementEvents, findMissingResetSnapshotEventIds, mergeMetricRows, summarizeDailyRevenueMetrics } from '../scripts/backfill_retention_metrics.ts' +import { aggregateRevenueMovementEvents, buildRevenueMovementEvents, fetchStripeEvents, findMissingResetSnapshotEventIds, mergeMetricRows, summarizeDailyRevenueMetrics } from '../scripts/backfill_retention_metrics.ts' const plans = [ { @@ -291,6 +291,57 @@ describe('retention metric backfill helpers', () => { expect(missing).toEqual(['evt_missing']) }) + it.concurrent('keeps Stripe API source order for same-second events across event types', async () => { + const seenParams: Stripe.EventListParams[] = [] + const stripe = { + events: { + list(params: Stripe.EventListParams) { + seenParams.push(params) + return (async function* () { + yield subscriptionEvent('evt_deleted_same_second', 'customer.subscription.deleted', 1774353600, 'cus_api', 'sub_api', 'price_solo_monthly', 'prod_solo') + yield subscriptionEvent('evt_created_same_second', 'customer.subscription.created', 1774353600, 'cus_api', 'sub_api', 'price_solo_monthly', 'prod_solo') + })() + }, + }, + } as any + + const result = await fetchStripeEvents(stripe, '2026-03-24', '2026-03-24', null) + + expect(seenParams).toHaveLength(1) + expect(seenParams[0]?.types).toEqual([ + 'customer.subscription.created', + 'customer.subscription.updated', + 'customer.subscription.deleted', + ]) + expect(result.reachedLimit).toBe(false) + expect(result.events.map(event => event.id)).toEqual([ + 'evt_deleted_same_second', + 'evt_created_same_second', + ]) + }) + + it.concurrent('flags truncated Stripe event fetches when the limit is reached', async () => { + const stripe = { + events: { + list() { + return (async function* () { + yield subscriptionEvent('evt_first', 'customer.subscription.created', 1774353600, 'cus_limit', 'sub_limit', 'price_solo_monthly', 'prod_solo') + yield subscriptionEvent('evt_second', 'customer.subscription.updated', 1774357200, 'cus_limit', 'sub_limit', 'price_team_monthly', 'prod_team', { + priceId: 'price_solo_monthly', + productId: 'prod_solo', + }) + })() + }, + }, + } as any + + const result = await fetchStripeEvents(stripe, '2026-03-24', '2026-03-24', 1) + + expect(result.reachedLimit).toBe(true) + expect(result.events).toHaveLength(1) + expect(result.events[0]?.id).toBe('evt_first') + }) + it.concurrent('skips deleted events when pre-range state tracks a different subscription id', () => { const result = buildRevenueMovementEvents([ subscriptionEvent('evt_pre_range_create', 'customer.subscription.created', 1774267200, 'cus_active', 'sub_new', 'price_team_monthly', 'prod_team'), From 91bc95e4e1c4705ef6f06f59a0414524b8004900 Mon Sep 17 00:00:00 2001 From: Martin Donadieu Date: Thu, 23 Apr 2026 17:16:26 +0200 Subject: [PATCH 14/14] fix(admin): align retention item fallback --- scripts/backfill_retention_metrics.ts | 7 ++-- tests/backfill-retention-metrics.unit.test.ts | 34 +++++++++++++++++-- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/scripts/backfill_retention_metrics.ts b/scripts/backfill_retention_metrics.ts index 6b520b29fb..4169cb503d 100644 --- a/scripts/backfill_retention_metrics.ts +++ b/scripts/backfill_retention_metrics.ts @@ -402,10 +402,11 @@ function getLicensedSubscriptionItem(items: Stripe.SubscriptionItem[] | undefine if (licensedItem) return licensedItem - if (items?.length) - console.warn(`No licensed subscription item found; ignoring ${items.length} subscription item(s). First item usage_type=${items[0]?.plan?.usage_type ?? 'unknown'}`) + const fallbackItem = items?.[0] ?? null + if (fallbackItem) + console.warn(`No licensed subscription item found; falling back to the first subscription item (usage_type=${fallbackItem.plan?.usage_type ?? 'unknown'})`) - return null + return fallbackItem } function getItemPriceId(item: Stripe.SubscriptionItem | null | undefined) { diff --git a/tests/backfill-retention-metrics.unit.test.ts b/tests/backfill-retention-metrics.unit.test.ts index 8025e39d40..8fdb178dea 100644 --- a/tests/backfill-retention-metrics.unit.test.ts +++ b/tests/backfill-retention-metrics.unit.test.ts @@ -19,13 +19,13 @@ const plans = [ }, ] as const -function subscriptionItem(priceId: string, productId: string, currentPeriodEnd?: number) { +function subscriptionItem(priceId: string, productId: string, currentPeriodEnd?: number, usageType = 'licensed') { return { current_period_end: currentPeriodEnd, plan: { id: priceId, product: productId, - usage_type: 'licensed', + usage_type: usageType, }, } as Stripe.SubscriptionItem } @@ -157,6 +157,36 @@ describe('retention metric backfill helpers', () => { expect(result.skipped.noMovement).toBe(1) }) + it.concurrent('falls back to the first subscription item when no licensed item is present', () => { + const result = buildRevenueMovementEvents([ + { + id: 'evt_metered_fallback', + type: 'customer.subscription.created', + created: 1774353600, + data: { + object: { + id: 'sub_metered', + object: 'subscription', + customer: 'cus_metered', + items: { + data: [subscriptionItem('price_solo_monthly', 'prod_solo', undefined, 'metered')], + }, + }, + }, + } as Stripe.Event, + ], plans as any, { + fromDateId: '2026-03-24', + toDateId: '2026-03-24', + }) + + expect(result.movements).toHaveLength(1) + expect(result.movements[0]).toMatchObject({ + event_id: 'evt_metered_fallback', + new_business_mrr: 12, + }) + expect(result.skipped.missingPlan).toBe(0) + }) + it.concurrent('builds churn metrics from deleted subscription events', () => { const result = buildRevenueMovementEvents([ subscriptionEvent('evt_deleted', 'customer.subscription.deleted', 1774353600, 'cus_churned', 'sub_churned', 'price_team_yearly', 'prod_team'),