Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prepare embedder for Production #9517

Merged
merged 44 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d86eb2d
feat: support ad-hoc metadata inserts
mattkrick Mar 7, 2024
8628e3e
feat: triggers from metadata to queue
mattkrick Mar 8, 2024
285f69b
feat: move job queue to pg
mattkrick Mar 8, 2024
0b024dd
begin retry
mattkrick Mar 13, 2024
ae6126f
feat: support retries after queue goes dry
mattkrick Mar 13, 2024
9fb9c4f
chore: separate abstract models into their own files
mattkrick Mar 13, 2024
20f5d96
feat: support recursive text splitting for chunks
mattkrick Mar 14, 2024
eff8eb9
validate discussions to only include ended meetings
mattkrick Mar 18, 2024
7ec3d6a
support historical info
mattkrick Mar 18, 2024
49c3eae
Merge branch 'master' into feat/embedder1
mattkrick Mar 18, 2024
46dafac
feat: support listening to jobs from app
mattkrick Mar 19, 2024
47e83d9
feat: support calls from app
mattkrick Mar 19, 2024
7cee59e
merge master
mattkrick Mar 19, 2024
5c96c41
fix migration conflict
mattkrick Mar 19, 2024
49b349b
fix: import history
mattkrick Mar 19, 2024
bf9a379
fix: dataloader mem leak
mattkrick Mar 19, 2024
f7ccdc6
feat: handle stalled jobs
mattkrick Mar 19, 2024
b573083
self-review
mattkrick Mar 19, 2024
ba906e0
remove redlock for non-historical jobs
mattkrick Mar 19, 2024
10953b5
feat: clean comments
mattkrick Mar 19, 2024
7953939
remove pubsub
mattkrick Mar 19, 2024
c2b6753
fix lint errors
mattkrick Mar 19, 2024
8852ac2
fix lint
mattkrick Mar 19, 2024
2ba07c1
cast to any for CI
mattkrick Mar 19, 2024
7bcd083
build embeddings table in CI for the typings
mattkrick Mar 19, 2024
368b34c
codecheck after server starts
mattkrick Mar 19, 2024
1158523
use pgvector in CI
mattkrick Mar 19, 2024
ddfc151
POSTGRES_USE_PGVECTOR='true'
mattkrick Mar 19, 2024
07e0d12
lazy Kysely Codegen
mattkrick Mar 20, 2024
8360b95
fix: release-please and CI pgvector image bump
mattkrick Mar 20, 2024
e74754d
chore: rename files for clarity
mattkrick Mar 20, 2024
e7991d3
rename yaml to yml
mattkrick Mar 21, 2024
ef1d8c4
feat: support priority
mattkrick Mar 26, 2024
59b0647
feat: custom text splitter
mattkrick Mar 27, 2024
3faa442
feat: add language to metadata table
mattkrick Mar 27, 2024
2ab9f61
feat: support multiple workers
mattkrick Mar 27, 2024
7fc4136
feat: set workers via env var
mattkrick Mar 27, 2024
ea32be7
fix: handle shutdown and stalled jobs
mattkrick Mar 27, 2024
bc1cf74
update readme for parabol-ubi
mattkrick Mar 28, 2024
7d82b28
remove unused trigger logic
mattkrick Mar 28, 2024
b609f3e
turn on dev servers
mattkrick Mar 28, 2024
8679d2c
Merge branch 'master' into feat/embedder1
mattkrick Mar 28, 2024
0c80dd6
fix: rename migration
mattkrick Mar 28, 2024
826a044
Merge branch 'master' into feat/embedder1
mattkrick Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ services:
parabol-network:
text-embeddings-inference:
container_name: text-embeddings-inference
image: ghcr.io/huggingface/text-embeddings-inference:cpu-0.6
image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.1
command:
- "--model-id=llmrails/ember-v1"
platform: linux/x86_64
Expand Down
9 changes: 9 additions & 0 deletions packages/client/shared/gqlIds/EmbedderChannelId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export const EmbedderChannelId = {
join: (serverId: string) => `embedder:${serverId}`,
split: (id: string) => {
const [, serverId] = id.split(':')
return serverId
}
}

export default EmbedderChannelId
22 changes: 22 additions & 0 deletions packages/embedder/addEmbeddingsMetadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import RedisInstance from 'parabol-server/utils/RedisInstance'
import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRet'
import {EmbeddingObjectType, PubSubEmbedderMessage} from './embedder'

export const addEmbeddingsMetadata = async (
redis: RedisInstance,
{objectType, startAt, endAt}: PubSubEmbedderMessage
) => {
const ALL_OBJECT_TYPES: EmbeddingObjectType[] = ['retrospectiveDiscussionTopic']
const objectTypes = objectType ? [objectType] : ALL_OBJECT_TYPES

return Promise.all(
objectTypes.map((type) => {
switch (type) {
case 'retrospectiveDiscussionTopic':
return addEmbeddingsMetadataForRetrospectiveDiscussionTopic(redis, startAt, endAt)
default:
throw new Error(`Invalid object type: ${type}`)
}
})
)
}
83 changes: 83 additions & 0 deletions packages/embedder/addEmbeddingsMetadataForRet.ts
mattkrick marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import ms from 'ms'
import getRethink from 'parabol-server/database/rethinkDriver'
import getKysely from 'parabol-server/postgres/getKysely'
import RedisInstance from 'parabol-server/utils/RedisInstance'
import Redlock from 'redlock'

const insertDiscussionsIntoMetadata = async (
discussions: {id: string; teamId: string; createdAt: Date}[]
) => {
const pg = getKysely()
if (discussions.length === 0) return
const metadataRows = discussions.map(({id, teamId, createdAt}) => ({
refId: id,
objectType: 'retrospectiveDiscussionTopic' as const,
teamId,
// this is technically when the discussion was created. Discussions are mutable.
// The best solution would be a date range of min(commentUpdatedAt) to max(commentUpdatedAt)
refUpdatedAt: createdAt
}))

const PG_MAX_PARAMS = 65535
const metadataColParams = Object.keys(metadataRows[0]).length
const metadataBatchSize = Math.trunc(PG_MAX_PARAMS / metadataColParams)
const insertBatches = Array.from(
{length: Math.ceil(metadataRows.length / metadataBatchSize)},
(v, i) => metadataRows.slice(i * metadataBatchSize, i * metadataBatchSize + metadataBatchSize)
)
return Promise.all(
insertBatches.map((batch) => {
return pg
.insertInto('EmbeddingsMetadata')
.values(batch)
.onConflict((oc) => oc.doNothing())
.execute()
})
)
}

export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async (
redis: RedisInstance,
startAt: Date | undefined,
endAt: Date | undefined
) => {
const redlock = new Redlock([redis], {retryCount: 0})
try {
await redlock.acquire([`embedder_metadata_retrospectiveDiscussionTopic`], ms('10m'))
} catch {
// lock not acquired, another worker must be doing the job. abort
return
}
// load up the metadata table will all discussion topics that are a part of meetings ended within the given date range

const r = await getRethink()
const pg = getKysely()
const BATCH_SIZE = 1000
const rStartAt = startAt || r.minval
const rEndAt = endAt || r.maxval

let curStartAt = rStartAt
for (let i = 0; i < 1e6; i++) {
const endedMeetings = await r
.table('NewMeeting')
.between(curStartAt, rEndAt, {index: 'endedAt'})
.orderBy({index: 'endedAt'})
.filter({meetingType: 'retrospective'})
.limit(BATCH_SIZE)
.pluck('id', 'endedAt')
.run()
if (endedMeetings.length === 0) break
const endedMeetingIds = endedMeetings.map(({id}) => id!)
const endedMeetingDiscussions = await pg
.selectFrom('Discussion')
.select(['id', 'teamId', 'createdAt'])
.where('meetingId', 'in', endedMeetingIds)
.execute()
await insertDiscussionsIntoMetadata(endedMeetingDiscussions)

// assumes that fewer than BATCH_SIZE meetings share the same endedAt value.
// If this is not safe, we need to index on `endedAt + id`
const lastMeeting = endedMeetings[endedMeetings.length - 1]
curStartAt = lastMeeting.endedAt
}
}
69 changes: 68 additions & 1 deletion packages/embedder/ai_models/AbstractModel.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import {sql} from 'kysely'
import getKysely from 'parabol-server/postgres/getKysely'
import {DB} from 'parabol-server/postgres/pg'

export interface ModelConfig {
model: string
url: string
Expand Down Expand Up @@ -30,6 +34,8 @@ export interface EmbeddingModelParams {
tableSuffix: string
}

export type EmbeddingsTable = Extract<keyof DB, `Embeddings_${string}`>

export abstract class AbstractEmbeddingsModel extends AbstractModel {
readonly embeddingDimensions: number
readonly maxInputTokens: number
Expand All @@ -42,7 +48,68 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel {
this.tableName = `Embeddings_${modelParams.tableSuffix}`
}
protected abstract constructModelParams(config: EmbeddingModelConfig): EmbeddingModelParams
abstract getEmbedding(content: string): Promise<number[]>
abstract getEmbedding(content: string): Promise<number[] | Error>

abstract getTokens(content: string): Promise<number[] | Error>

async createTable() {
const pg = getKysely()
const hasTable =
(
await sql<number[]>`SELECT 1 FROM ${sql.id('pg_catalog', 'pg_tables')} WHERE ${sql.id(
'tablename'
)} = ${this.tableName}`.execute(pg)
).rows.length > 0
if (hasTable) return undefined
const vectorDimensions = this.embeddingDimensions
console.log(`ModelManager: creating ${this.tableName} with ${vectorDimensions} dimensions`)
await sql`
DO $$
BEGIN
CREATE TABLE IF NOT EXISTS ${sql.id(this.tableName)} (
"id" INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
"embedText" TEXT,
"embedding" vector(${sql.raw(vectorDimensions.toString())}),
"embeddingsMetadataId" INTEGER UNIQUE NOT NULL,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jordanh safe to add the unique constraint here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that is ok. It would imply that if we ever have multiple versions of the same embedding models, they'll need to live on their own table which I think is what we'd want

FOREIGN KEY ("embeddingsMetadataId")
REFERENCES "EmbeddingsMetadata"("id")
ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS "idx_${sql.raw(this.tableName)}_embedding_vector_cosign_ops"
ON ${sql.id(this.tableName)}
USING hnsw ("embedding" vector_cosine_ops);
END
$$;
CREATE OR REPLACE FUNCTION insert_metadata_in_queue_${sql.raw(this.tableName)} ()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO public."EmbeddingsJobQueue" ("model", "embeddingsMetadataId")
VALUES ('${sql.raw(this.tableName)}', NEW."embeddingsMetadataId");
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS "embeddings_metadata_to_queue_${sql.raw(
this.tableName
)}" on "EmbeddingsMetadata";

CREATE TRIGGER "embeddings_metadata_to_queue_${sql.raw(this.tableName)}"

AFTER INSERT ON "EmbeddingsMetadata"
FOR EACH ROW
EXECUTE PROCEDURE insert_metadata_in_queue_${sql.raw(this.tableName)}();
`.execute(pg)

console.log(
`ModelManager: Queueing EmbeddingsMetadata into EmbeddingsJobQue for ${this.tableName}`
)
await sql`
INSERT INTO "EmbeddingsJobQueue" ("model", "embeddingsMetadataId")
SELECT '${sql.raw(this.tableName)}', "embeddingsMetadataId"
FROM "EmbeddingsMetadata"
ON CONFLICT DO NOTHING;
`.execute(pg)
}
}

export interface GenerationModelParams {
Expand Down
67 changes: 33 additions & 34 deletions packages/embedder/ai_models/ModelManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {Kysely, sql} from 'kysely'
import {sql} from 'kysely'

import getKysely from 'parabol-server/postgres/getKysely'
import {
AbstractEmbeddingsModel,
AbstractGenerationModel,
Expand Down Expand Up @@ -93,39 +94,37 @@ export class ModelManager {
})
}

async maybeCreateTables(pg: Kysely<any>) {
const maybePromises = this.embeddingModels.map(async (embeddingsModel) => {
const tableName = embeddingsModel.tableName
const hasTable =
(
await sql<number[]>`SELECT 1 FROM ${sql.id('pg_catalog', 'pg_tables')} WHERE ${sql.id(
'tablename'
)} = ${tableName}`.execute(pg)
).rows.length > 0
if (hasTable) return undefined
const vectorDimensions = embeddingsModel.embeddingDimensions
console.log(`ModelManager: creating ${tableName} with ${vectorDimensions} dimensions`)
const query = sql`
DO $$
BEGIN
CREATE TABLE IF NOT EXISTS ${sql.id(tableName)} (
"id" INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
"embedText" TEXT,
"embedding" vector(${sql.raw(vectorDimensions.toString())}),
"embeddingsMetadataId" INTEGER NOT NULL,
FOREIGN KEY ("embeddingsMetadataId")
REFERENCES "EmbeddingsMetadata"("id")
ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS "idx_${sql.raw(tableName)}_embedding_vector_cosign_ops"
ON ${sql.id(tableName)}
USING hnsw ("embedding" vector_cosine_ops);
END $$;

`
return query.execute(pg)
})
Promise.all(maybePromises)
async maybeCreateTables() {
return Promise.all(this.embeddingModels.map((model) => model.createTable()))
}
/*
Once a model is no longer used, don't schedule work for it in the job queue
*/
async removeOldTriggers() {
const pg = getKysely()
const prefix = 'embeddings_metadata_to_queue_'
const triggers = await pg
.selectFrom('information_schema.triggers' as any)
.select('trigger_name')
.where('event_object_table', '=', 'EmbeddingsMetadata')
.where('trigger_name', 'like', `${prefix}%`)
.execute()
return Promise.all(
triggers.map(async ({trigger_name}) => {
// pgadmin lowercases triggers but PG doesn't. Lowercase it all just to be safe
const lowercaseTableName = trigger_name.slice(prefix.length).toLowerCase()
const isModelUsed = this.embeddingModels.some(
(model) => model.tableName.toLowerCase() === lowercaseTableName
)
if (isModelUsed) return
await sql`
DROP TRIGGER IF EXISTS ${sql.id(trigger_name)} on "EmbeddingsMetadata";
DROP FUNCTION IF EXISTS insert_metadata_in_queue_${sql.raw(lowercaseTableName)};`.execute(
pg
)
console.log(`Removed old trigger: ${trigger_name}`)
})
)
}
}

Expand Down
33 changes: 26 additions & 7 deletions packages/embedder/ai_models/TextEmbeddingsInference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ export class TextEmbeddingsInference extends AbstractEmbeddingsModel {
super(config)
}

async getTokens(content: string) {
const fetchOptions = {
body: JSON.stringify({inputs: content}),
deadline: new Date(new Date().getTime() + MAX_REQUEST_TIME_S * 1000),
headers: {
Accept: 'application/json',
'Content-Type': 'application/json; charset=utf-8'
},
method: 'POST'
}

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+2

This is very nice. It's so cool to see that we're riding the edge of the hugging face container development and using these endpoints as they ship them.

const res = await fetchWithRetry(`${this.url}/tokenize`, fetchOptions)
const listOfTokens = (await res.json()) as number[][]
if (!listOfTokens) return new Error('listOfTokens is undefined')
if (listOfTokens.length !== 1 || !listOfTokens[0])
return new Error(`listOfTokens list length !== 1 (length: ${listOfTokens.length})`)
return listOfTokens[0]
} catch (e) {
return e instanceof Error ? e : new Error(e)
}
}
public async getEmbedding(content: string) {
const fetchOptions = {
body: JSON.stringify({inputs: content}),
Expand All @@ -40,17 +62,14 @@ export class TextEmbeddingsInference extends AbstractEmbeddingsModel {

try {
const res = await fetchWithRetry(`${this.url}/embed`, fetchOptions)
const listOfVectors = (await res.json()) as Array<number[]>
if (!listOfVectors)
throw new Error('TextEmbeddingsInference.getEmbeddings(): listOfVectors is undefined')
const listOfVectors = (await res.json()) as number[][]
if (!listOfVectors) return new Error('listOfVectors is undefined')
if (listOfVectors.length !== 1 || !listOfVectors[0])
throw new Error(
`TextEmbeddingsInference.getEmbeddings(): listOfVectors list length !== 1 (length: ${listOfVectors.length})`
)
return new Error(`listOfVectors list length !== 1 (length: ${listOfVectors.length})`)
return listOfVectors[0]
} catch (e) {
console.log(`TextEmbeddingsInference.getEmbeddings() timeout: `, e)
throw e
return e instanceof Error ? e : new Error(e || 'Unknown Error')
}
}

Expand Down