Skip to content

Commit

Permalink
More working here
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisronline committed Apr 9, 2021
1 parent 7ee229f commit a365cbd
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 40 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/actions/server/actions_client.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const createActionsClientMock = () => {
getBulk: jest.fn(),
execute: jest.fn(),
enqueueExecution: jest.fn(),
enqueueInMemoryExecution: jest.fn(),
listTypes: jest.fn(),
isActionTypeEnabled: jest.fn(),
};
Expand Down
15 changes: 2 additions & 13 deletions x-pack/plugins/actions/server/create_execute_function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export function createInMemoryExecutionEnqueuerFunction({
}

const task = {
id: '' + +new Date(),
taskType: `actions:${actionTypeId}`,
params: {
spaceId,
Expand All @@ -113,25 +114,13 @@ export function createInMemoryExecutionEnqueuerFunction({
apiKey,
params,
},
// actionTaskParamsId: actionTaskParamsRecord.id,
},
state: {},
scope: ['actions'],
};

// console.log(`Running task`, { task })
await taskManager.runTask(task);

// const actionTaskParamsRecord = await unsecuredSavedObjectsClient.create(
// ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE,
// {
// actionId: id,
// params,
// apiKey,
// },
// executionSourceAsSavedObjectReferences(source)
// );

// await taskManager.schedule();
};
}

Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/actions/server/lib/task_runner_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export class TaskRunnerFactory {
return {
async run() {
const { spaceId, actionTaskParamsId } = taskInstance.params as Record<string, string>;
// console.log('task_runner_factory, run()')
const namespace = spaceIdToNamespace(spaceId as string);
const taskParams = taskInstance.params.taskParams;

Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const createStartMock = () => {
remove: jest.fn(),
schedule: jest.fn(),
runNow: jest.fn(),
runTask: jest.fn(),
ensureScheduled: jest.fn(),
removeIfExists: jest.fn(),
};
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ export class TaskPollingLifecycle {

public async attemptToRunTaskDirectly(taskInstance: ConcreteTaskInstance) {
const runner = this.createTaskRunnerForTask(taskInstance);
await this.pool.run([runner]);
// console.log(`polling_lifecycle.attemptToRunTaskDirectly() ${runner}`)
await this.pool.run([runner], true);
}

private createTaskRunnerForTask = (instance: ConcreteTaskInstance) => {
Expand Down
8 changes: 6 additions & 2 deletions x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ export class TaskPool {
* @param {TaskRunner[]} tasks
* @returns {Promise<boolean>}
*/
public run = async (tasks: TaskRunner[]): Promise<TaskPoolRunResult> => {
public run = async (
tasks: TaskRunner[],
inMemory: boolean = false
): Promise<TaskPoolRunResult> => {
const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, this.availableWorkers);
if (tasksToRun.length) {
performance.mark('attemptToRun_start');
Expand All @@ -123,7 +126,7 @@ export class TaskPool {
.map(async (taskRunner) => {
this.tasksInPool.set(taskRunner.id, taskRunner);
return taskRunner
.markTaskAsRunning()
.markTaskAsRunning(inMemory)
.then((hasTaskBeenMarkAsRunning: boolean) =>
hasTaskBeenMarkAsRunning
? this.handleMarkAsRunning(taskRunner)
Expand Down Expand Up @@ -159,6 +162,7 @@ export class TaskPool {
}

private handleMarkAsRunning(taskRunner: TaskRunner) {
// console.log(`task_pool.handleMarkAsRunning() ${taskRunner}`);
taskRunner
.run()
.catch((err) => {
Expand Down
56 changes: 32 additions & 24 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export interface TaskRunner {
startedAt: Date | null;
definition: TaskDefinition;
cancel: CancelFunction;
markTaskAsRunning: () => Promise<boolean>;
markTaskAsRunning: (inMemory: boolean) => Promise<boolean>;
run: () => Promise<Result<SuccessfulRunResult, FailedRunResult>>;
id: string;
stage: string;
Expand Down Expand Up @@ -275,7 +275,7 @@ export class TaskManagerRunner implements TaskRunner {
*
* @returns {Promise<boolean>}
*/
public async markTaskAsRunning(): Promise<boolean> {
public async markTaskAsRunning(inMemory: boolean = false): Promise<boolean> {
if (!isPending(this.instance)) {
throw new Error(
`Marking task ${this} as running has failed as it ${
Expand Down Expand Up @@ -310,29 +310,37 @@ export class TaskManagerRunner implements TaskRunner {
);
}

this.instance = asReadyToRun(
(await this.bufferedTaskStore.update({
if (!inMemory) {
this.instance = asReadyToRun(
(await this.bufferedTaskStore.update({
...taskInstance,
status: TaskStatus.Running,
startedAt: now,
attempts,
retryAt:
(this.instance.task.schedule
? maxIntervalFromDate(
now,
this.instance.task.schedule.interval,
this.definition.timeout
)
: this.getRetryDelay({
attempts,
// Fake an error. This allows retry logic when tasks keep timing out
// and lets us set a proper "retryAt" value each time.
error: new Error('Task timeout'),
addDuration: this.definition.timeout,
})) ?? null,
// This is a safe convertion as we're setting the startAt above
})) as ConcreteTaskInstanceWithStartedAt
);
} else {
const startedTask: ConcreteTaskInstanceWithStartedAt = {
...taskInstance,
status: TaskStatus.Running,
startedAt: now,
attempts,
retryAt:
(this.instance.task.schedule
? maxIntervalFromDate(
now,
this.instance.task.schedule.interval,
this.definition.timeout
)
: this.getRetryDelay({
attempts,
// Fake an error. This allows retry logic when tasks keep timing out
// and lets us set a proper "retryAt" value each time.
error: new Error('Task timeout'),
addDuration: this.definition.timeout,
})) ?? null,
// This is a safe convertion as we're setting the startAt above
})) as ConcreteTaskInstanceWithStartedAt
);
startedAt: new Date(),
};
this.instance = asReadyToRun(startedTask);
}

const timeUntilClaimExpiresAfterUpdate = howManyMsUntilOwnershipClaimExpires(
ownershipClaimedUntil
Expand Down

0 comments on commit a365cbd

Please sign in to comment.