Skip to content
Permalink
Browse files

Set WebSocketLifecycle on configuration

[changelog:added]
  • Loading branch information...
cdupuis committed Jan 23, 2019
1 parent d731239 commit 6c009bf30f89b3685c208636337d6810338f917e
@@ -29,6 +29,10 @@ import { automationClientInstance } from "./globals";
import { HandleCommand } from "./HandleCommand";
import { HandleEvent } from "./HandleEvent";
import { ExpressServerOptions } from "./internal/transport/express/ExpressServer";
import {
QueuingWebSocketLifecycle,
WebSocketLifecycle,
} from "./internal/transport/websocket/WebSocketLifecycle";
import { config } from "./internal/util/config";
import {
guid,
@@ -980,7 +984,8 @@ export const LocalDefaultConfiguration: Configuration = {
},
compress: false,
timeout: 30000,
},
lifecycle: new QueuingWebSocketLifecycle(),
} as any,
graphql: {
client: {
factory: DefaultGraphClientFactory,
@@ -53,7 +53,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
implements WebSocketRequestProcessor {

private registration?: RegistrationConfirmation;
private webSocketLifecycle: WebSocketLifecycle = new WebSocketLifecycle();
private webSocketLifecycle: WebSocketLifecycle;
private commands: Map<string, Dispatched<HandlerResult>> = new Map();
private events: Map<string, Dispatched<HandlerResult[]>> = new Map();
private shutdownInitiated: boolean = false;
@@ -63,6 +63,7 @@ export class ClusterMasterRequestProcessor extends AbstractRequestProcessor
protected listeners: AutomationEventListener[] = [],
protected numWorkers: number = require("os").cpus().length) {
super(automations, configuration, listeners);
this.webSocketLifecycle = (configuration.ws as any).lifecycle as WebSocketLifecycle;

registerHealthIndicator(() => {
if (this.webSocketLifecycle.connected() && this.registration) {
@@ -39,12 +39,13 @@ export class DefaultWebSocketRequestProcessor extends AbstractRequestProcessor

private graphClients: GraphClientFactory;
private registration?: RegistrationConfirmation;
private webSocketLifecycle: WebSocketLifecycle = new WebSocketLifecycle();
private webSocketLifecycle: WebSocketLifecycle;

constructor(protected automations: AutomationServer,
protected configuration: Configuration,
protected listeners: AutomationEventListener[] = []) {
super(automations, listeners);
this.webSocketLifecycle = (configuration.ws as any).lifecycle as WebSocketLifecycle;

registerHealthIndicator(() => {
if (this.webSocketLifecycle.connected() && this.registration) {
@@ -2,10 +2,41 @@ import * as TinyQueue from "tinyqueue";
import * as WebSocket from "ws";
import { sendMessage } from "./WebSocketMessageClient";

export interface WebSocketLifecycle {
/**
* Set the WebSocket to manage
* @param ws
*/
set(ws: WebSocket): void;

/**
* Is the WebSocket is connected and healthy
*/
connected(): boolean;

/**
* Get the raw WebSocket that is managed here
*/
get(): WebSocket;

/**
* Reset the WebSocket
*/
reset(): void;

/**
* Send a message over the managed WebSocket
* If the WebSocket isn't connected, messages are queued for later
* when a WebSocket is connected again.
* @param msg
*/
send(msg: any): void;
}

/**
* Lifecycle owning a WebSocket connection wrt message sending
*/
export class WebSocketLifecycle {
export class QueuingWebSocketLifecycle implements WebSocketLifecycle {

private messages: TinyQueue;
private ws: WebSocket;
@@ -19,28 +50,28 @@ export class WebSocketLifecycle {
* Set the WebSocket to manage
* @param ws
*/
public set(ws: WebSocket): void {
set(ws: WebSocket): void {
this.ws = ws;
}

/**
* Is the WebSocket is connected and healthy
*/
public connected(): boolean {
connected(): boolean {
return !!this.ws && this.ws.readyState === WebSocket.OPEN;
}

/**
* Get the raw WebSocket that is managed here
*/
public get(): WebSocket {
get(): WebSocket {
return this.ws;
}

/**
* Reset the WebSocket
*/
public reset(): void {
reset(): void {
this.ws = null;
}

@@ -50,7 +81,7 @@ export class WebSocketLifecycle {
* when a WebSocket is connected again.
* @param msg
*/
public send(msg: any): void {
send(msg: any): void {
if (this.connected()) {
sendMessage(msg, this.ws, true);
} else {
@@ -60,7 +60,7 @@ export class DefaultSlackMessageClient implements MessageClient, SlackMessageCli
public addressUsers(msg: string | SlackMessage,
users: string | string[],
options?: MessageOptions): Promise<any> {
if (!users || Array.isArray(users) && users.length === 0) {
if (!users || (Array.isArray(users) && users.length === 0)) {
throw new Error("Please pass at least one user");
}
return lookupChatTeam(this.graphClient)
@@ -71,7 +71,7 @@ export class DefaultSlackMessageClient implements MessageClient, SlackMessageCli
public addressChannels(msg: string | SlackMessage,
channels: string | string[],
options?: MessageOptions): Promise<any> {
if (!channels || Array.isArray(channels) && channels.length === 0) {
if (!channels || (Array.isArray(channels) && channels.length === 0)) {
throw new Error("Please pass at least one channel");
}
return lookupChatTeam(this.graphClient)
@@ -29,6 +29,7 @@ import {
TestingDefaultConfiguration,
UserConfig,
} from "../lib/configuration";
import { QueuingWebSocketLifecycle } from "../lib/internal/transport/websocket/WebSocketLifecycle";
import { DefaultGraphClientFactory } from "../lib/spi/graph/GraphClientFactory";
import { DefaultHttpClientFactory } from "../lib/spi/http/httpClient";
import { DefaultWebSocketFactory } from "../lib/spi/http/wsClient";
@@ -88,7 +89,8 @@ describe("configuration", () => {
client: {
factory: DefaultWebSocketFactory,
},
},
lifecycle: new QueuingWebSocketLifecycle(),
} as any,
graphql: {
client: {
factory: DefaultGraphClientFactory,
@@ -7,6 +7,7 @@ import { HandlerResult } from "../../../../lib/HandlerResult";
import { CommandInvocation } from "../../../../lib/internal/invoker/Payload";
import { Automations } from "../../../../lib/internal/metadata/metadata";
import { DefaultWebSocketRequestProcessor } from "../../../../lib/internal/transport/websocket/DefaultWebSocketRequestProcessor";
import { QueuingWebSocketLifecycle } from "../../../../lib/internal/transport/websocket/WebSocketLifecycle";
import { CommandHandlerMetadata } from "../../../../lib/metadata/automationMetadata";
import { AutomationServer } from "../../../../lib/server/AutomationServer";
import { DefaultGraphClientFactory } from "../../../../lib/spi/graph/GraphClientFactory";
@@ -60,7 +61,9 @@ describe("DefaultWebSocketRequestProcessor", () => {
{
token: "xxx",
endpoints: { api: "http://foo.com", graphql: "http://bar.com" },
ws: {},
ws: {
lifecycle: new QueuingWebSocketLifecycle(),
} as any,
graphql: { client: { factory: DefaultGraphClientFactory } },
});
listener.onRegistration({ url: "http://bla.com", jwt: "123456789", name: "goo", version: "1.0.0" });
@@ -138,7 +141,9 @@ function verifyCommandHandler(code: number, callback: (result) => void) {
{
token: "xxx",
endpoints: { api: "http://foo.com", graphql: "http://bar.com" },
ws: {},
ws: {
lifecycle: new QueuingWebSocketLifecycle(),
} as any,
graphql: { client: { factory: DefaultGraphClientFactory } },
});
listener.onRegistration({ url: "http://bla.com", jwt: "123456789", name: "goo", version: "1.0.0" });
@@ -1,7 +1,7 @@
import { SlackMessage } from "@atomist/slack-messages";
import "mocha";
import * as assert from "power-assert";
import { WebSocketLifecycle } from "../../../../lib/internal/transport/websocket/WebSocketLifecycle";
import { QueuingWebSocketLifecycle } from "../../../../lib/internal/transport/websocket/WebSocketLifecycle";
import {
clean,
WebSocketCommandMessageClient,
@@ -44,7 +44,7 @@ describe("WebSocketMessageClient", () => {
data: {},
extensions: { team_id: "Txxxxxxx", correlation_id: corrId, operationName: "Foor" },
secrets: [],
}, new WebSocketLifecycle() );
}, new QueuingWebSocketLifecycle() );

const msg: SlackMessage = {
attachments: [{
@@ -89,7 +89,7 @@ describe("WebSocketMessageClient", () => {
data: {},
extensions: { team_id: "Txxxxxxx", correlation_id: corrId, operationName: "Foor" },
secrets: [],
}, new WebSocketLifecycle() );
}, new QueuingWebSocketLifecycle() );

const msg: SlackMessage = {
attachments: [{
@@ -152,7 +152,7 @@ describe("WebSocketMessageClient", () => {
parameters: [],
mapped_parameters: [],
secrets: [],
}, new WebSocketLifecycle() );
}, new QueuingWebSocketLifecycle() );

const msg: SlackMessage = {
attachments: [{
@@ -213,7 +213,7 @@ describe("WebSocketMessageClient", () => {
parameters: [],
mapped_parameters: [],
secrets: [],
}, new WebSocketLifecycle() );
}, new QueuingWebSocketLifecycle() );

const msg: SlackFileMessage = {
content: "some basic text",

0 comments on commit 6c009bf

Please sign in to comment.
You can’t perform that action at this time.