Skip to content

Commit

Permalink
Reduce number of allocated Buffers for writes (#605)
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Sep 22, 2020
1 parent 03b2002 commit c1f7410
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 143 deletions.
2 changes: 1 addition & 1 deletion src/invocation/InvocationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ export class InvocationService {
}

private write(invocation: Invocation, connection: ClientConnection): Promise<void> {
return connection.write(invocation.request.toBuffer());
return connection.write(invocation.request);
}

private notifyError(invocation: Invocation, error: Error): void {
Expand Down
39 changes: 23 additions & 16 deletions src/network/ClientConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {BuildInfo} from '../BuildInfo';
import {HazelcastClient} from '../HazelcastClient';
import {AddressImpl, IOError, UUID} from '../core';
import {ClientMessageHandler} from '../protocol/ClientMessage';
import {copyBuffers, deferredPromise, DeferredPromise} from '../util/Util';
import {deferredPromise, DeferredPromise} from '../util/Util';
import {ILogger} from '../logging/ILogger';
import {
ClientMessage,
Expand All @@ -37,16 +37,18 @@ const PROPERTY_NO_DELAY = 'hazelcast.client.socket.no.delay';

abstract class Writer extends EventEmitter {

abstract write(buffer: Buffer, resolver: DeferredPromise<void>): void;
abstract write(message: ClientMessage, resolver: DeferredPromise<void>): void;

abstract close(): void;

}

interface OutputQueueItem {

buffer: Buffer;
message: ClientMessage;

resolver: DeferredPromise<void>;

}

/** @internal */
Expand Down Expand Up @@ -75,12 +77,12 @@ export class PipelinedWriter extends Writer {
});
}

write(buffer: Buffer, resolver: DeferredPromise<void>): void {
write(message: ClientMessage, resolver: DeferredPromise<void>): void {
if (this.error) {
// if there was a write error, it's useless to keep writing to the socket
return process.nextTick(() => resolver.reject(this.error));
}
this.queue.push({ buffer, resolver });
this.queue.push({ message, resolver });
this.schedule();
}

Expand All @@ -106,13 +108,14 @@ export class PipelinedWriter extends Writer {
let totalLength = 0;
let queueIdx = 0;
while (queueIdx < this.queue.length && totalLength < this.threshold) {
const buf = this.queue[queueIdx].buffer;
const msg = this.queue[queueIdx].message;
const msgLength = msg.getTotalLength();
// if the next buffer exceeds the threshold,
// try to take multiple queued buffers which fit this.coalesceBuf
if (queueIdx > 0 && totalLength + buf.length > this.threshold) {
if (queueIdx > 0 && totalLength + msgLength > this.threshold) {
break;
}
totalLength += buf.length;
totalLength += msgLength;
queueIdx++;
}

Expand All @@ -125,12 +128,15 @@ export class PipelinedWriter extends Writer {
this.queue = this.queue.slice(queueIdx);

let buf;
if (writeBatch.length === 1) {
// take the only buffer
buf = writeBatch[0].buffer;
if (writeBatch.length === 1 && totalLength > this.threshold) {
// take the only large message
buf = writeBatch[0].message.toBuffer();
} else {
// coalesce buffers
copyBuffers(this.coalesceBuf, writeBatch, totalLength);
let pos = 0;
for (const item of writeBatch) {
pos = item.message.writeTo(this.coalesceBuf, pos);
}
buf = this.coalesceBuf.slice(0, totalLength);
}

Expand Down Expand Up @@ -177,8 +183,9 @@ export class DirectWriter extends Writer {
this.socket = socket;
}

write(buffer: Buffer, resolver: DeferredPromise<void>): void {
this.socket.write(buffer as any, (err: any) => {
write(message: ClientMessage, resolver: DeferredPromise<void>): void {
const buffer = message.toBuffer();
this.socket.write(buffer, (err: any) => {
if (err) {
resolver.reject(new IOError(err));
return;
Expand Down Expand Up @@ -385,9 +392,9 @@ export class ClientConnection {
this.remoteUuid = remoteUuid;
}

write(buffer: Buffer): Promise<void> {
write(message: ClientMessage): Promise<void> {
const deferred = deferredPromise<void>();
this.writer.write(buffer, deferred);
this.writer.write(message, deferred);
return deferred.promise;
}

Expand Down
47 changes: 29 additions & 18 deletions src/protocol/ClientMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ export class ClientMessage {
private retryable: boolean;
private connection: ClientConnection;
private _nextFrame: Frame;
// cached total length for encode case
private cachedTotalLength: number;

private constructor(startFrame?: Frame, endFrame?: Frame) {
this.startFrame = startFrame;
Expand Down Expand Up @@ -174,14 +176,14 @@ export class ClientMessage {
}

addFrame(frame: Frame): void {
this.cachedTotalLength = undefined;
frame.next = null;
if (this.startFrame == null) {
this.startFrame = frame;
this.endFrame = frame;
this._nextFrame = frame;
return;
}

this.endFrame.next = frame;
this.endFrame = frame;
}
Expand Down Expand Up @@ -230,14 +232,18 @@ export class ClientMessage {
this.connection = connection;
}

getTotalFrameLength(): number {
let frameLength = 0;
getTotalLength(): number {
if (this.cachedTotalLength !== undefined) {
return this.cachedTotalLength;
}
let totalLength = 0;
let currentFrame = this.startFrame;
while (currentFrame != null) {
frameLength += currentFrame.getLength();
totalLength += currentFrame.getLength();
currentFrame = currentFrame.next;
}
return frameLength;
this.cachedTotalLength = totalLength;
return totalLength;
}

getFragmentationId(): number {
Expand All @@ -248,11 +254,13 @@ export class ClientMessage {
// Should be called after calling dropFragmentationFrame() on the fragment
this.endFrame.next = fragment.startFrame;
this.endFrame = fragment.endFrame;
this.cachedTotalLength = undefined;
}

dropFragmentationFrame(): void {
this.startFrame = this.startFrame.next;
this._nextFrame = this._nextFrame.next;
this.cachedTotalLength = undefined;
}

copyWithNewCorrelationId(): ClientMessage {
Expand All @@ -264,27 +272,30 @@ export class ClientMessage {
return newMessage;
}

toBuffer(): Buffer {
const buffers: Buffer[] = [];
let totalLength = 0;
writeTo(buffer: Buffer, offset = 0): number {
let pos = offset;
let currentFrame = this.startFrame;
while (currentFrame != null) {
const isLastFrame = currentFrame.next == null;
const frameLengthAndFlags = Buffer.allocUnsafe(SIZE_OF_FRAME_LENGTH_AND_FLAGS);
frameLengthAndFlags.writeInt32LE(currentFrame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, 0);

buffer.writeInt32LE(currentFrame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, pos);
if (isLastFrame) {
frameLengthAndFlags.writeUInt16LE(currentFrame.flags | IS_FINAL_FLAG, BitsUtil.INT_SIZE_IN_BYTES);
buffer.writeUInt16LE(currentFrame.flags | IS_FINAL_FLAG, pos + BitsUtil.INT_SIZE_IN_BYTES);
} else {
frameLengthAndFlags.writeUInt16LE(currentFrame.flags, BitsUtil.INT_SIZE_IN_BYTES);
buffer.writeUInt16LE(currentFrame.flags, pos + BitsUtil.INT_SIZE_IN_BYTES);
}
totalLength += SIZE_OF_FRAME_LENGTH_AND_FLAGS;
buffers.push(frameLengthAndFlags);
totalLength += currentFrame.content.length;
buffers.push(currentFrame.content);
pos += SIZE_OF_FRAME_LENGTH_AND_FLAGS;
currentFrame.content.copy(buffer, pos);
pos += currentFrame.content.length;
currentFrame = currentFrame.next;
}
return Buffer.concat(buffers, totalLength);
return pos;
}

toBuffer(): Buffer {
const totalLength = this.getTotalLength();
const buffer = Buffer.allocUnsafe(totalLength);
this.writeTo(buffer);
return buffer;
}
}

Expand Down
23 changes: 0 additions & 23 deletions src/util/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,26 +336,3 @@ export function timedPromise<T>(wrapped: Promise<T>, timeout: number, err?: Erro

return deferred.promise;
}

type Binary = {
buffer: Buffer;
}

/**
* Copy contents of the given array of objects with buffers into the target buffer.
*
* @param target target buffer
* @param sources source objects that contain buffers
* @param totalLength total length of all source buffers
* @internal
*/
export function copyBuffers(target: Buffer, sources: Binary[], totalLength: number): void {
if (target.length < totalLength) {
throw new RangeError('Target length ' + target.length + ' is less than requested ' + totalLength);
}
let pos = 0;
for (const source of sources) {
source.buffer.copy(target, pos);
pos += source.buffer.length;
}
}
39 changes: 25 additions & 14 deletions test/connection/DirectWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,28 @@

'use strict';

const Socket = require('net').Socket;
const { Socket } = require('net');
const sinon = require('sinon');
const expect = require('chai').expect;
const { expect } = require('chai');

const { deferredPromise } = require('../../lib/util/Util');
const { DirectWriter } = require('../../lib/network/ClientConnection');
const {
ClientMessage,
Frame
} = require('../../lib/protocol/ClientMessage');

describe('DirectWriterTest', function () {

let queue;
let mockSocket;

function createMessage(content) {
const clientMessage = ClientMessage.createForEncode();
clientMessage.addFrame(new Frame(Buffer.from(content, 'utf8')));
return clientMessage;
}

const setUpWriteSuccess = () => {
mockSocket = new Socket({});
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
Expand All @@ -45,39 +55,40 @@ describe('DirectWriterTest', function () {
queue = new DirectWriter(mockSocket);
}

it('writes single message into socket (without copying it)', (done) => {
it('writes single message into socket', (done) => {
setUpWriteSuccess();

const buffer = Buffer.from('test');
const msg = createMessage('test');
mockSocket.on('data', function(data) {
expect(data).to.be.equal(buffer);
expect(Buffer.compare(data, msg.toBuffer())).to.be.equal(0);
done();
});

queue.write(buffer, deferredPromise());
queue.write(msg, deferredPromise());
});

it('writes multiple messages separately into socket', (done) => {
setUpWriteSuccess();

const msg = createMessage('test');
let cnt = 0;
mockSocket.on('data', function(data) {
expect(data).to.be.deep.equal(Buffer.from('test'));
expect(Buffer.compare(data, msg.toBuffer())).to.be.equal(0);
if (++cnt === 3) {
done();
}
});

queue.write(Buffer.from('test'), deferredPromise());
queue.write(Buffer.from('test'), deferredPromise());
queue.write(Buffer.from('test'), deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
});

it('resolves promise on write success', (done) => {
setUpWriteSuccess();

const resolver = deferredPromise();
queue.write(Buffer.from('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.then(done);
});

Expand All @@ -86,7 +97,7 @@ describe('DirectWriterTest', function () {
setUpWriteFailure(err);

const resolver = deferredPromise();
queue.write(Buffer.from('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.catch((err) => {
expect(err).to.be.equal(err);
done();
Expand All @@ -97,15 +108,15 @@ describe('DirectWriterTest', function () {
setUpWriteSuccess();

queue.on('write', done);
queue.write(Buffer.from('test'), deferredPromise());
queue.write(createMessage('test'), deferredPromise());
});

it('does not emit write event on write failure', (done) => {
setUpWriteFailure(new Error());

queue.on('write', () => done(new Error()));
const resolver = deferredPromise();
queue.write(Buffer.from('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.catch(_ => {
done();
});
Expand Down

0 comments on commit c1f7410

Please sign in to comment.