Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce health checks #1607

Merged
merged 6 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 87 additions & 0 deletions packages/cubejs-api-gateway/src/cached-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import type { Handler, Response } from 'express';

type CachedRouterOptions = {
lifetime: number,
};

interface CachedResponse {
status: number,
json: any,
}

export function pipeFromCache(cache: CachedResponse, res: Response) {
res.status(cache.status)
.json(cache.json);
}

export function cachedHandler(handler: Handler, options: CachedRouterOptions = { lifetime: 1000 }): Handler {
let lastCache: CachedResponse = {
status: 200,
json: null,
};
let lastCacheExpr = new Date(Date.now() - options.lifetime);
let lock = false;

const queue: Response[] = [];

return async (req, res, next) => {
if (lock) {
queue.push(res);
} else {
if (lastCacheExpr.getTime() > new Date().getTime()) {
pipeFromCache(lastCache, res);

return;
}

lock = true;

try {
const responseWrapper: any = {
...res,
status(code: number) {
res.status(code);

lastCache.status = code;

return responseWrapper;
},
json(json: any) {
res.json(json);

lastCache.json = json;

return responseWrapper;
}
};

await handler(
req,
responseWrapper,
next
);

lastCacheExpr = new Date(Date.now() + options.lifetime);
lock = false;
} catch (e) {
// console.log('cached-router exception', e);

lock = false;
lastCache = {
status: 200,
json: null,
};
lastCacheExpr = new Date(Date.now() - options.lifetime);

next(e);
}

let queuedResponse: Response | undefined;

// eslint-disable-next-line no-cond-assign
while (queuedResponse = queue.pop()) {
pipeFromCache(lastCache, queuedResponse);
}
}
};
}
101 changes: 98 additions & 3 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ import R from 'ramda';
import moment from 'moment';
import uuid from 'uuid/v4';
import bodyParser from 'body-parser';
import type { Request as ExpressRequest, Response, NextFunction, Application as ExpressApplication, RequestHandler } from 'express';

import type {
Request as ExpressRequest,
Response, NextFunction,
Application as ExpressApplication,
RequestHandler,
ErrorRequestHandler
} from 'express';

import { requestParser } from './requestParser';
import { UserError } from './UserError';
Expand All @@ -12,6 +19,7 @@ import { SubscriptionServer } from './SubscriptionServer';
import { LocalSubscriptionStore } from './LocalSubscriptionStore';
import { getPivotQuery, getQueryGranularity, normalizeQuery, QUERY_TYPE } from './query';
import { CheckAuthFn, CheckAuthMiddlewareFn, ExtendContextFn, QueryTransformerFn, RequestContext } from './interfaces';
import { cachedHandler } from './cached-handler';

type MetaConfig = {
config: {
Expand Down Expand Up @@ -148,6 +156,8 @@ interface Request extends ExpressRequest {
}

export interface ApiGatewayOptions {
standalone: boolean;
dataSourceStorage: any;
refreshScheduler: any;
basePath?: string;
extendContext?: ExtendContextFn;
Expand All @@ -171,10 +181,14 @@ export class ApiGateway {

protected readonly enforceSecurityChecks: boolean;

protected readonly standalone: boolean;

protected readonly extendContext?: ExtendContextFn;

protected readonly requestMiddleware: RequestHandler[];

protected readonly dataSourceStorage: any;

public readonly checkAuthFn: CheckAuthFn;

public constructor(
Expand All @@ -186,8 +200,9 @@ export class ApiGateway {
) {
options = options || {};

this.dataSourceStorage = options.dataSourceStorage;
this.refreshScheduler = options.refreshScheduler;

this.standalone = options.standalone;
this.basePath = options.basePath || '/cubejs-api';

this.queryTransformer = options.queryTransformer || (async (query) => query);
Expand Down Expand Up @@ -267,9 +282,13 @@ export class ApiGateway {
await this.dryRun({
query: req.body.query,
context: req.context,
res: this.resToResultFn(res)
});
}));

app.get(`/readyz`, this.requestMiddleware, cachedHandler(this.readiness));
app.get(`/livez`, this.requestMiddleware, cachedHandler(this.liveness));

app.use(this.handleErrorMiddleware);
}

public initSubscriptionServer(sendMessage) {
Expand Down Expand Up @@ -591,6 +610,17 @@ export class ApiGateway {
return req.headers['x-request-id'] || req.headers.traceparent || uuid();
}

protected handleErrorMiddleware: ErrorRequestHandler = async (e, req, res, next) => {
this.handleError({
e,
context: (<any>req).context,
res: this.resToResultFn(res),
requestStarted: new Date(),
});

next(e);
}

public handleError({
e, context, query, res, requestStarted
}: any) {
Expand Down Expand Up @@ -745,4 +775,69 @@ export class ApiGateway {
requestId: context.requestId
});
}

protected readiness: RequestHandler = async (req, res) => {
let health = 'HEALTH';

if (this.standalone) {
const orchestratorApi = await this.adapterApi({});

try {
await orchestratorApi.testConnection();
} catch (e) {
this.log({}, {
type: 'Internal Server Error',
error: e.stack || e.toString(),
});

health = 'DOWN';
}

try {
await orchestratorApi.testOrchestratorConnections();
} catch (e) {
this.log({}, {
type: 'Internal Server Error',
error: e.stack || e.toString(),
});

health = 'DOWN';
}
}

res.status(health === 'HEALTH' ? 200 : 500).json({
health,
});
}

protected liveness: RequestHandler = async (req, res) => {
let health = 'HEALTH';

try {
await this.dataSourceStorage.testConnections();
} catch (e) {
this.log({}, {
type: 'Internal Server Error',
error: e.stack || e.toString(),
});

health = 'DOWN';
}

try {
// @todo Optimize this moment?
await this.dataSourceStorage.testOrchestratorConnections();
} catch (e) {
this.log({}, {
type: 'Internal Server Error',
error: e.stack || e.toString(),
});

health = 'DOWN';
}

res.status(health === 'HEALTH' ? 200 : 500).json({
health,
});
}
}
70 changes: 70 additions & 0 deletions packages/cubejs-api-gateway/test/cached-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { cachedHandler } from '../src/cached-handler';

const createAsyncLock = (timeout) => new Promise(
(resolve) => {
setTimeout(
resolve,
timeout
);
}
);

describe('cachedHandler', () => {
test('works', async () => {
let reqPassed = 0;
let resPassed = 0;

const handler = cachedHandler(async (req, res, next) => {
reqPassed++;

await createAsyncLock(125);

res.status(200).json('heh');
}, {
lifetime: 250
});

const req: any = {};

const res: any = {
status: (code) => {
expect(code).toEqual(200);

return res;
},
json: (content) => {
resPassed++;

expect(content).toEqual('heh');

return res;
}
};

const next = () => {
// nothing to do
};

handler(req, res, next);

expect(reqPassed).toEqual(1);
expect(resPassed).toEqual(0);

handler(req, res, next);
handler(req, res, next);

expect(resPassed).toEqual(0);

await createAsyncLock(125);

expect(resPassed).toEqual(3);
expect(reqPassed).toEqual(1);

// cache will be expired
await createAsyncLock(250);

handler(req, res, next);

expect(reqPassed).toEqual(2);
});
});
46 changes: 40 additions & 6 deletions packages/cubejs-api-gateway/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,32 @@ const compilerApi = jest.fn().mockImplementation(() => ({
},
}));

class DataSourceStorageMock {
async testConnections() {
return [];
}

async testOrchestratorConnections() {
return [];
}
}

const adapterApi = jest.fn().mockImplementation(() => ({
async executeQuery() {
return {
data: [{ foo__bar: 42 }],
};
},
testConnection: () => Promise.resolve(),
testOrchestratorConnections: () => Promise.resolve(),
executeQuery: async () => ({
data: [{ foo__bar: 42 }],
}),
}));

const logger = (type, message) => console.log({ type, ...message });

describe('API Gateway', () => {
process.env.NODE_ENV = 'production';
const apiGateway = new ApiGateway('secret', compilerApi, adapterApi, logger);
const apiGateway = new ApiGateway('secret', compilerApi, adapterApi, logger, {
standalone: true,
dataSourceStorage: new DataSourceStorageMock(),
});
process.env.NODE_ENV = null;
const app = express();
apiGateway.initApp(app);
Expand Down Expand Up @@ -225,5 +239,25 @@ describe('API Gateway', () => {
data: [{ 'Foo.bar': 42 }],
});
});

test('readyz', async () => {
const res = await request(app)
.get(`/readyz`)
.set('Content-type', 'application/json')
.set('Authorization', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe3iZaWbvV5XKSTbuAn0M')
.expect(200);

expect(res.body).toMatchObject({ health: 'HEALTH' });
});

test('livez', async () => {
const res = await request(app)
.get(`/livez`)
.set('Content-type', 'application/json')
.set('Authorization', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe3iZaWbvV5XKSTbuAn0M')
.expect(200);

expect(res.body).toMatchObject({ health: 'HEALTH' });
});
});
});