Skip to content

Commit

Permalink
chore(rethinkdb): Organization: Phase 2 (#9931)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <matt.krick@gmail.com>
  • Loading branch information
mattkrick committed Jul 4, 2024
1 parent 7971e5c commit 5baad4c
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 26 deletions.
55 changes: 30 additions & 25 deletions packages/server/graphql/private/mutations/checkRethinkPgEquality.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import getKysely from '../../../postgres/getKysely'
import {checkRowCount, checkTableEq} from '../../../postgres/utils/checkEqBase'
import {
compareDateAlmostEqual,
compareOptionalPlaintextContent,
compareRValOptionalPluckedArray,
compareRValOptionalPluckedObject,
compareRValStringAsNumber,
compareRValUndefinedAsEmptyArray,
compareRValUndefinedAsFalse,
compareRValUndefinedAsNull,
compareRValUndefinedAsNullAndTruncateRVal,
compareRealNumber,
compareRValUndefinedAsZero,
defaultEqFn
} from '../../../postgres/utils/rethinkEqualityFns'
import {MutationResolvers} from '../resolverTypes'
Expand All @@ -36,11 +37,11 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn
) => {
const r = await getRethink()

if (tableName === 'RetroReflection') {
if (tableName === 'Organization') {
const rowCountResult = await checkRowCount(tableName)
const rethinkQuery = (updatedAt: Date, id: string | number) => {
return r
.table('RetroReflection' as any)
.table('Organization' as any)
.between([updatedAt, id], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
Expand All @@ -50,12 +51,9 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn
}
const pgQuery = async (ids: string[]) => {
return getKysely()
.selectFrom('RetroReflection')
.selectFrom('Organization')
.selectAll()
.select(({fn}) => [
fn('to_json', ['entities']).as('entities'),
fn('to_json', ['reactjis']).as('reactjis')
])
.select(({fn}) => [fn('to_json', ['creditCard']).as('creditCard')])
.where('id', 'in', ids)
.execute()
}
Expand All @@ -64,23 +62,30 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn
pgQuery,
{
id: defaultEqFn,
activeDomain: compareRValUndefinedAsNullAndTruncateRVal(100),
isActiveDomainTouched: compareRValUndefinedAsFalse,
creditCard: compareRValOptionalPluckedObject({
brand: compareRValUndefinedAsNull,
expiry: compareRValUndefinedAsNull,
last4: compareRValStringAsNumber
}),
createdAt: defaultEqFn,
name: compareRValUndefinedAsNullAndTruncateRVal(100),
payLaterClickCount: compareRValUndefinedAsZero,
periodEnd: compareRValUndefinedAsNull,
periodStart: compareRValUndefinedAsNull,
picture: compareRValUndefinedAsNull,
showConversionModal: compareRValUndefinedAsFalse,
stripeId: compareRValUndefinedAsNull,
stripeSubscriptionId: compareRValUndefinedAsNull,
upcomingInvoiceEmailSentAt: compareRValUndefinedAsNull,
tier: defaultEqFn,
tierLimitExceededAt: compareRValUndefinedAsNull,
trialStartDate: compareRValUndefinedAsNull,
scheduledLockAt: compareRValUndefinedAsNull,
lockedAt: compareRValUndefinedAsNull,
updatedAt: compareDateAlmostEqual,
isActive: defaultEqFn,
meetingId: defaultEqFn,
promptId: defaultEqFn,
creatorId: compareRValUndefinedAsNull,
sortOrder: defaultEqFn,
reflectionGroupId: defaultEqFn,
content: compareRValUndefinedAsNullAndTruncateRVal(2000, 0.19),
plaintextContent: compareOptionalPlaintextContent,
entities: compareRValOptionalPluckedArray({
name: defaultEqFn,
salience: compareRealNumber,
lemma: compareRValUndefinedAsNull
}),
reactjis: compareRValUndefinedAsEmptyArray,
sentimentScore: compareRValUndefinedAsNull
featureFlags: compareRValUndefinedAsEmptyArray
},
maxErrors
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import {Kysely, PostgresDialect, sql} from 'kysely'
import {r} from 'rethinkdb-ts'
import connectRethinkDB from '../../database/connectRethinkDB'
import getPg from '../getPg'

const toCreditCard = (creditCard: any) => {
if (!creditCard) return null
return sql<string>`(select json_populate_record(null::"CreditCard", ${JSON.stringify(creditCard)}))`
}

export async function up() {
await connectRethinkDB()
const pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
try {
console.log('Adding index')
await r
.table('Organization')
.indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')])
.run()
await r.table('Organization').indexWait().run()
} catch {
// index already exists
}
console.log('Adding index complete')
const MAX_PG_PARAMS = 65545
const PG_COLS = [
'id',
'activeDomain',
'isActiveDomainTouched',
'creditCard',
'createdAt',
'name',
'payLaterClickCount',
'periodEnd',
'periodStart',
'picture',
'showConversionModal',
'stripeId',
'stripeSubscriptionId',
'upcomingInvoiceEmailSentAt',
'tier',
'tierLimitExceededAt',
'trialStartDate',
'scheduledLockAt',
'lockedAt',
'updatedAt',
'featureFlags'
] as const
type Organization = {
[K in (typeof PG_COLS)[number]]: any
}
const BATCH_SIZE = Math.trunc(MAX_PG_PARAMS / PG_COLS.length)

let curUpdatedAt = r.minval
let curId = r.minval
for (let i = 0; i < 1e6; i++) {
console.log('inserting row', i * BATCH_SIZE, curUpdatedAt, curId)
const rawRowsToInsert = (await r
.table('Organization')
.between([curUpdatedAt, curId], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'updatedAtId'})
.limit(BATCH_SIZE)
.pluck(...PG_COLS)
.run()) as Organization[]

const rowsToInsert = rawRowsToInsert.map((row) => ({
...row,
activeDomain: row.activeDomain?.slice(0, 100) ?? null,
name: row.name.slice(0, 100),
creditCard: toCreditCard(row.creditCard)
}))
if (rowsToInsert.length === 0) break
const lastRow = rowsToInsert[rowsToInsert.length - 1]
curUpdatedAt = lastRow.updatedAt
curId = lastRow.id
try {
await pg
.insertInto('Organization')
.values(rowsToInsert)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
await Promise.all(
rowsToInsert.map(async (row) => {
try {
await pg
.insertInto('Organization')
.values(row)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
console.log(e, row)
}
})
)
}
}
}

export async function down() {
await connectRethinkDB()
try {
await r.table('Organization').indexDrop('updatedAtId').run()
} catch {
// index already dropped
}
const pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
await pg.deleteFrom('Organization').execute()
}
26 changes: 25 additions & 1 deletion packages/server/postgres/utils/rethinkEqualityFns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import stringSimilarity from 'string-similarity'
export const defaultEqFn = (a: unknown, b: unknown) => {
if (a instanceof Date && b instanceof Date) return a.getTime() === b.getTime()
if (Array.isArray(a) && Array.isArray(b)) return JSON.stringify(a) === JSON.stringify(b)
if (typeof a === 'object' && typeof b === 'object') return JSON.stringify(a) === JSON.stringify(b)
return a === b
}
export const compareDateAlmostEqual = (rVal: unknown, pgVal: unknown) => {
Expand All @@ -30,11 +31,33 @@ export const compareRValUndefinedAsFalse = (rVal: unknown, pgVal: unknown) => {
return normalizedRVal === pgVal
}

export const compareRValUndefinedAsZero = (rVal: unknown, pgVal: unknown) => {
const normalizedRVal = rVal === undefined ? 0 : rVal
return normalizedRVal === pgVal
}

export const compareRValUndefinedAsEmptyArray = (rVal: unknown, pgVal: unknown) => {
const normalizedRVal = rVal === undefined ? [] : rVal
return defaultEqFn(normalizedRVal, pgVal)
}

export const compareRValStringAsNumber = (rVal: unknown, pgVal: unknown) => {
const normalizedRVal = Number(rVal)
return defaultEqFn(normalizedRVal, pgVal)
}

export const compareRValOptionalPluckedObject =
(pluckFields: Record<string, typeof defaultEqFn>) => (rVal: unknown, pgVal: unknown) => {
if (!rVal && !pgVal) return true
const rValObj = rVal || {}
const pgValItem = pgVal || {}
return Object.keys(pluckFields).every((prop) => {
const eqFn = pluckFields[prop]!
const rValItemProp = rValObj[prop as keyof typeof rValObj]
const pgValItemProp = pgValItem[prop as keyof typeof pgValItem]
return eqFn(rValItemProp, pgValItemProp)
})
}
export const compareRValOptionalPluckedArray =
(pluckFields: Record<string, typeof defaultEqFn>) => (rVal: unknown, pgVal: unknown) => {
const rValArray = Array.isArray(rVal) ? rVal : []
Expand All @@ -56,7 +79,8 @@ export const compareRValOptionalPluckedArray =
}

export const compareRValUndefinedAsNullAndTruncateRVal =
(length: number, similarity?: number) => (rVal: unknown, pgVal: unknown) => {
(length: number, similarity = 1) =>
(rVal: unknown, pgVal: unknown) => {
const truncatedRVal = typeof rVal === 'string' ? rVal.slice(0, length) : rVal
const normalizedRVal = truncatedRVal === undefined ? null : truncatedRVal
if (
Expand Down

0 comments on commit 5baad4c

Please sign in to comment.