Permalink
Browse files

Namespace our in process messages

[changelog:fixed]
  • Loading branch information...
cdupuis committed Nov 21, 2018
1 parent 7d8f0f5 commit d0cf724245cd60dddb15de115a2f809481815297
@@ -85,7 +85,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
this.registration = registration;

broadcast({
type: "registration",
type: "atomist:registration",
registration: this.registration,
context: null,
});
@@ -115,20 +115,25 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
const msg = message as WorkerMessage;

// Wait for online message to come in
if (msg.type === "online") {
if (msg.type === "atomist:online") {
deferred.resolve();
return;
}

const ses = namespace.create();
ses.run(() => {
// Only process our messages
if (msg.type || !msg.type.startsWith("atomist:")) {
return
}

namespace.set(msg.context);

logger.debug("Received '%s' message from worker '%s': %j", msg.type, worker.id, msg.context);

const invocationId = namespace.get().invocationId;
const ctx = hydrateContext(msg);
if (msg.type === "message") {
if (msg.type === "atomist:message") {

let messageClient: MessageClient;
if (commands.has(invocationId)) {
@@ -148,9 +153,9 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
messageClient.respond(msg.data.message, msg.data.options)
.then(clearNamespace, clearNamespace);
}
} else if (msg.type === "status") {
} else if (msg.type === "atomist:status") {
sendMessage(msg.data, ws());
} else if (msg.type === "command_success") {
} else if (msg.type === "atomist:command_success") {
listeners.map(l => () => l.commandSuccessful(msg.event as CommandInvocation,
ctx, msg.data as HandlerResult))
.reduce((p, f) => p.then(f), Promise.resolve())
@@ -162,7 +167,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
clearNamespace();
})
.catch(clearNamespace);
} else if (msg.type === "command_failure") {
} else if (msg.type === "atomist:command_failure") {
listeners.map(l => () => l.commandFailed(msg.event as CommandInvocation,
ctx, msg.data as HandlerResult))
.reduce((p, f) => p.then(f), Promise.resolve())
@@ -174,7 +179,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
clearNamespace();
})
.catch(clearNamespace);
} else if (msg.type === "event_success") {
} else if (msg.type === "atomist:event_success") {
listeners.map(l => () => l.eventSuccessful(msg.event as EventFired<any>,
ctx, msg.data as HandlerResult[]))
.reduce((p, f) => p.then(f), Promise.resolve())
@@ -186,7 +191,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
clearNamespace();
})
.catch(clearNamespace);
} else if (msg.type === "event_failure") {
} else if (msg.type === "atomist:event_failure") {
listeners.map(l => () => l.eventFailed(msg.event as EventFired<any>,
ctx, msg.data as HandlerResult[]))
.reduce((p, f) => p.then(f), Promise.resolve())
@@ -198,7 +203,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
clearNamespace();
})
.catch(clearNamespace);
} else if (msg.type === "shutdown") {
} else if (msg.type === "atomist:shutdown") {
logger.info(`Shutdown requested from worker`);
process.exit(msg.data);
}
@@ -240,7 +245,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
command: CommandIncoming,
callback: (result: Promise<HandlerResult>) => void) {
const message: MasterMessage = {
type: "command",
type: "atomist:command",
registration: this.registration,
context: ctx.context,
data: command,
@@ -259,7 +264,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
event: EventIncoming,
callback: (results: Promise<HandlerResult[]>) => void) {
const message: MasterMessage = {
type: "event",
type: "atomist:event",
registration: this.registration,
context: ctx.context,
data: event,
@@ -57,7 +57,7 @@ export class ClusterWorkerRequestProcessor extends AbstractRequestProcessor {
) {

super(_automations, _configuration, [..._listeners, new ClusterWorkerAutomationEventListener()]);
workerSend({ type: "online", context: null })
workerSend({ type: "atomist:online", context: null })
.then(() => { /** intentionally left empty */});
registerShutdownHook(() => {

@@ -99,13 +99,13 @@ export class ClusterWorkerRequestProcessor extends AbstractRequestProcessor {
}

public sendShutdown(code: number, ctx: HandlerContext & AutomationContextAware) {
workerSend({ type: "shutdown", data: code, context: ctx.context })
workerSend({ type: "atomist:shutdown", data: code, context: ctx.context })
.then(() => { /** intentionally left empty */ });
}

protected sendStatusMessage(payload: any, ctx: HandlerContext & AutomationContextAware): Promise<any> {
return workerSend({
type: "status",
type: "atomist:status",
context: ctx.context,
data: payload,
});
@@ -141,7 +141,7 @@ class ClusterWorkerMessageClient extends MessageClientSupport {
destinations: Destination | Destination[],
options?: MessageOptions): Promise<any> {
return workerSend({
type: "message",
type: "atomist:message",
context: this.ctx.context,
data: {
message: msg,
@@ -156,7 +156,7 @@ class ClusterWorkerAutomationEventListener extends AutomationEventListenerSuppor

public commandSuccessful(payload: CommandInvocation, ctx: HandlerContext, result: HandlerResult): Promise<any> {
return workerSend({
type: "command_success",
type: "atomist:command_success",
event: payload,
context: (ctx as any).context,
data: result,
@@ -165,7 +165,7 @@ class ClusterWorkerAutomationEventListener extends AutomationEventListenerSuppor

public commandFailed(payload: CommandInvocation, ctx: HandlerContext, err: any): Promise<any> {
return workerSend({
type: "command_failure",
type: "atomist:command_failure",
event: payload,
context: (ctx as any).context,
data: err,
@@ -174,7 +174,7 @@ class ClusterWorkerAutomationEventListener extends AutomationEventListenerSuppor

public eventSuccessful(payload: EventFired<any>, ctx: HandlerContext, result: HandlerResult[]): Promise<any> {
return workerSend({
type: "event_success",
type: "atomist:event_success",
event: payload,
context: (ctx as any).context,
data: result,
@@ -183,7 +183,7 @@ class ClusterWorkerAutomationEventListener extends AutomationEventListenerSuppor

public eventFailed(payload: EventFired<any>, ctx: HandlerContext, err: any): Promise<any> {
return workerSend({
type: "event_failure",
type: "atomist:event_failure",
event: payload,
context: (ctx as any).context,
data: err,
@@ -203,17 +203,17 @@ export function startWorker(automations: AutomationServer,
listeners: AutomationEventListener[] = []): ClusterWorkerRequestProcessor {
const worker = new ClusterWorkerRequestProcessor(automations, configuration, listeners);
process.on("message", msg => {
if (msg.type === "registration") {
if (msg.type === "atomist:registration") {
worker.setRegistration(msg.registration as RegistrationConfirmation);
} else if (msg.type === "command") {
} else if (msg.type === "atomist:command") {
worker.setRegistrationIfRequired(msg);
worker.processCommand(decorateContext(msg) as CommandIncoming);
} else if (msg.type === "event") {
} else if (msg.type === "atomist:event") {
worker.setRegistrationIfRequired(msg);
worker.processEvent(decorateContext(msg) as EventIncoming);
} else if (msg.type === "gc") {
} else if (msg.type === "atomist:gc") {
gc();
} else if (msg.type === "heapdump") {
} else if (msg.type === "atomist:heapdump") {
heapDump();
}
});
@@ -5,18 +5,19 @@ import { AutomationContext } from "../../util/cls";
import { RegistrationConfirmation } from "../websocket/WebSocketRequestProcessor";

export interface MasterMessage {
type: "registration" | "event" | "command";
type: "atomist:registration" | "atomist:event" | "atomist:command";
registration: RegistrationConfirmation;
context: AutomationContext;
data?: any;
}

export interface MasterManagementMessage {
type: "gc" | "heapdump";
type: "atomist:gc" | "atomist:heapdump";
}

export interface WorkerMessage {
type: "online" | "status" | "message" | "command_success" | "command_failure" | "event_success" | "event_failure" | "shutdown";
type: "atomist:online" | "atomist:status" | "atomist:message" | "atomist:command_success"
| "atomist:command_failure" | "atomist:event_success" | "atomist:event_failure" | "atomist:shutdown";
event?: EventFired<any> | CommandInvocation;
context: AutomationContext;
data?: any;
@@ -5,7 +5,6 @@ import * as passport from "passport";
import * as http from "passport-http";
import * as bearer from "passport-http-bearer";
import * as tokenHeader from "passport-http-header-token";
import * as portfinder from "portfinder";
import {
Configuration,
ExpressCustomizer,
@@ -38,7 +38,7 @@ export function heapDump(): string {
heapdump.writeSnapshot(`${DataDirectory}/${name}`, (err, filename) => {
logger.warn("Heap dump written to '%s'", filename);
});
broadcast({ type: "heapdump" });
broadcast({ type: "atomist:heapdump" });
return name;
} catch (err) {
logger.error("Failed to initialise memory monitoring. Required 'heapdump' module is missing or can't be" +
@@ -76,7 +76,7 @@ export function gc() {
logger.warn("Triggering GC");
global.gc();
logger.debug("Memory statistics: %j", memoryUsage());
broadcast({ type: "gc" });
broadcast({ type: "atomist:gc" });
}
}

0 comments on commit d0cf724

Please sign in to comment.