Skip to content

Commit

Permalink
Add Queue goal to queue goal sets
Browse files Browse the repository at this point in the history
  • Loading branch information
cdupuis committed Dec 7, 2018
1 parent b56d510 commit 20daa1f
Show file tree
Hide file tree
Showing 15 changed files with 2,647 additions and 3,039 deletions.
2 changes: 0 additions & 2 deletions lib/api-helper/goal/chooseAndSetGoals.ts
Expand Up @@ -20,7 +20,6 @@ import {
logger,
ProjectOperationCredentials,
RemoteRepoRef,
Success,
} from "@atomist/automation-client";
import {
AddressChannels,
Expand All @@ -31,7 +30,6 @@ import {
GoalWithPrecondition,
hasPreconditions,
} from "../../api/goal/Goal";
import { ExecuteGoal } from "../../api/goal/GoalInvocation";
import { Goals } from "../../api/goal/Goals";
import {
SdmGoalFulfillment,
Expand Down
29 changes: 29 additions & 0 deletions lib/api-helper/goal/storeGoals.ts
Expand Up @@ -232,6 +232,7 @@ export async function storeGoalSet(ctx: HandlerContext,
owner: push.repo.owner,
providerId: push.repo.org.provider.providerId,
},
state: goalSetState(sdmGoals),
goals: sdmGoals.map(g => ({
name: g.name,
uniqueName: g.uniqueName,
Expand All @@ -241,6 +242,34 @@ export async function storeGoalSet(ctx: HandlerContext,
return ctx.messageClient.send(sdmGoalSet, addressEvent(GoalSetRootType));
}

export function goalSetState(goals: Array<{ state: SdmGoalState }>): SdmGoalState {
if (goals.some(g => g.state === SdmGoalState.failure)) {
return SdmGoalState.failure;
} else if (goals.some(g => g.state === SdmGoalState.canceled)) {
return SdmGoalState.canceled;
} else if (goals.some(g => g.state === SdmGoalState.stopped)) {
return SdmGoalState.stopped;
} else if (goals.some(g => g.state === SdmGoalState.in_process)) {
return SdmGoalState.in_process;
} else if (goals.some(g => g.state === SdmGoalState.waiting_for_pre_approval)) {
return SdmGoalState.waiting_for_pre_approval;
} else if (goals.some(g => g.state === SdmGoalState.waiting_for_approval)) {
return SdmGoalState.waiting_for_approval;
} else if (goals.some(g => g.state === SdmGoalState.pre_approved)) {
return SdmGoalState.pre_approved;
} else if (goals.some(g => g.state === SdmGoalState.approved)) {
return SdmGoalState.approved;
} else if (goals.some(g => g.state === SdmGoalState.requested)) {
return SdmGoalState.requested;
} else if (goals.some(g => g.state === SdmGoalState.planned)) {
return SdmGoalState.planned;
} else if (goals.some(g => g.state === SdmGoalState.skipped)) {
return SdmGoalState.skipped;
} else {
return SdmGoalState.success;
}
}

function cleanPush(push: PushFields.Fragment): PushFields.Fragment {
const newPush = _.cloneDeep(push);
delete (newPush as any).goals;
Expand Down
54 changes: 54 additions & 0 deletions lib/api-helper/listener/goalSetListener.ts
@@ -0,0 +1,54 @@
/*
* Copyright © 2018 Atomist, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
addressEvent,
logger,
QueryNoCacheOptions,
} from "@atomist/automation-client";
import { GoalSetRootType } from "../../api/goal/SdmGoalSetMessage";
import { GoalCompletionListener } from "../../api/listener/GoalCompletionListener";
import { SdmGoalSetForId } from "../../typings/types";
import { goalSetState } from "../goal/storeGoals";

/**
* Update the state of the SdmGoalSet as the goals progress
* @param gcl
*/
export const GoalSetGoalCompletionListener: GoalCompletionListener = async gcl => {
const state = goalSetState(gcl.allGoals || []);

logger.debug(`GoalSet '${gcl.completedGoal.goalSetId}' now in state '${state}' because goal '${
gcl.completedGoal.uniqueName}' was '${gcl.completedGoal.state}'`);

const result = await gcl.context.graphClient.query<SdmGoalSetForId.Query, SdmGoalSetForId.Variables>({
name: "SdmGoalSetForId",
variables: {
goalSetId: [gcl.completedGoal.goalSetId],
},
options: QueryNoCacheOptions,
});
if (result && result.SdmGoalSet && result.SdmGoalSet.length === 1) {
const goalSet = result.SdmGoalSet[0];
if (goalSet.state !== state) {
const newGoalSet = {
...goalSet,
state,
};
await gcl.context.messageClient.send(newGoalSet, addressEvent(GoalSetRootType));
}
}
};
4 changes: 4 additions & 0 deletions lib/api-helper/machine/AbstractSoftwareDeliveryMachine.ts
Expand Up @@ -63,6 +63,7 @@ import {
import { IngesterRegistration } from "../../api/registration/IngesterRegistration";
import { InterpretLog } from "../../spi/log/InterpretedLog";
import { DefaultGoalImplementationMapper } from "../goal/DefaultGoalImplementationMapper";
import { GoalSetGoalCompletionListener } from "../listener/goalSetListener";
import { lastLinesLogInterpreter } from "../log/logInterpreters";
import { HandlerRegistrationManagerSupport } from "./HandlerRegistrationManagerSupport";
import { ListenerRegistrationManagerSupport } from "./ListenerRegistrationManagerSupport";
Expand Down Expand Up @@ -309,6 +310,9 @@ export abstract class AbstractSoftwareDeliveryMachine<O extends SoftwareDelivery

// Register the triggered listener scheduler on SDM start
this.addStartupListener(() => Promise.resolve(this.scheduleTriggeredListeners()));

// Register the goal completion listenr to update goal set state
this.addGoalCompletionListener(GoalSetGoalCompletionListener);
}

}
5 changes: 4 additions & 1 deletion lib/api/goal/SdmGoalSetMessage.ts
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import { SdmGoalState } from "../../typings/types";
import { SdmProvenance } from "./SdmGoalMessage";

export const GoalSetRootType = "SdmGoalSet";
Expand All @@ -31,11 +32,13 @@ export interface SdmGoalSetMessage {
providerId: string;
};

state: SdmGoalState;

goalSet: string;
goalSetId: string;
ts: number;

goals: Array<{ name: string, uniqueName: string}>;
goals: Array<{ name: string, uniqueName: string }>;

provenance: SdmProvenance;
}
204 changes: 204 additions & 0 deletions lib/api/goal/common/Queue.ts
@@ -0,0 +1,204 @@
/*
* Copyright © 2018 Atomist, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
EventFired,
GraphQL,
HandlerContext,
logger,
OnEvent,
QueryNoCacheOptions,
Success,
} from "@atomist/automation-client";
import * as _ from "lodash";
import { updateGoal } from "../../../api-helper/goal/storeGoals";
import { LogSuppressor } from "../../../api-helper/log/logInterpreters";
import {
InProcessSdmGoalSets,
OnAnySdmGoalSets,
SdmGoalsByGoalSetIdAndUniqueName,
SdmGoalState,
SdmGoalWithPushFields,
} from "../../../typings/types";
import { SoftwareDeliveryMachine } from "../../machine/SoftwareDeliveryMachine";
import { SoftwareDeliveryMachineConfiguration } from "../../machine/SoftwareDeliveryMachineOptions";
import { AnyPush } from "../../mapping/support/commonPushTests";
import {
Goal,
GoalDefinition,
} from "../Goal";
import { DefaultGoalNameGenerator } from "../GoalNameGenerator";
import {
FulfillableGoal,
FulfillableGoalDetails,
getGoalDefinitionFrom,
} from "../GoalWithFulfillment";
import { SdmGoalEvent } from "../SdmGoalEvent";
import { IndependentOfEnvironment } from "../support/environment";
import SdmGoalSet = InProcessSdmGoalSets.SdmGoalSet;

/**
* Options to configure the Queue goal
*/
export interface QueueOptions {
concurrent?: number;
fetch?: number;
}

export const DefaultQueueOptions: QueueOptions = {
concurrent: 2,
fetch: 10,
};

/**
* Goal to queue current goal set until it is the first in the list and can execute
*/
export class Queue extends FulfillableGoal {

constructor(private readonly options: FulfillableGoalDetails & QueueOptions = DefaultQueueOptions,
...dependsOn: Goal[]) {

super({
...getGoalDefinitionFrom(options, DefaultGoalNameGenerator.generateName("queue"), QueueDefinition),
}, ...dependsOn);

this.addFulfillment({
name: `cancel-${this.definition.uniqueName}`,
pushTest: AnyPush,
goalExecutor: async gi => ({ state: SdmGoalState.in_process }),
logInterpreter: LogSuppressor,
});
}

public register(sdm: SoftwareDeliveryMachine): void {
super.register(sdm);

const optsToUse: QueueOptions = {
...DefaultQueueOptions,
...this.options,
};

sdm.addEvent({
name: `OnAnySdmGoalSet`,
description: `Handle queuing for goal ${this.definition.uniqueName}`,
subscription: GraphQL.subscription({
name: "OnAnySdmGoalSet",
variables: {
registration: [sdm.configuration.name] as any,
},
}),
listener: handleSdmGoalSetEvent(optsToUse, this.definition, sdm.configuration),
});
}
}

const QueueDefinition: GoalDefinition = {
uniqueName: "queue",
displayName: "queue goals",
environment: IndependentOfEnvironment,
workingDescription: "Queued",
completedDescription: "Started goals",
failedDescription: "Failed to queue goals",
};

export function handleSdmGoalSetEvent(options: QueueOptions,
definition: GoalDefinition,
configuration: SoftwareDeliveryMachineConfiguration): OnEvent<OnAnySdmGoalSets.Subscription> {
return async (e: EventFired<OnAnySdmGoalSets.Subscription>, ctx: HandlerContext) => {
const optsToUse: QueueOptions = {
...DefaultQueueOptions,
...options,
};

const goalSets = await ctx.graphClient.query<InProcessSdmGoalSets.Query, InProcessSdmGoalSets.Variables>({
name: "InProcessSdmGoalSets",
variables: {
fetch: optsToUse.fetch + optsToUse.concurrent,
registration: [configuration.name],
},
options: QueryNoCacheOptions,
});

if (goalSets && goalSets.SdmGoalSet && goalSets.SdmGoalSet) {
await startGoals(goalSets, optsToUse, definition, ctx);
await updateGoals(goalSets, optsToUse, definition, ctx);
}

return Success;
};
}

async function loadQueueGoals(goalsSets: SdmGoalSet[],
definition: GoalDefinition,
ctx: HandlerContext): Promise<SdmGoalWithPushFields.Fragment[]> {
return (await ctx.graphClient.query<SdmGoalsByGoalSetIdAndUniqueName.Query, SdmGoalsByGoalSetIdAndUniqueName.Variables>({
name: "SdmGoalsByGoalSetIdAndUniqueName",
variables: {
goalSetId: goalsSets.map(gs => gs.goalSetId),
uniqueName: [definition.uniqueName],
},
options: QueryNoCacheOptions,
})).SdmGoal as SdmGoalWithPushFields.Fragment[] || [];
}

async function startGoals(goalSets: InProcessSdmGoalSets.Query,
options: QueueOptions,
definition: GoalDefinition,
ctx: HandlerContext) {
// Update goal sets that are allowed to start
const goalSetsToStart = goalSets.SdmGoalSet.slice(0, options.concurrent)
.filter(gs => gs.goals.some(g => g.uniqueName === definition.uniqueName));
if (goalSetsToStart.length > 0) {

logger.debug(`Following goal sets are ready to start: '${goalSetsToStart.map(gs => gs.goalSetId).join(", ")}'`);
const queueGoals = await loadQueueGoals(goalSetsToStart, definition, ctx);

for (const goalSetToStart of goalSetsToStart) {
const queueGoal = _.maxBy(queueGoals.filter(g => g.goalSetId === goalSetToStart.goalSetId), "ts") as SdmGoalEvent;
logger.debug(`Updating goal '${definition.uniqueName}' of goal set '${queueGoal.goalSetId}' to 'success'`);
if (queueGoal.state === SdmGoalState.in_process) {
await updateGoal(ctx, queueGoal, {
state: SdmGoalState.success,
description: definition.completedDescription,
});
}
}
}
}

async function updateGoals(goalSets: InProcessSdmGoalSets.Query,
options: QueueOptions,
definition: GoalDefinition, ctx: HandlerContext) {
// Update pending goal sets with a counter
const goalSetsToUpdate = goalSets.SdmGoalSet.slice(options.concurrent)
.filter(gs => gs.goals.some(g => g.uniqueName === definition.uniqueName));
if (goalSetsToUpdate.length > 0) {

const updateGoals = await loadQueueGoals(goalSetsToUpdate, definition, ctx);

for (const goalSetToUpdate of goalSetsToUpdate) {
const updGoal = _.maxBy(updateGoals.filter(g => g.goalSetId === goalSetToUpdate.goalSetId), "ts") as SdmGoalEvent;
const phase = `at ${goalSetsToUpdate.findIndex(gs => gs.goalSetId === updGoal.goalSetId) + 1}`;
if (updGoal.state === SdmGoalState.in_process && updGoal.phase !== phase) {
await updateGoal(ctx, updGoal, {
state: SdmGoalState.in_process,
description: definition.workingDescription,
phase,
});
}
}
}
}

0 comments on commit 20daa1f

Please sign in to comment.