Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: related discussions refactor #9557

Merged
merged 63 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
b3fa7ba
feat: add embedder service
jordanh Feb 9, 2024
854f0ef
fix: type and class interface changes, assert AbstractModelCase commit 1
jordanh Feb 15, 2024
372e7c9
fix: code review updates
jordanh Feb 15, 2024
97aae3b
feat: added related discussions feature
jordanh Feb 21, 2024
d86eb2d
feat: support ad-hoc metadata inserts
mattkrick Mar 7, 2024
8628e3e
feat: triggers from metadata to queue
mattkrick Mar 8, 2024
285f69b
feat: move job queue to pg
mattkrick Mar 8, 2024
0b024dd
begin retry
mattkrick Mar 13, 2024
ae6126f
feat: support retries after queue goes dry
mattkrick Mar 13, 2024
9fb9c4f
chore: separate abstract models into their own files
mattkrick Mar 13, 2024
20f5d96
feat: support recursive text splitting for chunks
mattkrick Mar 14, 2024
eff8eb9
validate discussions to only include ended meetings
mattkrick Mar 18, 2024
7ec3d6a
support historical info
mattkrick Mar 18, 2024
49c3eae
Merge branch 'master' into feat/embedder1
mattkrick Mar 18, 2024
46dafac
feat: support listening to jobs from app
mattkrick Mar 19, 2024
47e83d9
feat: support calls from app
mattkrick Mar 19, 2024
7cee59e
merge master
mattkrick Mar 19, 2024
5c96c41
fix migration conflict
mattkrick Mar 19, 2024
49b349b
fix: import history
mattkrick Mar 19, 2024
bf9a379
fix: dataloader mem leak
mattkrick Mar 19, 2024
f7ccdc6
feat: handle stalled jobs
mattkrick Mar 19, 2024
b573083
self-review
mattkrick Mar 19, 2024
ba906e0
remove redlock for non-historical jobs
mattkrick Mar 19, 2024
10953b5
feat: clean comments
mattkrick Mar 19, 2024
7953939
remove pubsub
mattkrick Mar 19, 2024
c2b6753
fix lint errors
mattkrick Mar 19, 2024
8852ac2
fix lint
mattkrick Mar 19, 2024
2ba07c1
cast to any for CI
mattkrick Mar 19, 2024
7bcd083
build embeddings table in CI for the typings
mattkrick Mar 19, 2024
368b34c
codecheck after server starts
mattkrick Mar 19, 2024
1158523
use pgvector in CI
mattkrick Mar 19, 2024
ddfc151
POSTGRES_USE_PGVECTOR='true'
mattkrick Mar 19, 2024
07e0d12
lazy Kysely Codegen
mattkrick Mar 20, 2024
8360b95
fix: release-please and CI pgvector image bump
mattkrick Mar 20, 2024
e74754d
chore: rename files for clarity
mattkrick Mar 20, 2024
6dcdcc8
merge master
mattkrick Mar 21, 2024
e7991d3
rename yaml to yml
mattkrick Mar 21, 2024
3a9fd5c
Merge branch 'feat/embedder1' into feat/embedder-related-discussions1
mattkrick Mar 22, 2024
50b23c9
fix: dataloader type
mattkrick Mar 22, 2024
32fffbd
Merge branch 'master' into feat/embedder-related-discussions1
mattkrick Apr 1, 2024
c0daab3
fix: merge conflicts
mattkrick Apr 1, 2024
9c7430a
fix: refactor jobs to workflows
mattkrick Apr 4, 2024
e9fd17f
feat: add EmbedWorkflow
mattkrick Apr 4, 2024
b437e4a
feat: logPerformance so we can benchmark worker count
mattkrick Apr 4, 2024
2436a19
fix: chunking
mattkrick Apr 5, 2024
41e47a4
feat: promote embeddingsMetadataId, model to columns in jobqueue
mattkrick Apr 5, 2024
b14c8c8
fix sql bug
mattkrick Apr 5, 2024
da5fd5b
fix: truncate URLs pre-tokenization
mattkrick Apr 5, 2024
f18314d
fix: better catching on language mismatches
mattkrick Apr 8, 2024
be796c7
debug complete
mattkrick Apr 9, 2024
fa030be
fix: junk inputs full of lengths of delimiters to avoid DoS
mattkrick Apr 9, 2024
60109e3
Merge branch 'master' into feat/embedder-related-discussions1
mattkrick Apr 9, 2024
47954cb
fix: rename migration
mattkrick Apr 9, 2024
0f88e71
Merge branch 'master' into feat/embedder-related-discussions1
mattkrick Apr 9, 2024
743a93d
fix: prettier
mattkrick Apr 9, 2024
0ce2075
support instances that do not use embedder
mattkrick Apr 9, 2024
69e67b5
fix similar retro topics query
mattkrick Apr 10, 2024
b465ee7
fix: publishing from embedder
mattkrick Apr 10, 2024
c57ff5c
only sleep for 10 seconds if no jobs are found
mattkrick Apr 10, 2024
06d68d3
fix: simplify orchestrator API
mattkrick Apr 10, 2024
07abeaf
fix: use only 1 embedder for related discussions
mattkrick Apr 10, 2024
c0cab43
fix runtime error
mattkrick Apr 10, 2024
3f4fdf3
fix lint
mattkrick Apr 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
"minimist": "^1.2.5",
"node-loader": "^2.0.0",
"pg-promise": "^11.2.0",
"pm2": "^5.3.0",
"pm2": "^5.3.1",
"postcss": "^8.4.21",
"postcss-loader": "^7.0.2",
"prettier": "^3.2.5",
Expand Down
6 changes: 0 additions & 6 deletions packages/embedder/EMBEDDER_JOB_PRIORITY.ts

This file was deleted.

10 changes: 10 additions & 0 deletions packages/embedder/EmbedderJobType.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {JobType} from './custom'

type SplitJobType<T extends string> = T extends `${infer W}:${infer S}` ? [W, S] : never
export const EmbedderJobType = {
join: (workflowName: string, stepName: string) => `${workflowName}:${stepName}` as JobType,
split: (jobType: JobType) => {
const [workflowName, stepName] = jobType.split(':') as SplitJobType<typeof jobType>
return {workflowName, stepName}
}
}
40 changes: 14 additions & 26 deletions packages/embedder/EmbeddingsJobQueueStream.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,22 @@
import {Selectable, sql} from 'kysely'
import {sql} from 'kysely'
import ms from 'ms'
import sleep from 'parabol-client/utils/sleep'
import 'parabol-server/initSentry'
import getKysely from 'parabol-server/postgres/getKysely'
import {DB} from 'parabol-server/postgres/pg'
import RootDataLoader from '../server/dataloader/RootDataLoader'
import {processJob} from './processJob'
import {Logger} from '../server/utils/Logger'
import {EmbeddingsTableName} from './ai_models/AbstractEmbeddingsModel'
import {WorkflowOrchestrator} from './WorkflowOrchestrator'
import {DBJob} from './custom'

export type DBJob = Selectable<DB['EmbeddingsJobQueue']>
export type EmbedJob = DBJob & {
jobType: 'embed'
jobData: {
embeddingsMetadataId: number
model: EmbeddingsTableName
}
}
export type RerankJob = DBJob & {jobType: 'rerank'; jobData: {discussionIds: string[]}}
export type Job = EmbedJob | RerankJob

export class EmbeddingsJobQueueStream implements AsyncIterableIterator<Job> {
export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
[Symbol.asyncIterator]() {
return this
}
dataLoader = new RootDataLoader({maxBatchSize: 1000})
async next(): Promise<IteratorResult<Job>> {

orchestrator: WorkflowOrchestrator
constructor(orchestrator: WorkflowOrchestrator) {
this.orchestrator = orchestrator
}
async next(): Promise<IteratorResult<DBJob>> {
const pg = getKysely()
const getJob = (isFailed: boolean) => {
return pg
Expand All @@ -48,7 +39,7 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<Job> {
.set({state: 'running', startAt: new Date()})
.where('id', '=', sql<number>`ANY(SELECT id FROM ids)`)
.returningAll()
.executeTakeFirst()
.executeTakeFirst() as Promise<DBJob | undefined>
}
const job = (await getJob(false)) || (await getJob(true))
if (!job) {
Expand All @@ -58,16 +49,13 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<Job> {
return this.next()
}

const isSuccessful = await processJob(job as Job, this.dataLoader)
if (isSuccessful) {
await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', job.id).executeTakeFirstOrThrow()
}
return {done: false, value: job as Job}
await this.orchestrator.runStep(job)
return {done: false, value: job}
}
return() {
return Promise.resolve({done: true as const, value: undefined})
}
throw(error: any) {
return Promise.resolve({done: true, value: error})
return Promise.resolve({done: true as const, value: error})
}
}
19 changes: 19 additions & 0 deletions packages/embedder/JobQueueError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export class JobQueueError extends Error {
name = 'JobQueueError' as const
retryDelay?: number
maxRetries?: number
jobData?: Record<string, any>

constructor(
message: string,
retryDelay?: number,
maxRetries?: number,
jobData?: Record<string, any>
) {
super(message)
this.message = message
this.retryDelay = retryDelay
this.maxRetries = maxRetries
this.jobData = jobData
}
}
92 changes: 92 additions & 0 deletions packages/embedder/WorkflowOrchestrator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import {sql} from 'kysely'
import RootDataLoader from 'parabol-server/dataloader/RootDataLoader'
import getKysely from 'parabol-server/postgres/getKysely'
import {EmbedderJobType} from './EmbedderJobType'
import {JobQueueError} from './JobQueueError'
import {DBJob, JobType, Workflow} from './custom'

export class WorkflowOrchestrator {
workflows: Record<string, Workflow> = {}
constructor(workflows: Workflow[]) {
workflows.forEach((workflow) => {
this.workflows[workflow.name] = workflow
})
}

private failJob = async (jobId: number, retryCount: number, error: JobQueueError) => {
console.log('job failed', jobId, error)
const pg = getKysely()
const {message, retryDelay, jobData} = error
const maxRetries = error.maxRetries ?? 1e6
await pg
.updateTable('EmbeddingsJobQueue')
.set((eb) => ({
state: 'failed',
stateMessage: message,
retryCount: eb('retryCount', '+', 1),
retryAfter:
retryDelay && retryCount < maxRetries ? new Date(Date.now() + retryDelay) : null,
jobData: jobData ? sql`jsonb_concat("jobData", ${JSON.stringify({jobData})})` : undefined
}))
.where('id', '=', jobId)
.executeTakeFirstOrThrow()
}

private finishJob = async (jobId: number) => {
const pg = getKysely()
await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', jobId).executeTakeFirstOrThrow()
}

private addNextJob = async (
jobType: JobType,
priority: number,
data: Record<string, any> | Record<string, any>[]
) => {
const pg = getKysely()
const getValues = (datum: any, idx = 0) => {
const {embeddingsMetadataId, model, ...jobData} = datum
return {
jobType,
// increment by idx so the first item goes first
priority: priority + idx,
embeddingsMetadataId,
model,
jobData: JSON.stringify(jobData)
}
}
const values = Array.isArray(data) ? data.map(getValues) : getValues(data)
await pg.insertInto('EmbeddingsJobQueue').values(values).execute()
}

runStep = async (job: DBJob) => {
const {id: jobId, jobData, jobType, priority, retryCount, embeddingsMetadataId, model} = job
const {workflowName, stepName} = EmbedderJobType.split(jobType)
const workflow = this.workflows[workflowName]
if (!workflow)
return this.failJob(
jobId,
retryCount,
new JobQueueError(`Workflow ${workflowName} not found`)
)
const step = workflow.steps[stepName]
if (!step)
return this.failJob(jobId, retryCount, new JobQueueError(`Step ${stepName} not found`))
const {run, getNextStep} = step
const dataLoader = new RootDataLoader()
let result: Awaited<ReturnType<typeof run>> = false
try {
result = await run({dataLoader, data: {...jobData, embeddingsMetadataId, model}})
} catch (e) {
if (e instanceof Error) {
result = new JobQueueError(`Uncaught error: ${e.message}`)
result.stack = e.stack
}
}
if (result instanceof JobQueueError) return this.failJob(jobId, retryCount, result)
if (result === false) return this.finishJob(jobId)
const nextStepName = await getNextStep?.({dataLoader, data: result})
if (!nextStepName) return this.finishJob(jobId)
const nextJobType = EmbedderJobType.join(workflowName, nextStepName)
await this.addNextJob(nextJobType, priority, result)
}
}
16 changes: 14 additions & 2 deletions packages/embedder/addEmbeddingsMetadata.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRetrospectiveDiscussionTopic'
import {MessageToEmbedder} from './custom'
import {EmbeddingObjectType} from './custom'

export const addEmbeddingsMetadata = async ({objectTypes, ...options}: MessageToEmbedder) => {
export type AddEmbeddingsMetadataParams = {
startAt?: Date
endAt?: Date
}

type AddEmbeddingsMetadataOptions = AddEmbeddingsMetadataParams & {
objectTypes: EmbeddingObjectType[]
}

export const addEmbeddingsMetadata = async ({
objectTypes,
...options
}: AddEmbeddingsMetadataOptions) => {
return Promise.all(
objectTypes.map((type) => {
switch (type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import {ExpressionOrFactory, SqlBool, sql} from 'kysely'
import getRethink from 'parabol-server/database/rethinkDriver'
import {RDatum} from 'parabol-server/database/stricterR'
import getKysely from 'parabol-server/postgres/getKysely'
import {DB} from 'parabol-server/postgres/pg'
import {Logger} from 'parabol-server/utils/Logger'
import {EMBEDDER_JOB_PRIORITY} from './EMBEDDER_JOB_PRIORITY'
import getModelManager from './ai_models/ModelManager'
import {EmbedderOptions} from './custom'
import getKysely from '../server/postgres/getKysely'
import {AddEmbeddingsMetadataParams} from './addEmbeddingsMetadata'
import {insertDiscussionsIntoMetadataAndQueue} from './insertDiscussionsIntoMetadataAndQueue'

interface DiscussionMeta {
export interface DiscussionMeta {
id: string
teamId: string
createdAt: Date
Expand All @@ -29,79 +28,18 @@ const validateDiscussions = async (discussions: (DiscussionMeta & {meetingId: st
return discussions.filter(({meetingId}) => endedMeetingIdsSet.has(meetingId))
}

const insertDiscussionsIntoMetadata = async (discussions: DiscussionMeta[], priority: number) => {
const pg = getKysely()
const metadataRows = discussions.map(({id, teamId, createdAt}) => ({
refId: id,
objectType: 'retrospectiveDiscussionTopic' as const,
teamId,
// Not techincally updatedAt since discussions are be updated after they get created
refUpdatedAt: createdAt
}))
if (!metadataRows[0]) return

const modelManager = getModelManager()
const tableNames = [...modelManager.embeddingModels.keys()]
return (
pg
.with('Insert', (qc) =>
qc
.insertInto('EmbeddingsMetadata')
.values(metadataRows)
.onConflict((oc) => oc.doNothing())
.returning('id')
)
// create n*m rows for n models & m discussions
.with('Metadata', (qc) =>
qc
.selectFrom('Insert')
.fullJoin(
sql<{model: string}>`UNNEST(ARRAY[${sql.join(tableNames)}])`.as('model'),
(join) => join.onTrue()
)
.select(['id', 'model'])
)
.insertInto('EmbeddingsJobQueue')
.columns(['jobType', 'priority', 'jobData'])
.expression(({selectFrom}) =>
selectFrom('Metadata').select(({lit, fn, ref}) => [
sql.lit('embed').as('jobType'),
lit(priority).as('priority'),
fn('json_build_object', [
sql.lit('embeddingsMetadataId'),
ref('Metadata.id'),
sql.lit('model'),
ref('Metadata.model')
]).as('jobData')
])
)
.execute()
)
}

export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ({
startAt,
endAt,
meetingId
}: EmbedderOptions) => {
// load up the metadata table will all discussion topics that are a part of meetings ended within the given date range
endAt
}: AddEmbeddingsMetadataParams) => {
const pg = getKysely()
if (meetingId) {
const discussions = await pg
.selectFrom('Discussion')
.select(['id', 'teamId', 'createdAt'])
.where('meetingId', '=', meetingId)
.execute()
await insertDiscussionsIntoMetadata(discussions, EMBEDDER_JOB_PRIORITY.MEETING)
return
}
// PG only accepts 65K parameters (inserted columns * number of rows + query params). Make the batches as big as possible
const PG_MAX_PARAMS = 65535
const QUERY_PARAMS = 10
const METADATA_COLS_PER_ROW = 4
const BATCH_SIZE = Math.floor((PG_MAX_PARAMS - QUERY_PARAMS) / METADATA_COLS_PER_ROW)
const pgStartAt = startAt || new Date(0)
const pgEndAt = (endAt || new Date('4000-01-01')).getTime() / 1000
const pgEndAt = (endAt || new Date('4000')).getTime() / 1000

let curEndAt = pgEndAt
let curEndId = ''
Expand Down Expand Up @@ -135,7 +73,7 @@ export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ({
curEndId = curEndAt === createdAtEpoch ? id : ''
curEndAt = createdAtEpoch
const validDiscussions = await validateDiscussions(discussions)
await insertDiscussionsIntoMetadata(validDiscussions, EMBEDDER_JOB_PRIORITY.TOPIC_HISTORY)
await insertDiscussionsIntoMetadataAndQueue(validDiscussions, 5)
const jsTime = new Date(createdAtEpoch * 1000)
Logger.log(
`Inserted ${validDiscussions.length}/${discussions.length} discussions in metadata ending at ${jsTime}`
Expand Down