Skip to content

Commit

Permalink
feat: related discussions refactor (#9557)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <matt.krick@gmail.com>
Co-authored-by: Jordan Husney <jordan.husney@gmail.com>
  • Loading branch information
mattkrick and jordanh committed Apr 10, 2024
1 parent 0e06d1f commit 15a54fb
Show file tree
Hide file tree
Showing 46 changed files with 1,077 additions and 451 deletions.
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -111,7 +111,7 @@
"minimist": "^1.2.5",
"node-loader": "^2.0.0",
"pg-promise": "^11.2.0",
"pm2": "^5.3.0",
"pm2": "^5.3.1",
"postcss": "^8.4.21",
"postcss-loader": "^7.0.2",
"prettier": "^3.2.5",
Expand Down
4 changes: 2 additions & 2 deletions packages/client/components/RetroDrawerRoot.tsx
@@ -1,6 +1,6 @@
import React, {Suspense} from 'react'
import useQueryLoaderNow from '../hooks/useQueryLoaderNow'
import retroDrawerQuery, {RetroDrawerQuery} from '../__generated__/RetroDrawerQuery.graphql'
import useQueryLoaderNow from '../hooks/useQueryLoaderNow'
import RetroDrawer from './RetroDrawer'

type Props = {
Expand All @@ -12,7 +12,7 @@ type Props = {
const RetroDrawerRoot = (props: Props) => {
const {showDrawer, setShowDrawer, meetingId} = props
const queryRef = useQueryLoaderNow<RetroDrawerQuery>(retroDrawerQuery, {
first: 200,
first: 2000,
type: 'retrospective',
meetingId
})
Expand Down
6 changes: 0 additions & 6 deletions packages/embedder/EMBEDDER_JOB_PRIORITY.ts

This file was deleted.

10 changes: 10 additions & 0 deletions packages/embedder/EmbedderJobType.ts
@@ -0,0 +1,10 @@
import {JobType} from './custom'

type SplitJobType<T extends string> = T extends `${infer W}:${infer S}` ? [W, S] : never
export const EmbedderJobType = {
join: (workflowName: string, stepName: string) => `${workflowName}:${stepName}` as JobType,
split: (jobType: JobType) => {
const [workflowName, stepName] = jobType.split(':') as SplitJobType<typeof jobType>
return {workflowName, stepName}
}
}
40 changes: 14 additions & 26 deletions packages/embedder/EmbeddingsJobQueueStream.ts
@@ -1,31 +1,22 @@
import {Selectable, sql} from 'kysely'
import {sql} from 'kysely'
import ms from 'ms'
import sleep from 'parabol-client/utils/sleep'
import 'parabol-server/initSentry'
import getKysely from 'parabol-server/postgres/getKysely'
import {DB} from 'parabol-server/postgres/pg'
import RootDataLoader from '../server/dataloader/RootDataLoader'
import {processJob} from './processJob'
import {Logger} from '../server/utils/Logger'
import {EmbeddingsTableName} from './ai_models/AbstractEmbeddingsModel'
import {WorkflowOrchestrator} from './WorkflowOrchestrator'
import {DBJob} from './custom'

export type DBJob = Selectable<DB['EmbeddingsJobQueue']>
export type EmbedJob = DBJob & {
jobType: 'embed'
jobData: {
embeddingsMetadataId: number
model: EmbeddingsTableName
}
}
export type RerankJob = DBJob & {jobType: 'rerank'; jobData: {discussionIds: string[]}}
export type Job = EmbedJob | RerankJob

export class EmbeddingsJobQueueStream implements AsyncIterableIterator<Job> {
export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
[Symbol.asyncIterator]() {
return this
}
dataLoader = new RootDataLoader({maxBatchSize: 1000})
async next(): Promise<IteratorResult<Job>> {

orchestrator: WorkflowOrchestrator
constructor(orchestrator: WorkflowOrchestrator) {
this.orchestrator = orchestrator
}
async next(): Promise<IteratorResult<DBJob>> {
const pg = getKysely()
const getJob = (isFailed: boolean) => {
return pg
Expand Down Expand Up @@ -54,20 +45,17 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<Job> {
if (!job) {
Logger.log('JobQueueStream: no jobs found')
// queue is empty, so sleep for a while
await sleep(ms('1m'))
await sleep(ms('10s'))
return this.next()
}

const isSuccessful = await processJob(job as Job, this.dataLoader)
if (isSuccessful) {
await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', job.id).executeTakeFirstOrThrow()
}
return {done: false, value: job as Job}
await this.orchestrator.runStep(job)
return {done: false, value: job}
}
return() {
return Promise.resolve({done: true as const, value: undefined})
}
throw(error: any) {
return Promise.resolve({done: true, value: error})
return Promise.resolve({done: true as const, value: error})
}
}
19 changes: 19 additions & 0 deletions packages/embedder/JobQueueError.ts
@@ -0,0 +1,19 @@
export class JobQueueError extends Error {
name = 'JobQueueError' as const
retryDelay?: number
maxRetries?: number
jobData?: Record<string, any>

constructor(
message: string,
retryDelay?: number,
maxRetries?: number,
jobData?: Record<string, any>
) {
super(message)
this.message = message
this.retryDelay = retryDelay
this.maxRetries = maxRetries
this.jobData = jobData
}
}
117 changes: 117 additions & 0 deletions packages/embedder/WorkflowOrchestrator.ts
@@ -0,0 +1,117 @@
import {sql} from 'kysely'
import RootDataLoader from 'parabol-server/dataloader/RootDataLoader'
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from '../server/utils/Logger'
import {EmbedderJobType} from './EmbedderJobType'
import {JobQueueError} from './JobQueueError'
import {DBJob, JobType, Workflow} from './custom'
import {embedMetadata} from './workflows/embedMetadata'
import {getSimilarRetroTopics} from './workflows/getSimilarRetroTopics'
import {relatedDiscussionsStart} from './workflows/relatedDiscussionsStart'
import {rerankRetroTopics} from './workflows/rerankRetroTopics'

export class WorkflowOrchestrator {
workflows: Record<string, Workflow> = {
embed: {
start: {
run: embedMetadata
}
},
relatedDiscussions: {
start: {
run: relatedDiscussionsStart,
getNextStep: () => 'embed'
},
embed: {
run: embedMetadata,
getNextStep: () => 'getSimilarRetroTopics'
},
getSimilarRetroTopics: {
run: getSimilarRetroTopics,
getNextStep: () => 'rerank'
},
rerank: {
run: rerankRetroTopics
}
}
}

private failJob = async (jobId: number, retryCount: number, error: JobQueueError) => {
Logger.error(error)
const pg = getKysely()
const {message, retryDelay, jobData} = error
const maxRetries = error.maxRetries ?? 1e6
await pg
.updateTable('EmbeddingsJobQueue')
.set((eb) => ({
state: 'failed',
stateMessage: message,
retryCount: eb('retryCount', '+', 1),
retryAfter:
retryDelay && retryCount < maxRetries ? new Date(Date.now() + retryDelay) : null,
jobData: jobData ? sql`jsonb_concat("jobData", ${JSON.stringify({jobData})})` : undefined
}))
.where('id', '=', jobId)
.executeTakeFirstOrThrow()
}

private finishJob = async (jobId: number) => {
const pg = getKysely()
await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', jobId).executeTakeFirstOrThrow()
}

private addNextJob = async (
jobType: JobType,
priority: number,
data: Record<string, any> | Record<string, any>[]
) => {
const pg = getKysely()
const getValues = (datum: any, idx = 0) => {
const {embeddingsMetadataId, model, ...jobData} = datum
return {
jobType,
// increment by idx so the first item goes first
priority: priority + idx,
embeddingsMetadataId,
model,
jobData: JSON.stringify(jobData)
}
}
const values = Array.isArray(data) ? data.map(getValues) : getValues(data)
await pg.insertInto('EmbeddingsJobQueue').values(values).execute()
}

runStep = async (job: DBJob) => {
const {id: jobId, jobData, jobType, priority, retryCount, embeddingsMetadataId, model} = job
const {workflowName, stepName} = EmbedderJobType.split(jobType)
const workflow = this.workflows[workflowName]
if (!workflow)
return this.failJob(
jobId,
retryCount,
new JobQueueError(`Workflow ${workflowName} not found`)
)
const step = workflow[stepName]
if (!step)
return this.failJob(jobId, retryCount, new JobQueueError(`Step ${stepName} not found`))
const {run, getNextStep} = step
const dataLoader = new RootDataLoader()
let result: Awaited<ReturnType<typeof run>> = false
const data = {...jobData, embeddingsMetadataId, model}
try {
result = await run({dataLoader, data})
} catch (e) {
if (e instanceof Error) {
result = new JobQueueError(`Uncaught error: ${e.message}`)
result.stack = e.stack
}
}
if (result instanceof JobQueueError) return this.failJob(jobId, retryCount, result)
await this.finishJob(jobId)
if (result === false) return
const nextStepName = await getNextStep?.({dataLoader, data: {...data, ...result}})
if (!nextStepName) return
const nextJobType = EmbedderJobType.join(workflowName, nextStepName)
await this.addNextJob(nextJobType, priority, result)
}
}
16 changes: 14 additions & 2 deletions packages/embedder/addEmbeddingsMetadata.ts
@@ -1,7 +1,19 @@
import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRetrospectiveDiscussionTopic'
import {MessageToEmbedder} from './custom'
import {EmbeddingObjectType} from './custom'

export const addEmbeddingsMetadata = async ({objectTypes, ...options}: MessageToEmbedder) => {
export type AddEmbeddingsMetadataParams = {
startAt?: Date
endAt?: Date
}

type AddEmbeddingsMetadataOptions = AddEmbeddingsMetadataParams & {
objectTypes: EmbeddingObjectType[]
}

export const addEmbeddingsMetadata = async ({
objectTypes,
...options
}: AddEmbeddingsMetadataOptions) => {
return Promise.all(
objectTypes.map((type) => {
switch (type) {
Expand Down
@@ -1,14 +1,13 @@
import {ExpressionOrFactory, SqlBool, sql} from 'kysely'
import getRethink from 'parabol-server/database/rethinkDriver'
import {RDatum} from 'parabol-server/database/stricterR'
import getKysely from 'parabol-server/postgres/getKysely'
import {DB} from 'parabol-server/postgres/pg'
import {Logger} from 'parabol-server/utils/Logger'
import {EMBEDDER_JOB_PRIORITY} from './EMBEDDER_JOB_PRIORITY'
import getModelManager from './ai_models/ModelManager'
import {EmbedderOptions} from './custom'
import getKysely from '../server/postgres/getKysely'
import {AddEmbeddingsMetadataParams} from './addEmbeddingsMetadata'
import {insertDiscussionsIntoMetadataAndQueue} from './insertDiscussionsIntoMetadataAndQueue'

interface DiscussionMeta {
export interface DiscussionMeta {
id: string
teamId: string
createdAt: Date
Expand All @@ -29,79 +28,18 @@ const validateDiscussions = async (discussions: (DiscussionMeta & {meetingId: st
return discussions.filter(({meetingId}) => endedMeetingIdsSet.has(meetingId))
}

const insertDiscussionsIntoMetadata = async (discussions: DiscussionMeta[], priority: number) => {
const pg = getKysely()
const metadataRows = discussions.map(({id, teamId, createdAt}) => ({
refId: id,
objectType: 'retrospectiveDiscussionTopic' as const,
teamId,
// Not techincally updatedAt since discussions are be updated after they get created
refUpdatedAt: createdAt
}))
if (!metadataRows[0]) return

const modelManager = getModelManager()
const tableNames = [...modelManager.embeddingModels.keys()]
return (
pg
.with('Insert', (qc) =>
qc
.insertInto('EmbeddingsMetadata')
.values(metadataRows)
.onConflict((oc) => oc.doNothing())
.returning('id')
)
// create n*m rows for n models & m discussions
.with('Metadata', (qc) =>
qc
.selectFrom('Insert')
.fullJoin(
sql<{model: string}>`UNNEST(ARRAY[${sql.join(tableNames)}])`.as('model'),
(join) => join.onTrue()
)
.select(['id', 'model'])
)
.insertInto('EmbeddingsJobQueue')
.columns(['jobType', 'priority', 'jobData'])
.expression(({selectFrom}) =>
selectFrom('Metadata').select(({lit, fn, ref}) => [
sql.lit('embed').as('jobType'),
lit(priority).as('priority'),
fn('json_build_object', [
sql.lit('embeddingsMetadataId'),
ref('Metadata.id'),
sql.lit('model'),
ref('Metadata.model')
]).as('jobData')
])
)
.execute()
)
}

export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ({
startAt,
endAt,
meetingId
}: EmbedderOptions) => {
// load up the metadata table will all discussion topics that are a part of meetings ended within the given date range
endAt
}: AddEmbeddingsMetadataParams) => {
const pg = getKysely()
if (meetingId) {
const discussions = await pg
.selectFrom('Discussion')
.select(['id', 'teamId', 'createdAt'])
.where('meetingId', '=', meetingId)
.execute()
await insertDiscussionsIntoMetadata(discussions, EMBEDDER_JOB_PRIORITY.MEETING)
return
}
// PG only accepts 65K parameters (inserted columns * number of rows + query params). Make the batches as big as possible
const PG_MAX_PARAMS = 65535
const QUERY_PARAMS = 10
const METADATA_COLS_PER_ROW = 4
const BATCH_SIZE = Math.floor((PG_MAX_PARAMS - QUERY_PARAMS) / METADATA_COLS_PER_ROW)
const pgStartAt = startAt || new Date(0)
const pgEndAt = (endAt || new Date('4000-01-01')).getTime() / 1000
const pgEndAt = (endAt || new Date('4000')).getTime() / 1000

let curEndAt = pgEndAt
let curEndId = ''
Expand Down Expand Up @@ -135,7 +73,7 @@ export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ({
curEndId = curEndAt === createdAtEpoch ? id : ''
curEndAt = createdAtEpoch
const validDiscussions = await validateDiscussions(discussions)
await insertDiscussionsIntoMetadata(validDiscussions, EMBEDDER_JOB_PRIORITY.TOPIC_HISTORY)
await insertDiscussionsIntoMetadataAndQueue(validDiscussions, 5)
const jsTime = new Date(createdAtEpoch * 1000)
Logger.log(
`Inserted ${validDiscussions.length}/${discussions.length} discussions in metadata ending at ${jsTime}`
Expand Down

0 comments on commit 15a54fb

Please sign in to comment.