Skip to content

Commit

Permalink
docs
Browse files Browse the repository at this point in the history
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
  • Loading branch information
freben committed Nov 11, 2021
1 parent 751317d commit d7c1e0e
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/happy-rice-tickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@backstage/backend-common': patch
---

Add support for distributed mutexes and scheduled tasks, through the `TaskManager` class. This class can be particularly useful for coordinating things across many deployed instances of a given backend plugin. An example of this is catalog entity providers - with this facility you can register tasks similar to a cron job, and make sure that only one host at a time tries to execute the job, and that the timing (call frequency, timeouts etc) are retained as a global concern, letting you scale your workload safely without affecting the task behavior.
3 changes: 3 additions & 0 deletions .github/styles/vocab.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ configmaps
configs
const
cookiecutter
cron
css
Datadog
dataflow
Expand Down Expand Up @@ -162,6 +163,8 @@ Monorepo
monorepos
msgraph
msw
mutex
mutexes
mysql
namespace
namespaced
Expand Down
29 changes: 16 additions & 13 deletions packages/backend-common/api-report.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ export function loadBackendConfig(options: {
argv: string[];
}): Promise<Config>;

// @public
export interface LockOptions {
timeout: Duration;
}

// @public
export function notFoundHandler(): RequestHandler;

Expand All @@ -405,30 +410,22 @@ export type PluginEndpointDiscovery = {

// @public
export interface PluginTaskManager {
// (undocumented)
acquireLock(
id: string,
options: {
timeout: Duration;
},
options: LockOptions,
): Promise<
| {
acquired: false;
}
| {
acquired: true;
release: () => void | Promise<void>;
release(): Promise<void>;
}
>;
// (undocumented)
scheduleTask(
id: string,
options: {
timeout: Duration;
frequency: Duration;
initialDelay?: Duration;
},
fn: () => Promise<void>,
options: TaskOptions,
fn: () => void | Promise<void>,
): Promise<{
unschedule: () => Promise<void>;
}>;
Expand Down Expand Up @@ -614,7 +611,6 @@ export interface StatusCheckHandlerOptions {
// @public
export class TaskManager {
constructor(databaseManager: DatabaseManager, logger: Logger_2);
// (undocumented)
forPlugin(pluginId: string): PluginTaskManager;
// (undocumented)
static fromConfig(
Expand All @@ -626,6 +622,13 @@ export class TaskManager {
): TaskManager;
}

// @public
export interface TaskOptions {
frequency?: Duration;
initialDelay?: Duration;
timeout?: Duration;
}

// @public
export type UrlReader = {
read(url: string): Promise<Buffer>;
Expand Down
1 change: 1 addition & 0 deletions packages/backend-common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
"@types/concat-stream": "^1.6.0",
"@types/fs-extra": "^9.0.3",
"@types/http-errors": "^1.6.3",
"@types/luxon": "^2.0.4",
"@types/minimist": "^1.2.0",
"@types/mock-fs": "^4.13.0",
"@types/morgan": "^1.9.0",
Expand Down
16 changes: 4 additions & 12 deletions packages/backend-common/src/tasks/PluginTaskManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
*/

import { Knex } from 'knex';
import { Duration } from 'luxon';
import { v4 as uuid } from 'uuid';
import { Logger } from 'winston';
import { isDatabaseConflictError } from '../database';
import { DbMutexesRow, DB_MUTEXES_TABLE } from '../database/tables';
import { TaskWorker } from './TaskWorker';
import { PluginTaskManager } from './types';
import { LockOptions, PluginTaskManager, TaskOptions } from './types';
import { nowPlus, validateId } from './util';

/**
Expand All @@ -35,12 +34,9 @@ export class PluginTaskManagerImpl implements PluginTaskManager {

async acquireLock(
id: string,
options: {
timeout: Duration;
},
options: LockOptions,
): Promise<
| { acquired: false }
| { acquired: true; release: () => void | Promise<void> }
{ acquired: false } | { acquired: true; release(): Promise<void> }
> {
validateId(id);

Expand Down Expand Up @@ -90,11 +86,7 @@ export class PluginTaskManagerImpl implements PluginTaskManager {

async scheduleTask(
id: string,
options: {
timeout?: Duration;
frequency?: Duration;
initialDelay?: Duration;
},
options: TaskOptions,
fn: () => void | Promise<void>,
): Promise<{ unschedule: () => Promise<void> }> {
validateId(id);
Expand Down
6 changes: 6 additions & 0 deletions packages/backend-common/src/tasks/TaskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export class TaskManager {
private readonly logger: Logger,
) {}

/**
* Instantiates a task manager instance for the given plugin.
*
* @param pluginId - The unique ID of the plugin, for example "catalog"
* @returns A {@link PluginTaskManager} instance
*/
forPlugin(pluginId: string): PluginTaskManager {
const databaseFactory = memoize(async () => {
const knex = await this.databaseManager.forPlugin(pluginId).getClient();
Expand Down
2 changes: 1 addition & 1 deletion packages/backend-common/src/tasks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
* limitations under the License.
*/

export type { PluginTaskManager } from './types';
export { TaskManager } from './TaskManager';
export type { LockOptions, PluginTaskManager, TaskOptions } from './types';
101 changes: 90 additions & 11 deletions packages/backend-common/src/tasks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,110 @@
import { Duration } from 'luxon';
import { z } from 'zod';

/**
* Options that apply to the acquiral of a given lock.
*
* @public
*/
export interface LockOptions {
/**
* The maximum amount of time that the lock can be held, before it's
* considered timed out and gets auto-released by the framework.
*/
timeout: Duration;
}

/**
* Options that apply to the invocation of a given task.
*
* @public
*/
export interface TaskOptions {
/**
* The maximum amount of time that a single task invocation can take, before
* it's considered timed out and gets "released" such that a new invocation
* is permitted to take place (possibly, then, on a different worker).
*
* If no value is given for this field then there is no timeout. This is
* potentially dangerous.
*/
timeout?: Duration;

/**
* The amount of time that should pass between task invocation starts.
* Essentially, this equals roughly how often you want the task to run.
*
* This is a best effort value; under some circumstances there can be
* deviations. For example, if the task runtime is longer than the frequency
* and the timeout has not been given or not been exceeded yet, the next
* invocation of this task will be delayed until after the previous one
* finishes.
*
* The system does its best to avoid overlapping invocations.
*
* If no value is given for this field then the task will only be invoked
* once (on any worker) and then unscheduled automatically.
*/
frequency?: Duration;

/**
* The amount of time that should pass before the first invocation happens.
*
* This can be useful in cold start scenarios to stagger or delay some heavy
* compute jobs.
*
* If no value is given for this field then the first invocation will happen
* as soon as possible.
*/
initialDelay?: Duration;
}

/**
* Deals with management and locking related to distributed tasks, for a given
* plugin.
*
* @public
*/
export interface PluginTaskManager {
/**
* Attempts to acquire an exclusive lock.
*
* A lock can only be held by one party at a time. Any subsequent attempts to
* acquire the lock will fail, unless the timeout period has been exceeded or
* the lock was released by the previous holder.
*
* @param id - A unique ID (within the scope of the plugin) for a lock
* @param options - Options for the lock
* @returns The result of the lock attempt. If it was successfully acquired,
* you should remember to call its `release` method as soon as you
* are done with the lock.
*/
acquireLock(
id: string,
options: {
timeout: Duration;
},
options: LockOptions,
): Promise<
| { acquired: false }
| { acquired: true; release: () => void | Promise<void> }
{ acquired: false } | { acquired: true; release(): Promise<void> }
>;

/**
* Schedules a task function for coordinated exclusive invocation across
* workers.
*
* If the task was already scheduled since before by us or by another party,
* its options are just overwritten with the given options, and things
* continue from there.
*
* @param id - A unique ID (within the scope of the plugin) for the task
* @param options - Options for the task
* @param fn - The actual task function to be invoked
* @returns An `unschedule` function that can be used to stop the task
* invocations later on. This removes the task entirely from storage
* and stops its invocations across all workers.
*/
scheduleTask(
id: string,
options: {
timeout: Duration;
frequency: Duration;
initialDelay?: Duration;
},
fn: () => Promise<void>,
options: TaskOptions,
fn: () => void | Promise<void>,
): Promise<{ unschedule: () => Promise<void> }>;
}

Expand Down

0 comments on commit d7c1e0e

Please sign in to comment.