Skip to content

Commit

Permalink
Queue defaults to InMemoryLogger + add basic logging test
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheraff committed Jul 13, 2024
1 parent 7849206 commit 20f3145
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 40 deletions.
3 changes: 2 additions & 1 deletion lib/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,8 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
})
}

logger[loggerSystem]({ runs, event: 'run' })

try {
const runController = new AbortController()
const utils = {
Expand All @@ -631,7 +633,6 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
])
: maybePromise

logger[loggerSystem]({ runs, event: 'run' })
promises.push(new Promise<Data>(resolve =>
registrationContext.recordStep(
task,
Expand Down
143 changes: 106 additions & 37 deletions lib/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@ export interface LogReader {
): Promise<void>
}

// TODO: `child` could be improved so that it does accumulate the payload
export class ConsoleLogger implements Logger {
child() { return this }
[system] = () => { }
info = console.log
warn = console.warn
error = console.error
}

export class PinoLogger implements Logger {
#pino: pino.Logger<"system">

Expand Down Expand Up @@ -123,37 +114,115 @@ export class PinoReader implements LogReader {
crlfDelay: Infinity,
})

const filter = `"queue":"${query.queue}","job":"${query.job}","input":${JSON.stringify(query.input)},`

const innerOnLine = (line: string) => {
const index = line.indexOf(filter)
if (index === -1) return

const raw = JSON.parse(line) as {
time: number
queue: string
job: string
input: string
key: string
runs: number
payload?: object
message?: string
level: number
}

onLine({
queue: raw.queue,
key: raw.key,
created_at: raw.time / 1000,
input: raw.input,
fromLogger: true,
system: (raw.level === Number.MAX_SAFE_INTEGER) as never,
payload: raw.payload ?? raw.message ?? '',
})
}
const innerOnLine = lineFilter(query, onLine)

rl.on('line', innerOnLine)
await once(rl, 'close')
rl.off('line', innerOnLine)
}
}

function lineFilter(query: { queue: string, job: string, input: string }, onLine: (line: Log) => void) {
const filter = `"queue":"${query.queue}","job":"${query.job}","input":${JSON.stringify(query.input)},`

return (line: string) => {
const index = line.indexOf(filter)
if (index === -1) return

const raw = JSON.parse(line) as {
time: number
queue: string
job: string
input: string
key: string
runs: number
payload?: object
message?: string
level: number
}

onLine({
queue: raw.queue,
key: raw.key,
created_at: raw.time / 1000,
input: raw.input,
fromLogger: true,
system: (raw.level === Number.MAX_SAFE_INTEGER) as never,
payload: raw.payload ?? raw.message ?? '',
})
}
}

export class InMemoryLogger implements Logger, LogReader {
#acc: object
#logs: string[]

constructor(acc: object = {}, logs: string[] = []) {
this.#acc = acc
this.#logs = logs
}

#push(log: object) {
this.#logs.push(JSON.stringify(log))
}

child(acc: object) {
return new InMemoryLogger({ ...this.#acc, ...acc }, this.#logs)
}


[system](payload: SystemPayload) {
this.#push({
level: Number.MAX_SAFE_INTEGER,
time: Date.now(),
...this.#acc,
payload,
})
}

#base(payload: string | object) {
if (typeof payload === 'string') {
this.#push({
level: 30,
time: Date.now(),
...this.#acc,
message: payload,
})
} else {
this.#push({
level: 30,
time: Date.now(),
...this.#acc,
payload,
})
}
}

info(payload: object | string) {
this.#base(payload)
console.log(payload)
}
warn(payload: object | string) {
this.#base(payload)
console.warn(payload)
}
error(payload: object | string) {
this.#base(payload)
console.error(payload)
}

async get(
query: {
queue: string,
job: string,
input: string
},
onLine: (line: Log) => void
) {
const innerOnLine = lineFilter(query, onLine)

for (const line of this.#logs) {
innerOnLine(line)
}
}
}
4 changes: 2 additions & 2 deletions lib/src/queue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Pipe } from "./pipe"
import { exec, Job } from "./job"
import type { Storage } from "./storage"
import { type Logger, ConsoleLogger } from "./logger"
import { type Logger, InMemoryLogger } from "./logger"
import { registration, type RegistrationContext } from "./context"
import { isPromise, serializeError } from "./utils"

Expand Down Expand Up @@ -46,7 +46,7 @@ export class Queue<
this.id = opts.id
this.parallel = Math.max(1, opts.parallel ?? Infinity)
this.storage = opts.storage
this.logger = opts.logger ?? new ConsoleLogger()
this.logger = opts.logger ?? new InMemoryLogger()

this.jobs = Object.fromEntries(Object.entries(opts.jobs).map(([id, job]) => [
id,
Expand Down
75 changes: 75 additions & 0 deletions lib/tests/logger.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import test from "node:test"
import { Job, Queue, SQLiteStorage } from "../src"
import { z } from "zod"
import assert from "assert"
import { invoke } from "./utils"
import { InMemoryLogger, type Log } from "../src/logger"

test('logger foo', { timeout: 500 }, async (t) => {
const aaa = new Job({
id: 'aaa',
input: z.object({ a: z.number() }),
output: z.object({ b: z.number() }),
}, async (input) => {
let next = input.a
next = await Job.run('add-one', async ({ logger }) => {
logger.info('adding one')
return next + 1
})
return { b: next }
})

const logger = new InMemoryLogger()

const queue = new Queue({
id: 'basic',
jobs: { aaa },
storage: new SQLiteStorage(),
logger: logger
})

for (let i = 0; i < 5; i++) {
const result = await invoke(queue.jobs.aaa, { a: i })
assert.deepEqual(result, { b: i + 1 })
}

{
const logs: Log[] = []
await logger.get({ queue: queue.id, job: aaa.id, input: JSON.stringify({ a: 0 }) }, (line) => logs.push(line))

assert.deepEqual(logs.map(l => `${l.key} - ${l.system ? l.payload.event : 'log'}`), [
'step/aaa/system/parse-input#0 - run',
'step/aaa/system/parse-input#0 - success',
'step/aaa/user/add-one#0 - run',
'step/aaa/user/add-one#0 - log',
'step/aaa/user/add-one#0 - success',
'step/aaa/system/parse-output#0 - run',
'step/aaa/system/parse-output#0 - success'
])

assert.deepEqual(logs.filter(l => !l.system)[0]!.payload, 'adding one')

assert.deepEqual(logs.at(-1)?.payload, {
data: {
b: 1
},
event: 'success',
runs: 1
})
}

{
const logs: Log[] = []
await logger.get({ queue: queue.id, job: aaa.id, input: JSON.stringify({ a: 1 }) }, (line) => logs.push(line))
assert.equal(logs.length, 7)
}

{
const logs: Log[] = []
await logger.get({ queue: queue.id, job: aaa.id, input: JSON.stringify({ a: 5 }) }, (line) => logs.push(line))
assert.equal(logs.length, 0)
}

await queue.close()

})

0 comments on commit 20f3145

Please sign in to comment.