diff --git a/fixtures/src/util.ts b/fixtures/src/util.ts index c04e72d1..e1cd502f 100644 --- a/fixtures/src/util.ts +++ b/fixtures/src/util.ts @@ -1,5 +1,6 @@ import fs from "fs-extra"; import http from "http"; +import dgram from "dgram"; // construct extension for 2 new test services export const mockServer = (file: string): Promise => { return new Promise((resolve: (value: http.Server) => void) => { @@ -18,6 +19,17 @@ export const mockServer = (file: string): Promise => { testServer.listen(0, () => { resolve(testServer); }); }); }; + +export const mockUDPServer = (): Promise => { + const server = dgram.createSocket("udp4"); + return new Promise((resolve) => { + server.on("listening", () => { + resolve(server); + }); + server.bind(0); + }); +}; + export const mockConfig: any = { services: [ { @@ -32,6 +44,12 @@ export const mockConfig: any = { stop: [], teardown: [], }, + health: { + interval: 500, + retries: 2, + port: "${DYNAMIC_TCP_PORT_1}", + protocol: "tcp", + }, }], os: { osx: { diff --git a/src/lib/config.ts b/src/lib/config.ts index 0164a2e8..9d5d5983 100644 --- a/src/lib/config.ts +++ b/src/lib/config.ts @@ -5,7 +5,7 @@ import Ajv from "ajv"; const ajv = new Ajv(); import metaSchema from "./service-runner-schema.json"; import defaultConfig from "../service-runner-config.json"; -import { IConfig, IService, IServiceConfig, IServiceOSConfig, IServiceEnv } from "./service"; +import { IConfig, IService, IServiceConfig, IServiceOSConfig } from "./service"; import _ from "lodash"; import { makeLogger } from "./logging"; const logger = makeLogger("ServiceRunner", "Config"); @@ -18,7 +18,7 @@ export class Config { this.validateConfig(defaultConfig); this.config = _.cloneDeep(defaultConfig) as IConfig; } else { - this.config = this.extendConfig(defaultConfig, config) as IConfig; + this.config = this.extendConfig(defaultConfig as IConfig, config) as IConfig; } } diff --git a/src/lib/events.ts b/src/lib/events.ts index 7f91f2d7..4107cdf6 100644 --- a/src/lib/events.ts +++ b/src/lib/events.ts @@ -4,7 +4,7 @@ import {Logger} from "winston"; /* TaskEvent describes the union type of all process management related tasks */ -export type TaskEvent = LaunchTaskEvent | PendingTaskEvent | ConsoleTaskEvent | HealthTaskEvent | ErrorTaskEvent | ExitTaskEvent; +export type TaskEvent = LaunchTaskEvent | PendingTaskEvent | ConsoleTaskEvent | HealthTaskEvent | ErrorTaskEvent | ExitTaskEvent | TerminateTaskEvent | StopTaskEvent; /* * PendingTaskEvent describes a task that has been created, and rendered, but has not been launched. @@ -36,12 +36,10 @@ export interface ConsoleTaskEvent { */ export interface HealthTaskEvent { name: "health"; - retries: number; logger: Logger; service: ActiveTaskService; - interval: NodeJS.Timeout; } -/* +/** * ErrorTaskEvent describes an event that occurs when a fatal error has occured with a task that is not related to an exit. */ export interface ErrorTaskEvent { @@ -50,7 +48,25 @@ export interface ErrorTaskEvent { logger: Logger; service: ITaskService; } -/* +/** + * StopTaskEvent describes an event that triggers a task to terminate. + */ + +export type StopTaskReason = "health" | "unknown"; +export interface StopTaskEvent { + name: "stop"; + reason: StopTaskReason; + logger: Logger; + service: ActiveTaskService; +} +/** + * TerminateTaskEvent describes an event that occurs when a tasks exits. + */ +export interface TerminateTaskEvent { + name: "terminate"; + service: ActiveTaskService; +} +/** * ExitTaskEvent describes an event that occurs when a tasks exits. */ export interface ExitTaskEvent { diff --git a/src/lib/service-runner-schema.json b/src/lib/service-runner-schema.json index 97a7358b..9248447c 100644 --- a/src/lib/service-runner-schema.json +++ b/src/lib/service-runner-schema.json @@ -27,11 +27,11 @@ "retries": { "type": "number" }, - "proto": { + "protocol": { "type": "string", "oneOf":["udp","tcp"] }, - "required":["interval", "retries", "port", "proto"] + "required":["interval", "retries", "port", "protocol"] }, "commandArgs": { "start": { diff --git a/src/lib/service.ts b/src/lib/service.ts index f7bcb495..f396c653 100644 --- a/src/lib/service.ts +++ b/src/lib/service.ts @@ -1,5 +1,6 @@ import { StrictEventEmitter } from "strict-event-emitter-types"; import { EventEmitter } from "events"; +import { ChildProcessWithoutNullStreams } from "child_process"; export interface IService { name: string; @@ -12,7 +13,7 @@ export interface IService { export interface IHealth { port: string; - proto: string; + protocol: "udp" | "tcp"; retries: number; interval: number; } @@ -69,7 +70,8 @@ export interface ISequenceCmd { */ export interface TaskNotificationEvents { launched: (service: ITaskService) => void; - terminated: (service: ITaskService) => void; + stopped: (service: ActiveTaskService) => void; + terminated: (service: ActiveTaskService) => void; pending: (service: ActiveTaskService) => void; } /* @@ -91,7 +93,7 @@ export interface ITaskService { */ export interface ActiveTaskService extends ITaskService { state: TaskState; - pid?: number; + process?: ChildProcessWithoutNullStreams; } export type TaskState = "running" | "stopped" | "pending"; diff --git a/src/lib/task.test.ts b/src/lib/task.test.ts index 3937092d..b51bef21 100644 --- a/src/lib/task.test.ts +++ b/src/lib/task.test.ts @@ -7,14 +7,68 @@ import { promisify } from "util"; import { TaskManager } from "./task"; import { AddressInfo } from "net"; import http from "http"; -import { IServiceConfig } from "./service"; +import { IServiceConfig, ActiveTaskService, TaskNotificationEvents, ITaskService } from "./service"; import { getOS, OSTypes } from "./util"; +import * as util from "./util"; import { Installer } from "./installer"; +import _ from "lodash"; +import { ChildProcessWithoutNullStreams } from "child_process"; import { kill } from "process"; const rmDir = promisify(rimraf); describe("TaskManager", () => { let repoDir: string; let server: http.Server; + interface TestServiceConfig { + service: ActiveTaskService; + taskManager: TaskManager; + } + type TransitionType = keyof TaskNotificationEvents; + interface LifeCyleTestType { + state: TransitionType; + service: ITaskService | ActiveTaskService; + } + + const testLifeCycle = async (transitions: TransitionType[], service: ActiveTaskService) => { + const log: LifeCyleTestType[] = []; + await Promise.all(transitions.map((transition) => { + return new Promise((resolve) => { + service.notifications.once(transition, (svc: ITaskService | ActiveTaskService) => { + log.push({ state: transition, service: _.cloneDeep(svc) }); + resolve(); + }); + }); + })); + return log; + }; + + const createTestService = async (): Promise => { + // NOTE temporarily disables this test for windows + const config = new Config(mockConfig); + const { port } = server.address() as AddressInfo; + const svc = config.config.services.find((s: IServiceConfig) => s.name === "testService"); + if (svc === undefined) { throw new Error("could not find testService"); } + const service = svc.os[getOS()]; + if (service === undefined) { throw new Error("could not find service for os"); } + service.assets = [`http://localhost:${port}/download/testService.zip`]; + const repo = new Repo(repoDir); + await repo.init(); + const installer = new Installer(config, getOS(), repo); + const taskManager = new TaskManager(repo, config); + await installer.install("testService", "1.0.0"); + const serviceConfig = await taskManager.startService("testService", "1.0.0", "test"); + expect(serviceConfig).toBeDefined(); + return { taskManager, service: serviceConfig }; + }; + + const healthSeq = [true, true, false, false, true]; + function* healthCheck() { + for (const health of healthSeq) { + yield Promise.resolve(health); + } + while (true) { + yield Promise.resolve(true); + } + } beforeAll(async () => { server = await mockServer("fixtures/testService.zip"); }); @@ -29,36 +83,64 @@ describe("TaskManager", () => { afterEach(async () => { await rmDir(repoDir); }); + it("should construct new TaskManager", async () => { const config: Config = new Config(mockConfig); const repo = new Repo(repoDir); await repo.init(); new TaskManager(repo, config);// tslint:disable-line }); - it("should start a service", async () => { - // NOTE temporarily disables this test for windows + + it("should start and terminate a service", async () => { if (getOS() !== OSTypes.WINDOWS) { - const config = new Config(mockConfig); - const { port } = server.address() as AddressInfo; - const svc = config.config.services.find((s: IServiceConfig) => s.name === "testService"); - if (svc === undefined) { throw new Error("could not find testService"); } - const service = svc.os[getOS()]; - if (service === undefined) { throw new Error("could not find service for os"); } - service.assets = [`http://localhost:${port}/download/testService.zip`]; - const repo = new Repo(repoDir); - await repo.init(); - const installer = new Installer(config, getOS(), repo); - const taskManager = new TaskManager(repo, config); - await installer.install("testService", "1.0.0"); - const serviceConfig = await taskManager.startService("testService", "1.0.0", "test"); - expect(serviceConfig).toBeDefined(); + const serviceConfig = await createTestService(); await new Promise((resolve) => { - setTimeout(() => { - const pid = serviceConfig.pid; - if (pid) { kill(pid); } + setTimeout(async () => { + await serviceConfig.taskManager.stopService("testService", "1.0.0", "test"); resolve(); - }, 3000); + }, 2000); }); } }); + + it("should execute task lifecycle if failure", async () => { + if (getOS() !== OSTypes.WINDOWS) { + const serviceConfig = await createTestService(); + const { service } = serviceConfig; + const restartSeq: TransitionType[] = ["stopped", "pending", "launched"]; + const prom = testLifeCycle(restartSeq, service); + await new Promise((resolve) => { + setTimeout(async () => { + const proc = service.process as ChildProcessWithoutNullStreams; + kill(proc.pid, "SIGTERM"); + const events = await prom; + expect(_.isEqual(events.map((e) => e.state), restartSeq)).toBe(true); + await serviceConfig.taskManager.stopService("testService", "1.0.0", "test"); + resolve(); + }, 2000); + }); + } + }, 10000); + + it("should handle health check failure and reboot service", async () => { + // The health check sequence should result in a successful run then failure and reboot + const hc = healthCheck(); + // being bad here and shimming the mock in + // @ts-ignore + util.isUp = jest.fn(() => hc.next().value); + if (getOS() !== OSTypes.WINDOWS) { + const serviceConfig = await createTestService(); + const { service } = serviceConfig; + const restartSeq: TransitionType[] = ["stopped", "pending", "launched"]; + const prom = testLifeCycle(restartSeq, service); + await new Promise((resolve) => { + setTimeout(async () => { + const events = await prom; + expect(_.isEqual(events.map((e) => e.state), restartSeq)).toBe(true); + await serviceConfig.taskManager.stopService("testService", "1.0.0", "test"); + resolve(); + }, 2000); + }); + } + }, 10000); }); diff --git a/src/lib/task.ts b/src/lib/task.ts index 0ad72d2b..c2668593 100644 --- a/src/lib/task.ts +++ b/src/lib/task.ts @@ -51,10 +51,11 @@ export class TaskManager { const serviceEntry = await this.repo.getServiceEntry(serviceName, version); if (serviceEntry === undefined) { throw new Error("Service does not exists in repo"); } const { rpcPort, commands, environments } = this.config.getService(serviceName, getOS()); - const { args } = environments.find((e) => e.name === env) as IServiceEnv; + const { args, health } = environments.find((e) => e.name === env) as IServiceEnv; const taskService = { env, + health, version: serviceEntry.version, name: serviceName, args, @@ -74,6 +75,31 @@ export class TaskManager { }); } + + /** + * Starts an installed service using the service configuration and manifest entry, and + * returns service configuration information. + * + * + * @param serviceName - Name of the service + * @param version - Version of the service + * @param env - Environment + * @returns void + */ + public async stopService(serviceName: string, version: string, env: string): Promise { + logger.debug(`stopping task ${serviceName} - ${version} - ${env}`); + const serviceTask = this.manager.getService(serviceName, version, env); + if (serviceTask === undefined) { + logger.error(`Service tag not found task ${serviceName} - ${version} - ${env}`); + return; + } + if (serviceTask.active === undefined) { + logger.error(`Active service tag not found task ${serviceName} - ${version} - ${env}`); + return; + } + return this.manager.stopTask(serviceTask.active); + } + /** * Returns a list of currently active services * diff --git a/src/lib/taskProcessManager.ts b/src/lib/taskProcessManager.ts index 5612b9fe..97021591 100644 --- a/src/lib/taskProcessManager.ts +++ b/src/lib/taskProcessManager.ts @@ -7,9 +7,11 @@ */ import { ITaskService, ActiveTaskService, IHealth, ISequenceCmd, TaskStatus } from "./service"; import * as events from "./events"; -import { spawn } from "child_process"; +import { spawn, ChildProcessWithoutNullStreams } from "child_process"; +import { kill } from "process"; import { renderService } from "./serviceTemplate"; import { makeLogger } from "./logging"; +import * as util from "./util"; const logger = makeLogger("ServiceRunner", "TaskProcessManager"); /** @@ -21,17 +23,20 @@ const logger = makeLogger("ServiceRunner", "TaskProcessManager"); * ### Simple Process state machine: * **PendingTaskEvent** => **LaunchTaskEvent** => **ExitTaskEvent**; * + * + * ### Health Check Failure state machine: + * **PendingTaskEvent** => **LaunchTaskEvent** => **HealthTaskEvent** => **StopTaskEvent** => **ExitTaskEvent**; * The state transitions are handled by a sink called processManagerTask(), which consumes the process events on the EventBus * to handle state transitions. * * ### The Events: * - * **PendingTaskEvent** : signals an unresolved templated service that is up for launching. This is pushed onto + * **PendingTaskEvent** : signals an unresolved templated service that is up for launching. This is pushed onto * the event bus, and then written the service config cache and the active service cache. It is written in the * service running cache with the state pending. A corresponding event is pushed onto the services individual * notification queue. So downstream components can recieve this signal. * - * **LaunchTaskEvent**: signals a resolved template service that is ready to launch. This is pushed on to the event bus + * **LaunchTaskEvent**: signals a resolved template service that is ready to launch. This is pushed on to the event bus * and handled by the sink called processManagerTask(), which consumes the launch events. It then spawns a new process * with any additional features, and updates the service state in the active service cache. A corresponding event is pushed onto the services individual * notification queue. So downstream components can recieve this signal. @@ -40,26 +45,60 @@ const logger = makeLogger("ServiceRunner", "TaskProcessManager"); * and then handled by the sink called processManagerTask(), which consumes the exit events.It then updates the service state in the active service cache. A corresponding event is pushed onto the services individual * notification queue. So downstream components can recieve this signal. TBD in future this might emit a launch event. * + * **StopTaskEvent**: signals a resolved template service to stop executing. This event is pushed on the eventbus + * and then handled by the sink called processManagerTask(), which consumes the stop events.It then kills tthe service. Killing the service triggers A corresponding exit event and + * a stopped message is pushed onto the services individual + * notification queue. So downstream components can recieve this signal. + * + * **TerminateTaskEvent**: signals a resolved template service to stop excuting and not restart. This event is pushed on the eventbus + * and then handled by the sink called processManagerTask(), which consumes the terminate event.It then kills the service and prevents the service from being rescheduled. + * + * **HealthTaskEvent**: signals a need for a template service to be healtht checked. This event is pushed on the eventbus + * and then handled by the sink called processManagerTask(), which consumes the healthcheck events. It then performs a health check based on port accessibility. If the health check fails + * the service health cache is updated and if it violates the number of retries constraint, a StopTaskEvent is emitted. If the health check is successful the health check cache for that service + * is reset and a new health check task is emitted corresponding to the health check interval specified in config. + * * **ConsoleTaskEvent**: signals an event that occurs do to some stdout/stderr activity coming from the service, this is then forward to any * downstream components taht wich to receive this signal, via the service notifications queue. * */ +interface Health { + retries: number; + timestamp: number; +} + +interface TaskDesc { + active?: ActiveTaskService; + spec: ITaskService; +} + +const printActiveMap = (activeTaskMap: Map) => { + activeTaskMap.forEach((s, hash) => { + let pid: number = -1; + if (s.process) { + pid = s.process.pid; + } + logger.debug(`${s.state.toUpperCase()} pid: ${pid} status: ${s.state} hash:${hash}`); + }); +}; + +const DEFAULT_SERVICE_RESTART_DELAY = 5000; export class TaskProcessManager { public taskMap: Map; public activeTaskMap: Map; private notifications: events.EventBus; + private healthMap: Map; constructor() { this.taskMap = new Map(); this.activeTaskMap = new Map(); + this.healthMap = new Map(); this.notifications = new events.EventBus(this.processTask.bind(this)); } /** - * Launches a service, writing the service to an in memory map of active and templated processes. - * It spawns new tasks, and catches errors and SIGTERM signals to then re spawn itself. It - * returns a fully rendered config - * + * Launches a service, generating a pending task event that ultimately triggers writing a resolved service and unresolved template of that service + * to an internal cache. It is used to spawns new task, it then returns a fully resolved service that includes a pid. * * @param service - Configuration for a templated service * @returns The rendered configuration for a service @@ -75,6 +114,44 @@ export class TaskProcessManager { }); } + /** + * Returns a TaskDesc for a service including both the active running service and original spec + * @param name - Service name that corresponds to spec + * @param version - Service version that corresponds to spec + * @param env - Service environment name that corresponds to spec + * @returns The rendered configuration for a service + */ + public getService(name: string, version: string, env: string): TaskDesc | undefined { + const taskHash = this.taskHash({ name, version, env }); + const spec = this.taskMap.get(taskHash); + const active = this.activeTaskMap.get(taskHash); + if (active !== undefined && active.process) { + logger.debug("getting the service active pid: " + active.process.pid); + } + if (spec === undefined) { + return; + } + return {active, spec}; + } + + /** + * Stops a service, generating a terminate task event that ultimately triggers writing a resolved service and unresolved template of that service + * to an internal cache. It is used to spawns new task, it then returns a fully resolved service that includes a pid. + * + * @param service - Configuration for a templated service + * @returns The rendered configuration for a service + */ + public async stopTask(service: ActiveTaskService): Promise { + + return new Promise((resolve) => { + const terminateTask: events.TerminateTaskEvent = { name: "terminate", service }; + service.notifications.once("terminated", (svc) => { + resolve(svc); + }); + this.notifications.emit(terminateTask); + }); + } + private handleConsoleEvent(event: events.ConsoleTaskEvent) { if (event.stderr) { logger.error(`stderr: ${event.stderr}`); @@ -84,14 +161,48 @@ export class TaskProcessManager { } } + private handleStopEvent(event: events.StopTaskEvent) { + const { service, reason} = event; + const { process } = service; + const stopMsg = `${service.name}: child process triggered to stop for: ${reason}`; + logger.debug(`${stopMsg} ${reason}`); + if (process === undefined) { + return; + } + kill(process.pid, "SIGTERM"); + } + + private cleanupTask(service: ActiveTaskService) { + const serviceID = this.taskHash(service); + this.setTask(service, "stopped"); + this.activeTaskMap.delete(serviceID); + } + private handleExitEvent(event: events.ExitTaskEvent) { const {service, error} = event; const exitMsg = `${service.name}: child process exited`; - this.setTask(service, "stopped"); + const serviceID = this.taskHash(service); + // service has already been removed already + if (this.activeTaskMap.has(serviceID) === false) { + logger.info(JSON.stringify(this.activeTaskMap)); + return; + } + const serviceEntry = this.activeTaskMap.get(serviceID) as ActiveTaskService; + const process = serviceEntry.process; + // the service update is stale, and does not correspond to current task map state + if (process && service.process && process.pid !== service.process.pid) { + return; + } if (error) { event.logger.error(`${exitMsg} with err ${error}`); - return; } + this.cleanupTask(service); + const relaunchDelay = service.health ? service.health.interval : DEFAULT_SERVICE_RESTART_DELAY; + service.notifications.emit("stopped", service); + setTimeout(() => { + const serviceSpec = this.taskMap.get(serviceID) as ITaskService; + this.notifications.emit({ name: "pending", service: serviceSpec }); + }, relaunchDelay); event.logger.error(`${exitMsg}`); } @@ -114,24 +225,91 @@ export class TaskProcessManager { child.on("error", (err) => { this.notifications.emit({ name: "exit", error: err, code: -1, logger: childLogger, service }); }); - logger.info(`Launched service with ${JSON.stringify(service)}`); + logger.info(`launched service ${service.name} version: ${service.version} environment: ${service.env}`); - service.pid = child.pid; + service.process = child; this.setTask(service, "running"); service.notifications.emit("launched", service); + this.sendHealthEvent(service); } - private handleHealthEvent(event: events.HealthTaskEvent) { - // TODO + private sendHealthEvent(service: ActiveTaskService, healthStatus?: Health) { + const defaultHealth = { retries: 0, timestamp: Date.now()}; + const health = healthStatus ? healthStatus : defaultHealth; + if (service.health !== undefined && service.state === "running") { + const process = service.process as ChildProcessWithoutNullStreams; + // Set the initial state for the service in the service health cache + this.healthMap.set(process.pid, health); + setTimeout(() => this.notifications.emit({ name: "health", service, logger }), service.health.interval); + } + } + + private async checkServiceHealth(service: ActiveTaskService): Promise { + if (service.health === undefined) { + return true; + } + const port = parseInt(service.health.port, 10); + return util.isUp(port, service.health.protocol); + } + + private async handleTerminateEvent(event: events.TerminateTaskEvent) { + const {service} = event; + const process = service.process as ChildProcessWithoutNullStreams; + // disable any handlers attached to the process; + process.removeAllListeners(); + this.cleanupTask(service); + process.kill("SIGTERM"); + service.notifications.emit("terminated", service); + } + + private async handleHealthEvent(event: events.HealthTaskEvent) { + const { process } = event.service; + const health = event.service.health as IHealth; + let success = false; + const timestamp = Date.now(); + if (process) { + const { pid } = process; + try { + success = await this.checkServiceHealth(event.service); + } catch (e) { + logger.error(e.message); + logger.debug(e.stack); + return; + } + const currentHealth = this.healthMap.get(pid); + if (currentHealth === undefined) { + return; + } + const newHealth = success ? { timestamp, retries: 0 } : { timestamp, retries: currentHealth.retries + 1 }; + + this.healthMap.set(pid, newHealth); + if (newHealth.retries >= health.retries) { + // restart failing process + this.notifications.emit({ name: "stop", service: event.service, logger: event.logger, reason: "health" }); + return; + } + // schedule next health request + this.sendHealthEvent(event.service, newHealth); + + } } private async processTask(event: events.TaskEvent) { + const { service } = event; + logger.debug(`${event.name.toUpperCase()} - processing event:${event.name} service: ${service.name} env: ${service.env}`); + printActiveMap(this.activeTaskMap); switch (event.name) { case "console": this.handleConsoleEvent(event); return; case "error": throw new Error(`Could not handle task error ${event.error}`); + case "stop": + this.handleStopEvent(event); + return; + case "terminate": + this.handleTerminateEvent(event); + return; case "exit": this.handleExitEvent(event); return; @@ -142,7 +320,7 @@ export class TaskProcessManager { await this.handleLaunchEvent(event); return; case "health": - this.handleHealthEvent(event); + await this.handleHealthEvent(event); return; } } @@ -153,6 +331,7 @@ export class TaskProcessManager { const renderedService = await renderService(service); this.setTask(renderedService, "pending"); const serviceLogger = makeLogger(renderedService.name, "Child Task"); + service.notifications.emit("pending", renderedService); this.notifications.emit({ service: renderedService, name: "launch", logger: serviceLogger }); } @@ -190,14 +369,17 @@ private setTask(service: ITaskService, status: TaskStatus) { } private addActiveTaskSpec(service: ActiveTaskService) { +// logger.debug("WAAAAAAAAAAAAAAAAAAAAAAAA"); const hash = this.taskHash(service); - if (this.activeTaskMap.has(hash) === false) { - this.taskMap.set(hash, service); + this.activeTaskMap.set(hash, service); + /* if (this.activeTaskMap.has(hash) === flase ) { + this.activeTaskMap.set(hash, service); return; } + */ } - private taskHash(service: ITaskService): string { - return `${service.name}_${service.version}_${service.env}`; + private taskHash({ name, version, env }: { name: string, version: string, env: string }): string { + return `${name}_${version}_${env}`; } } diff --git a/src/lib/util.test.ts b/src/lib/util.test.ts index 0be1d935..24fd8722 100644 --- a/src/lib/util.test.ts +++ b/src/lib/util.test.ts @@ -1,4 +1,4 @@ -import { extractAsset, downloadAsset } from "./util"; +import {isUp, extractAsset, downloadAsset, getFreePorts } from "./util"; import fs, { ensureDir } from "fs-extra"; import net from "net"; import { createServer } from "http"; @@ -7,6 +7,9 @@ import crypto from "crypto"; import _ from "lodash"; import rimraf from "rimraf"; import { promisify } from "util"; +import { mockUDPServer } from "../../fixtures/src/util"; +import dgram from "dgram"; + const rmDir = promisify(rimraf); const TEST_DATA_DIR = "./test-data"; describe("extract asset ", () => { @@ -62,13 +65,14 @@ describe("extract asset ", () => { describe("downloadAsset", () => { let testServer: http.Server; + let testUDPServer: dgram.Socket; let testBuffer: Buffer; let downloadDir: string; beforeAll(async () => { await fs.ensureDir(`${TEST_DATA_DIR}`); testBuffer = crypto.randomBytes(200); - return new Promise((resolve) => { + await new Promise((resolve) => { testServer = createServer((req, res) => { if (!req.url) { throw new Error("Request missing url"); } if (req.url.search("download") > 0) { @@ -96,6 +100,7 @@ describe("downloadAsset", () => { }); testServer.listen(0, resolve); }); + testUDPServer = await mockUDPServer(); }); beforeEach(async () => { @@ -104,7 +109,9 @@ describe("downloadAsset", () => { }); afterAll((done) => { - testServer.close(done); + testUDPServer.close(() => { + testServer.close(done); + }); }); afterAll(async () => { @@ -146,4 +153,21 @@ describe("downloadAsset", () => { } }); + it("should check for tcp endpoint being up", async () => { + const {port} = testServer.address() as net.AddressInfo; + const ports = await getFreePorts(); + let up = await isUp(port, "tcp"); + expect(up).toBe(true); + up = await isUp(ports.DYNAMIC_TCP_PORT_1, "tcp"); + expect(up).toBe(false); + }); + + it("should check for udp endpoint being up", async () => { + const {port} = testUDPServer.address() as net.AddressInfo; + const ports = await getFreePorts(); + let up = await isUp(port, "udp"); + expect(up).toBe(true); + up = await isUp(ports.DYNAMIC_UDP_PORT_1, "udp"); + expect(up).toBe(false); + }); }); diff --git a/src/lib/util.ts b/src/lib/util.ts index 40c3f6cd..8fc5ae22 100644 --- a/src/lib/util.ts +++ b/src/lib/util.ts @@ -8,7 +8,7 @@ import tar from "tar-fs"; import zlib from "zlib"; import request from "request"; import net, { AddressInfo } from "net"; -import dgram from "dgram"; +import dgram, { Socket } from "dgram"; import { makeLogger } from "./logging"; const logger = makeLogger("ServiceRunner", "Util"); @@ -23,7 +23,55 @@ interface IDynamicPorts { DYNAMIC_UDP_PORT_2: number; DYNAMIC_UDP_PORT_3: number; } +type Protocol = "udp" | "tcp"; +const SOCKET_CONNECTIVITY_TIMEOUT = 5000; +/** + * Returns true or false if port cannot be connected to. For services support RPC discover + * isUp will check the RPC discover response to verify connectivity across protocols. + * For services without RPC discover, service will just test raw connectivity with TCP. + * and port availibility with UDP. + * + * For UDP an unavailable port implies connectivity. + * + * @returns true if the port is able to be connected to, false otherwise + */ +export async function isUp(port: number, protocol: Protocol): Promise { + switch (protocol) { + case "tcp": + try { + await tcpSocketTest(port); + return true; + } catch (e) { + logger.error(e.message); + return false; + } + case "udp": + try { + await getAvailableUDPPort(port); + return false; + } catch (e) { + logger.error(e.message); + return true; + } + } +} +function tcpSocketTest(port: number): Promise { + return new Promise((resolve, reject) => { + const socket = new net.Socket(); + const handleError = (err: Error) => { + logger.error(err); + socket.destroy(); + reject(err); + }; + socket.once("error", handleError); + socket.once("timeout", handleError); + socket.setTimeout(SOCKET_CONNECTIVITY_TIMEOUT); + socket.connect(port, "localhost", () => { + resolve(); + }); + }); +} // Note this might be problematic if there are collisions /** * Returns a set of TCP and UDP Ports. @@ -52,10 +100,10 @@ export async function getFreePorts(): Promise < IDynamicPorts > { * * @returns a free TCP Port */ -export const getAvailableTCPPort = () => new Promise((resolve, reject) => { +export const getAvailableTCPPort = (testPort: number = 0) => new Promise((resolve, reject) => { const server = net.createServer(); server.on("error", reject); - server.listen(0, () => { + server.listen(testPort, () => { const { port } = server.address() as AddressInfo; server.close(() => { resolve(port); @@ -68,12 +116,12 @@ export const getAvailableTCPPort = () => new Promise((resolve, reject) => { * * @returns a free TCP Port */ -export const getAvailableUDPPort = () => new Promise((resolve, reject) => { +export const getAvailableUDPPort = (testPort: number = 0) => new Promise((resolve, reject) => { const socket = dgram.createSocket("udp4"); - socket.bind({ port: 0 }, () => { + socket.on("error", reject); + socket.bind({ port: testPort }, () => { const { port } = socket.address() as AddressInfo; - socket.on("error", reject); socket.close(() => { resolve(port); }); diff --git a/src/service-runner-config.json b/src/service-runner-config.json index e6d60a7b..ef5a690e 100644 --- a/src/service-runner-config.json +++ b/src/service-runner-config.json @@ -25,7 +25,7 @@ "interval": 5000, "retries": 2, "port": "${DYNAMIC_TCP_PORT_1}", - "proto": "tcp" + "protocol": "tcp" } }, { @@ -45,7 +45,13 @@ ], "stop": [], "teardown": [] - } + }, + "health": { + "interval": 5000, + "retries": 2, + "port": "${DYNAMIC_TCP_PORT_1}", + "protocol": "tcp" + } }, { "name": "ethereum", @@ -66,7 +72,7 @@ "interval": 5000, "retries": 2, "port": "${DYNAMIC_TCP_PORT_1}", - "proto": "tcp" + "protocol": "tcp" } }, { @@ -89,7 +95,7 @@ "interval": 5000, "retries": 2, "port": "${DYNAMIC_TCP_PORT_1}", - "proto": "tcp" + "protocol": "tcp" } }, { @@ -112,7 +118,7 @@ "interval": 5000, "retries": 2, "port": "${DYNAMIC_TCP_PORT_1}", - "proto": "tcp" + "protocol": "tcp" } } ],