Skip to content

Commit

Permalink
fix: remove logs from embedder (#9718)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <matt.krick@gmail.com>
  • Loading branch information
mattkrick committed May 7, 2024
1 parent 695ccad commit 93b26bb
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 21 deletions.
2 changes: 0 additions & 2 deletions packages/embedder/EmbeddingsJobQueueStream.ts
Expand Up @@ -3,7 +3,6 @@ import ms from 'ms'
import sleep from 'parabol-client/utils/sleep'
import 'parabol-server/initSentry'
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from '../server/utils/Logger'
import {WorkflowOrchestrator} from './WorkflowOrchestrator'
import {DBJob} from './custom'

Expand Down Expand Up @@ -53,7 +52,6 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
try {
const job = (await getJob(false)) || (await getJob(true))
if (!job) {
Logger.log('JobQueueStream: no jobs found')
// queue is empty, so sleep for a while
await sleep(ms('10s'))
return this.next()
Expand Down
10 changes: 7 additions & 3 deletions packages/embedder/JobQueueError.ts
Expand Up @@ -5,15 +5,19 @@ export class JobQueueError extends Error {
jobData?: Record<string, any>

constructor(
message: string,
message: string | Error,
retryDelay?: number,
maxRetries?: number,
jobData?: Record<string, any>
) {
super(message)
this.message = message
const strMessage = message instanceof Error ? message.message : message
super(strMessage)
this.message = strMessage
this.retryDelay = retryDelay
this.maxRetries = maxRetries
this.jobData = jobData
if (message instanceof Error) {
this.stack = message.stack
}
}
}
2 changes: 0 additions & 2 deletions packages/embedder/WorkflowOrchestrator.ts
@@ -1,7 +1,6 @@
import {sql} from 'kysely'
import RootDataLoader from 'parabol-server/dataloader/RootDataLoader'
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from '../server/utils/Logger'
import {EmbedderJobType} from './EmbedderJobType'
import {JobQueueError} from './JobQueueError'
import {DBJob, JobType, Workflow} from './custom'
Expand Down Expand Up @@ -37,7 +36,6 @@ export class WorkflowOrchestrator {
}

private failJob = async (jobId: number, retryCount: number, error: JobQueueError) => {
Logger.error(error)
const pg = getKysely()
const {message, retryDelay, jobData} = error
const maxRetries = error.maxRetries ?? 10
Expand Down
1 change: 1 addition & 0 deletions packages/embedder/ai_models/TextEmbeddingsInference.ts
Expand Up @@ -57,6 +57,7 @@ export class TextEmbeddingsInference extends AbstractEmbeddingsModel {
this.client = client
}
async getTokens(content: string) {
if (!content) return []
const {data, error} = await this.client.POST('/tokenize', {
body: {add_special_tokens: true, inputs: content}
})
Expand Down
1 change: 0 additions & 1 deletion packages/embedder/embedder.ts
Expand Up @@ -62,7 +62,6 @@ const run = async () => {
const counter = logPerformance(10, 6)

for await (const _ of streams) {
// Logger.log(`Worker ${idx} finished job ${message.id}`)
counter.i++
}

Expand Down
13 changes: 7 additions & 6 deletions packages/embedder/indexing/retrospectiveDiscussionTopic.ts
Expand Up @@ -46,7 +46,8 @@ async function formatThread(
? 'Anonymous'
: await getPreferredNameByUserId(comment.createdBy, dataLoader)
const how = depth === 0 ? 'wrote' : 'replied'
const content = comment.plaintextContent.slice(0, MAX_TEXT_LENGTH)
const content = comment.plaintextContent?.slice(0, MAX_TEXT_LENGTH)
if (!content) return ''
const formattedPost = `${indent}- ${author} ${how}, "${content}"\n`

// Recursively format child threads
Expand Down Expand Up @@ -98,14 +99,14 @@ export const createTextFromRetrospectiveDiscussionTopic = async (
if (prompt.description) markdown += `: ${prompt.description}`
markdown += `".\n`
}
for (const reflection of reflections.filter((r) => r.promptId === prompt.id)) {
const matchingReflections = reflections.filter((r) => r.promptId === prompt.id)
for (const reflection of matchingReflections) {
const content = reflection.plaintextContent?.slice(0, MAX_TEXT_LENGTH)
if (!content) continue
const author = newMeeting.disableAnonymity
? await getPreferredNameByUserId(reflection.creatorId, dataLoader)
: 'Anonymous'
markdown += ` - ${author} wrote, "${reflection.plaintextContent.slice(
0,
MAX_TEXT_LENGTH
)}"\n`
markdown += ` - ${author} wrote, "${content}"\n`
}
markdown += `\n`
}
Expand Down
7 changes: 6 additions & 1 deletion packages/embedder/logPerformance.ts
Expand Up @@ -6,9 +6,14 @@ export const logPerformance = (logEvery: number, resetEvery: number) => {
const counter = {i: 0}
let start = performance.now()
let logs = 0
let lastJobs = 0
setInterval(() => {
const duration = performance.now() - start
Logger.log(`Jobs per second: ${Math.round((counter.i / duration) * 1000)}`)
const jobs = Math.round((counter.i / duration) * 1000)
if (jobs !== lastJobs) {
Logger.log(`Jobs per second: ${jobs}`)
}
lastJobs = jobs
if (++logs >= resetEvery) {
counter.i = 0
logs = 0
Expand Down
4 changes: 1 addition & 3 deletions packages/embedder/workflows/embedMetadata.ts
Expand Up @@ -36,9 +36,7 @@ export const embedMetadata: JobQueueStepRun<
.where('id', '=', embeddingsMetadataId)
.execute()
} catch (e) {
// get the trace since the error message may be unobvious
console.trace(e)
return new JobQueueError(`unable to create embedding text: ${e}`, undefined, 0, {
return new JobQueueError(e as Error, undefined, 0, {
forceBuildText: true
})
}
Expand Down
4 changes: 1 addition & 3 deletions packages/embedder/workflows/rerankRetroTopics.ts
Expand Up @@ -24,9 +24,7 @@ export const rerankRetroTopics: JobQueueStepRun<{
const {body} = await createEmbeddingTextFrom(metadata, dataLoader, true)
rerankText = body
} catch (e) {
// get the trace since the error message may be unobvious
console.trace(e)
return new JobQueueError(`unable to create embedding text: ${e}`)
return new JobQueueError(e as Error)
}

const modelManager = getModelManager()
Expand Down

0 comments on commit 93b26bb

Please sign in to comment.