Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
feat: Add requests idempotency verification on the server side
Browse files Browse the repository at this point in the history
  • Loading branch information
mykola-mokhnach committed Mar 30, 2020
1 parent 39fec80 commit b48cd96
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 16 deletions.
133 changes: 133 additions & 0 deletions lib/express/idempotency.js
@@ -0,0 +1,133 @@
import log from './logger';
import LRU from 'lru-cache';
import { fs, util } from 'appium-support';
import os from 'os';
import path from 'path';
import { EventEmitter } from 'events';


const CACHE_SIZE = 1024;
const IDEMPOTENT_RESPONSES = new LRU({
max: CACHE_SIZE,
dispose: (key, {response}) => {
if (response) {
fs.rimrafSync(response);
}
},
});
const MONITORED_METHODS = ['POST', 'PATCH'];
const IDEMPOTENCY_KEY_HEADER = 'x-idempotency-key';

process.on('exit', () => IDEMPOTENT_RESPONSES.reset());


function cacheResponse (key, req, res) {
const responseStateListener = new EventEmitter();
IDEMPOTENT_RESPONSES.set(key, {
method: req.method,
path: req.path,
response: null,
responseStateListener,
});
const tmpFile = path.resolve(os.tmpdir(), `${util.uuidV4()}.response`);
const responseListener = fs.createWriteStream(tmpFile, {
emitClose: true,
});
const originalSocketWriter = res.socket.write.bind(res.socket);
const patchedWriter = (chunk, encoding, next) => {
responseListener.write(chunk);
return originalSocketWriter(chunk, encoding, next);
};
res.socket.write = patchedWriter;
let writeError = null;
let isResponseFullySent = false;
responseListener.once('error', (e) => {
writeError = e;
});
res.once('finish', () => {
isResponseFullySent = true;
responseListener.end();
});
res.once('close', () => {
if (!isResponseFullySent) {
responseListener.end();
}
});
responseListener.once('close', () => {
if (!IDEMPOTENT_RESPONSES.has(key)) {
const msg = `Could not cache the response identified by '${key}'. ` +
`Cache consistency has been damaged`;
log.info(msg);
return responseStateListener.emit('error', new Error(msg));
}
if (writeError) {
log.info(`Could not cache the response identified by '${key}': ${writeError.message}`);
IDEMPOTENT_RESPONSES.del(key);
return responseStateListener.emit('error', writeError);
}
if (!isResponseFullySent) {
const msg = `Could not cache the response identified by '${key}', ` +
`because it has not been completed`;
log.info(msg);
log.info('Does the client terminate connections too early?');
IDEMPOTENT_RESPONSES.del(key);
return responseStateListener.emit('error', new Error(msg));
}

IDEMPOTENT_RESPONSES.get(key).response = tmpFile;
responseStateListener.emit('ready', tmpFile);
});
}

async function handleIdempotency (req, res, next) {
const key = req.headers[IDEMPOTENCY_KEY_HEADER];
if (!key) {
return next();
}
if (!MONITORED_METHODS.includes(req.method)) {
// GET, DELETE, etc. requests are idempotent by default
// there is no need to cache them
return next();
}

log.debug(`Request idempotency key: ${key}`);
if (!IDEMPOTENT_RESPONSES.has(key)) {
cacheResponse(key, req, res);
return next();
}

const {
method: storedMethod,
path: storedPath,
response,
responseStateListener,
} = IDEMPOTENT_RESPONSES.get(key);
if (req.method !== storedMethod || req.path !== storedPath) {
log.warn(`Got two different requests with the same idempotency key '${key}'`);
log.warn('Is the client generating idempotency keys properly?');
return next();
}

const rerouteCachedResponse = async (cachedResPath) => {
if (!await fs.exists(cachedResPath)) {
IDEMPOTENT_RESPONSES.del(key);
log.warn(`Could not read the cached response identified by key '${key}'`);
log.warn('The temporary storage is not accessible anymore');
return next();
}
fs.createReadStream(cachedResPath).pipe(res.socket);
};

if (response) {
log.info(`The same request with the idempotency key '${key}' has been already processed`);
log.info(`Rerouting its response to the current request`);
await rerouteCachedResponse(response);
} else {
log.info(`The same request with the idempotency key '${key}' is being processed`);
log.info(`Waiting for the response to be rerouted to the current request`);
responseStateListener.once('error', () => next());
responseStateListener.once('ready', rerouteCachedResponse);
}
}

export { handleIdempotency };
4 changes: 2 additions & 2 deletions lib/express/middleware.js
@@ -1,7 +1,7 @@
import _ from 'lodash';
import log from './logger';
import { errors } from '../protocol';

import { handleIdempotency } from './idempotency';

function allowCrossDomain (req, res, next) {
try {
Expand Down Expand Up @@ -87,5 +87,5 @@ function catch404Handler (req, res) {
export {
allowCrossDomain, fixPythonContentType, defaultToJSONContentType,
catchAllHandler, catch404Handler, catch4XXHandler,
allowCrossDomainAsyncExecute,
allowCrossDomainAsyncExecute, handleIdempotency,
};
27 changes: 15 additions & 12 deletions lib/express/server.js
Expand Up @@ -7,13 +7,17 @@ import bodyParser from 'body-parser';
import methodOverride from 'method-override';
import log from './logger';
import { startLogFormatter, endLogFormatter } from './express-logging';
import { allowCrossDomain, fixPythonContentType, defaultToJSONContentType,
catchAllHandler, catch404Handler, catch4XXHandler,
allowCrossDomainAsyncExecute} from './middleware';
import {
allowCrossDomain, fixPythonContentType, defaultToJSONContentType,
catchAllHandler, catch404Handler, catch4XXHandler,
allowCrossDomainAsyncExecute, handleIdempotency,
} from './middleware';
import { guineaPig, guineaPigScrollable, guineaPigAppBanner, welcome, STATIC_DIR } from './static';
import { produceError, produceCrash } from './crash';
import { addWebSocketHandler, removeWebSocketHandler, removeAllWebSocketHandlers,
getWebSocketHandlers } from './websocket';
import {
addWebSocketHandler, removeWebSocketHandler, removeAllWebSocketHandlers,
getWebSocketHandlers
} from './websocket';
import B from 'bluebird';
import { DEFAULT_BASE_PATH } from '../protocol';

Expand Down Expand Up @@ -41,14 +45,12 @@ async function server (opts = {}) {
// http.Server.close() only stops new connections, but we need to wait until
// all connections are closed and the `close` event is emitted
const close = httpServer.close.bind(httpServer);
httpServer.close = async () => {
return await new B((resolve, reject) => {
httpServer.on('close', resolve);
close((err) => {
if (err) reject(err); // eslint-disable-line curly
});
httpServer.close = async () => await new B((resolve, reject) => {
httpServer.on('close', resolve);
close((err) => {
if (err) reject(err); // eslint-disable-line curly
});
};
});

return await new B((resolve, reject) => {
httpServer.on('error', (err) => {
Expand Down Expand Up @@ -123,6 +125,7 @@ function configureServer (app, routeConfiguringFunction, allowCors = true, baseP
} else {
app.use(allowCrossDomainAsyncExecute(basePath));
}
app.use(handleIdempotency);
app.use(fixPythonContentType(basePath));
app.use(defaultToJSONContentType);
app.use(bodyParser.urlencoded({extended: true}));
Expand Down
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -53,7 +53,7 @@
"serve-favicon": "^2.4.5",
"source-map-support": "^0.5.5",
"validate.js": "^0.13.0",
"webdriverio": "^5.10.9",
"webdriverio": "^6.0.2",
"ws": "^7.0.0"
},
"scripts": {
Expand Down
61 changes: 61 additions & 0 deletions test/basedriver/driver-e2e-tests.js
Expand Up @@ -53,6 +53,67 @@ function baseDriverE2ETests (DriverClass, defaultCaps = {}) {
}

describe('session handling', function () {
it('should handle idempotency while creating sessions', async function () {
const sessionIds = [];
let times = 0;
do {
const res = await request({
url: 'http://localhost:8181/wd/hub/session',
headers: {
'X-Idempotency-Key': '123456',
},
method: 'POST',
json: {desiredCapabilities: defaultCaps, requiredCapabilities: {}},
simple: false,
resolveWithFullResponse: true
});

sessionIds.push(res.body.sessionId);
times++;
} while (times < 2);
_.uniq(sessionIds).length.should.equal(1);

const res = await request({
url: `http://localhost:8181/wd/hub/session/${sessionIds[0]}`,
method: 'DELETE',
json: true,
simple: false,
resolveWithFullResponse: true
});
res.statusCode.should.equal(200);
res.body.status.should.equal(0);
});

it('should handle idempotency while creating parallel sessions', async function () {
const reqs = [];
let times = 0;
do {
reqs.push(request({
url: 'http://localhost:8181/wd/hub/session',
headers: {
'X-Idempotency-Key': '12345',
},
method: 'POST',
json: {desiredCapabilities: defaultCaps, requiredCapabilities: {}},
simple: false,
resolveWithFullResponse: true
}));
times++;
} while (times < 2);
const sessionIds = (await B.all(reqs)).map((x) => x.body.sessionId);
_.uniq(sessionIds).length.should.equal(1);

const res = await request({
url: `http://localhost:8181/wd/hub/session/${sessionIds[0]}`,
method: 'DELETE',
json: true,
simple: false,
resolveWithFullResponse: true
});
res.statusCode.should.equal(200);
res.body.status.should.equal(0);
});

it('should create session and retrieve a session id, then delete it', async function () {
let res = await request({
url: 'http://localhost:8181/wd/hub/session',
Expand Down
2 changes: 1 addition & 1 deletion test/express/server-specs.js
Expand Up @@ -15,7 +15,7 @@ describe('server configuration', function () {
let app = {use: sinon.spy(), all: sinon.spy()};
let configureRoutes = () => {};
configureServer(app, configureRoutes);
app.use.callCount.should.equal(15);
app.use.callCount.should.equal(16);
app.all.callCount.should.equal(4);
});

Expand Down

0 comments on commit b48cd96

Please sign in to comment.