Skip to content

Commit

Permalink
Add top level types
Browse files Browse the repository at this point in the history
  • Loading branch information
jasrusable committed Jun 9, 2020
1 parent 899e24e commit 43332f9
Show file tree
Hide file tree
Showing 21 changed files with 104 additions and 44 deletions.
3 changes: 2 additions & 1 deletion src/actions/create-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
createClientAndLoadLuaScripts,
ensureDisconnected,
} from '../utils/redis';
import { Listener } from '../domain/listener/listener';

const debug = debugF('conveyor-mq:listener');

Expand All @@ -45,7 +46,7 @@ export const createListener = ({
queue: string;
redisConfig: RedisConfig;
events?: EventType[];
}) => {
}): Listener => {
debug('Starting');
debug('Creating client');
const client = createClientAndLoadLuaScripts(redisConfig);
Expand Down
11 changes: 4 additions & 7 deletions src/actions/create-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ import { scheduleTasks as scheduleTasksAction } from './schedule-tasks';
import { getWorkers } from './get-workers';
import { pauseQueue } from './pause-queue';
import { resumeQueue } from './resume-queue';
import { Manager } from '../domain/manager/manager';
import { TaskResponse } from '../domain/manager/task-response';

const debug = debugF('conveyor-mq:manager');

export interface TaskResponse {
task: Task;
onTaskComplete: () => Promise<Task>;
}

/**
* Creates a manager which is responsible for enqueuing tasks, as well as querying various
* queue, task and worker properties.
Expand Down Expand Up @@ -58,7 +55,7 @@ export const createManager = ({
}: {
queue: string;
redisConfig: RedisConfig;
}) => {
}): Manager => {
debug('Starting');
debug('Creating client');
const client = createClientAndLoadLuaScripts(redisConfig);
Expand Down Expand Up @@ -229,7 +226,7 @@ export const createManager = ({
quit: async () => {
await readyPromise;
debug(`quit`);
return ensureDisconnected({ client });
await ensureDisconnected({ client });
},
};
};
3 changes: 2 additions & 1 deletion src/actions/create-orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from '../utils/redis';
import { RedisConfig } from '../utils/general';
import { acknowledgeOrphanedProcessingTasks } from './acknowledge-orphaned-processing-tasks';
import { Orchestrator } from '../domain/orchestrator/orchestrator';

const debug = debugF('conveyor-mq:orchestrator');

Expand Down Expand Up @@ -41,7 +42,7 @@ export const createOrchestrator = ({
stalledCheckInterval?: number;
scheduledTasksCheckInterval?: number;
defaultStallTimeout?: number;
}) => {
}): Orchestrator => {
debug('Starting');
debug('Creating client');
const client = createClientAndLoadLuaScripts(redisConfig);
Expand Down
11 changes: 7 additions & 4 deletions src/actions/create-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import {
} from '../utils/keys';
import { serializeEvent } from '../domain/events/serialize-event';
import { EventType } from '../domain/events/event-type';
import { Worker } from '../domain/workers/worker';
import { serializeWorker } from '../domain/workers/serialize-worker';
import { WorkerInstance } from '../domain/worker/worker-instance';
import { serializeWorker } from '../domain/worker/serialize-worker';
import { Task } from '../domain/tasks/task';
import { Worker } from '../domain/worker/worker';

const debug = debugF('conveyor-mq:worker');

Expand Down Expand Up @@ -108,15 +109,15 @@ export const createWorker = ({
autoStart?: boolean;
removeOnSuccess?: boolean;
removeOnFailed?: boolean;
}) => {
}): Worker => {
debug('Starting');
let isPausing = false;
let isPaused = true;
let isShuttingDown = false;
let isShutdown = false;
let upsertInterval: SetIntervalAsyncTimer;

const worker: Worker = {
const worker: WorkerInstance = {
id: createWorkerId(),
createdAt: new Date(),
};
Expand Down Expand Up @@ -351,6 +352,8 @@ export const createWorker = ({
const readyPromise = ready();

return {
id: worker.id,
createdAt: worker.createdAt,
onReady: async () => {
debug('onReady');
await readyPromise;
Expand Down
2 changes: 1 addition & 1 deletion src/actions/get-workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Redis } from 'ioredis';
import { filter, map } from 'lodash';
import { keys, mget } from '../utils/redis';
import { getWorkerKeyPrefix } from '../utils/keys';
import { deSerializeWorker } from '../domain/workers/deserialize-worker';
import { deSerializeWorker } from '../domain/worker/deserialize-worker';

/**
* @ignore
Expand Down
2 changes: 1 addition & 1 deletion src/domain/events/event-from-json.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Event } from './event';
import { taskFromJson } from '../tasks/task-from-json';
import { EventType } from './event-type';
import { workerFromJson } from '../workers/worker-from-json';
import { workerFromJson } from '../worker/worker-from-json';

/**
* @ignore
Expand Down
2 changes: 1 addition & 1 deletion src/domain/events/event-to-json.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Event } from './event';
import { taskToJson } from '../tasks/task-to-json';
import { workerToJson } from '../workers/worker-to-json';
import { workerToJson } from '../worker/worker-to-json';

/**
* @ignore
Expand Down
4 changes: 2 additions & 2 deletions src/domain/events/event.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Task } from '../tasks/task';
import { EventType } from './event-type';
import { Worker } from '../workers/worker';
import { WorkerInstance } from '../worker/worker-instance';

export interface Event {
createdAt: Date;
type: EventType;
task?: Task;
worker?: Worker;
worker?: WorkerInstance;
}
8 changes: 8 additions & 0 deletions src/domain/listener/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { EventType } from '../events/event-type';
import { Event } from '../events/event';

export interface Listener {
onReady: () => Promise<void>;
on: (event: EventType, f: ({ event }: { event: Event }) => any) => void;
quit: () => Promise<void>;
}
27 changes: 27 additions & 0 deletions src/domain/manager/manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Task } from '../tasks/task';
import { TaskResponse } from './task-response';
import { WorkerInstance } from '../worker/worker-instance';

export interface Manager {
enqueueTask: (task: Partial<Task>) => Promise<TaskResponse>;
enqueueTasks: (tasks: Partial<Task>[]) => Promise<TaskResponse[]>;
scheduleTask: (task: Partial<Task>) => Promise<TaskResponse>;
scheduleTasks: (tasks: Partial<Task>[]) => Promise<TaskResponse[]>;
onTaskComplete: (taskId: string) => Promise<Task>;
getTaskById: (taskId: string) => Promise<Task | null>;
getTasksById: (taskIds: string[]) => Promise<Task[]>;
getTaskCounts: () => Promise<{
scheduledCount: number;
queuedCount: number;
processingCount: number;
successCount: number;
failedCount: number;
}>;
getWorkers: () => Promise<WorkerInstance[]>;
removeTaskById: (taskId: string) => Promise<void>;
destroyQueue: () => Promise<void>;
pauseQueue: () => Promise<void>;
resumeQueue: () => Promise<void>;
onReady: () => Promise<void>;
quit: () => Promise<void>;
}
6 changes: 6 additions & 0 deletions src/domain/manager/task-response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Task } from '../tasks/task';

export interface TaskResponse {
task: Task;
onTaskComplete: () => Promise<Task>;
}
4 changes: 4 additions & 0 deletions src/domain/orchestrator/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface Orchestrator {
onReady: () => Promise<void>;
quit: () => Promise<void>;
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { workerToJson } from './worker-to-json';
import { Worker } from './worker';
import { WorkerInstance } from './worker-instance';

/**
* @ignore
*/
export const serializeWorker = (worker: Worker) => {
export const serializeWorker = (worker: WorkerInstance) => {
return JSON.stringify(workerToJson(worker));
};
11 changes: 11 additions & 0 deletions src/domain/worker/worker-from-json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { WorkerInstance } from './worker-instance';

/**
* @ignore
*/
export const workerFromJson = (workerJson: any): WorkerInstance => {
return {
id: workerJson.id,
createdAt: new Date(workerJson.createdAt),
};
};
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export interface Worker {
export interface WorkerInstance {
id: string;
createdAt: Date;
}
11 changes: 11 additions & 0 deletions src/domain/worker/worker-to-json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { WorkerInstance } from './worker-instance';

/**
* @ignore
*/
export const workerToJson = (worker: WorkerInstance) => {
return {
id: worker.id,
createdAt: worker.createdAt.toISOString(),
};
};
9 changes: 9 additions & 0 deletions src/domain/worker/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export interface Worker {
id: string;
createdAt: Date;
onReady: () => Promise<void>;
pause: () => Promise<void>;
start: () => Promise<void>;
shutdown: () => Promise<void>;
onIdle: () => Promise<void>;
}
11 changes: 0 additions & 11 deletions src/domain/workers/worker-from-json.ts

This file was deleted.

11 changes: 0 additions & 11 deletions src/domain/workers/worker-to-json.ts

This file was deleted.

6 changes: 5 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ export { TaskStatus } from './domain/tasks/task-status';
// Interfaces & types
export { Task } from './domain/tasks/task';
export { Event } from './domain/events/event';
export { Worker } from './domain/workers/worker';
export { Manager } from './domain/manager/manager';
export { Listener } from './domain/listener/listener';
export { Worker } from './domain/worker/worker';
export { WorkerInstance } from './domain/worker/worker-instance';
export { Orchestrator } from './domain/orchestrator/orchestrator';

0 comments on commit 43332f9

Please sign in to comment.