-
-
Notifications
You must be signed in to change notification settings - Fork 656
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: allow schedulers to run in a single node (#6794)
## About the changes This PR provides a service that allows a scheduled function to run in a single instance. It's currently not in use but tests show how to wrap a function to make it single-instance: https://github.com/Unleash/unleash/blob/65b7080e0532ffdcbd8c2aa2593d0aff78e632fb/src/lib/features/scheduler/job-service.test.ts#L26-L32 The key `'test'` is used to identify the group and most likely should have the same name as the scheduled job. --------- Co-authored-by: Christopher Kolstad <chriswk@getunleash.io>
- Loading branch information
1 parent
00d3490
commit 0a2d40f
Showing
10 changed files
with
324 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import { createTestConfig } from '../../../test/config/test-config'; | ||
import { JobStore } from './job-store'; | ||
import { JobService } from './job-service'; | ||
import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init'; | ||
|
||
let db: ITestDb; | ||
let store: JobStore; | ||
const config = createTestConfig(); | ||
beforeAll(async () => { | ||
db = await dbInit('job_service_serial', config.getLogger); | ||
store = new JobStore(db.rawDatabase, config); | ||
}); | ||
|
||
afterEach(async () => { | ||
await store.deleteAll(); | ||
}); | ||
|
||
afterAll(async () => { | ||
await db.destroy(); | ||
}); | ||
|
||
// note: this might be flaky if the test runs exactly at 59 minutes and 59 seconds of an hour and 999 milliseconds but should be unlikely | ||
test('Only executes job once within time period', async () => { | ||
let counter = 0; | ||
const service = new JobService(store, config.getLogger); | ||
const job = service.singleInstance( | ||
'test', | ||
async () => { | ||
counter++; | ||
}, | ||
60, | ||
); | ||
await job(); | ||
await job(); | ||
expect(counter).toBe(1); | ||
const jobs = await store.getAll(); | ||
expect(jobs).toHaveLength(1); | ||
expect(jobs.every((j) => j.finishedAt !== null)).toBe(true); | ||
}); | ||
|
||
test('Will execute jobs with different keys', async () => { | ||
let counter = 0; | ||
let otherCounter = 0; | ||
const service = new JobService(store, config.getLogger); | ||
const incrementCounter = service.singleInstance( | ||
'one', | ||
async () => { | ||
counter++; | ||
}, | ||
60, | ||
); | ||
const incrementOtherCounter = service.singleInstance( | ||
'two', | ||
async () => { | ||
otherCounter++; | ||
}, | ||
60, | ||
); | ||
await incrementCounter(); | ||
await incrementCounter(); | ||
await incrementOtherCounter(); | ||
await incrementOtherCounter(); | ||
expect(counter).toBe(1); | ||
expect(otherCounter).toBe(1); | ||
const jobs = await store.getAll(); | ||
expect(jobs).toHaveLength(2); | ||
expect(jobs.every((j) => j.finishedAt !== null)).toBe(true); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import type { Logger } from '../../server-impl'; | ||
import type { JobStore } from './job-store'; | ||
import type { LogProvider } from '../../logger'; | ||
import { subMinutes } from 'date-fns'; | ||
|
||
export class JobService { | ||
private jobStore: JobStore; | ||
private logger: Logger; | ||
constructor(jobStore: JobStore, logProvider: LogProvider) { | ||
this.jobStore = jobStore; | ||
this.logger = logProvider('/services/job-service'); | ||
} | ||
|
||
/** | ||
* Wraps a function in a job that will guarantee the function is executed | ||
* in a mutually exclusive way in a single instance of the cluster, at most | ||
* once every {@param bucketSizeInMinutes}. | ||
* | ||
* The key identifies the job group: only one job in the group will execute | ||
* at any given time. | ||
* | ||
* Note: buckets begin at the start of the time span | ||
*/ | ||
public singleInstance( | ||
key: string, | ||
fn: (range?: { from: Date; to: Date }) => Promise<unknown>, | ||
bucketSizeInMinutes = 5, | ||
): () => Promise<unknown> { | ||
return async () => { | ||
const acquired = await this.jobStore.acquireBucket( | ||
key, | ||
bucketSizeInMinutes, | ||
); | ||
|
||
if (acquired) { | ||
const { name, bucket } = acquired; | ||
this.logger.info( | ||
`Acquired job lock for ${name} from >= ${subMinutes( | ||
bucket, | ||
bucketSizeInMinutes, | ||
)} to < ${bucket}`, | ||
); | ||
try { | ||
const range = { | ||
from: subMinutes(bucket, bucketSizeInMinutes), | ||
to: bucket, | ||
}; | ||
return fn(range); | ||
} finally { | ||
await this.jobStore.update(name, bucket, { | ||
stage: 'completed', | ||
finishedAt: new Date(), | ||
}); | ||
} | ||
} | ||
}; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import { createTestConfig } from '../../../test/config/test-config'; | ||
import { JobStore } from './job-store'; | ||
import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init'; | ||
|
||
let db: ITestDb; | ||
const config = createTestConfig(); | ||
beforeAll(async () => { | ||
db = await dbInit('job_store_serial', config.getLogger); | ||
}); | ||
|
||
afterAll(async () => { | ||
await db.destroy(); | ||
}); | ||
|
||
test('cannot acquireBucket twice', async () => { | ||
const store = new JobStore(db.rawDatabase, config); | ||
// note: this might be flaky if the test runs exactly at 59 minutes and 59 seconds of an hour and 999 milliseconds but should be unlikely | ||
const bucket = await store.acquireBucket('test', 60); | ||
expect(bucket).toBeDefined(); | ||
const bucket2 = await store.acquireBucket('test', 60); | ||
expect(bucket2).toBeUndefined(); | ||
}); | ||
|
||
test('Can acquire bucket for two different key names within the same period', async () => { | ||
const store = new JobStore(db.rawDatabase, config); | ||
const firstBucket = await store.acquireBucket('first', 60); | ||
const secondBucket = await store.acquireBucket('second', 60); | ||
expect(firstBucket).toBeDefined(); | ||
expect(secondBucket).toBeDefined(); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import type { Store } from '../../types/stores/store'; | ||
import type { Db, IUnleashConfig, Logger } from '../../server-impl'; | ||
import metricsHelper from '../../util/metrics-helper'; | ||
import { DB_TIME } from '../../metric-events'; | ||
import type { Row } from '../../db/crud/row-type'; | ||
import { defaultToRow } from '../../db/crud/default-mappings'; | ||
|
||
export type JobModel = { | ||
name: string; | ||
bucket: Date; | ||
stage: 'started' | 'completed' | 'failed'; | ||
finishedAt?: Date; | ||
}; | ||
|
||
const TABLE = 'jobs'; | ||
const toRow = (data: Partial<JobModel>) => | ||
defaultToRow<JobModel, Row<JobModel>>(data); | ||
|
||
export class JobStore | ||
implements Store<JobModel, { name: string; bucket: Date }> | ||
{ | ||
private logger: Logger; | ||
protected readonly timer: (action: string) => Function; | ||
private db: Db; | ||
|
||
constructor( | ||
db: Db, | ||
config: Pick<IUnleashConfig, 'eventBus' | 'getLogger'>, | ||
) { | ||
this.db = db; | ||
this.logger = config.getLogger('job-store'); | ||
this.timer = (action: string) => | ||
metricsHelper.wrapTimer(config.eventBus, DB_TIME, { | ||
store: TABLE, | ||
action, | ||
}); | ||
} | ||
|
||
async acquireBucket( | ||
key: string, | ||
bucketLengthInMinutes: number, | ||
): Promise<{ name: string; bucket: Date } | undefined> { | ||
const endTimer = this.timer('acquireBucket'); | ||
|
||
const bucket = await this.db<Row<JobModel>>(TABLE) | ||
.insert({ | ||
name: key, | ||
// note: date_floor_round is a custom function defined in the DB | ||
bucket: this.db.raw( | ||
`date_floor_round(now(), '${bucketLengthInMinutes} minutes')`, | ||
), | ||
stage: 'started', | ||
}) | ||
.onConflict(['name', 'bucket']) | ||
.ignore() | ||
.returning(['name', 'bucket']); | ||
|
||
endTimer(); | ||
return bucket[0]; | ||
} | ||
|
||
async update( | ||
name: string, | ||
bucket: Date, | ||
data: Partial<Omit<JobModel, 'name' | 'bucket'>>, | ||
): Promise<JobModel> { | ||
const rows = await this.db<Row<JobModel>>(TABLE) | ||
.update(toRow(data)) | ||
.where({ name, bucket }) | ||
.returning('*'); | ||
return rows[0]; | ||
} | ||
|
||
async get(pk: { name: string; bucket: Date }): Promise<JobModel> { | ||
const rows = await this.db(TABLE).where(pk); | ||
return rows[0]; | ||
} | ||
|
||
async getAll(query?: Object | undefined): Promise<JobModel[]> { | ||
if (query) { | ||
return this.db(TABLE).where(query); | ||
} | ||
return this.db(TABLE); | ||
} | ||
|
||
async exists(key: { name: string; bucket: Date }): Promise<boolean> { | ||
const result = await this.db.raw( | ||
`SELECT EXISTS(SELECT 1 FROM ${TABLE} WHERE name = ? AND bucket = ?) AS present`, | ||
[key.name, key.bucket], | ||
); | ||
const { present } = result.rows[0]; | ||
return present; | ||
} | ||
|
||
async delete(key: { name: string; bucket: Date }): Promise<void> { | ||
await this.db(TABLE).where(key).delete(); | ||
} | ||
|
||
async deleteAll(): Promise<void> { | ||
return this.db(TABLE).delete(); | ||
} | ||
|
||
destroy(): void {} | ||
|
||
async count(): Promise<number> { | ||
return this.db(TABLE) | ||
.count() | ||
.then((res) => Number(res[0].count)); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.