Skip to content

Commit

Permalink
feat: add health check support to service runnner
Browse files Browse the repository at this point in the history
  • Loading branch information
zcstarr committed Jun 13, 2019
1 parent c39ba15 commit bc577d6
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 66 deletions.
18 changes: 18 additions & 0 deletions fixtures/src/util.ts
@@ -1,5 +1,6 @@
import fs from "fs-extra";
import http from "http";
import dgram from "dgram";
// construct extension for 2 new test services
export const mockServer = (file: string): Promise<http.Server> => {
return new Promise((resolve: (value: http.Server) => void) => {
Expand All @@ -18,6 +19,17 @@ export const mockServer = (file: string): Promise<http.Server> => {
testServer.listen(0, () => { resolve(testServer); });
});
};

export const mockUDPServer = (): Promise<dgram.Socket> => {
const server = dgram.createSocket("udp4");
return new Promise((resolve) => {
server.on("listening", () => {
resolve(server);
});
server.bind(0);
});
};

export const mockConfig: any = {
services: [
{
Expand All @@ -32,6 +44,12 @@ export const mockConfig: any = {
stop: [],
teardown: [],
},
health: {
interval: 500,
retries: 2,
port: "${DYNAMIC_TCP_PORT_1}",
protocol: "tcp",
},
}],
os: {
osx: {
Expand Down
4 changes: 2 additions & 2 deletions src/lib/config.ts
Expand Up @@ -5,7 +5,7 @@ import Ajv from "ajv";
const ajv = new Ajv();
import metaSchema from "./service-runner-schema.json";
import defaultConfig from "../service-runner-config.json";
import { IConfig, IService, IServiceConfig, IServiceOSConfig, IServiceEnv } from "./service";
import { IConfig, IService, IServiceConfig, IServiceOSConfig } from "./service";
import _ from "lodash";
import { makeLogger } from "./logging";
const logger = makeLogger("ServiceRunner", "Config");
Expand All @@ -18,7 +18,7 @@ export class Config {
this.validateConfig(defaultConfig);
this.config = _.cloneDeep(defaultConfig) as IConfig;
} else {
this.config = this.extendConfig(defaultConfig, config) as IConfig;
this.config = this.extendConfig(defaultConfig as IConfig, config) as IConfig;
}
}

Expand Down
26 changes: 21 additions & 5 deletions src/lib/events.ts
Expand Up @@ -4,7 +4,7 @@ import {Logger} from "winston";
/*
TaskEvent describes the union type of all process management related tasks
*/
export type TaskEvent = LaunchTaskEvent | PendingTaskEvent | ConsoleTaskEvent | HealthTaskEvent | ErrorTaskEvent | ExitTaskEvent;
export type TaskEvent = LaunchTaskEvent | PendingTaskEvent | ConsoleTaskEvent | HealthTaskEvent | ErrorTaskEvent | ExitTaskEvent | TerminateTaskEvent | StopTaskEvent;

/*
* PendingTaskEvent describes a task that has been created, and rendered, but has not been launched.
Expand Down Expand Up @@ -36,12 +36,10 @@ export interface ConsoleTaskEvent {
*/
export interface HealthTaskEvent {
name: "health";
retries: number;
logger: Logger;
service: ActiveTaskService;
interval: NodeJS.Timeout;
}
/*
/**
* ErrorTaskEvent describes an event that occurs when a fatal error has occured with a task that is not related to an exit.
*/
export interface ErrorTaskEvent {
Expand All @@ -50,7 +48,25 @@ export interface ErrorTaskEvent {
logger: Logger;
service: ITaskService;
}
/*
/**
* StopTaskEvent describes an event that triggers a task to terminate.
*/

export type StopTaskReason = "health" | "unknown";
export interface StopTaskEvent {
name: "stop";
reason: StopTaskReason;
logger: Logger;
service: ActiveTaskService;
}
/**
* TerminateTaskEvent describes an event that occurs when a tasks exits.
*/
export interface TerminateTaskEvent {
name: "terminate";
service: ActiveTaskService;
}
/**
* ExitTaskEvent describes an event that occurs when a tasks exits.
*/
export interface ExitTaskEvent {
Expand Down
4 changes: 2 additions & 2 deletions src/lib/service-runner-schema.json
Expand Up @@ -27,11 +27,11 @@
"retries": {
"type": "number"
},
"proto": {
"protocol": {
"type": "string",
"oneOf":["udp","tcp"]
},
"required":["interval", "retries", "port", "proto"]
"required":["interval", "retries", "port", "protocol"]
},
"commandArgs": {
"start": {
Expand Down
8 changes: 5 additions & 3 deletions src/lib/service.ts
@@ -1,5 +1,6 @@
import { StrictEventEmitter } from "strict-event-emitter-types";
import { EventEmitter } from "events";
import { ChildProcessWithoutNullStreams } from "child_process";

export interface IService {
name: string;
Expand All @@ -12,7 +13,7 @@ export interface IService {

export interface IHealth {
port: string;
proto: string;
protocol: "udp" | "tcp";
retries: number;
interval: number;
}
Expand Down Expand Up @@ -69,7 +70,8 @@ export interface ISequenceCmd {
*/
export interface TaskNotificationEvents {
launched: (service: ITaskService) => void;
terminated: (service: ITaskService) => void;
stopped: (service: ActiveTaskService) => void;
terminated: (service: ActiveTaskService) => void;
pending: (service: ActiveTaskService) => void;
}
/*
Expand All @@ -91,7 +93,7 @@ export interface ITaskService {
*/
export interface ActiveTaskService extends ITaskService {
state: TaskState;
pid?: number;
process?: ChildProcessWithoutNullStreams;
}

export type TaskState = "running" | "stopped" | "pending";
Expand Down
124 changes: 103 additions & 21 deletions src/lib/task.test.ts
Expand Up @@ -7,14 +7,68 @@ import { promisify } from "util";
import { TaskManager } from "./task";
import { AddressInfo } from "net";
import http from "http";
import { IServiceConfig } from "./service";
import { IServiceConfig, ActiveTaskService, TaskNotificationEvents, ITaskService } from "./service";
import { getOS, OSTypes } from "./util";
import * as util from "./util";
import { Installer } from "./installer";
import _ from "lodash";
import { ChildProcessWithoutNullStreams } from "child_process";
import { kill } from "process";
const rmDir = promisify(rimraf);
describe("TaskManager", () => {
let repoDir: string;
let server: http.Server;
interface TestServiceConfig {
service: ActiveTaskService;
taskManager: TaskManager;
}
type TransitionType = keyof TaskNotificationEvents;
interface LifeCyleTestType {
state: TransitionType;
service: ITaskService | ActiveTaskService;
}

const testLifeCycle = async (transitions: TransitionType[], service: ActiveTaskService) => {
const log: LifeCyleTestType[] = [];
await Promise.all(transitions.map((transition) => {
return new Promise((resolve) => {
service.notifications.once(transition, (svc: ITaskService | ActiveTaskService) => {
log.push({ state: transition, service: _.cloneDeep(svc) });
resolve();
});
});
}));
return log;
};

const createTestService = async (): Promise<TestServiceConfig> => {
// NOTE temporarily disables this test for windows
const config = new Config(mockConfig);
const { port } = server.address() as AddressInfo;
const svc = config.config.services.find((s: IServiceConfig) => s.name === "testService");
if (svc === undefined) { throw new Error("could not find testService"); }
const service = svc.os[getOS()];
if (service === undefined) { throw new Error("could not find service for os"); }
service.assets = [`http://localhost:${port}/download/testService.zip`];
const repo = new Repo(repoDir);
await repo.init();
const installer = new Installer(config, getOS(), repo);
const taskManager = new TaskManager(repo, config);
await installer.install("testService", "1.0.0");
const serviceConfig = await taskManager.startService("testService", "1.0.0", "test");
expect(serviceConfig).toBeDefined();
return { taskManager, service: serviceConfig };
};

const healthSeq = [true, true, false, false, true];
function* healthCheck() {
for (const health of healthSeq) {
yield Promise.resolve(health);
}
while (true) {
yield Promise.resolve(true);
}
}
beforeAll(async () => {
server = await mockServer("fixtures/testService.zip");
});
Expand All @@ -29,36 +83,64 @@ describe("TaskManager", () => {
afterEach(async () => {
await rmDir(repoDir);
});

it("should construct new TaskManager", async () => {
const config: Config = new Config(mockConfig);
const repo = new Repo(repoDir);
await repo.init();
new TaskManager(repo, config);// tslint:disable-line
});
it("should start a service", async () => {
// NOTE temporarily disables this test for windows

it("should start and terminate a service", async () => {
if (getOS() !== OSTypes.WINDOWS) {
const config = new Config(mockConfig);
const { port } = server.address() as AddressInfo;
const svc = config.config.services.find((s: IServiceConfig) => s.name === "testService");
if (svc === undefined) { throw new Error("could not find testService"); }
const service = svc.os[getOS()];
if (service === undefined) { throw new Error("could not find service for os"); }
service.assets = [`http://localhost:${port}/download/testService.zip`];
const repo = new Repo(repoDir);
await repo.init();
const installer = new Installer(config, getOS(), repo);
const taskManager = new TaskManager(repo, config);
await installer.install("testService", "1.0.0");
const serviceConfig = await taskManager.startService("testService", "1.0.0", "test");
expect(serviceConfig).toBeDefined();
const serviceConfig = await createTestService();
await new Promise((resolve) => {
setTimeout(() => {
const pid = serviceConfig.pid;
if (pid) { kill(pid); }
setTimeout(async () => {
await serviceConfig.taskManager.stopService("testService", "1.0.0", "test");
resolve();
}, 3000);
}, 2000);
});
}
});

it("should execute task lifecycle if failure", async () => {
if (getOS() !== OSTypes.WINDOWS) {
const serviceConfig = await createTestService();
const { service } = serviceConfig;
const restartSeq: TransitionType[] = ["stopped", "pending", "launched"];
const prom = testLifeCycle(restartSeq, service);
await new Promise((resolve) => {
setTimeout(async () => {
const proc = service.process as ChildProcessWithoutNullStreams;
kill(proc.pid, "SIGTERM");
const events = await prom;
expect(_.isEqual(events.map((e) => e.state), restartSeq)).toBe(true);
await serviceConfig.taskManager.stopService("testService", "1.0.0", "test");
resolve();
}, 2000);
});
}
}, 10000);

it("should handle health check failure and reboot service", async () => {
// The health check sequence should result in a successful run then failure and reboot
const hc = healthCheck();
// being bad here and shimming the mock in
// @ts-ignore
util.isUp = jest.fn(() => hc.next().value);
if (getOS() !== OSTypes.WINDOWS) {
const serviceConfig = await createTestService();
const { service } = serviceConfig;
const restartSeq: TransitionType[] = ["stopped", "pending", "launched"];
const prom = testLifeCycle(restartSeq, service);
await new Promise((resolve) => {
setTimeout(async () => {
const events = await prom;
expect(_.isEqual(events.map((e) => e.state), restartSeq)).toBe(true);
await serviceConfig.taskManager.stopService("testService", "1.0.0", "test");
resolve();
}, 2000);
});
}
}, 10000);
});
28 changes: 27 additions & 1 deletion src/lib/task.ts
Expand Up @@ -51,10 +51,11 @@ export class TaskManager {
const serviceEntry = await this.repo.getServiceEntry(serviceName, version);
if (serviceEntry === undefined) { throw new Error("Service does not exists in repo"); }
const { rpcPort, commands, environments } = this.config.getService(serviceName, getOS());
const { args } = environments.find((e) => e.name === env) as IServiceEnv;
const { args, health } = environments.find((e) => e.name === env) as IServiceEnv;

const taskService = {
env,
health,
version: serviceEntry.version,
name: serviceName,
args,
Expand All @@ -74,6 +75,31 @@ export class TaskManager {
});

}

/**
* Starts an installed service using the service configuration and manifest entry, and
* returns service configuration information.
*
*
* @param serviceName - Name of the service
* @param version - Version of the service
* @param env - Environment
* @returns void
*/
public async stopService(serviceName: string, version: string, env: string): Promise<ActiveTaskService | undefined> {
logger.debug(`stopping task ${serviceName} - ${version} - ${env}`);
const serviceTask = this.manager.getService(serviceName, version, env);
if (serviceTask === undefined) {
logger.error(`Service tag not found task ${serviceName} - ${version} - ${env}`);
return;
}
if (serviceTask.active === undefined) {
logger.error(`Active service tag not found task ${serviceName} - ${version} - ${env}`);
return;
}
return this.manager.stopTask(serviceTask.active);
}

/**
* Returns a list of currently active services
*
Expand Down

0 comments on commit bc577d6

Please sign in to comment.