Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Clean up Jobqueues, minor fixes for S3 Queue #451

Merged
merged 5 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 4 additions & 33 deletions src/main/job-queues/concurrent/graphile-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,18 @@ import { Pool } from 'pg'
import { EnqueuedJob, JobQueue, OnJobCallback, PluginsServerConfig } from '../../../types'
import { status } from '../../../utils/status'
import { createPostgresPool } from '../../../utils/utils'
import { JobQueueBase } from '../job-queue-base'

export class GraphileQueue implements JobQueue {
export class GraphileQueue extends JobQueueBase {
serverConfig: PluginsServerConfig
started: boolean
paused: boolean
onJob: OnJobCallback | null
runner: Runner | null
consumerPool: Pool | null
producerPool: Pool | null
workerUtilsPromise: Promise<WorkerUtils> | null

constructor(serverConfig: PluginsServerConfig) {
super()
this.serverConfig = serverConfig
this.started = false
this.paused = false
this.onJob = null
this.runner = null
this.consumerPool = null
this.producerPool = null
Expand Down Expand Up @@ -68,32 +64,7 @@ export class GraphileQueue implements JobQueue {

// consumer

async startConsumer(onJob: OnJobCallback): Promise<void> {
this.started = true
this.onJob = onJob
await this.syncState()
}

async stopConsumer(): Promise<void> {
this.started = false
await this.syncState()
}

async pauseConsumer(): Promise<void> {
this.paused = true
await this.syncState()
}

isConsumerPaused(): boolean {
return this.paused
}

async resumeConsumer(): Promise<void> {
this.paused = false
await this.syncState()
}

private async syncState(): Promise<void> {
protected async syncState(): Promise<void> {
if (this.started && !this.paused) {
if (!this.runner) {
this.consumerPool = await this.createPool()
Expand Down
12 changes: 8 additions & 4 deletions src/main/job-queues/job-queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ export class JobQueueBase implements JobQueue {
startConsumer(onJob: OnJobCallback): void
// eslint-disable-next-line @typescript-eslint/require-await
async startConsumer(onJob: OnJobCallback): Promise<void> {
this.started = true
this.onJob = onJob
await this.syncState()
if (!this.started) {
this.started = true
await this.syncState()
}
}

stopConsumer(): void
Expand All @@ -62,8 +64,10 @@ export class JobQueueBase implements JobQueue {
resumeConsumer(): void
// eslint-disable-next-line @typescript-eslint/require-await
async resumeConsumer(): Promise<void> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resume is called on drain events emitted by Piscina, which results in multiple calls to sync state. This is okay, as long as dequeue code is idempotent (which it isn't, thus leading to race conditions).

We didn't face this with graphile because the runner object is unchanged over multiple sync state calls.

this.paused = false
await this.syncState()
if (this.paused) {
this.paused = false
await this.syncState()
}
}

// eslint-disable-next-line @typescript-eslint/require-await
Expand Down
18 changes: 2 additions & 16 deletions src/main/job-queues/redlocked/s3-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ import { S3Wrapper } from '../../../utils/db/s3-wrapper'
import { UUIDT } from '../../../utils/utils'
import { JobQueueBase } from '../job-queue-base'

const S3_POLL_INTERVAL = 5000
const S3_POLL_INTERVAL = 5

export class S3Queue extends JobQueueBase {
serverConfig: PluginsServerConfig
s3Wrapper: S3Wrapper | null
runner: NodeJS.Timeout | null

constructor(serverConfig: PluginsServerConfig) {
super()
this.serverConfig = serverConfig
this.s3Wrapper = null
this.runner = null
this.intervalSeconds = S3_POLL_INTERVAL
}

// producer
Expand Down Expand Up @@ -50,19 +49,6 @@ export class S3Queue extends JobQueueBase {

// consumer

protected async syncState(): Promise<void> {
if (this.started && !this.paused) {
if (!this.runner) {
await this.connectS3()
this.runner = setTimeout(() => this.readState(), S3_POLL_INTERVAL)
}
} else {
if (this.runner) {
clearTimeout(this.runner)
}
}
}

async readState(): Promise<boolean> {
if (!this.s3Wrapper) {
throw new Error('S3 object not initialized')
Expand Down
63 changes: 63 additions & 0 deletions tests/jobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,30 @@ describe('job queues', () => {
await waitForLogEntries(2)
expect(testConsole.read()).toEqual([['processEvent'], ['reply', 'runIn']])
})

test('polls for jobs in future', async () => {
const DELAY = 3000 // 3s

// return something to be picked up after a few loops (poll interval is 100ms)
const now = Date.now()

const job: EnqueuedJob = {
type: 'pluginJob',
payload: { key: 'value' },
timestamp: now + DELAY,
pluginConfigId: 2,
pluginConfigTeam: 3,
}

server.hub.jobQueueManager.enqueue(job)
const consumedJob: EnqueuedJob = await new Promise((resolve, reject) => {
server.hub.jobQueueManager.startConsumer((consumedJob) => {
resolve(consumedJob[0])
})
})

expect(consumedJob).toEqual(job)
})
})

describe('connection', () => {
Expand Down Expand Up @@ -284,5 +308,44 @@ describe('job queues', () => {
Key: `prefix/2020-01-01/20200101-123456.123Z-deadbeef.json.gz`,
})
})

test('polls for new jobs', async () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests would fail after 60s (jest timeout) if there's no polling.

const DELAY = 10000 // 10s
// calls the right functions to read the enqueued job
mS3WrapperInstance.mockClear()

// return something to be picked up after a few loops (poll interval is 5s)
const now = Date.now()
const date = new Date(now + DELAY).toISOString()
const [day, time] = date.split('T')
const dayTime = `${day.split('-').join('')}-${time.split(':').join('')}`

const job: EnqueuedJob = {
type: 'pluginJob',
payload: { key: 'value' },
timestamp: now,
pluginConfigId: 2,
pluginConfigTeam: 3,
}

mS3WrapperInstance.listObjectsV2.mockReturnValue({
Contents: [{ Key: `prefix/${day}/${dayTime}-deadbeef.json.gz` }],
})
mS3WrapperInstance.getObject.mockReturnValueOnce({
Body: gzipSync(Buffer.from(JSON.stringify(job), 'utf8')),
})

const consumedJob: EnqueuedJob = await new Promise((resolve, reject) => {
hub.jobQueueManager.startConsumer((consumedJob) => {
resolve(consumedJob[0])
})
})
expect(consumedJob).toEqual(job)
await delay(10)
expect(mS3WrapperInstance.deleteObject).toBeCalledWith({
Bucket: 'bucket-name',
Key: `prefix/${day}/${dayTime}-deadbeef.json.gz`,
})
})
})
})