Skip to content

Commit

Permalink
Reuse coalescing buffer in PipelinedWriter (hazelcast#585)
Browse files Browse the repository at this point in the history
Also includes the following:
* Add proper closing for PipelinedWriter
* Fix flakiness in InvocationTest
  • Loading branch information
puzpuzpuz committed Sep 8, 2020
1 parent 3d7a0c2 commit 1b1269a
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 47 deletions.
93 changes: 63 additions & 30 deletions src/network/ClientConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,46 @@ import {BuildInfo} from '../BuildInfo';
import {HazelcastClient} from '../HazelcastClient';
import {AddressImpl, IOError, UUID} from '../core';
import {ClientMessageHandler} from '../protocol/ClientMessage';
import {DeferredPromise} from '../util/Util';
import {DeferredPromise, copyBuffers} from '../util/Util';
import {ILogger} from '../logging/ILogger';
import {
ClientMessage,
Frame,
SIZE_OF_FRAME_LENGTH_AND_FLAGS
} from '../protocol/ClientMessage';

const FROZEN_ARRAY = Object.freeze([]) as OutputQueueItem[];
const FROZEN_ARRAY = Object.freeze([]);
const PROPERTY_PIPELINING_ENABLED = 'hazelcast.client.autopipelining.enabled';
const PROPERTY_PIPELINING_THRESHOLD = 'hazelcast.client.autopipelining.threshold.bytes';
const PROPERTY_NO_DELAY = 'hazelcast.client.socket.no.delay';

interface OutputQueueItem {
buffer: Buffer;
resolver: Promise.Resolver<void>;
abstract class Writer extends EventEmitter {

abstract write(buffer: Buffer, resolver: Promise.Resolver<void>): void;

abstract close(): void;

}

/** @internal */
export class PipelinedWriter extends EventEmitter {
export class PipelinedWriter extends Writer {

private readonly socket: net.Socket;
private queue: OutputQueueItem[] = [];
private queuedBufs: Buffer[] = [];
private queuedResolvers: Promise.Resolver<void>[] = [];
private error: Error;
private scheduled = false;
private canWrite = true;
// coalescing threshold in bytes
private readonly threshold: number;
// reusable buffer for coalescing
private readonly coalesceBuf: Buffer;

constructor(socket: net.Socket, threshold: number) {
super();
this.socket = socket;
this.threshold = threshold;
this.coalesceBuf = Buffer.allocUnsafe(threshold);

// write queued items on drain event
socket.on('drain', () => {
Expand All @@ -69,10 +76,18 @@ export class PipelinedWriter extends EventEmitter {
// if there was a write error, it's useless to keep writing to the socket
return process.nextTick(() => resolver.reject(this.error));
}
this.queue.push({ buffer, resolver });
this.queuedBufs.push(buffer);
this.queuedResolvers.push(resolver);
this.schedule();
}

close(): void {
this.canWrite = false;
// no more items can be added now
this.queuedResolvers = FROZEN_ARRAY as Promise.Resolver<void>[];
this.queuedBufs = FROZEN_ARRAY as Buffer[];
}

private schedule(): void {
if (!this.scheduled && this.canWrite) {
this.scheduled = true;
Expand All @@ -86,36 +101,51 @@ export class PipelinedWriter extends EventEmitter {
return;
}

const buffers: Buffer[] = [];
const resolvers: Array<Promise.Resolver<void>> = [];
let totalLength = 0;

while (this.queue.length > 0 && totalLength < this.threshold) {
const item = this.queue.shift();
const data = item.buffer;
totalLength += data.length;
buffers.push(data);
resolvers.push(item.resolver);
let queueIdx = 0;
while (queueIdx < this.queuedBufs.length && totalLength < this.threshold) {
const buf = this.queuedBufs[queueIdx];
// if the next buffer exceeds the threshold,
// try to take multiple queued buffers which fit this.coalesceBuf
if (queueIdx > 0 && totalLength + buf.length > this.threshold) {
break;
}
totalLength += buf.length;
queueIdx++;
}

if (totalLength === 0) {
this.scheduled = false;
return;
}

// coalesce buffers and write to the socket: no further writes until flushed
const merged = buffers.length === 1 ? buffers[0] : Buffer.concat(buffers, totalLength);
this.canWrite = this.socket.write(merged as any, (err: Error) => {
const buffers = this.queuedBufs.slice(0, queueIdx);
this.queuedBufs = this.queuedBufs.slice(queueIdx);
const resolvers = this.queuedResolvers.slice(0, queueIdx);
this.queuedResolvers = this.queuedResolvers.slice(queueIdx);

let buf;
if (buffers.length === 1) {
// take the only buffer
buf = buffers[0];
} else {
// coalesce buffers
copyBuffers(this.coalesceBuf, buffers, totalLength);
buf = this.coalesceBuf.slice(0, totalLength);
}

// write to the socket: no further writes until flushed
this.canWrite = this.socket.write(buf, (err: Error) => {
if (err) {
this.handleError(err, resolvers);
return;
}

this.emit('write');
for (const r of resolvers) {
r.resolve();
for (const resolver of resolvers) {
resolver.resolve();
}
if (this.queue.length === 0 || !this.canWrite) {
if (this.queuedBufs.length === 0 || !this.canWrite) {
// will start running on the next message or drain event
this.scheduled = false;
return;
Expand All @@ -130,17 +160,15 @@ export class PipelinedWriter extends EventEmitter {
for (const r of sentResolvers) {
r.reject(this.error);
}
// no more items can be added now
const q = this.queue;
this.queue = FROZEN_ARRAY;
for (const it of q) {
it.resolver.reject(this.error);
for (const resolver of this.queuedResolvers) {
resolver.reject(this.error);
}
this.close();
}
}

/** @internal */
export class DirectWriter extends EventEmitter {
export class DirectWriter extends Writer {

private readonly socket: net.Socket;

Expand All @@ -159,6 +187,10 @@ export class DirectWriter extends EventEmitter {
resolver.resolve();
});
}

close(): void {
// no-op
}
}

/** @internal */
Expand Down Expand Up @@ -296,7 +328,7 @@ export class ClientConnection {
private closedCause: Error;
private connectedServerVersion: number;
private readonly socket: net.Socket;
private readonly writer: PipelinedWriter | DirectWriter;
private readonly writer: Writer;
private readonly reader: ClientMessageReader;
private readonly logger: ILogger;
private readonly fragmentedMessageHandler: FragmentedClientMessageHandler;
Expand Down Expand Up @@ -381,6 +413,7 @@ export class ClientConnection {
this.logClose();

this.socket.end();
this.writer.close();

this.client.getConnectionManager().onConnectionClose(this);
}
Expand Down
19 changes: 19 additions & 0 deletions src/util/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,22 @@ export function DeferredPromise<T>(): Promise.Resolver<T> {
promise,
} as Promise.Resolver<T>;
}

/**
* Copy contents of the given array of buffers into the target buffer.
*
* @param target target buffer
* @param sources source buffers
* @param totalLength total length of all source buffers
* @internal
*/
export function copyBuffers(target: Buffer, sources: Buffer[], totalLength: number): void {
if (target.length < totalLength) {
throw new RangeError('Target length ' + target.length + ' is less than requested ' + totalLength);
}
let pos = 0;
for (const source of sources) {
source.copy(target, pos);
pos += source.length;
}
}
47 changes: 31 additions & 16 deletions test/connection/PipelinedWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const { PipelinedWriter } = require('../../lib/network/ClientConnection');

describe('PipelinedWriterTest', function () {

const THRESHOLD = 8192;
let writer;
let mockSocket;

Expand All @@ -36,7 +37,7 @@ describe('PipelinedWriterTest', function () {
process.nextTick(() => mockSocket.emit('data', data));
return canWrite;
});
writer = new PipelinedWriter(mockSocket, 8192);
writer = new PipelinedWriter(mockSocket, THRESHOLD);
}

function setUpWriteFailure(err) {
Expand All @@ -45,10 +46,10 @@ describe('PipelinedWriterTest', function () {
process.nextTick(() => cb(err));
return false;
});
writer = new PipelinedWriter(mockSocket, 8192);
writer = new PipelinedWriter(mockSocket, THRESHOLD);
}

it('writes single message into socket (without copying it)', (done) => {
it('writes single small message into socket (without copying it)', (done) => {
setUpWriteSuccess(true);

const buffer = Buffer.from('test');
Expand All @@ -59,39 +60,53 @@ describe('PipelinedWriterTest', function () {
});
});

it('writes multiple messages as one into socket', (done) => {
it('writes single large message into socket (without copying it)', (done) => {
setUpWriteSuccess(true);

const buffer = Buffer.allocUnsafe(THRESHOLD * 2);
writer.write(buffer, DeferredPromise());
mockSocket.on('data', (data) => {
expect(data).to.be.equal(buffer);
done();
});
});

it('writes multiple small messages as one into socket', (done) => {
setUpWriteSuccess(true);

writer.write(Buffer.from('1'), DeferredPromise());
writer.write(Buffer.from('2'), DeferredPromise());
writer.write(Buffer.from('3'), DeferredPromise());
mockSocket.on('data', (data) => {
expect(data).to.be.deep.equal(Buffer.from('123'));
expect(Buffer.compare(data, Buffer.from('123'))).to.be.equal(0);
done();
});
});

it('coalesces buffers when writing into socket', (done) => {
it('coalesces buffers when writing into socket (1/2 of threshold)', (done) => {
setUpWriteSuccess(true);

const size = 4200;
const size = THRESHOLD / 2;
const data1 = Buffer.alloc(size).fill('1');
const resolver1 = DeferredPromise();
writer.write(Buffer.alloc(size), resolver1);
writer.write(data1, resolver1);
const data2 = Buffer.alloc(size).fill('2');
const resolver2 = DeferredPromise();
writer.write(Buffer.alloc(size), resolver2);
writer.write(data2, resolver2);
const data3 = Buffer.alloc(size).fill('3');
const resolver3 = DeferredPromise();
writer.write(Buffer.alloc(size), resolver3);
writer.write(data3, resolver3);

let cnt = 0;
let allData = Buffer.alloc(0);
mockSocket.on('data', (data) => {
allData = Buffer.concat([allData, data]);
cnt += 1;
cnt++;
if (cnt === 1) {
expect(data).to.be.deep.equal(Buffer.alloc(size * 2));
expect(Buffer.compare(data, Buffer.concat([data1, data2]))).to.be.equal(0);
}
if (cnt === 2) {
expect(data).to.be.deep.equal(Buffer.alloc(size));
expect(Buffer.compare(data, data3)).to.be.equal(0);
}
});

Expand All @@ -101,15 +116,15 @@ describe('PipelinedWriterTest', function () {
resolver3.promise
]).then(() => {
expect(cnt).to.be.equal(2);
expect(allData).to.be.deep.equal(Buffer.alloc(size * 3));
expect(Buffer.compare(allData, Buffer.concat([data1, data2, data3]))).to.be.equal(0);
done();
});
});

it('allows I/O in between coalesced writes into socket', (done) => {
setUpWriteSuccess(true);

const size = 9000;
const size = THRESHOLD * 2;
writer.write(Buffer.alloc(size), DeferredPromise());
writer.write(Buffer.alloc(size), DeferredPromise());
let cnt = 0;
Expand Down Expand Up @@ -187,7 +202,7 @@ describe('PipelinedWriterTest', function () {
writer.on('write', () => done(new Error()));
const resolver = DeferredPromise();
writer.write(Buffer.from('test'), resolver);
resolver.promise.catch(_ => {
resolver.promise.catch(() => {
done();
});
});
Expand Down
2 changes: 2 additions & 0 deletions test/invocation/InvocationTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const sinon = require('sinon');
const sandbox = sinon.createSandbox();
const { Client, IndeterminateOperationStateError } = require('../../');
const { Invocation, InvocationService } = require('../../lib/invocation/InvocationService');
const { LifecycleServiceImpl } = require('../../lib/LifecycleService');
const { ClientMessage } = require('../../lib/protocol/ClientMessage');

describe('InvocationTest', function () {
Expand All @@ -31,6 +32,7 @@ describe('InvocationTest', function () {
clientStub = sandbox.stub(Client.prototype);
serviceStub = sandbox.stub(InvocationService.prototype);
clientStub.getInvocationService.returns(serviceStub);
clientStub.getLifecycleService.returns(sandbox.stub(LifecycleServiceImpl.prototype));
});

afterEach(function () {
Expand Down
48 changes: 48 additions & 0 deletions test/unit/UtilTest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const { expect } = require('chai');
const { copyBuffers } = require('../../lib/util/Util');

describe('UtilTest', function () {

it('copyBuffers: throws on invalid total length', function () {
expect(() => copyBuffers(Buffer.from([0x1]), [ Buffer.from([0x2]) ], 3))
.to.throw(RangeError);
});

it('copyBuffers: writes single buffer of less length', function () {
const target = Buffer.from('abc');
const sources = [ Buffer.from('d') ];
copyBuffers(target, sources, 1);

expect(Buffer.compare(target, Buffer.from('dbc'))).to.be.equal(0);
});

it('copyBuffers: writes multiple buffers of same total length', function () {
const target = Buffer.from('abc');
const sources = [
Buffer.from('d'),
Buffer.from('e'),
Buffer.from('f')
];
copyBuffers(target, sources, 3);

expect(Buffer.compare(target, Buffer.from('def'))).to.be.equal(0);
});
});
Loading

0 comments on commit 1b1269a

Please sign in to comment.