diff --git a/services/billing/pod-billing/src/db/postgres.ts b/services/billing/pod-billing/src/db/postgres.ts index 31fba8abad4..5425e3b93dd 100644 --- a/services/billing/pod-billing/src/db/postgres.ts +++ b/services/billing/pod-billing/src/db/postgres.ts @@ -184,8 +184,13 @@ export class PostgresDB implements BillingDB { } async setLiveKitSessions (ctx: MeasureContext, data: LiveKitSessionData[]): Promise { - for (let i = 0; i < data.length; i += BATCH_SIZE) { - const batch = data.slice(i, i + BATCH_SIZE) + const uniqueSessions = new Map() + for (const item of data) { + uniqueSessions.set(`${item.workspace}::${item.sessionId}`, item) + } + const uniqueSessionsValues = uniqueSessions.values() + for (let i = 0; i < uniqueSessions.size; i += BATCH_SIZE) { + const batch = uniqueSessionsValues.take(BATCH_SIZE) const values = [] const params = [] let paramIndex = 1 @@ -198,6 +203,8 @@ export class PostgresDB implements BillingDB { params.push(workspace, sessionId, sessionStart, sessionEnd, room, bandwidth, minutes) } + if (values.length === 0) continue + const query = ` UPSERT INTO billing.livekit_session (workspace, session_id, session_start, session_end, room, bandwidth, minutes) VALUES ${values.join(',')} @@ -207,8 +214,13 @@ export class PostgresDB implements BillingDB { } async setLiveKitEgress (ctx: MeasureContext, data: LiveKitEgressData[]): Promise { - for (let i = 0; i < data.length; i += BATCH_SIZE) { - const batch = data.slice(i, i + BATCH_SIZE) + const uniqueSessions = new Map() + for (const item of data) { + uniqueSessions.set(`${item.workspace}::${item.egressId}`, item) + } + const uniqueSessionsValues = uniqueSessions.values() + for (let i = 0; i < uniqueSessions.size; i += BATCH_SIZE) { + const batch = uniqueSessionsValues.take(BATCH_SIZE) const values = [] const params = [] let paramIndex = 1 @@ -221,6 +233,8 @@ export class PostgresDB implements BillingDB { params.push(workspace, egressId, egressStart, egressEnd, room, duration) } + if (values.length === 0) continue + const query = ` UPSERT INTO billing.livekit_egress (workspace, egress_id, egress_start, egress_end, room, duration) VALUES ${values.join(',')} diff --git a/services/love/src/billing.ts b/services/love/src/billing.ts index 51ba42c9a9a..037c28a3f0b 100644 --- a/services/love/src/billing.ts +++ b/services/love/src/billing.ts @@ -23,9 +23,9 @@ interface LiveKitSession { sessionId: string createdAt: string lastActive: string - bandwidthIn: string - bandwidthOut: string - numParticipants: string + bandwidthIn?: string + bandwidthOut?: string + numParticipants?: string roomName: string endedAt?: string } @@ -104,7 +104,7 @@ export async function updateLiveKitSessions (ctx: MeasureContext): Promise sessionId: session.sessionId, sessionStart: session.createdAt, sessionEnd: session.endedAt ?? session.lastActive, - bandwidth: Math.max(Number(session.bandwidthOut), Number(session.bandwidthIn)), + bandwidth: Math.max(Number(session.bandwidthOut ?? '0'), Number(session.bandwidthIn ?? '0')), minutes: Math.round((sessionEnd - sessionStart) / (1000 * 60)), room: session.roomName })