diff --git a/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts b/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts index 954411c9f50..a01373429eb 100644 --- a/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts +++ b/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts @@ -4,12 +4,13 @@ import getKysely from '../../../postgres/getKysely' import {checkRowCount, checkTableEq} from '../../../postgres/utils/checkEqBase' import { compareDateAlmostEqual, - compareOptionalPlaintextContent, - compareRValOptionalPluckedArray, + compareRValOptionalPluckedObject, + compareRValStringAsNumber, compareRValUndefinedAsEmptyArray, + compareRValUndefinedAsFalse, compareRValUndefinedAsNull, compareRValUndefinedAsNullAndTruncateRVal, - compareRealNumber, + compareRValUndefinedAsZero, defaultEqFn } from '../../../postgres/utils/rethinkEqualityFns' import {MutationResolvers} from '../resolverTypes' @@ -36,11 +37,11 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn ) => { const r = await getRethink() - if (tableName === 'RetroReflection') { + if (tableName === 'Organization') { const rowCountResult = await checkRowCount(tableName) const rethinkQuery = (updatedAt: Date, id: string | number) => { return r - .table('RetroReflection' as any) + .table('Organization' as any) .between([updatedAt, id], [r.maxval, r.maxval], { index: 'updatedAtId', leftBound: 'open', @@ -50,12 +51,9 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn } const pgQuery = async (ids: string[]) => { return getKysely() - .selectFrom('RetroReflection') + .selectFrom('Organization') .selectAll() - .select(({fn}) => [ - fn('to_json', ['entities']).as('entities'), - fn('to_json', ['reactjis']).as('reactjis') - ]) + .select(({fn}) => [fn('to_json', ['creditCard']).as('creditCard')]) .where('id', 'in', ids) .execute() } @@ -64,23 +62,30 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn pgQuery, { id: defaultEqFn, + activeDomain: compareRValUndefinedAsNullAndTruncateRVal(100), + isActiveDomainTouched: compareRValUndefinedAsFalse, + creditCard: compareRValOptionalPluckedObject({ + brand: compareRValUndefinedAsNull, + expiry: compareRValUndefinedAsNull, + last4: compareRValStringAsNumber + }), createdAt: defaultEqFn, + name: compareRValUndefinedAsNullAndTruncateRVal(100), + payLaterClickCount: compareRValUndefinedAsZero, + periodEnd: compareRValUndefinedAsNull, + periodStart: compareRValUndefinedAsNull, + picture: compareRValUndefinedAsNull, + showConversionModal: compareRValUndefinedAsFalse, + stripeId: compareRValUndefinedAsNull, + stripeSubscriptionId: compareRValUndefinedAsNull, + upcomingInvoiceEmailSentAt: compareRValUndefinedAsNull, + tier: defaultEqFn, + tierLimitExceededAt: compareRValUndefinedAsNull, + trialStartDate: compareRValUndefinedAsNull, + scheduledLockAt: compareRValUndefinedAsNull, + lockedAt: compareRValUndefinedAsNull, updatedAt: compareDateAlmostEqual, - isActive: defaultEqFn, - meetingId: defaultEqFn, - promptId: defaultEqFn, - creatorId: compareRValUndefinedAsNull, - sortOrder: defaultEqFn, - reflectionGroupId: defaultEqFn, - content: compareRValUndefinedAsNullAndTruncateRVal(2000, 0.19), - plaintextContent: compareOptionalPlaintextContent, - entities: compareRValOptionalPluckedArray({ - name: defaultEqFn, - salience: compareRealNumber, - lemma: compareRValUndefinedAsNull - }), - reactjis: compareRValUndefinedAsEmptyArray, - sentimentScore: compareRValUndefinedAsNull + featureFlags: compareRValUndefinedAsEmptyArray }, maxErrors ) diff --git a/packages/server/postgres/migrations/1720026588542_Organization-phase2.ts b/packages/server/postgres/migrations/1720026588542_Organization-phase2.ts new file mode 100644 index 00000000000..78776890b41 --- /dev/null +++ b/packages/server/postgres/migrations/1720026588542_Organization-phase2.ts @@ -0,0 +1,121 @@ +import {Kysely, PostgresDialect, sql} from 'kysely' +import {r} from 'rethinkdb-ts' +import connectRethinkDB from '../../database/connectRethinkDB' +import getPg from '../getPg' + +const toCreditCard = (creditCard: any) => { + if (!creditCard) return null + return sql`(select json_populate_record(null::"CreditCard", ${JSON.stringify(creditCard)}))` +} + +export async function up() { + await connectRethinkDB() + const pg = new Kysely({ + dialect: new PostgresDialect({ + pool: getPg() + }) + }) + try { + console.log('Adding index') + await r + .table('Organization') + .indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')]) + .run() + await r.table('Organization').indexWait().run() + } catch { + // index already exists + } + console.log('Adding index complete') + const MAX_PG_PARAMS = 65545 + const PG_COLS = [ + 'id', + 'activeDomain', + 'isActiveDomainTouched', + 'creditCard', + 'createdAt', + 'name', + 'payLaterClickCount', + 'periodEnd', + 'periodStart', + 'picture', + 'showConversionModal', + 'stripeId', + 'stripeSubscriptionId', + 'upcomingInvoiceEmailSentAt', + 'tier', + 'tierLimitExceededAt', + 'trialStartDate', + 'scheduledLockAt', + 'lockedAt', + 'updatedAt', + 'featureFlags' + ] as const + type Organization = { + [K in (typeof PG_COLS)[number]]: any + } + const BATCH_SIZE = Math.trunc(MAX_PG_PARAMS / PG_COLS.length) + + let curUpdatedAt = r.minval + let curId = r.minval + for (let i = 0; i < 1e6; i++) { + console.log('inserting row', i * BATCH_SIZE, curUpdatedAt, curId) + const rawRowsToInsert = (await r + .table('Organization') + .between([curUpdatedAt, curId], [r.maxval, r.maxval], { + index: 'updatedAtId', + leftBound: 'open', + rightBound: 'closed' + }) + .orderBy({index: 'updatedAtId'}) + .limit(BATCH_SIZE) + .pluck(...PG_COLS) + .run()) as Organization[] + + const rowsToInsert = rawRowsToInsert.map((row) => ({ + ...row, + activeDomain: row.activeDomain?.slice(0, 100) ?? null, + name: row.name.slice(0, 100), + creditCard: toCreditCard(row.creditCard) + })) + if (rowsToInsert.length === 0) break + const lastRow = rowsToInsert[rowsToInsert.length - 1] + curUpdatedAt = lastRow.updatedAt + curId = lastRow.id + try { + await pg + .insertInto('Organization') + .values(rowsToInsert) + .onConflict((oc) => oc.doNothing()) + .execute() + } catch (e) { + await Promise.all( + rowsToInsert.map(async (row) => { + try { + await pg + .insertInto('Organization') + .values(row) + .onConflict((oc) => oc.doNothing()) + .execute() + } catch (e) { + console.log(e, row) + } + }) + ) + } + } +} + +export async function down() { + await connectRethinkDB() + try { + await r.table('Organization').indexDrop('updatedAtId').run() + } catch { + // index already dropped + } + const pg = new Kysely({ + dialect: new PostgresDialect({ + pool: getPg() + }) + }) + await pg.deleteFrom('Organization').execute() +} diff --git a/packages/server/postgres/utils/rethinkEqualityFns.ts b/packages/server/postgres/utils/rethinkEqualityFns.ts index 8a7282be5f4..3d63e8381df 100644 --- a/packages/server/postgres/utils/rethinkEqualityFns.ts +++ b/packages/server/postgres/utils/rethinkEqualityFns.ts @@ -4,6 +4,7 @@ import stringSimilarity from 'string-similarity' export const defaultEqFn = (a: unknown, b: unknown) => { if (a instanceof Date && b instanceof Date) return a.getTime() === b.getTime() if (Array.isArray(a) && Array.isArray(b)) return JSON.stringify(a) === JSON.stringify(b) + if (typeof a === 'object' && typeof b === 'object') return JSON.stringify(a) === JSON.stringify(b) return a === b } export const compareDateAlmostEqual = (rVal: unknown, pgVal: unknown) => { @@ -30,11 +31,33 @@ export const compareRValUndefinedAsFalse = (rVal: unknown, pgVal: unknown) => { return normalizedRVal === pgVal } +export const compareRValUndefinedAsZero = (rVal: unknown, pgVal: unknown) => { + const normalizedRVal = rVal === undefined ? 0 : rVal + return normalizedRVal === pgVal +} + export const compareRValUndefinedAsEmptyArray = (rVal: unknown, pgVal: unknown) => { const normalizedRVal = rVal === undefined ? [] : rVal return defaultEqFn(normalizedRVal, pgVal) } +export const compareRValStringAsNumber = (rVal: unknown, pgVal: unknown) => { + const normalizedRVal = Number(rVal) + return defaultEqFn(normalizedRVal, pgVal) +} + +export const compareRValOptionalPluckedObject = + (pluckFields: Record) => (rVal: unknown, pgVal: unknown) => { + if (!rVal && !pgVal) return true + const rValObj = rVal || {} + const pgValItem = pgVal || {} + return Object.keys(pluckFields).every((prop) => { + const eqFn = pluckFields[prop]! + const rValItemProp = rValObj[prop as keyof typeof rValObj] + const pgValItemProp = pgValItem[prop as keyof typeof pgValItem] + return eqFn(rValItemProp, pgValItemProp) + }) + } export const compareRValOptionalPluckedArray = (pluckFields: Record) => (rVal: unknown, pgVal: unknown) => { const rValArray = Array.isArray(rVal) ? rVal : [] @@ -56,7 +79,8 @@ export const compareRValOptionalPluckedArray = } export const compareRValUndefinedAsNullAndTruncateRVal = - (length: number, similarity?: number) => (rVal: unknown, pgVal: unknown) => { + (length: number, similarity = 1) => + (rVal: unknown, pgVal: unknown) => { const truncatedRVal = typeof rVal === 'string' ? rVal.slice(0, length) : rVal const normalizedRVal = truncatedRVal === undefined ? null : truncatedRVal if (