Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse coalescing buffer in PipelinedWriter #585

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 63 additions & 30 deletions src/network/ClientConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,46 @@ import {BuildInfo} from '../BuildInfo';
import {HazelcastClient} from '../HazelcastClient';
import {AddressImpl, IOError, UUID} from '../core';
import {ClientMessageHandler} from '../protocol/ClientMessage';
import {DeferredPromise} from '../util/Util';
import {DeferredPromise, copyBuffers} from '../util/Util';
import {ILogger} from '../logging/ILogger';
import {
ClientMessage,
Frame,
SIZE_OF_FRAME_LENGTH_AND_FLAGS
} from '../protocol/ClientMessage';

const FROZEN_ARRAY = Object.freeze([]) as OutputQueueItem[];
const FROZEN_ARRAY = Object.freeze([]);
const PROPERTY_PIPELINING_ENABLED = 'hazelcast.client.autopipelining.enabled';
const PROPERTY_PIPELINING_THRESHOLD = 'hazelcast.client.autopipelining.threshold.bytes';
const PROPERTY_NO_DELAY = 'hazelcast.client.socket.no.delay';

interface OutputQueueItem {
buffer: Buffer;
resolver: Promise.Resolver<void>;
abstract class Writer extends EventEmitter {

abstract write(buffer: Buffer, resolver: Promise.Resolver<void>): void;

abstract close(): void;

}

/** @internal */
export class PipelinedWriter extends EventEmitter {
export class PipelinedWriter extends Writer {

private readonly socket: net.Socket;
private queue: OutputQueueItem[] = [];
private queuedBufs: Buffer[] = [];
private queuedResolvers: Promise.Resolver<void>[] = [];
private error: Error;
private scheduled = false;
private canWrite = true;
// coalescing threshold in bytes
private readonly threshold: number;
// reusable buffer for coalescing
private readonly coalesceBuf: Buffer;

constructor(socket: net.Socket, threshold: number) {
super();
this.socket = socket;
this.threshold = threshold;
this.coalesceBuf = Buffer.allocUnsafe(threshold);

// write queued items on drain event
socket.on('drain', () => {
Expand All @@ -69,10 +76,18 @@ export class PipelinedWriter extends EventEmitter {
// if there was a write error, it's useless to keep writing to the socket
return process.nextTick(() => resolver.reject(this.error));
}
this.queue.push({ buffer, resolver });
this.queuedBufs.push(buffer);
this.queuedResolvers.push(resolver);
this.schedule();
}

close(): void {
this.canWrite = false;
// no more items can be added now
this.queuedResolvers = FROZEN_ARRAY as Promise.Resolver<void>[];
this.queuedBufs = FROZEN_ARRAY as Buffer[];
}

private schedule(): void {
if (!this.scheduled && this.canWrite) {
this.scheduled = true;
Expand All @@ -86,36 +101,51 @@ export class PipelinedWriter extends EventEmitter {
return;
}

const buffers: Buffer[] = [];
const resolvers: Array<Promise.Resolver<void>> = [];
let totalLength = 0;

while (this.queue.length > 0 && totalLength < this.threshold) {
const item = this.queue.shift();
const data = item.buffer;
totalLength += data.length;
buffers.push(data);
resolvers.push(item.resolver);
let queueIdx = 0;
while (queueIdx < this.queuedBufs.length && totalLength < this.threshold) {
const buf = this.queuedBufs[queueIdx];
// if the next buffer exceeds the threshold,
// try to take multiple queued buffers which fit this.coalesceBuf
if (queueIdx > 0 && totalLength + buf.length > this.threshold) {
break;
}
totalLength += buf.length;
queueIdx++;
}

if (totalLength === 0) {
this.scheduled = false;
return;
}

// coalesce buffers and write to the socket: no further writes until flushed
const merged = buffers.length === 1 ? buffers[0] : Buffer.concat(buffers, totalLength);
this.canWrite = this.socket.write(merged as any, (err: Error) => {
const buffers = this.queuedBufs.slice(0, queueIdx);
this.queuedBufs = this.queuedBufs.slice(queueIdx);
const resolvers = this.queuedResolvers.slice(0, queueIdx);
this.queuedResolvers = this.queuedResolvers.slice(queueIdx);

let buf;
if (buffers.length === 1) {
// take the only buffer
buf = buffers[0];
} else {
// coalesce buffers
copyBuffers(this.coalesceBuf, buffers, totalLength);
buf = this.coalesceBuf.slice(0, totalLength);
}

// write to the socket: no further writes until flushed
this.canWrite = this.socket.write(buf, (err: Error) => {
if (err) {
this.handleError(err, resolvers);
return;
}

this.emit('write');
for (const r of resolvers) {
r.resolve();
for (const resolver of resolvers) {
resolver.resolve();
}
if (this.queue.length === 0 || !this.canWrite) {
if (this.queuedBufs.length === 0 || !this.canWrite) {
// will start running on the next message or drain event
this.scheduled = false;
return;
Expand All @@ -130,17 +160,15 @@ export class PipelinedWriter extends EventEmitter {
for (const r of sentResolvers) {
r.reject(this.error);
}
// no more items can be added now
const q = this.queue;
this.queue = FROZEN_ARRAY;
for (const it of q) {
it.resolver.reject(this.error);
for (const resolver of this.queuedResolvers) {
resolver.reject(this.error);
}
this.close();
}
}

/** @internal */
export class DirectWriter extends EventEmitter {
export class DirectWriter extends Writer {

private readonly socket: net.Socket;

Expand All @@ -159,6 +187,10 @@ export class DirectWriter extends EventEmitter {
resolver.resolve();
});
}

close(): void {
// no-op
}
}

/** @internal */
Expand Down Expand Up @@ -296,7 +328,7 @@ export class ClientConnection {
private closedCause: Error;
private connectedServerVersion: number;
private readonly socket: net.Socket;
private readonly writer: PipelinedWriter | DirectWriter;
private readonly writer: Writer;
private readonly reader: ClientMessageReader;
private readonly logger: ILogger;
private readonly fragmentedMessageHandler: FragmentedClientMessageHandler;
Expand Down Expand Up @@ -381,6 +413,7 @@ export class ClientConnection {
this.logClose();

this.socket.end();
this.writer.close();

this.client.getConnectionManager().onConnectionClose(this);
}
Expand Down
19 changes: 19 additions & 0 deletions src/util/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,22 @@ export function DeferredPromise<T>(): Promise.Resolver<T> {
promise,
} as Promise.Resolver<T>;
}

/**
* Copy contents of the given array of buffers into the target buffer.
*
* @param target target buffer
* @param sources source buffers
* @param totalLength total length of all source buffers
* @internal
*/
export function copyBuffers(target: Buffer, sources: Buffer[], totalLength: number): void {
if (target.length < totalLength) {
throw new RangeError('Target length ' + target.length + ' is less than requested ' + totalLength);
}
let pos = 0;
for (const source of sources) {
source.copy(target, pos);
pos += source.length;
}
}
47 changes: 31 additions & 16 deletions test/connection/PipelinedWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const { PipelinedWriter } = require('../../lib/network/ClientConnection');

describe('PipelinedWriterTest', function () {

const THRESHOLD = 8192;
let writer;
let mockSocket;

Expand All @@ -36,7 +37,7 @@ describe('PipelinedWriterTest', function () {
process.nextTick(() => mockSocket.emit('data', data));
return canWrite;
});
writer = new PipelinedWriter(mockSocket, 8192);
writer = new PipelinedWriter(mockSocket, THRESHOLD);
}

function setUpWriteFailure(err) {
Expand All @@ -45,10 +46,10 @@ describe('PipelinedWriterTest', function () {
process.nextTick(() => cb(err));
return false;
});
writer = new PipelinedWriter(mockSocket, 8192);
writer = new PipelinedWriter(mockSocket, THRESHOLD);
}

it('writes single message into socket (without copying it)', (done) => {
it('writes single small message into socket (without copying it)', (done) => {
setUpWriteSuccess(true);

const buffer = Buffer.from('test');
Expand All @@ -59,39 +60,53 @@ describe('PipelinedWriterTest', function () {
});
});

it('writes multiple messages as one into socket', (done) => {
it('writes single large message into socket (without copying it)', (done) => {
setUpWriteSuccess(true);

const buffer = Buffer.allocUnsafe(THRESHOLD * 2);
writer.write(buffer, DeferredPromise());
mockSocket.on('data', (data) => {
expect(data).to.be.equal(buffer);
done();
});
});

it('writes multiple small messages as one into socket', (done) => {
setUpWriteSuccess(true);

writer.write(Buffer.from('1'), DeferredPromise());
writer.write(Buffer.from('2'), DeferredPromise());
writer.write(Buffer.from('3'), DeferredPromise());
mockSocket.on('data', (data) => {
expect(data).to.be.deep.equal(Buffer.from('123'));
expect(Buffer.compare(data, Buffer.from('123'))).to.be.equal(0);
done();
});
});

it('coalesces buffers when writing into socket', (done) => {
it('coalesces buffers when writing into socket (1/2 of threshold)', (done) => {
setUpWriteSuccess(true);

const size = 4200;
const size = THRESHOLD / 2;
const data1 = Buffer.alloc(size).fill('1');
const resolver1 = DeferredPromise();
writer.write(Buffer.alloc(size), resolver1);
writer.write(data1, resolver1);
const data2 = Buffer.alloc(size).fill('2');
const resolver2 = DeferredPromise();
writer.write(Buffer.alloc(size), resolver2);
writer.write(data2, resolver2);
const data3 = Buffer.alloc(size).fill('3');
const resolver3 = DeferredPromise();
writer.write(Buffer.alloc(size), resolver3);
writer.write(data3, resolver3);

let cnt = 0;
let allData = Buffer.alloc(0);
mockSocket.on('data', (data) => {
allData = Buffer.concat([allData, data]);
cnt += 1;
cnt++;
if (cnt === 1) {
expect(data).to.be.deep.equal(Buffer.alloc(size * 2));
expect(Buffer.compare(data, Buffer.concat([data1, data2]))).to.be.equal(0);
}
if (cnt === 2) {
expect(data).to.be.deep.equal(Buffer.alloc(size));
expect(Buffer.compare(data, data3)).to.be.equal(0);
}
});

Expand All @@ -101,15 +116,15 @@ describe('PipelinedWriterTest', function () {
resolver3.promise
]).then(() => {
expect(cnt).to.be.equal(2);
expect(allData).to.be.deep.equal(Buffer.alloc(size * 3));
expect(Buffer.compare(allData, Buffer.concat([data1, data2, data3]))).to.be.equal(0);
done();
});
});

it('allows I/O in between coalesced writes into socket', (done) => {
setUpWriteSuccess(true);

const size = 9000;
const size = THRESHOLD * 2;
writer.write(Buffer.alloc(size), DeferredPromise());
writer.write(Buffer.alloc(size), DeferredPromise());
let cnt = 0;
Expand Down Expand Up @@ -187,7 +202,7 @@ describe('PipelinedWriterTest', function () {
writer.on('write', () => done(new Error()));
const resolver = DeferredPromise();
writer.write(Buffer.from('test'), resolver);
resolver.promise.catch(_ => {
resolver.promise.catch(() => {
done();
});
});
Expand Down
2 changes: 2 additions & 0 deletions test/invocation/InvocationTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const sinon = require('sinon');
const sandbox = sinon.createSandbox();
const { Client, IndeterminateOperationStateError } = require('../../');
const { Invocation, InvocationService } = require('../../lib/invocation/InvocationService');
const { LifecycleServiceImpl } = require('../../lib/LifecycleService');
const { ClientMessage } = require('../../lib/protocol/ClientMessage');

describe('InvocationTest', function () {
Expand All @@ -31,6 +32,7 @@ describe('InvocationTest', function () {
clientStub = sandbox.stub(Client.prototype);
serviceStub = sandbox.stub(InvocationService.prototype);
clientStub.getInvocationService.returns(serviceStub);
clientStub.getLifecycleService.returns(sandbox.stub(LifecycleServiceImpl.prototype));
});

afterEach(function () {
Expand Down
48 changes: 48 additions & 0 deletions test/unit/UtilTest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const { expect } = require('chai');
const { copyBuffers } = require('../../lib/util/Util');

describe('UtilTest', function () {

it('copyBuffers: throws on invalid total length', function () {
expect(() => copyBuffers(Buffer.from([0x1]), [ Buffer.from([0x2]) ], 3))
.to.throw(RangeError);
});

it('copyBuffers: writes single buffer of less length', function () {
const target = Buffer.from('abc');
const sources = [ Buffer.from('d') ];
copyBuffers(target, sources, 1);

expect(Buffer.compare(target, Buffer.from('dbc'))).to.be.equal(0);
});

it('copyBuffers: writes multiple buffers of same total length', function () {
const target = Buffer.from('abc');
const sources = [
Buffer.from('d'),
Buffer.from('e'),
Buffer.from('f')
];
copyBuffers(target, sources, 3);

expect(Buffer.compare(target, Buffer.from('def'))).to.be.equal(0);
});
});
Loading