diff --git a/package.json b/package.json index 4c634718ee7..277c5ee433b 100644 --- a/package.json +++ b/package.json @@ -111,7 +111,7 @@ "minimist": "^1.2.5", "node-loader": "^2.0.0", "pg-promise": "^11.2.0", - "pm2": "^5.3.0", + "pm2": "^5.3.1", "postcss": "^8.4.21", "postcss-loader": "^7.0.2", "prettier": "^3.2.5", diff --git a/packages/client/components/RetroDrawerRoot.tsx b/packages/client/components/RetroDrawerRoot.tsx index 58bf63544ff..cb2698bbbe3 100644 --- a/packages/client/components/RetroDrawerRoot.tsx +++ b/packages/client/components/RetroDrawerRoot.tsx @@ -1,6 +1,6 @@ import React, {Suspense} from 'react' -import useQueryLoaderNow from '../hooks/useQueryLoaderNow' import retroDrawerQuery, {RetroDrawerQuery} from '../__generated__/RetroDrawerQuery.graphql' +import useQueryLoaderNow from '../hooks/useQueryLoaderNow' import RetroDrawer from './RetroDrawer' type Props = { @@ -12,7 +12,7 @@ type Props = { const RetroDrawerRoot = (props: Props) => { const {showDrawer, setShowDrawer, meetingId} = props const queryRef = useQueryLoaderNow(retroDrawerQuery, { - first: 200, + first: 2000, type: 'retrospective', meetingId }) diff --git a/packages/embedder/EMBEDDER_JOB_PRIORITY.ts b/packages/embedder/EMBEDDER_JOB_PRIORITY.ts deleted file mode 100644 index a54e4b5c67d..00000000000 --- a/packages/embedder/EMBEDDER_JOB_PRIORITY.ts +++ /dev/null @@ -1,6 +0,0 @@ -export const EMBEDDER_JOB_PRIORITY = { - MEETING: 40, - DEFAULT: 50, - TOPIC_HISTORY: 80, - NEW_MODEL: 90 -} as const diff --git a/packages/embedder/EmbedderJobType.ts b/packages/embedder/EmbedderJobType.ts new file mode 100644 index 00000000000..788731ff56f --- /dev/null +++ b/packages/embedder/EmbedderJobType.ts @@ -0,0 +1,10 @@ +import {JobType} from './custom' + +type SplitJobType = T extends `${infer W}:${infer S}` ? [W, S] : never +export const EmbedderJobType = { + join: (workflowName: string, stepName: string) => `${workflowName}:${stepName}` as JobType, + split: (jobType: JobType) => { + const [workflowName, stepName] = jobType.split(':') as SplitJobType + return {workflowName, stepName} + } +} diff --git a/packages/embedder/EmbeddingsJobQueueStream.ts b/packages/embedder/EmbeddingsJobQueueStream.ts index 64f7de936dd..cb995821f4b 100644 --- a/packages/embedder/EmbeddingsJobQueueStream.ts +++ b/packages/embedder/EmbeddingsJobQueueStream.ts @@ -1,31 +1,22 @@ -import {Selectable, sql} from 'kysely' +import {sql} from 'kysely' import ms from 'ms' import sleep from 'parabol-client/utils/sleep' import 'parabol-server/initSentry' import getKysely from 'parabol-server/postgres/getKysely' -import {DB} from 'parabol-server/postgres/pg' -import RootDataLoader from '../server/dataloader/RootDataLoader' -import {processJob} from './processJob' import {Logger} from '../server/utils/Logger' -import {EmbeddingsTableName} from './ai_models/AbstractEmbeddingsModel' +import {WorkflowOrchestrator} from './WorkflowOrchestrator' +import {DBJob} from './custom' -export type DBJob = Selectable -export type EmbedJob = DBJob & { - jobType: 'embed' - jobData: { - embeddingsMetadataId: number - model: EmbeddingsTableName - } -} -export type RerankJob = DBJob & {jobType: 'rerank'; jobData: {discussionIds: string[]}} -export type Job = EmbedJob | RerankJob - -export class EmbeddingsJobQueueStream implements AsyncIterableIterator { +export class EmbeddingsJobQueueStream implements AsyncIterableIterator { [Symbol.asyncIterator]() { return this } - dataLoader = new RootDataLoader({maxBatchSize: 1000}) - async next(): Promise> { + + orchestrator: WorkflowOrchestrator + constructor(orchestrator: WorkflowOrchestrator) { + this.orchestrator = orchestrator + } + async next(): Promise> { const pg = getKysely() const getJob = (isFailed: boolean) => { return pg @@ -54,20 +45,17 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator { if (!job) { Logger.log('JobQueueStream: no jobs found') // queue is empty, so sleep for a while - await sleep(ms('1m')) + await sleep(ms('10s')) return this.next() } - const isSuccessful = await processJob(job as Job, this.dataLoader) - if (isSuccessful) { - await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', job.id).executeTakeFirstOrThrow() - } - return {done: false, value: job as Job} + await this.orchestrator.runStep(job) + return {done: false, value: job} } return() { return Promise.resolve({done: true as const, value: undefined}) } throw(error: any) { - return Promise.resolve({done: true, value: error}) + return Promise.resolve({done: true as const, value: error}) } } diff --git a/packages/embedder/JobQueueError.ts b/packages/embedder/JobQueueError.ts new file mode 100644 index 00000000000..9f88425f3e7 --- /dev/null +++ b/packages/embedder/JobQueueError.ts @@ -0,0 +1,19 @@ +export class JobQueueError extends Error { + name = 'JobQueueError' as const + retryDelay?: number + maxRetries?: number + jobData?: Record + + constructor( + message: string, + retryDelay?: number, + maxRetries?: number, + jobData?: Record + ) { + super(message) + this.message = message + this.retryDelay = retryDelay + this.maxRetries = maxRetries + this.jobData = jobData + } +} diff --git a/packages/embedder/WorkflowOrchestrator.ts b/packages/embedder/WorkflowOrchestrator.ts new file mode 100644 index 00000000000..704320542be --- /dev/null +++ b/packages/embedder/WorkflowOrchestrator.ts @@ -0,0 +1,117 @@ +import {sql} from 'kysely' +import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' +import getKysely from 'parabol-server/postgres/getKysely' +import {Logger} from '../server/utils/Logger' +import {EmbedderJobType} from './EmbedderJobType' +import {JobQueueError} from './JobQueueError' +import {DBJob, JobType, Workflow} from './custom' +import {embedMetadata} from './workflows/embedMetadata' +import {getSimilarRetroTopics} from './workflows/getSimilarRetroTopics' +import {relatedDiscussionsStart} from './workflows/relatedDiscussionsStart' +import {rerankRetroTopics} from './workflows/rerankRetroTopics' + +export class WorkflowOrchestrator { + workflows: Record = { + embed: { + start: { + run: embedMetadata + } + }, + relatedDiscussions: { + start: { + run: relatedDiscussionsStart, + getNextStep: () => 'embed' + }, + embed: { + run: embedMetadata, + getNextStep: () => 'getSimilarRetroTopics' + }, + getSimilarRetroTopics: { + run: getSimilarRetroTopics, + getNextStep: () => 'rerank' + }, + rerank: { + run: rerankRetroTopics + } + } + } + + private failJob = async (jobId: number, retryCount: number, error: JobQueueError) => { + Logger.error(error) + const pg = getKysely() + const {message, retryDelay, jobData} = error + const maxRetries = error.maxRetries ?? 1e6 + await pg + .updateTable('EmbeddingsJobQueue') + .set((eb) => ({ + state: 'failed', + stateMessage: message, + retryCount: eb('retryCount', '+', 1), + retryAfter: + retryDelay && retryCount < maxRetries ? new Date(Date.now() + retryDelay) : null, + jobData: jobData ? sql`jsonb_concat("jobData", ${JSON.stringify({jobData})})` : undefined + })) + .where('id', '=', jobId) + .executeTakeFirstOrThrow() + } + + private finishJob = async (jobId: number) => { + const pg = getKysely() + await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', jobId).executeTakeFirstOrThrow() + } + + private addNextJob = async ( + jobType: JobType, + priority: number, + data: Record | Record[] + ) => { + const pg = getKysely() + const getValues = (datum: any, idx = 0) => { + const {embeddingsMetadataId, model, ...jobData} = datum + return { + jobType, + // increment by idx so the first item goes first + priority: priority + idx, + embeddingsMetadataId, + model, + jobData: JSON.stringify(jobData) + } + } + const values = Array.isArray(data) ? data.map(getValues) : getValues(data) + await pg.insertInto('EmbeddingsJobQueue').values(values).execute() + } + + runStep = async (job: DBJob) => { + const {id: jobId, jobData, jobType, priority, retryCount, embeddingsMetadataId, model} = job + const {workflowName, stepName} = EmbedderJobType.split(jobType) + const workflow = this.workflows[workflowName] + if (!workflow) + return this.failJob( + jobId, + retryCount, + new JobQueueError(`Workflow ${workflowName} not found`) + ) + const step = workflow[stepName] + if (!step) + return this.failJob(jobId, retryCount, new JobQueueError(`Step ${stepName} not found`)) + const {run, getNextStep} = step + const dataLoader = new RootDataLoader() + let result: Awaited> = false + const data = {...jobData, embeddingsMetadataId, model} + try { + result = await run({dataLoader, data}) + } catch (e) { + if (e instanceof Error) { + result = new JobQueueError(`Uncaught error: ${e.message}`) + result.stack = e.stack + } + } + if (result instanceof JobQueueError) return this.failJob(jobId, retryCount, result) + await this.finishJob(jobId) + if (result === false) return + const nextStepName = await getNextStep?.({dataLoader, data: {...data, ...result}}) + if (!nextStepName) return + const nextJobType = EmbedderJobType.join(workflowName, nextStepName) + await this.addNextJob(nextJobType, priority, result) + } +} diff --git a/packages/embedder/addEmbeddingsMetadata.ts b/packages/embedder/addEmbeddingsMetadata.ts index 214fecc0409..86b7801b288 100644 --- a/packages/embedder/addEmbeddingsMetadata.ts +++ b/packages/embedder/addEmbeddingsMetadata.ts @@ -1,7 +1,19 @@ import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRetrospectiveDiscussionTopic' -import {MessageToEmbedder} from './custom' +import {EmbeddingObjectType} from './custom' -export const addEmbeddingsMetadata = async ({objectTypes, ...options}: MessageToEmbedder) => { +export type AddEmbeddingsMetadataParams = { + startAt?: Date + endAt?: Date +} + +type AddEmbeddingsMetadataOptions = AddEmbeddingsMetadataParams & { + objectTypes: EmbeddingObjectType[] +} + +export const addEmbeddingsMetadata = async ({ + objectTypes, + ...options +}: AddEmbeddingsMetadataOptions) => { return Promise.all( objectTypes.map((type) => { switch (type) { diff --git a/packages/embedder/addEmbeddingsMetadataForRetrospectiveDiscussionTopic.ts b/packages/embedder/addEmbeddingsMetadataForRetrospectiveDiscussionTopic.ts index cd4489e3942..72bd3519779 100644 --- a/packages/embedder/addEmbeddingsMetadataForRetrospectiveDiscussionTopic.ts +++ b/packages/embedder/addEmbeddingsMetadataForRetrospectiveDiscussionTopic.ts @@ -1,14 +1,13 @@ import {ExpressionOrFactory, SqlBool, sql} from 'kysely' import getRethink from 'parabol-server/database/rethinkDriver' import {RDatum} from 'parabol-server/database/stricterR' -import getKysely from 'parabol-server/postgres/getKysely' import {DB} from 'parabol-server/postgres/pg' import {Logger} from 'parabol-server/utils/Logger' -import {EMBEDDER_JOB_PRIORITY} from './EMBEDDER_JOB_PRIORITY' -import getModelManager from './ai_models/ModelManager' -import {EmbedderOptions} from './custom' +import getKysely from '../server/postgres/getKysely' +import {AddEmbeddingsMetadataParams} from './addEmbeddingsMetadata' +import {insertDiscussionsIntoMetadataAndQueue} from './insertDiscussionsIntoMetadataAndQueue' -interface DiscussionMeta { +export interface DiscussionMeta { id: string teamId: string createdAt: Date @@ -29,79 +28,18 @@ const validateDiscussions = async (discussions: (DiscussionMeta & {meetingId: st return discussions.filter(({meetingId}) => endedMeetingIdsSet.has(meetingId)) } -const insertDiscussionsIntoMetadata = async (discussions: DiscussionMeta[], priority: number) => { - const pg = getKysely() - const metadataRows = discussions.map(({id, teamId, createdAt}) => ({ - refId: id, - objectType: 'retrospectiveDiscussionTopic' as const, - teamId, - // Not techincally updatedAt since discussions are be updated after they get created - refUpdatedAt: createdAt - })) - if (!metadataRows[0]) return - - const modelManager = getModelManager() - const tableNames = [...modelManager.embeddingModels.keys()] - return ( - pg - .with('Insert', (qc) => - qc - .insertInto('EmbeddingsMetadata') - .values(metadataRows) - .onConflict((oc) => oc.doNothing()) - .returning('id') - ) - // create n*m rows for n models & m discussions - .with('Metadata', (qc) => - qc - .selectFrom('Insert') - .fullJoin( - sql<{model: string}>`UNNEST(ARRAY[${sql.join(tableNames)}])`.as('model'), - (join) => join.onTrue() - ) - .select(['id', 'model']) - ) - .insertInto('EmbeddingsJobQueue') - .columns(['jobType', 'priority', 'jobData']) - .expression(({selectFrom}) => - selectFrom('Metadata').select(({lit, fn, ref}) => [ - sql.lit('embed').as('jobType'), - lit(priority).as('priority'), - fn('json_build_object', [ - sql.lit('embeddingsMetadataId'), - ref('Metadata.id'), - sql.lit('model'), - ref('Metadata.model') - ]).as('jobData') - ]) - ) - .execute() - ) -} - export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ({ startAt, - endAt, - meetingId -}: EmbedderOptions) => { - // load up the metadata table will all discussion topics that are a part of meetings ended within the given date range + endAt +}: AddEmbeddingsMetadataParams) => { const pg = getKysely() - if (meetingId) { - const discussions = await pg - .selectFrom('Discussion') - .select(['id', 'teamId', 'createdAt']) - .where('meetingId', '=', meetingId) - .execute() - await insertDiscussionsIntoMetadata(discussions, EMBEDDER_JOB_PRIORITY.MEETING) - return - } // PG only accepts 65K parameters (inserted columns * number of rows + query params). Make the batches as big as possible const PG_MAX_PARAMS = 65535 const QUERY_PARAMS = 10 const METADATA_COLS_PER_ROW = 4 const BATCH_SIZE = Math.floor((PG_MAX_PARAMS - QUERY_PARAMS) / METADATA_COLS_PER_ROW) const pgStartAt = startAt || new Date(0) - const pgEndAt = (endAt || new Date('4000-01-01')).getTime() / 1000 + const pgEndAt = (endAt || new Date('4000')).getTime() / 1000 let curEndAt = pgEndAt let curEndId = '' @@ -135,7 +73,7 @@ export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ({ curEndId = curEndAt === createdAtEpoch ? id : '' curEndAt = createdAtEpoch const validDiscussions = await validateDiscussions(discussions) - await insertDiscussionsIntoMetadata(validDiscussions, EMBEDDER_JOB_PRIORITY.TOPIC_HISTORY) + await insertDiscussionsIntoMetadataAndQueue(validDiscussions, 5) const jsTime = new Date(createdAtEpoch * 1000) Logger.log( `Inserted ${validDiscussions.length}/${discussions.length} discussions in metadata ending at ${jsTime}` diff --git a/packages/embedder/ai_models/AbstractEmbeddingsModel.ts b/packages/embedder/ai_models/AbstractEmbeddingsModel.ts index f9fb669737d..daa19fad9aa 100644 --- a/packages/embedder/ai_models/AbstractEmbeddingsModel.ts +++ b/packages/embedder/ai_models/AbstractEmbeddingsModel.ts @@ -3,8 +3,9 @@ import getKysely from 'parabol-server/postgres/getKysely' import {DB} from 'parabol-server/postgres/pg' import isValid from '../../server/graphql/isValid' import {Logger} from '../../server/utils/Logger' -import {EMBEDDER_JOB_PRIORITY} from '../EMBEDDER_JOB_PRIORITY' +import {getEmbedderPriority} from '../getEmbedderPriority' import {ISO6391} from '../iso6393To1' +import {URLRegex} from '../regex' import {AbstractModel} from './AbstractModel' export interface EmbeddingModelParams { @@ -19,7 +20,7 @@ export type EmbeddingsTable = Extract export abstract class AbstractEmbeddingsModel extends AbstractModel { readonly embeddingDimensions: number readonly maxInputTokens: number - readonly tableName: EmbeddingsTableName + readonly tableName: EmbeddingsTable readonly languages: ISO6391[] constructor(modelId: string, url: string) { super(url) @@ -27,22 +28,30 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel { this.embeddingDimensions = modelParams.embeddingDimensions this.languages = modelParams.languages this.maxInputTokens = modelParams.maxInputTokens - this.tableName = `Embeddings_${modelParams.tableSuffix}` + this.tableName = `Embeddings_${modelParams.tableSuffix}` as EmbeddingsTable } protected abstract constructModelParams(modelId: string): EmbeddingModelParams abstract getEmbedding(content: string, retries?: number): Promise abstract getTokens(content: string): Promise + private normalizeContent = (content: string, truncateUrls: boolean) => { + if (!truncateUrls) return content.trim() + // pathname & search can include a lot of garbage that doesn't help the meaning + return content.trim().replaceAll(URLRegex, (match) => new URL(match).origin) + } + async chunkText(content: string) { - const tokens = await this.getTokens(content) + const AVG_CHARS_PER_TOKEN = 20 + const maxContentLength = this.maxInputTokens * AVG_CHARS_PER_TOKEN + const tokens = content.length < maxContentLength ? await this.getTokens(content) : -1 if (tokens instanceof Error) return tokens - const isFullTextTooBig = tokens.length > this.maxInputTokens + const isFullTextTooBig = tokens === -1 || tokens.length > this.maxInputTokens if (!isFullTextTooBig) return [content] - - for (let i = 0; i < 3; i++) { + const normalizedContent = this.normalizeContent(content, true) + for (let i = 0; i < 5; i++) { const tokensPerWord = (4 + i) / 3 - const chunks = this.splitText(content, tokensPerWord) + const chunks = this.splitText(normalizedContent, tokensPerWord) const chunkLengths = await Promise.all( chunks.map(async (chunk) => { const chunkTokens = await this.getTokens(chunk) @@ -60,27 +69,30 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel { return chunks } } - return new Error(`Text is too long and could not be split into chunks. Is it english?`) + return new Error(`Text could not be chunked. The tokenizer cannot support this content.`) } // private because result must still be too long to go into model. Must verify with getTokens private splitText(content: string, tokensPerWord = 4 / 3) { - // it's actually 4 / 3, but don't want to chance a failed split const WORD_LIMIT = Math.floor(this.maxInputTokens / tokensPerWord) + // account for junk data with excessively long words + const charLimit = WORD_LIMIT * 100 const chunks: string[] = [] - const delimiters = ['\n\n', '\n', '.', ' '] + const delimiters = ['\n\n', '\n', '.', ' '] as const const countWords = (text: string) => text.trim().split(/\s+/).length - const splitOnDelimiter = (text: string, delimiter: string) => { + const splitOnDelimiter = (text: string, delimiter: (typeof delimiters)[number]) => { const sections = text.split(delimiter) for (let i = 0; i < sections.length; i++) { const section = sections[i]! + // account for multiple delimiters in a row + if (section.length === 0) continue const sectionWordCount = countWords(section) - if (sectionWordCount < WORD_LIMIT) { + if (sectionWordCount < WORD_LIMIT && section.length < charLimit) { // try to merge this section with the last one const previousSection = chunks.at(-1) if (previousSection) { const combinedChunks = `${previousSection}${delimiter}${section}` const mergedWordCount = countWords(combinedChunks) - if (mergedWordCount < WORD_LIMIT) { + if (mergedWordCount < WORD_LIMIT && combinedChunks.length < charLimit) { chunks[chunks.length - 1] = combinedChunks continue } @@ -92,26 +104,24 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel { } } } - splitOnDelimiter(content.trim(), delimiters[0]!) + splitOnDelimiter(content, delimiters[0]) return chunks } async createEmbeddingsForModel() { Logger.log(`Queueing EmbeddingsMetadata into EmbeddingsJobQueue for ${this.tableName}`) const pg = getKysely() + const priority = getEmbedderPriority(10) await pg .insertInto('EmbeddingsJobQueue') - .columns(['jobData', 'priority']) + .columns(['jobType', 'priority', 'embeddingsMetadataId', 'model']) .expression(({selectFrom}) => selectFrom('EmbeddingsMetadata') - .select(({fn, lit}) => [ - fn('json_build_object', [ - sql.lit('model'), - sql.lit(this.tableName), - sql.lit('embeddingsMetadataId'), - 'id' - ]).as('jobData'), - lit(EMBEDDER_JOB_PRIORITY.NEW_MODEL).as('priority') + .select(({lit, ref}) => [ + sql.lit('embed:start').as('jobType'), + lit(priority).as('priority'), + ref('id').as('embeddingsMetadataId'), + sql.lit(this.tableName).as('model') ]) .where('language', 'in', this.languages) ) @@ -136,9 +146,9 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel { "id" INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, "embedText" TEXT, "embedding" vector(${sql.raw(vectorDimensions.toString())}), - "embeddingsMetadataId" INTEGER UNIQUE NOT NULL, + "embeddingsMetadataId" INTEGER NOT NULL, "chunkNumber" SMALLINT, - UNIQUE("embeddingsMetadataId", "chunkNumber"), + UNIQUE NULLS NOT DISTINCT("embeddingsMetadataId", "chunkNumber"), FOREIGN KEY ("embeddingsMetadataId") REFERENCES "EmbeddingsMetadata"("id") ON DELETE CASCADE diff --git a/packages/embedder/ai_models/ModelManager.ts b/packages/embedder/ai_models/ModelManager.ts index bbbfa4f6273..09091499121 100644 --- a/packages/embedder/ai_models/ModelManager.ts +++ b/packages/embedder/ai_models/ModelManager.ts @@ -15,6 +15,11 @@ export interface ModelConfig { export class ModelManager { embeddingModels: Map generationModels: Map + getEmbedder(tableName?: EmbeddingsTableName): AbstractEmbeddingsModel { + return tableName + ? this.embeddingModels.get(tableName) + : this.embeddingModels.values().next().value + } private parseModelEnvVars(envVar: 'AI_EMBEDDING_MODELS' | 'AI_GENERATION_MODELS'): ModelConfig[] { const envValue = process.env[envVar] diff --git a/packages/embedder/ai_models/TextEmbeddingsInference.ts b/packages/embedder/ai_models/TextEmbeddingsInference.ts index 1a2a02596b3..594f9dcf1ce 100644 --- a/packages/embedder/ai_models/TextEmbeddingsInference.ts +++ b/packages/embedder/ai_models/TextEmbeddingsInference.ts @@ -1,4 +1,4 @@ -import createClient from 'openapi-fetch' +import createClient, {ClientMethod} from 'openapi-fetch' import sleep from 'parabol-client/utils/sleep' import type {paths} from '../textEmbeddingsnterface' import {AbstractEmbeddingsModel, EmbeddingModelParams} from './AbstractEmbeddingsModel' @@ -19,62 +19,68 @@ const modelIdDefinitions: Record = { } } -export class TextEmbeddingsInference extends AbstractEmbeddingsModel { - client: ReturnType> - constructor(modelId: string, url: string) { - super(modelId, url) - this.client = createClient({baseUrl: this.url}) - } - - async getTokens(content: string) { +const openAPIWithTimeout = + (client: ClientMethod, toError: (error: unknown) => any, timeout: number) => + async (...args: Parameters>) => { + const controller = new AbortController() + const {signal} = controller + const timeoutId = setTimeout(() => { + controller.abort(new Error('Timeout')) + }, timeout) + const [route, requestInit] = args + let response: any try { - const {data, error} = await this.client.POST('/tokenize', { - body: {inputs: content, add_special_tokens: true}, + response = await client(route, { + signal, headers: { Accept: 'application/json', 'Content-Type': 'application/json; charset=utf-8' - } + }, + ...requestInit }) - if (error) return new Error(error.error) - return data[0]!.map(({id}) => id) + clearTimeout(timeoutId) + return response } catch (e) { - return e instanceof Error ? e : new Error(e as string) + const error = toError(e) + return {error} } } +export class TextEmbeddingsInference extends AbstractEmbeddingsModel { + client: ReturnType> + constructor(modelId: string, url: string) { + super(modelId, url) + const client = createClient({baseUrl: this.url}) + const toError = (e: unknown) => ({error: e instanceof Error ? e.message : e}) + client.GET = openAPIWithTimeout(client.GET, toError, 10000) + client.POST = openAPIWithTimeout(client.POST, toError, 10000) + this.client = client + } + async getTokens(content: string) { + const {data, error} = await this.client.POST('/tokenize', { + body: {add_special_tokens: true, inputs: content} + }) + if (error) return new Error(error.error) + return data[0]!.map(({id}) => id) + } + async decodeTokens(inputIds: number[]) { - try { - const {data, error} = await this.client.POST('/decode', { - body: {ids: inputIds}, - headers: { - Accept: 'application/json', - 'Content-Type': 'application/json; charset=utf-8' - } - }) - if (error) return new Error(error.error) - return data - } catch (e) { - return e instanceof Error ? e : new Error(e as string) - } + const {data, error} = await this.client.POST('/decode', { + body: {ids: inputIds} + }) + if (error) return new Error(error.error) + return data } public async getEmbedding(content: string, retries = 5): Promise { - try { - const {data, error, response} = await this.client.POST('/embed', { - body: {inputs: content}, - headers: { - Accept: 'application/json', - 'Content-Type': 'application/json; charset=utf-8' - } - }) - if (error) { - if (response.status !== 429 || retries < 1) return new Error(error.error) - await sleep(2000) - return this.getEmbedding(content, retries - 1) - } - return data[0]! - } catch (e) { - return e instanceof Error ? e : new Error(e as string) + const {data, error, response} = await this.client.POST('/embed', { + body: {inputs: content} + }) + if (error) { + if (response.status !== 429 || retries < 1) return new Error(error.error) + await sleep(2000) + return this.getEmbedding(content, retries - 1) } + return data[0]! } protected constructModelParams(modelId: string): EmbeddingModelParams { diff --git a/packages/embedder/custom.d.ts b/packages/embedder/custom.d.ts index 6640974c6a9..b051eb3e4b3 100644 --- a/packages/embedder/custom.d.ts +++ b/packages/embedder/custom.d.ts @@ -1,11 +1,29 @@ +import {DataLoaderInstance} from '../server/dataloader/RootDataLoader' import type {DB} from '../server/postgres/pg' +import {JobQueueError} from './JobQueueError' export type EmbeddingObjectType = DB['EmbeddingsMetadata']['objectType'] -export interface MessageToEmbedder { - objectTypes: EmbeddingObjectType[] - startAt?: Date - endAt?: Date - meetingId?: string +type GetInputData = T extends JobQueueStepRun ? U : never +export type ParentJob = GetInputData | GetInputData[] + +interface StepContext { + dataLoader: DataLoaderInstance + data: TData } -export type EmbedderOptions = Omit + +type StepResult = Record | Record[] +export type JobQueueStepRun = ( + context: StepContext + // false if the job completed without error, but the flow should not continue + // e.g. could not create an embeddings because there wasn't enough text or it was in Russian +) => Promise + +interface JobQueueStep { + run: JobQueueStepRun + getNextStep?: (result: StepContext) => string | Promise +} + +export type JobType = `${string}:${string}` +export type Workflow = Record> +export type DBJob = Selectable diff --git a/packages/embedder/debug.ts b/packages/embedder/debug.ts new file mode 100644 index 00000000000..7edd0a011f6 --- /dev/null +++ b/packages/embedder/debug.ts @@ -0,0 +1,29 @@ +// call with yarn sucrase-node billing/debug.ts +import '../../scripts/webpack/utils/dotenv' +import getKysely from '../server/postgres/getKysely' +import {WorkflowOrchestrator} from './WorkflowOrchestrator' + +const debugFailedJob = async () => { + const pg = getKysely() + const failedJob = await pg + .selectFrom('EmbeddingsJobQueue') + .selectAll() + .orderBy(['priority']) + .where('state', '=', 'failed') + .limit(1) + .executeTakeFirst() + + if (!failedJob) { + console.log('No failed jobs found') + return + } + + console.log('Debugging job:', failedJob.id) + const orch = new WorkflowOrchestrator() + await orch.runStep(failedJob as any) + // const man = getModelManager() + // const model = man.embeddingModels.get('Embeddings_ember_1') + // const res = await model?.chunkText('hey there') +} + +debugFailedJob() diff --git a/packages/embedder/embedder.ts b/packages/embedder/embedder.ts index 7971ba4f717..e89a7b60a97 100644 --- a/packages/embedder/embedder.ts +++ b/packages/embedder/embedder.ts @@ -1,16 +1,14 @@ import tracer from 'dd-trace' -import EmbedderChannelId from 'parabol-client/shared/gqlIds/EmbedderChannelId' import 'parabol-server/initSentry' import {Logger} from 'parabol-server/utils/Logger' import RedisInstance from 'parabol-server/utils/RedisInstance' import {Tuple} from '../client/types/generics' -import RedisStream from '../gql-executor/RedisStream' import {EmbeddingsJobQueueStream} from './EmbeddingsJobQueueStream' -import {addEmbeddingsMetadata} from './addEmbeddingsMetadata' +import {WorkflowOrchestrator} from './WorkflowOrchestrator' import getModelManager from './ai_models/ModelManager' -import {MessageToEmbedder} from './custom' import {establishPrimaryEmbedder} from './establishPrimaryEmbedder' import {importHistoricalMetadata} from './importHistoricalMetadata' +import {logPerformance} from './logPerformance' import {mergeAsyncIterators} from './mergeAsyncIterators' import {resetStalledJobs} from './resetStalledJobs' @@ -22,19 +20,9 @@ tracer.init({ }) tracer.use('pg') -const parseEmbedderMessage = (message: string): MessageToEmbedder => { - const {startAt, endAt, ...input} = JSON.parse(message) - return { - ...input, - startAt: startAt ? new Date(startAt) : undefined, - endAt: endAt ? new Date(endAt) : undefined - } -} - const run = async () => { const SERVER_ID = process.env.SERVER_ID if (!SERVER_ID) throw new Error('env.SERVER_ID is required') - const embedderChannel = EmbedderChannelId.join(SERVER_ID) const NUM_WORKERS = parseInt(process.env.AI_EMBEDDER_WORKERS!) if (!(NUM_WORKERS > 0)) { Logger.log('env.AI_EMBEDDER_WORKERS is < 0. Embedder will not run.') @@ -44,14 +32,6 @@ const run = async () => { const redis = new RedisInstance(`embedder_${SERVER_ID}`) const primaryLock = await establishPrimaryEmbedder(redis) const modelManager = getModelManager() - let streams: AsyncIterableIterator | undefined = undefined - const kill = () => { - primaryLock?.release() - streams?.return?.() - process.exit() - } - process.on('SIGTERM', kill) - process.on('SIGINT', kill) if (primaryLock) { // only 1 worker needs to perform these on startup await modelManager.maybeCreateTables() @@ -59,48 +39,30 @@ const run = async () => { resetStalledJobs() } - const onMessage = async (_channel: string, message: string) => { - const parsedMessage = parseEmbedderMessage(message) - await addEmbeddingsMetadata(parsedMessage) - } - - // subscribe to consumer group - try { - await redis.xgroup( - 'CREATE', - 'embedMetadataStream', - 'embedMetadataConsumerGroup', - '$', - 'MKSTREAM' - ) - } catch (e) { - // stream already exists - } - - const messageStream = new RedisStream( - 'embedMetadataStream', - 'embedMetadataConsumerGroup', - embedderChannel - ) - + const orchestrator = new WorkflowOrchestrator() // Assume 3 workers for type safety, but it doesn't really matter at runtime const jobQueueStreams = Array.from( {length: NUM_WORKERS}, - () => new EmbeddingsJobQueueStream() + () => new EmbeddingsJobQueueStream(orchestrator) ) as Tuple + const streams = mergeAsyncIterators(jobQueueStreams) + + const kill: NodeJS.SignalsListener = (signal) => { + Logger.log(`Kill signal received: ${signal}`) + primaryLock?.release() + streams.return?.() + process.exit() + } + process.on('SIGTERM', kill) + process.on('SIGINT', kill) Logger.log(`\n⚡⚡⚡️️ Server ID: ${SERVER_ID}. Embedder is ready ⚡⚡⚡️️️`) - streams = mergeAsyncIterators([messageStream, ...jobQueueStreams]) - for await (const [idx, message] of streams) { - switch (idx) { - case 0: - onMessage('', message) - continue - default: - Logger.log(`Worker ${idx} finished job ${message.id}`) - continue - } + const counter = logPerformance(10, 6) + + for await (const _ of streams) { + // Logger.log(`Worker ${idx} finished job ${message.id}`) + counter.i++ } // On graceful shutdown diff --git a/packages/embedder/getEmbedderPriority.ts b/packages/embedder/getEmbedderPriority.ts new file mode 100644 index 00000000000..ea0b0bad8e9 --- /dev/null +++ b/packages/embedder/getEmbedderPriority.ts @@ -0,0 +1,21 @@ +import ms from 'ms' +/* +The Job Queue has a first in first out (FIFO) strategy with a few exceptions: + - Realtime requests from the app should come before historal data processing + - Historical data processing should come before backfilling data for a new model +Future use cases: + - Prioritize Enterprise/Pro users over free + - Deprioritize jobs for heavy users + +To achieve these goals we need some kind of clock, so we use a timestamp in Unix time (resolution in seconds) +Postgres uses a signed 32-bit INT, so we subtract 2**31 to make use of the full range +Otherwise, we could not schedule anything after 2**31-1, which is roughly the year 2038 + +This allows us to provide a `maxDelayInDays` API, which is somewhat intuitive: +e.g. Process realtime requests immediately, but start processing this historical data no later than 5 days from now. +In 5 days, that historical data will be a higher priority than new realtime requests. +*/ +export const getEmbedderPriority = (maxDelayInDays: number) => { + const maxDelayInSeconds = maxDelayInDays * ms('1d') + return -(2 ** 31) + ~~(Date.now() / 1000) + maxDelayInSeconds +} diff --git a/packages/embedder/indexing/createEmbeddingTextFrom.ts b/packages/embedder/indexing/createEmbeddingTextFrom.ts index 9fb3cceda80..26083bd024c 100644 --- a/packages/embedder/indexing/createEmbeddingTextFrom.ts +++ b/packages/embedder/indexing/createEmbeddingTextFrom.ts @@ -1,16 +1,21 @@ import {Selectable} from 'kysely' import {DB} from 'parabol-server/postgres/pg' -import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' +import {DataLoaderInstance} from '../../server/dataloader/RootDataLoader' import {createTextFromRetrospectiveDiscussionTopic} from './retrospectiveDiscussionTopic' export const createEmbeddingTextFrom = async ( embeddingsMetadata: Selectable, - dataLoader: RootDataLoader + dataLoader: DataLoaderInstance, + isRerank?: boolean ) => { switch (embeddingsMetadata.objectType) { case 'retrospectiveDiscussionTopic': - return createTextFromRetrospectiveDiscussionTopic(embeddingsMetadata.refId, dataLoader) + return createTextFromRetrospectiveDiscussionTopic( + embeddingsMetadata.refId, + dataLoader, + isRerank + ) default: throw new Error(`Unexcepted objectType: ${embeddingsMetadata.objectType}`) } diff --git a/packages/embedder/indexing/failJob.ts b/packages/embedder/indexing/failJob.ts deleted file mode 100644 index 17d293b49a9..00000000000 --- a/packages/embedder/indexing/failJob.ts +++ /dev/null @@ -1,17 +0,0 @@ -import getKysely from 'parabol-server/postgres/getKysely' -import {Logger} from 'parabol-server/utils/Logger' - -export const failJob = async (jobId: number, stateMessage: string, retryAfter?: Date | null) => { - const pg = getKysely() - Logger.log(`embedder: failed job ${jobId}, ${stateMessage}`) - await pg - .updateTable('EmbeddingsJobQueue') - .set((eb) => ({ - state: 'failed', - stateMessage, - retryCount: eb('retryCount', '+', 1), - retryAfter: retryAfter || null - })) - .where('id', '=', jobId) - .executeTakeFirstOrThrow() -} diff --git a/packages/embedder/indexing/retrospectiveDiscussionTopic.ts b/packages/embedder/indexing/retrospectiveDiscussionTopic.ts index 5a9d1c423cc..0d6b8244e6a 100644 --- a/packages/embedder/indexing/retrospectiveDiscussionTopic.ts +++ b/packages/embedder/indexing/retrospectiveDiscussionTopic.ts @@ -1,8 +1,9 @@ -import {RethinkSchema} from 'parabol-server/database/rethinkDriver' import Comment from 'parabol-server/database/types/Comment' import {isMeetingRetrospective} from 'parabol-server/database/types/MeetingRetrospective' -import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' +import {DataLoaderInstance} from 'parabol-server/dataloader/RootDataLoader' import prettier from 'prettier' +import {inferLanguage} from '../inferLanguage' +import {ISO6391} from '../iso6393To1' // Here's a generic reprentation of the text generated here: @@ -20,15 +21,15 @@ import prettier from 'prettier' // const IGNORE_COMMENT_USER_IDS = ['parabolAIUser'] - -async function getPreferredNameByUserId(userId: string, dataLoader: RootDataLoader) { +const MAX_TEXT_LENGTH = 10000 +async function getPreferredNameByUserId(userId: string, dataLoader: DataLoaderInstance) { if (!userId) return 'Unknown' const user = await dataLoader.get('users').load(userId) return !user ? 'Unknown' : user.preferredName } async function formatThread( - dataLoader: RootDataLoader, + dataLoader: DataLoaderInstance, comments: Comment[], parentId: string | null = null, depth = 0 @@ -45,7 +46,7 @@ async function formatThread( ? 'Anonymous' : await getPreferredNameByUserId(comment.createdBy, dataLoader) const how = depth === 0 ? 'wrote' : 'replied' - const content = comment.plaintextContent + const content = comment.plaintextContent.slice(0, MAX_TEXT_LENGTH) const formattedPost = `${indent}- ${author} ${how}, "${content}"\n` // Recursively format child threads @@ -60,7 +61,7 @@ async function formatThread( export const createTextFromRetrospectiveDiscussionTopic = async ( discussionId: string, - dataLoader: RootDataLoader, + dataLoader: DataLoaderInstance, textForReranking: boolean = false ) => { const discussion = await dataLoader.get('discussions').load(discussionId) @@ -72,23 +73,26 @@ export const createTextFromRetrospectiveDiscussionTopic = async ( dataLoader.get('retroReflectionsByGroupId').load(reflectionGroupId) ]) if (!isMeetingRetrospective(newMeeting)) throw new Error('Meeting is not a retro') - const {templateId} = newMeeting + // It should never be undefined, but our data integrity in RethinkDB is bad + const templateId = newMeeting?.templateId ?? '' const promptIds = [...new Set(reflections.map((r) => r.promptId))] const [template, ...prompts] = await Promise.all([ - dataLoader.get('meetingTemplates').loadNonNull(templateId), + dataLoader.get('meetingTemplates').load(templateId), ...promptIds.map((promptId) => dataLoader.get('reflectPrompts').load(promptId)) ]) let markdown = '' + const templateName = template?.name ?? 'Unknown' if (!textForReranking) { markdown = `A topic "${reflectionGroup?.title ?? ''}" was discussed during ` + - `the meeting "${newMeeting.name}" that followed the "${template.name}" template.\n` + + `the meeting "${newMeeting.name}" that followed the "${templateName}" template.\n` + `\n` } for (const prompt of prompts) { + if (!prompt) continue // RethinkDB bad data integrity if (!textForReranking) { markdown += `Participants were prompted with, "${prompt.question}` if (prompt.description) markdown += `: ${prompt.description}` @@ -98,7 +102,10 @@ export const createTextFromRetrospectiveDiscussionTopic = async ( const author = newMeeting.disableAnonymity ? await getPreferredNameByUserId(reflection.creatorId, dataLoader) : 'Anonymous' - markdown += ` - ${author} wrote, "${reflection.plaintextContent}"\n` + markdown += ` - ${author} wrote, "${reflection.plaintextContent.slice( + 0, + MAX_TEXT_LENGTH + )}"\n` } markdown += `\n` } @@ -120,6 +127,7 @@ export const createTextFromRetrospectiveDiscussionTopic = async ( * objectType: 'retrospectiveDiscussionNoSummary' or something and do a bit of testing. */ + let language: ISO6391 | undefined = undefined if (discussionSummary) { markdown += `Further discussion was made. ` + ` ${discussionSummary}` } else { @@ -148,35 +156,20 @@ export const createTextFromRetrospectiveDiscussionTopic = async ( (c) => !IGNORE_COMMENT_USER_IDS.includes(c.createdBy) ) if (filteredComments.length) { - markdown += `Futher discussion was made:\n` + markdown += `Further discussion was made:\n` markdown += await formatThread(dataLoader, filteredComments) - // TODO: if the discussion threads are too long, summarize them + const commentBlob = filteredComments.map((c) => c.plaintextContent).join(' ') + // it's common to reflect in english and comment in a native tongue + language = inferLanguage(commentBlob) } } - markdown = await prettier.format(markdown, { + const body = await prettier.format(markdown, { parser: 'markdown', proseWrap: 'always', printWidth: 72 }) - return markdown -} - -export const newRetroDiscussionTopicsFromNewMeeting = async ( - newMeeting: RethinkSchema['NewMeeting']['type'], - dataLoader: RootDataLoader -) => { - const discussPhase = newMeeting.phases.find((phase) => phase.phaseType === 'discuss') - const orgId = (await dataLoader.get('teams').load(newMeeting.teamId))?.orgId - if (orgId && discussPhase && discussPhase.stages) { - return discussPhase.stages.map((stage) => ({ - objectType: 'retrospectiveDiscussionTopic' as const, - teamId: newMeeting.teamId, - refId: `${newMeeting.id}:${stage.id}`, - refUpdatedAt: newMeeting.createdAt - })) - } else { - return [] - } + language = language || inferLanguage(reflections.map((r) => r.plaintextContent).join(' ')) + return {body, language} } diff --git a/packages/embedder/inferLanguage.ts b/packages/embedder/inferLanguage.ts new file mode 100644 index 00000000000..ffa94a4c532 --- /dev/null +++ b/packages/embedder/inferLanguage.ts @@ -0,0 +1,11 @@ +import franc from 'franc-min' +import {ISO6391, iso6393To1} from './iso6393To1' +import {URLRegex} from './regex' + +export const inferLanguage = (text: string, minLength = 100): ISO6391 | undefined => { + // URLs make foreign languages look English + const urlFreeText = text.replaceAll(URLRegex, '') + // if it's less than 100 chars, we probably don't care anyways + const iso6393 = franc(urlFreeText, {minLength}) + return iso6393To1[iso6393 as keyof typeof iso6393To1] +} diff --git a/packages/embedder/insertDiscussionsIntoMetadataAndQueue.ts b/packages/embedder/insertDiscussionsIntoMetadataAndQueue.ts new file mode 100644 index 00000000000..27f0b5db3d9 --- /dev/null +++ b/packages/embedder/insertDiscussionsIntoMetadataAndQueue.ts @@ -0,0 +1,56 @@ +import {sql} from 'kysely' +import getKysely from 'parabol-server/postgres/getKysely' +import {DiscussionMeta} from './addEmbeddingsMetadataForRetrospectiveDiscussionTopic' +import getModelManager from './ai_models/ModelManager' +import {getEmbedderPriority} from './getEmbedderPriority' + +export const insertDiscussionsIntoMetadataAndQueue = async ( + discussions: DiscussionMeta[], + maxDelayInDays: number +) => { + const pg = getKysely() + const metadataRows = discussions.map(({id, teamId, createdAt}) => ({ + refId: id, + objectType: 'retrospectiveDiscussionTopic' as const, + teamId, + // Not techincally updatedAt since discussions are updated after they get created + refUpdatedAt: createdAt + })) + if (!metadataRows[0]) return + + const modelManager = getModelManager() + const tableNames = [...modelManager.embeddingModels.keys()] + const priority = getEmbedderPriority(maxDelayInDays) + // This is ugly but it runs fast, which is what we need for historical data + return ( + pg + .with('Insert', (qc) => + qc + .insertInto('EmbeddingsMetadata') + .values(metadataRows) + .onConflict((oc) => oc.doNothing()) + .returning('id') + ) + // create n*m rows for n models & m discussions + .with('Metadata', (qc) => + qc + .selectFrom('Insert') + .fullJoin( + sql<{model: string}>`UNNEST(ARRAY[${sql.join(tableNames)}])`.as('model'), + (join) => join.onTrue() + ) + .select(['id', 'model']) + ) + .insertInto('EmbeddingsJobQueue') + .columns(['jobType', 'priority', 'embeddingsMetadataId', 'model']) + .expression(({selectFrom}) => + selectFrom('Metadata').select(({lit, ref}) => [ + sql.lit('embed:start').as('jobType'), + lit(priority).as('priority'), + ref('Metadata.id').as('embeddingsMetadataId'), + ref('Metadata.model').as('model') + ]) + ) + .execute() + ) +} diff --git a/packages/embedder/logPerformance.ts b/packages/embedder/logPerformance.ts new file mode 100644 index 00000000000..96742475db5 --- /dev/null +++ b/packages/embedder/logPerformance.ts @@ -0,0 +1,19 @@ +import {Logger} from '../server/utils/Logger' +/* +Every `logEvery` seconds log to the console the number of jobs completed per second for the last `resetEvery` logs +*/ +export const logPerformance = (logEvery: number, resetEvery: number) => { + const counter = {i: 0} + let start = performance.now() + let logs = 0 + setInterval(() => { + const duration = performance.now() - start + Logger.log(`Jobs per second: ${Math.round((counter.i / duration) * 1000)}`) + if (++logs >= resetEvery) { + counter.i = 0 + logs = 0 + start = performance.now() + } + }, logEvery * 1000) + return counter +} diff --git a/packages/embedder/package.json b/packages/embedder/package.json index 74bca7e11af..ca9ed2822a9 100644 --- a/packages/embedder/package.json +++ b/packages/embedder/package.json @@ -33,6 +33,7 @@ "dependencies": { "dd-trace": "^4.2.0", "franc-min": "^5.0.0", + "ms": "^2.1.3", "redlock": "^5.0.0-beta.2" } } diff --git a/packages/embedder/processJob.ts b/packages/embedder/processJob.ts deleted file mode 100644 index 8723b75de6a..00000000000 --- a/packages/embedder/processJob.ts +++ /dev/null @@ -1,13 +0,0 @@ -import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' -import {Job} from './EmbeddingsJobQueueStream' -import {processJobEmbed} from './processJobEmbed' - -export const processJob = async (job: Job, dataLoader: RootDataLoader) => { - const {jobType} = job - switch (jobType) { - case 'embed': - return processJobEmbed(job, dataLoader) - default: - throw new Error(`Invalid job type: ${jobType}`) - } -} diff --git a/packages/embedder/processJobEmbed.ts b/packages/embedder/processJobEmbed.ts deleted file mode 100644 index d7fecf2aab0..00000000000 --- a/packages/embedder/processJobEmbed.ts +++ /dev/null @@ -1,101 +0,0 @@ -import franc from 'franc-min' -import ms from 'ms' -import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' -import getKysely from 'parabol-server/postgres/getKysely' -import {EmbedJob} from './EmbeddingsJobQueueStream' -import {EmbeddingsTable} from './ai_models/AbstractEmbeddingsModel' -import getModelManager from './ai_models/ModelManager' -import {createEmbeddingTextFrom} from './indexing/createEmbeddingTextFrom' -import {failJob} from './indexing/failJob' -import numberVectorToString from './indexing/numberVectorToString' -import {iso6393To1} from './iso6393To1' - -export const processJobEmbed = async (job: EmbedJob, dataLoader: RootDataLoader) => { - const pg = getKysely() - const {id: jobId, retryCount, jobData} = job - const {embeddingsMetadataId, model} = jobData - const modelManager = getModelManager() - - const metadata = await pg - .selectFrom('EmbeddingsMetadata') - .selectAll() - .where('id', '=', embeddingsMetadataId) - .executeTakeFirst() - - if (!metadata) { - await failJob(jobId, `unable to fetch metadata by EmbeddingsJobQueue.id = ${jobId}`) - return - } - - let {fullText, language} = metadata - try { - if (!fullText) { - fullText = await createEmbeddingTextFrom(metadata, dataLoader) - language = iso6393To1[franc(fullText) as keyof typeof iso6393To1] - await pg - .updateTable('EmbeddingsMetadata') - .set({fullText, language}) - .where('id', '=', embeddingsMetadataId) - .execute() - } - } catch (e) { - // get the trace since the error message may be unobvious - console.trace(e) - await failJob(jobId, `unable to create embedding text: ${e}`) - return - } - - const embeddingModel = modelManager.embeddingModels.get(model) - if (!embeddingModel) { - await failJob(jobId, `embedding model ${model} not available`) - return - } - - // Exit successfully, we don't want to fail the job because the language is not supported - if (!embeddingModel.languages.includes(language!)) return true - - const chunks = await embeddingModel.chunkText(fullText) - if (chunks instanceof Error) { - await failJob( - jobId, - `unable to get tokens: ${chunks.message}`, - retryCount < 10 ? new Date(Date.now() + ms('1m')) : null - ) - return - } - // Cannot use summarization strategy if generation model has same context length as embedding model - // We must split the text & not tokens because BERT tokenizer is not trained for linebreaks e.g. \n\n - const isSuccessful = await Promise.all( - chunks.map(async (chunk, chunkNumber) => { - const embeddingVector = await embeddingModel.getEmbedding(chunk) - if (embeddingVector instanceof Error) { - await failJob( - jobId, - `unable to get embeddings: ${embeddingVector.message}`, - retryCount < 10 ? new Date(Date.now() + ms('1m')) : null - ) - return false - } - await pg - // cast to any because these types won't be available in CI - .insertInto(embeddingModel.tableName as EmbeddingsTable) - .values({ - // TODO is the extra space of a null embedText really worth it?! - embedText: chunks.length > 1 ? chunk : null, - embedding: numberVectorToString(embeddingVector), - embeddingsMetadataId, - chunkNumber: chunks.length > 1 ? chunkNumber : null - }) - .onConflict((oc) => - oc.column('embeddingsMetadataId').doUpdateSet((eb) => ({ - embedText: eb.ref('excluded.embedText'), - embedding: eb.ref('excluded.embedding') - })) - ) - .execute() - return true - }) - ) - // Logger.log(`Embedded ${embeddingsMetadataId} -> ${model}`) - return isSuccessful -} diff --git a/packages/embedder/regex.ts b/packages/embedder/regex.ts new file mode 100644 index 00000000000..34a87e18344 --- /dev/null +++ b/packages/embedder/regex.ts @@ -0,0 +1,2 @@ +// grab the whole url, exclude <> for cases like +export const URLRegex = /https?:\/\/[^<>\s]*\b/g diff --git a/packages/embedder/workflows/embedMetadata.ts b/packages/embedder/workflows/embedMetadata.ts new file mode 100644 index 00000000000..4cba3ad6b90 --- /dev/null +++ b/packages/embedder/workflows/embedMetadata.ts @@ -0,0 +1,96 @@ +import ms from 'ms' +import getKysely from 'parabol-server/postgres/getKysely' +import {JobQueueError} from '../JobQueueError' +import {EmbeddingsTableName} from '../ai_models/AbstractEmbeddingsModel' +import getModelManager from '../ai_models/ModelManager' +import {JobQueueStepRun, ParentJob} from '../custom' +import {createEmbeddingTextFrom} from '../indexing/createEmbeddingTextFrom' +import numberVectorToString from '../indexing/numberVectorToString' +import {getSimilarRetroTopics} from './getSimilarRetroTopics' + +export const embedMetadata: JobQueueStepRun< + { + embeddingsMetadataId: number + model: EmbeddingsTableName + forceBuildText?: boolean + }, + ParentJob +> = async (context) => { + const {data, dataLoader} = context + const pg = getKysely() + const {embeddingsMetadataId, model, forceBuildText} = data + const modelManager = getModelManager() + + const metadata = await dataLoader.get('embeddingsMetadata').load(embeddingsMetadataId) + + if (!metadata) return new JobQueueError(`Invalid embeddingsMetadataId: ${embeddingsMetadataId}`) + + if (!metadata.fullText || !metadata.language || forceBuildText) { + try { + const {body: fullText, language} = await createEmbeddingTextFrom(metadata, dataLoader) + metadata.fullText = fullText + metadata.language = language! + await pg + .updateTable('EmbeddingsMetadata') + .set({fullText, language}) + .where('id', '=', embeddingsMetadataId) + .execute() + } catch (e) { + // get the trace since the error message may be unobvious + console.trace(e) + return new JobQueueError(`unable to create embedding text: ${e}`, undefined, 0, { + forceBuildText: true + }) + } + } + const {fullText, language} = metadata + + const embeddingModel = modelManager.getEmbedder(model) + if (!embeddingModel) { + return new JobQueueError(`embedding model ${model} not available`) + } + // Exit successfully, we don't want to fail the job because the language is not supported + if (!embeddingModel.languages.includes(language)) return false + const chunks = await embeddingModel.chunkText(fullText) + if (chunks instanceof Error) { + return new JobQueueError(`unable to get tokens: ${chunks.message}`, ms('1m'), 10, { + forceBuildText: true + }) + } + // Cannot use summarization strategy if generation model has same context length as embedding model + // We must split the text & not tokens because BERT tokenizer is not trained for linebreaks e.g. \n\n + const errors = await Promise.all( + chunks.map(async (chunk, chunkNumber) => { + const embeddingVector = await embeddingModel.getEmbedding(chunk) + if (embeddingVector instanceof Error) { + return new JobQueueError( + `unable to get embeddings: ${embeddingVector.message}`, + ms('1m'), + 10, + {forceBuildText: true} + ) + } + await pg + // cast to any because these types won't be available in CI + .insertInto(embeddingModel.tableName) + .values({ + // TODO is the extra space of a null embedText really worth it?! + embedText: chunks.length > 1 ? chunk : null, + embedding: numberVectorToString(embeddingVector), + embeddingsMetadataId, + chunkNumber: chunks.length > 1 ? chunkNumber : null + }) + .onConflict((oc) => + oc.columns(['embeddingsMetadataId', 'chunkNumber']).doUpdateSet((eb) => ({ + embedText: eb.ref('excluded.embedText'), + embedding: eb.ref('excluded.embedding') + })) + ) + .execute() + return undefined + }) + ) + const firstError = errors.find((error) => error instanceof JobQueueError) + // Logger.log(`Embedded ${embeddingsMetadataId} -> ${model}`) + return firstError || data +} diff --git a/packages/embedder/workflows/getSimilarRetroTopics.ts b/packages/embedder/workflows/getSimilarRetroTopics.ts new file mode 100644 index 00000000000..d38f193af70 --- /dev/null +++ b/packages/embedder/workflows/getSimilarRetroTopics.ts @@ -0,0 +1,65 @@ +import getKysely from 'parabol-server/postgres/getKysely' +import {EmbeddingsTable, EmbeddingsTableName} from '../ai_models/AbstractEmbeddingsModel' +import {JobQueueStepRun, ParentJob} from '../custom' +import {rerankRetroTopics} from './rerankRetroTopics' + +export const getSimilarRetroTopics: JobQueueStepRun< + { + embeddingsMetadataId: number + model: EmbeddingsTableName + }, + ParentJob +> = async (context) => { + const {data, dataLoader} = context + const {embeddingsMetadataId} = data + const model = data.model as EmbeddingsTable + const MAX_CANDIDATES = 10 + const SIMILARITY_THRESHOLD = 0.67 + const pg = getKysely() + const metadata = await dataLoader.get('embeddingsMetadata').loadNonNull(embeddingsMetadataId) + const {teamId} = metadata + const similarEmbeddings = await pg + .with('Vector', (qc) => + qc + .selectFrom(model) + .select('embedding') + .where('embeddingsMetadataId', '=', embeddingsMetadataId) + .orderBy('chunkNumber') + // truncate strategy: only get discussions similar to the first chunk + .limit(1) + ) + .with('Model', (qc) => + qc + .selectFrom(model) + .select([`${model}.id`, 'embeddingsMetadataId', 'embedding']) + .innerJoin('EmbeddingsMetadata', 'EmbeddingsMetadata.id', `${model}.embeddingsMetadataId`) + .where('teamId', '=', teamId) + .where('objectType', '=', 'retrospectiveDiscussionTopic') + .where('embeddingsMetadataId', '!=', embeddingsMetadataId) + ) + .with('CosineSimilarity', (pg) => + pg + .selectFrom(['Model', 'Vector']) + .select(({eb, val, parens, ref}) => [ + ref('Model.id').as('embeddingId'), + ref('Model.embeddingsMetadataId').as('embeddingsMetadataId'), + eb( + val(1), + '-', + parens('Model.embedding' as any, '<=>' as any, ref('Vector.embedding') as any) + ).as('similarity') + ]) + ) + .selectFrom('CosineSimilarity') + .select(['embeddingId', 'similarity', 'embeddingsMetadataId']) + .where('similarity', '>=', SIMILARITY_THRESHOLD) + .orderBy('similarity', 'desc') + .limit(MAX_CANDIDATES) + .execute() + if (similarEmbeddings.length === 0) return false + return { + embeddingsMetadataId, + model, + similarEmbeddings + } +} diff --git a/packages/embedder/workflows/helpers/getRerankedEmbeddingsFromChunks.ts b/packages/embedder/workflows/helpers/getRerankedEmbeddingsFromChunks.ts new file mode 100644 index 00000000000..b7f2b5e8642 --- /dev/null +++ b/packages/embedder/workflows/helpers/getRerankedEmbeddingsFromChunks.ts @@ -0,0 +1,115 @@ +import ms from 'ms' +import getKysely from '../../../server/postgres/getKysely' +import {JobQueueError} from '../../JobQueueError' +import {AbstractEmbeddingsModel} from '../../ai_models/AbstractEmbeddingsModel' +import numberVectorToString from '../../indexing/numberVectorToString' + +/* + * Overview on how we find related discussions: + * + * A previous discussion could be fully related (similarity=1), not related + * (similarity=0), opposite (similarity=-1), or any place between 1..0..-1. + * + * We create an embeddingVector for the current retro discussion topic, then + * fetch up to MAX_CANDIDATES that are greater than or equal + * to SIMILARITY_THRESHOLD. + * + * We create a new embeddingVector that emphasizes only the reflections from the + * topic and use this to re-rank the initial results by: + * + * newSimilarity = initialSimilarity + + * (rerankSimilarity - RERANK_THRESHOLD) * RERANK_MULTIPLE + * + * This amplifies the role reflection content plays in matches, making similar + * reflections boost matches and de-emphasize matches that otherwise might be + * based on similar template prompts, authorship, etc. + * + */ + +export interface SimilarEmbedding { + similarity: number + embeddingId: number + embeddingsMetadataId: number +} + +export const getRerankedEmbeddingsFromChunks = async ( + embeddingsMetadataId: number, + chunks: string[], + similarEmbeddings: SimilarEmbedding[], + embeddingModel: AbstractEmbeddingsModel +) => { + const pg = getKysely() + const RERANK_THRESHOLD = 0.7 + const RERANK_MULTIPLE = 3 + const similarEmbeddingIds = similarEmbeddings.map((e) => e.embeddingId) + const similarityScores = {} as Record + const results = await Promise.all( + chunks.map(async (chunk) => { + const embeddingVector = await embeddingModel.getEmbedding(chunk) + if (embeddingVector instanceof Error) { + return new JobQueueError( + `unable to get embeddings: ${embeddingVector.message}`, + ms('1m'), + 10 + ) + } + const embeddingVectorStr = numberVectorToString(embeddingVector) + const embeddingsWithSimilarities = await pg + .selectFrom(embeddingModel.tableName) + .select(({eb, val, parens}) => [ + 'id as embeddingId', + eb(val(1), '-', parens('embedding' as any, '<=>' as any, embeddingVectorStr)).as( + 'rerankSimilarity' + ) + ]) + .where('id', 'in', similarEmbeddingIds) + .execute() + embeddingsWithSimilarities.forEach((e) => { + const {embeddingId, rerankSimilarity} = e + const similarDiscussion = similarEmbeddings.find((se) => se.embeddingId === embeddingId)! + const weightedSimilarity = Math.min( + // Limit the upper bound to 0.999.. + 0.999999999, + // Limit the lower bound to -0.999.. + Math.max( + -0.999999999, + similarDiscussion.similarity + (rerankSimilarity - RERANK_THRESHOLD) * RERANK_MULTIPLE + ) + ) + similarityScores[embeddingId] = similarityScores[embeddingId] || [] + similarityScores[embeddingId]!.push(weightedSimilarity) + }) + return + }) + ) + const firstError = results.find((r) => r instanceof JobQueueError) + if (firstError) return firstError + + const weightedSimilarities = Object.entries(similarityScores).map((entry) => { + const embeddingId = parseInt(entry[0], 10) + const similarities = entry[1] + const similarEmbedding = similarEmbeddings.find((se) => se.embeddingId === embeddingId)! + return { + embeddingsMetadataId: similarEmbedding.embeddingsMetadataId, + similarity: similarities.reduce((a, b) => a + b, 0) / similarities.length + } + }) + + if (weightedSimilarities.length === 0) + return new JobQueueError(`no similar embeddings found: ${embeddingsMetadataId}`) + + const MAX_RESULTS = 3 + const SIMILARITY_THRESHOLD = 0.67 + const topResults = weightedSimilarities + // SIMILARITY THRESHOLD will be too high if chunks.length > 1 + .filter((sd) => sd.similarity >= SIMILARITY_THRESHOLD) + .sort((a, b) => (a.similarity > b.similarity ? -1 : 1)) + .slice(0, MAX_RESULTS) + if (!topResults.length) { + const similarities = weightedSimilarities.map((ws) => ws.similarity).join(',') + return new JobQueueError( + `no similar embeddings found. Decrease SIMILARITY_THRESHOLD: ${embeddingsMetadataId}, ${similarities}` + ) + } + return topResults +} diff --git a/packages/embedder/workflows/helpers/publishSimilarRetroTopics.ts b/packages/embedder/workflows/helpers/publishSimilarRetroTopics.ts new file mode 100644 index 00000000000..f8de026db74 --- /dev/null +++ b/packages/embedder/workflows/helpers/publishSimilarRetroTopics.ts @@ -0,0 +1,72 @@ +import {SubscriptionChannel} from '../../../client/types/constEnums' +import makeAppURL from '../../../client/utils/makeAppURL' +import appOrigin from '../../../server/appOrigin' +import getRethink from '../../../server/database/rethinkDriver' +import {DataLoaderInstance} from '../../../server/dataloader/RootDataLoader' +import {isRetroMeeting} from '../../../server/graphql/meetingTypePredicates' +import { + buildCommentContentBlock, + createAIComment +} from '../../../server/graphql/mutations/helpers/addAIGeneratedContentToThreads' +import getPhase from '../../../server/utils/getPhase' +import publish from '../../../server/utils/publish' + +const makeSimilarDiscussionLink = async ( + similarDiscussion: {embeddingsMetadataId: number; similarity: number}, + dataLoader: DataLoaderInstance +) => { + const {embeddingsMetadataId, similarity} = similarDiscussion + const metadata = await dataLoader.get('embeddingsMetadata').loadNonNull(embeddingsMetadataId) + const {refId: discussionId} = metadata + const discussion = await dataLoader.get('discussions').loadNonNull(discussionId) + const {meetingId, discussionTopicId: reflectionGroupId} = discussion + const [meeting, reflectionGroup] = await Promise.all([ + dataLoader.get('newMeetings').load(meetingId), + dataLoader.get('retroReflectionGroups').load(reflectionGroupId) + ]) + + if (!meeting || !isRetroMeeting(meeting)) throw new Error('invalid meeting type') + const {phases, name: meetingName} = meeting + const {title: topic} = reflectionGroup + const discussPhase = getPhase(phases, 'discuss') + const {stages} = discussPhase + const stageIdx = stages + .sort((a, b) => (a.sortOrder < b.sortOrder ? -1 : 1)) + .findIndex((stage) => stage.discussionId === discussionId) + + const url = makeAppURL(appOrigin, `meet/${meetingId}/discuss/${stageIdx + 1}`) + + return ( + `` + + `${meetingName} – ${topic} (score: ${Math.trunc(100 * similarity)})` + + `` + ) +} + +const publishComment = async (meetingId: string, commentId: string) => { + const data = {commentId, meetingId} + publish(SubscriptionChannel.MEETING, meetingId, 'AddCommentSuccess', data, {}, false) +} + +export const publishSimilarRetroTopics = async ( + embeddingsMetadataId: number, + similarEmbeddings: {embeddingsMetadataId: number; similarity: number}[], + dataLoader: DataLoaderInstance +) => { + const r = await getRethink() + const links = await Promise.all( + similarEmbeddings.map((se) => makeSimilarDiscussionLink(se, dataLoader)) + ) + const listItems = links.map((link) => `
  • ${link}
  • `).join('\n') + const metadata = await dataLoader.get('embeddingsMetadata').loadNonNull(embeddingsMetadataId) + const {refId: discussionId} = metadata + const discussion = await dataLoader.get('discussions').loadNonNull(discussionId) + const {meetingId} = discussion + const relatedDiscussionsComment = createAIComment( + discussionId, + buildCommentContentBlock('🤖 Related Discussions', `
      ${listItems}
    `), + 2 + ) + await r.table('Comment').insert(relatedDiscussionsComment).run() + publishComment(meetingId, relatedDiscussionsComment.id) +} diff --git a/packages/embedder/workflows/relatedDiscussionsStart.ts b/packages/embedder/workflows/relatedDiscussionsStart.ts new file mode 100644 index 00000000000..e62a986a182 --- /dev/null +++ b/packages/embedder/workflows/relatedDiscussionsStart.ts @@ -0,0 +1,54 @@ +import isValid from 'parabol-server/graphql/isValid' +import getKysely from 'parabol-server/postgres/getKysely' +import getPhase from 'parabol-server/utils/getPhase' +import {MessageToEmbedderRelatedDiscussions} from '../../server/graphql/mutations/helpers/publishToEmbedder' +import getModelManager from '../ai_models/ModelManager' +import {JobQueueStepRun, ParentJob} from '../custom' +import {embedMetadata} from './embedMetadata' + +export const relatedDiscussionsStart: JobQueueStepRun< + MessageToEmbedderRelatedDiscussions['data'], + ParentJob +> = async (context) => { + const pg = getKysely() + const {data, dataLoader} = context + const {meetingId} = data + const [discussions, meeting] = await Promise.all([ + dataLoader.get('discussionsByMeetingId').load(meetingId), + dataLoader.get('newMeetings').load(meetingId) + ]) + const {phases} = meeting + const discussPhase = getPhase(phases, 'discuss') + const {stages} = discussPhase + const orderedDiscussions = stages + .map((stage) => discussions.find((d) => d.id === stage.discussionId)) + .filter(isValid) + + const metadataRows = orderedDiscussions.map(({id, teamId, createdAt}) => ({ + refId: id, + objectType: 'retrospectiveDiscussionTopic' as const, + teamId, + // Not techincally updatedAt since discussions are updated after they get created + refUpdatedAt: createdAt + })) + const inserts = await pg + .insertInto('EmbeddingsMetadata') + .values(metadataRows) + .onConflict((oc) => + // trigger an update in order to return the ID of the existing row + oc.columns(['refId', 'objectType']).doUpdateSet((eb) => ({ + objectType: eb.ref('excluded.objectType') + })) + ) + .returning('id') + .execute() + + const modelManager = getModelManager() + // Only get 1 embedder since we only want to publish 1 message to the user + const {tableName} = modelManager.getEmbedder() + + return inserts.map(({id}) => ({ + embeddingsMetadataId: id, + model: tableName + })) +} diff --git a/packages/embedder/workflows/rerankRetroTopics.ts b/packages/embedder/workflows/rerankRetroTopics.ts new file mode 100644 index 00000000000..fa6fec59333 --- /dev/null +++ b/packages/embedder/workflows/rerankRetroTopics.ts @@ -0,0 +1,53 @@ +import ms from 'ms' +import {JobQueueError} from '../JobQueueError' +import {EmbeddingsTableName} from '../ai_models/AbstractEmbeddingsModel' +import getModelManager from '../ai_models/ModelManager' +import {JobQueueStepRun} from '../custom' +import {createEmbeddingTextFrom} from '../indexing/createEmbeddingTextFrom' +import { + SimilarEmbedding, + getRerankedEmbeddingsFromChunks +} from './helpers/getRerankedEmbeddingsFromChunks' +import {publishSimilarRetroTopics} from './helpers/publishSimilarRetroTopics' + +export const rerankRetroTopics: JobQueueStepRun<{ + similarEmbeddings: SimilarEmbedding[] + embeddingsMetadataId: number + model: EmbeddingsTableName +}> = async ({data, dataLoader}) => { + const {similarEmbeddings, embeddingsMetadataId, model} = data + const metadata = await dataLoader.get('embeddingsMetadata').load(embeddingsMetadataId) + if (!metadata) return new JobQueueError(`Invalid embeddingsMetadataId: ${embeddingsMetadataId}`) + + let rerankText: string = '' + try { + const {body} = await createEmbeddingTextFrom(metadata, dataLoader, true) + rerankText = body + } catch (e) { + // get the trace since the error message may be unobvious + console.trace(e) + return new JobQueueError(`unable to create embedding text: ${e}`) + } + + const modelManager = getModelManager() + const embeddingModel = modelManager.embeddingModels.get(model) + if (!embeddingModel) { + return new JobQueueError(`embedding model ${model} not available`) + } + + const chunks = await embeddingModel.chunkText(rerankText) + if (chunks instanceof Error) { + return new JobQueueError(`unable to get tokens: ${chunks.message}`, ms('1m'), 10) + } + + const results = await getRerankedEmbeddingsFromChunks( + embeddingsMetadataId, + chunks, + similarEmbeddings, + embeddingModel + ) + if (results instanceof JobQueueError) return results + + await publishSimilarRetroTopics(embeddingsMetadataId, results, dataLoader) + return false +} diff --git a/packages/server/dataloader/RootDataLoader.ts b/packages/server/dataloader/RootDataLoader.ts index 3bf5f7c3acd..1b2546ec1f8 100644 --- a/packages/server/dataloader/RootDataLoader.ts +++ b/packages/server/dataloader/RootDataLoader.ts @@ -1,5 +1,8 @@ import DataLoader from 'dataloader' import {DBType} from '../database/rethinkDriver' +import RethinkForeignKeyLoaderMaker from './RethinkForeignKeyLoaderMaker' +import RethinkPrimaryKeyLoaderMaker from './RethinkPrimaryKeyLoaderMaker' +import UpdatableCacheDataLoader from './UpdatableCacheDataLoader' import * as atlassianLoaders from './atlassianLoaders' import * as azureDevOpsLoaders from './azureDevOpsLoaders' import * as customLoaderMakers from './customLoaderMakers' @@ -12,12 +15,9 @@ import * as jiraServerLoaders from './jiraServerLoaders' import * as pollLoaders from './pollsLoaders' import * as primaryKeyLoaderMakers from './primaryKeyLoaderMakers' import rethinkForeignKeyLoader from './rethinkForeignKeyLoader' -import RethinkForeignKeyLoaderMaker from './RethinkForeignKeyLoaderMaker' import * as rethinkForeignKeyLoaderMakers from './rethinkForeignKeyLoaderMakers' import rethinkPrimaryKeyLoader from './rethinkPrimaryKeyLoader' -import RethinkPrimaryKeyLoaderMaker from './RethinkPrimaryKeyLoaderMaker' import * as rethinkPrimaryKeyLoaderMakers from './rethinkPrimaryKeyLoaderMakers' -import UpdatableCacheDataLoader from './UpdatableCacheDataLoader' interface LoaderDict { [loaderName: string]: DataLoader @@ -72,6 +72,11 @@ type CustomLoaders = keyof CustomLoaderMakers type Uncustom = T extends (parent: RootDataLoader) => infer U ? U : never type TypeFromCustom = Uncustom +// Use this if you don't need the dataloader to be shareable +export interface DataLoaderInstance { + get(loaderName: LoaderName): TypedDataLoader +} + export type TypedDataLoader = LoaderName extends CustomLoaders ? TypeFromCustom : UpdatableCacheDataLoader< @@ -86,7 +91,7 @@ export type TypedDataLoader = LoaderName extends CustomLoaders /** * This is the main dataloader */ -export default class RootDataLoader { +export default class RootDataLoader implements DataLoaderInstance { dataLoaderOptions: DataLoader.Options // casted to any because access to the loaders will results in a creation if needed loaders: LoaderDict = {} as any diff --git a/packages/server/dataloader/foreignKeyLoaderMakers.ts b/packages/server/dataloader/foreignKeyLoaderMakers.ts index 04d06ba3d2d..0c967c37b1a 100644 --- a/packages/server/dataloader/foreignKeyLoaderMakers.ts +++ b/packages/server/dataloader/foreignKeyLoaderMakers.ts @@ -1,6 +1,25 @@ +import getKysely from '../postgres/getKysely' import getTeamsByOrgIds from '../postgres/queries/getTeamsByOrgIds' import {foreignKeyLoaderMaker} from './foreignKeyLoaderMaker' export const teamsByOrgIds = foreignKeyLoaderMaker('teams', 'orgId', (orgIds) => getTeamsByOrgIds(orgIds, {isArchived: false}) ) + +export const discussionsByMeetingId = foreignKeyLoaderMaker( + 'discussions', + 'meetingId', + async (meetingIds) => { + const pg = getKysely() + return pg.selectFrom('Discussion').selectAll().where('meetingId', 'in', meetingIds).execute() + } +) + +export const embeddingsMetadataByRefId = foreignKeyLoaderMaker( + 'embeddingsMetadata', + 'refId', + async (refId) => { + const pg = getKysely() + return pg.selectFrom('EmbeddingsMetadata').selectAll().where('refId', 'in', refId).execute() + } +) diff --git a/packages/server/dataloader/primaryKeyLoaderMakers.ts b/packages/server/dataloader/primaryKeyLoaderMakers.ts index 4a87ea7bcef..06ebe30bd0f 100644 --- a/packages/server/dataloader/primaryKeyLoaderMakers.ts +++ b/packages/server/dataloader/primaryKeyLoaderMakers.ts @@ -9,6 +9,7 @@ import {getUsersByIds} from '../postgres/queries/getUsersByIds' import {getKudosesByIds} from '../postgres/queries/getKudosesByIds' import getMeetingTemplatesByIds from '../postgres/queries/getMeetingTemplatesByIds' import {primaryKeyLoaderMaker} from './primaryKeyLoaderMaker' +import getKysely from '../postgres/getKysely' export const users = primaryKeyLoaderMaker(getUsersByIds) export const teams = primaryKeyLoaderMaker(getTeamsByIds) @@ -20,3 +21,7 @@ export const meetingSeries = primaryKeyLoaderMaker(getMeetingSeriesByIds) export const meetingTemplates = primaryKeyLoaderMaker(getMeetingTemplatesByIds) export const domainJoinRequests = primaryKeyLoaderMaker(getDomainJoinRequestsByIds) export const kudoses = primaryKeyLoaderMaker(getKudosesByIds) + +export const embeddingsMetadata = primaryKeyLoaderMaker((ids: readonly number[]) => { + return getKysely().selectFrom('EmbeddingsMetadata').selectAll().where('id', 'in', ids).execute() +}) diff --git a/packages/server/graphql/mutations/helpers/addAIGeneratedContentToThreads.ts b/packages/server/graphql/mutations/helpers/addAIGeneratedContentToThreads.ts index 1254d4890b7..50b385c6a8f 100644 --- a/packages/server/graphql/mutations/helpers/addAIGeneratedContentToThreads.ts +++ b/packages/server/graphql/mutations/helpers/addAIGeneratedContentToThreads.ts @@ -7,13 +7,17 @@ import {convertHtmlToTaskContent} from '../../../utils/draftjs/convertHtmlToTask import {DataLoaderWorker} from '../../graphql' import {getFeatureTier} from '../../types/helpers/getFeatureTier' -const buildCommentContentBlock = (title: string, content: string, explainerText?: string) => { +export const buildCommentContentBlock = ( + title: string, + content: string, + explainerText?: string +) => { const explainerBlock = explainerText ? `${explainerText}
    ` : '' const html = `${explainerBlock}

    ${title}

    ${content}

    ` return convertHtmlToTaskContent(html) } -const createAIComment = (discussionId: string, content: string, order: number) => +export const createAIComment = (discussionId: string, content: string, order: number) => new Comment({ discussionId, content, diff --git a/packages/server/graphql/mutations/helpers/handleCompletedStage.ts b/packages/server/graphql/mutations/helpers/handleCompletedStage.ts index 3ef7ad154ec..785568a6396 100644 --- a/packages/server/graphql/mutations/helpers/handleCompletedStage.ts +++ b/packages/server/graphql/mutations/helpers/handleCompletedStage.ts @@ -10,13 +10,14 @@ import getKysely from '../../../postgres/getKysely' import insertDiscussions from '../../../postgres/queries/insertDiscussions' import {AnyMeeting} from '../../../postgres/types/Meeting' import {DataLoaderWorker} from '../../graphql' -import addDiscussionTopics from './addDiscussionTopics' import addAIGeneratedContentToThreads from './addAIGeneratedContentToThreads' +import addDiscussionTopics from './addDiscussionTopics' +import addRecallBot from './addRecallBot' import generateDiscussionSummary from './generateDiscussionSummary' -import generateGroups from './generateGroups' import generateGroupSummaries from './generateGroupSummaries' +import generateGroups from './generateGroups' +import {publishToEmbedder} from './publishToEmbedder' import removeEmptyReflections from './removeEmptyReflections' -import addRecallBot from './addRecallBot' /* * handle side effects when a stage is completed @@ -111,7 +112,8 @@ const handleCompletedRetrospectiveStage = async ( })) await Promise.all([ insertDiscussions(discussions), - addAIGeneratedContentToThreads(discussPhaseStages, meetingId, teamId, dataLoader) + addAIGeneratedContentToThreads(discussPhaseStages, meetingId, teamId, dataLoader), + publishToEmbedder({jobType: 'relatedDiscussions:start', data: {meetingId}, priority: 0}) ]) if (videoMeetingURL) { addRecallBot(meetingId, videoMeetingURL) @@ -119,7 +121,7 @@ const handleCompletedRetrospectiveStage = async ( return {[VOTE]: data} } else if (stage.phaseType === 'discuss') { const {discussionId} = stage as DiscussStage - // dont await for the OpenAI API response + // don't await for the OpenAI API response generateDiscussionSummary(discussionId, meeting, dataLoader) } return {} diff --git a/packages/server/graphql/mutations/helpers/publishToEmbedder.ts b/packages/server/graphql/mutations/helpers/publishToEmbedder.ts index c8a735f4ac9..0893dfaa286 100644 --- a/packages/server/graphql/mutations/helpers/publishToEmbedder.ts +++ b/packages/server/graphql/mutations/helpers/publishToEmbedder.ts @@ -1,14 +1,22 @@ -import type {MessageToEmbedder} from 'embedder/custom' -import getRedis from '../../../utils/getRedis' +import {getEmbedderPriority} from '../../../../embedder/getEmbedderPriority' +import getKysely from '../../../postgres/getKysely' -export const publishToEmbedder = (message: MessageToEmbedder) => { - return getRedis().xadd( - 'embedMetadataStream', - 'MAXLEN', - '~', - 1000, - '*', - 'msg', - JSON.stringify(message) - ) +export interface MessageToEmbedderRelatedDiscussions { + jobType: 'relatedDiscussions:start' + data: {meetingId: string} +} + +export type MessageToEmbedder = {priority: number} & MessageToEmbedderRelatedDiscussions + +const IS_EMBEDDER_ENALBED = !!parseInt(process.env.AI_EMBEDDER_WORKERS!) +export const publishToEmbedder = ({jobType, data, priority}: MessageToEmbedder) => { + if (!IS_EMBEDDER_ENALBED) return + return getKysely() + .insertInto('EmbeddingsJobQueue') + .values({ + jobType, + priority: getEmbedderPriority(priority), + jobData: JSON.stringify(data) + }) + .execute() } diff --git a/packages/server/graphql/mutations/helpers/safeEndRetrospective.ts b/packages/server/graphql/mutations/helpers/safeEndRetrospective.ts index f5a69be4f22..9b47bbda356 100644 --- a/packages/server/graphql/mutations/helpers/safeEndRetrospective.ts +++ b/packages/server/graphql/mutations/helpers/safeEndRetrospective.ts @@ -27,7 +27,6 @@ import generateWholeMeetingSentimentScore from './generateWholeMeetingSentimentS import generateWholeMeetingSummary from './generateWholeMeetingSummary' import handleCompletedStage from './handleCompletedStage' import {IntegrationNotifier} from './notifications/IntegrationNotifier' -import {publishToEmbedder} from './publishToEmbedder' import removeEmptyTasks from './removeEmptyTasks' import updateQualAIMeetingsCount from './updateQualAIMeetingsCount' import updateTeamInsights from './updateTeamInsights' @@ -371,7 +370,6 @@ const safeEndRetrospective = async ({ removedTaskIds, timelineEventId } - publishToEmbedder({objectTypes: ['retrospectiveDiscussionTopic'], meetingId}) publish(SubscriptionChannel.TEAM, teamId, 'EndRetrospectiveSuccess', data, subOptions) return data diff --git a/packages/server/graphql/private/typeDefs/updateOrgFeatureFlag.graphql b/packages/server/graphql/private/typeDefs/updateOrgFeatureFlag.graphql index 9d7fad91919..ef74a967ff1 100644 --- a/packages/server/graphql/private/typeDefs/updateOrgFeatureFlag.graphql +++ b/packages/server/graphql/private/typeDefs/updateOrgFeatureFlag.graphql @@ -16,6 +16,7 @@ enum OrganizationFeatureFlagsEnum { aiIcebreakers aiTemplate recurringRetros + relatedDiscussions } extend type Mutation { diff --git a/packages/server/graphql/public/typeDefs/Organization.graphql b/packages/server/graphql/public/typeDefs/Organization.graphql index b7471c75dcf..49d3169c48f 100644 --- a/packages/server/graphql/public/typeDefs/Organization.graphql +++ b/packages/server/graphql/public/typeDefs/Organization.graphql @@ -191,4 +191,5 @@ type OrganizationFeatureFlags { aiIcebreakers: Boolean! aiTemplate: Boolean! recurringRetros: Boolean! + relatedDiscussions: Boolean! } diff --git a/packages/server/graphql/public/types/OrganizationFeatureFlags.ts b/packages/server/graphql/public/types/OrganizationFeatureFlags.ts index 8a0a0cec364..1f917215116 100644 --- a/packages/server/graphql/public/types/OrganizationFeatureFlags.ts +++ b/packages/server/graphql/public/types/OrganizationFeatureFlags.ts @@ -14,7 +14,8 @@ const OrganizationFeatureFlags: OrganizationFeatureFlagsResolvers = { kudos: ({kudos}) => !!kudos, aiIcebreakers: ({aiIcebreakers}) => !!aiIcebreakers, aiTemplate: ({aiTemplate}) => !!aiTemplate, - recurringRetros: ({recurringRetros}) => !!recurringRetros + recurringRetros: ({recurringRetros}) => !!recurringRetros, + relatedDiscussions: ({relatedDiscussions}) => !!relatedDiscussions } export default OrganizationFeatureFlags diff --git a/packages/server/postgres/migrations/1712074131388_priority.ts b/packages/server/postgres/migrations/1712074131388_priority.ts new file mode 100644 index 00000000000..e2f7afd06d9 --- /dev/null +++ b/packages/server/postgres/migrations/1712074131388_priority.ts @@ -0,0 +1,32 @@ +import {Client} from 'pg' +import getPgConfig from '../getPgConfig' + +export async function up() { + const client = new Client(getPgConfig()) + await client.connect() + await client.query(` + ALTER TABLE "EmbeddingsJobQueue" + ALTER COLUMN "priority" TYPE INTEGER; + ALTER TABLE "EmbeddingsJobQueue" + ADD COLUMN "model" VARCHAR(255), + ADD COLUMN "embeddingsMetadataId" INTEGER, + ADD CONSTRAINT "fk_embeddingsMetadataId" + FOREIGN KEY("embeddingsMetadataId") + REFERENCES "EmbeddingsMetadata"("id") + ON DELETE SET NULL; + `) + await client.end() +} + +export async function down() { + const client = new Client(getPgConfig()) + await client.connect() + await client.query(` + DELETE FROM "EmbeddingsJobQueue"; + ALTER TABLE "EmbeddingsJobQueue" + ALTER COLUMN "priority" TYPE SMALLINT, + DROP COLUMN IF EXISTS "model", + DROP COLUMN IF EXISTS "embeddingsMetadataId"; + ` /* Do undo magic */) + await client.end() +} diff --git a/packages/server/utils/publish.ts b/packages/server/utils/publish.ts index b9c7defb2ff..80cfb0397fe 100644 --- a/packages/server/utils/publish.ts +++ b/packages/server/utils/publish.ts @@ -13,12 +13,14 @@ const publish = ( channel: string, type: string, payload: {[key: string]: any}, - subOptions: SubOptions = {} + subOptions: SubOptions = {}, + sendToSelf: boolean = true ) => { const subName = `${topic}Subscription` const rootValue = {[subName]: {fieldName: type, [type]: payload}} + const executorServerId = sendToSelf ? SERVER_ID! : undefined getPubSub() - .publish(`${topic}.${channel}`, {rootValue, executorServerId: SERVER_ID!, ...subOptions}) + .publish(`${topic}.${channel}`, {rootValue, executorServerId, ...subOptions}) .catch(Logger.error) } diff --git a/scripts/webpack/dev.servers.config.js b/scripts/webpack/dev.servers.config.js index 9632b40f66f..0d8e4db7e0e 100644 --- a/scripts/webpack/dev.servers.config.js +++ b/scripts/webpack/dev.servers.config.js @@ -29,7 +29,8 @@ module.exports = { entry: { web: [DOTENV, INIT_PUBLIC_PATH, path.join(SERVER_ROOT, 'server.ts')], embedder: [DOTENV, INIT_PUBLIC_PATH, path.join(EMBEDDER_ROOT, 'embedder.ts')], - gqlExecutor: [DOTENV, INIT_PUBLIC_PATH, path.join(GQL_ROOT, 'gqlExecutor.ts')] + gqlExecutor: [DOTENV, INIT_PUBLIC_PATH, path.join(GQL_ROOT, 'gqlExecutor.ts')], + debugEmbedder: [DOTENV, INIT_PUBLIC_PATH, path.join(EMBEDDER_ROOT, 'debug.ts')] }, output: { filename: '[name].js', diff --git a/yarn.lock b/yarn.lock index c5316801eb3..c01d5591da9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5595,15 +5595,15 @@ signal-exit "^3.0.3" tslib "1.9.3" -"@pm2/js-api@~0.6.7": - version "0.6.7" - resolved "https://registry.yarnpkg.com/@pm2/js-api/-/js-api-0.6.7.tgz#ed28c3b7b6d26f03f826318754fdc5468afa589f" - integrity sha512-jiJUhbdsK+5C4zhPZNnyA3wRI01dEc6a2GhcQ9qI38DyIk+S+C8iC3fGjcjUbt/viLYKPjlAaE+hcT2/JMQPXw== +"@pm2/js-api@~0.8.0": + version "0.8.0" + resolved "https://registry.yarnpkg.com/@pm2/js-api/-/js-api-0.8.0.tgz#d1b8aff562dd34befa3cb30fe28e08c9f9743abc" + integrity sha512-nmWzrA/BQZik3VBz+npRcNIu01kdBhWL0mxKmP1ciF/gTcujPTQqt027N9fc1pK9ERM8RipFhymw7RcmCyOEYA== dependencies: async "^2.6.3" - axios "^0.21.0" debug "~4.3.1" eventemitter2 "^6.3.1" + extrareqp2 "^1.0.0" ws "^7.0.0" "@pm2/pm2-version-check@latest": @@ -9104,7 +9104,7 @@ axios-retry@3.2.0: dependencies: is-retry-allowed "^1.1.0" -axios@0.21.4, axios@^0.21.0: +axios@0.21.4: version "0.21.4" resolved "https://registry.yarnpkg.com/axios/-/axios-0.21.4.tgz#c67b90dc0568e5c1cf2b0b858c43ba28e2eda575" integrity sha512-ut5vewkiu8jjGBdqpM44XxjuCjq9LAKeHVmoVfHVzy8eHgxxq8SbAVQNovDA8mVi05kP0Ea/n/UzcSHcTJQfNg== @@ -9742,9 +9742,9 @@ camelcase@^6.2.0: integrity sha512-Gmy6FhYlCY7uOElZUSbxo2UCDH8owEk996gkbrpsgGtrJLM3J7jGxl9Ic7Qwwj4ivOE5AWZWRMecDdF7hqGjFA== caniuse-lite@^1.0.30001426, caniuse-lite@^1.0.30001517, caniuse-lite@^1.0.30001580, caniuse-lite@~1.0.0: - version "1.0.30001603" - resolved "https://registry.yarnpkg.com/caniuse-lite/-/caniuse-lite-1.0.30001603.tgz#605046a5bdc95ba4a92496d67e062522dce43381" - integrity sha512-iL2iSS0eDILMb9n5yKQoTBim9jMZ0Yrk8g0N9K7UzYyWnfIKzXBZD5ngpM37ZcL/cv0Mli8XtVMRYMQAfFpi5Q== + version "1.0.30001605" + resolved "https://registry.yarnpkg.com/caniuse-lite/-/caniuse-lite-1.0.30001605.tgz#ca12d7330dd8bcb784557eb9aa64f0037870d9d6" + integrity sha512-nXwGlFWo34uliI9z3n6Qc0wZaf7zaZWA1CPZ169La5mV3I/gem7bst0vr5XQH5TJXZIMfDeZyOrZnSlVzKxxHQ== capital-case@^1.0.4: version "1.0.4" @@ -12030,6 +12030,13 @@ extract-files@^11.0.0: resolved "https://registry.yarnpkg.com/extract-files/-/extract-files-11.0.0.tgz#b72d428712f787eef1f5193aff8ab5351ca8469a" integrity sha512-FuoE1qtbJ4bBVvv94CC7s0oTnKUGvQs+Rjf1L2SJFfS+HTVVjhPFtehPdQ0JiGPqVNfSSZvL5yzHHQq2Z4WNhQ== +extrareqp2@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/extrareqp2/-/extrareqp2-1.0.0.tgz#aaf8ad1495d723f71276b0eab041c061aa21f035" + integrity sha512-Gum0g1QYb6wpPJCVypWP3bbIuaibcFiJcpuPM10YSXp/tzqi84x9PJageob+eN4xVRIOto4wjSGNLyMD54D2xA== + dependencies: + follow-redirects "^1.14.0" + faker@^5.5.3: version "5.5.3" resolved "https://registry.yarnpkg.com/faker/-/faker-5.5.3.tgz#c57974ee484431b25205c2c8dc09fda861e51e0e" @@ -16111,7 +16118,7 @@ ms@2.1.2: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009" integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== -ms@2.1.3, ms@^2.0.0, ms@^2.1.1: +ms@2.1.3, ms@^2.0.0, ms@^2.1.1, ms@^2.1.3: version "2.1.3" resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== @@ -17610,14 +17617,14 @@ pm2-sysmonit@^1.2.8: systeminformation "^5.7" tx2 "~1.0.4" -pm2@^5.3.0: - version "5.3.0" - resolved "https://registry.yarnpkg.com/pm2/-/pm2-5.3.0.tgz#06850810f77cd98495ae1c66fbdd028a8fb5899e" - integrity sha512-xscmQiAAf6ArVmKhjKTeeN8+Td7ZKnuZFFPw1DGkdFPR/0Iyx+m+1+OpCdf9+HQopX3VPc9/wqPQHqVOfHum9w== +pm2@^5.3.1: + version "5.3.1" + resolved "https://registry.yarnpkg.com/pm2/-/pm2-5.3.1.tgz#f4c1b1199aac2988e9079ca4f127adaa1a5d18ce" + integrity sha512-DLVQHpSR1EegaTaRH3KbRXxpPVaqYwAp3uHSCtCsS++LSErvk07WSxuUnntFblBRqNU/w2KQyqs12mSq5wurkg== dependencies: "@pm2/agent" "~2.0.0" "@pm2/io" "~5.0.0" - "@pm2/js-api" "~0.6.7" + "@pm2/js-api" "~0.8.0" "@pm2/pm2-version-check" latest async "~3.2.0" blessed "0.1.81"