Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api-21 Reduce number of allocated Buffers for writes #605

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/invocation/InvocationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ export class InvocationService {
}

private write(invocation: Invocation, connection: ClientConnection): Promise<void> {
return connection.write(invocation.request.toBuffer());
return connection.write(invocation.request);
}

private notifyError(invocation: Invocation, error: Error): void {
Expand Down
39 changes: 23 additions & 16 deletions src/network/ClientConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {BuildInfo} from '../BuildInfo';
import {HazelcastClient} from '../HazelcastClient';
import {AddressImpl, IOError, UUID} from '../core';
import {ClientMessageHandler} from '../protocol/ClientMessage';
import {copyBuffers, deferredPromise, DeferredPromise} from '../util/Util';
import {deferredPromise, DeferredPromise} from '../util/Util';
import {ILogger} from '../logging/ILogger';
import {
ClientMessage,
Expand All @@ -37,16 +37,18 @@ const PROPERTY_NO_DELAY = 'hazelcast.client.socket.no.delay';

abstract class Writer extends EventEmitter {

abstract write(buffer: Buffer, resolver: DeferredPromise<void>): void;
abstract write(message: ClientMessage, resolver: DeferredPromise<void>): void;

abstract close(): void;

}

interface OutputQueueItem {

buffer: Buffer;
message: ClientMessage;

resolver: DeferredPromise<void>;

}

/** @internal */
Expand Down Expand Up @@ -75,12 +77,12 @@ export class PipelinedWriter extends Writer {
});
}

write(buffer: Buffer, resolver: DeferredPromise<void>): void {
write(message: ClientMessage, resolver: DeferredPromise<void>): void {
if (this.error) {
// if there was a write error, it's useless to keep writing to the socket
return process.nextTick(() => resolver.reject(this.error));
}
this.queue.push({ buffer, resolver });
this.queue.push({ message, resolver });
this.schedule();
}

Expand All @@ -106,13 +108,14 @@ export class PipelinedWriter extends Writer {
let totalLength = 0;
let queueIdx = 0;
while (queueIdx < this.queue.length && totalLength < this.threshold) {
const buf = this.queue[queueIdx].buffer;
const msg = this.queue[queueIdx].message;
const msgLength = msg.getTotalLength();
// if the next buffer exceeds the threshold,
// try to take multiple queued buffers which fit this.coalesceBuf
if (queueIdx > 0 && totalLength + buf.length > this.threshold) {
if (queueIdx > 0 && totalLength + msgLength > this.threshold) {
break;
}
totalLength += buf.length;
totalLength += msgLength;
queueIdx++;
}

Expand All @@ -125,12 +128,15 @@ export class PipelinedWriter extends Writer {
this.queue = this.queue.slice(queueIdx);

let buf;
if (writeBatch.length === 1) {
// take the only buffer
buf = writeBatch[0].buffer;
if (writeBatch.length === 1 && totalLength > this.threshold) {
// take the only large message
buf = writeBatch[0].message.toBuffer();
} else {
// coalesce buffers
copyBuffers(this.coalesceBuf, writeBatch, totalLength);
let pos = 0;
for (const item of writeBatch) {
pos = item.message.writeTo(this.coalesceBuf, pos);
}
buf = this.coalesceBuf.slice(0, totalLength);
}

Expand Down Expand Up @@ -177,8 +183,9 @@ export class DirectWriter extends Writer {
this.socket = socket;
}

write(buffer: Buffer, resolver: DeferredPromise<void>): void {
this.socket.write(buffer as any, (err: any) => {
write(message: ClientMessage, resolver: DeferredPromise<void>): void {
const buffer = message.toBuffer();
this.socket.write(buffer, (err: any) => {
if (err) {
resolver.reject(new IOError(err));
return;
Expand Down Expand Up @@ -385,9 +392,9 @@ export class ClientConnection {
this.remoteUuid = remoteUuid;
}

write(buffer: Buffer): Promise<void> {
write(message: ClientMessage): Promise<void> {
const deferred = deferredPromise<void>();
this.writer.write(buffer, deferred);
this.writer.write(message, deferred);
return deferred.promise;
}

Expand Down
47 changes: 29 additions & 18 deletions src/protocol/ClientMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ export class ClientMessage {
private retryable: boolean;
private connection: ClientConnection;
private _nextFrame: Frame;
// cached total length for encode case
private cachedTotalLength: number;

private constructor(startFrame?: Frame, endFrame?: Frame) {
this.startFrame = startFrame;
Expand Down Expand Up @@ -174,14 +176,14 @@ export class ClientMessage {
}

addFrame(frame: Frame): void {
this.cachedTotalLength = undefined;
frame.next = null;
if (this.startFrame == null) {
this.startFrame = frame;
this.endFrame = frame;
this._nextFrame = frame;
return;
}

this.endFrame.next = frame;
this.endFrame = frame;
}
Expand Down Expand Up @@ -230,14 +232,18 @@ export class ClientMessage {
this.connection = connection;
}

getTotalFrameLength(): number {
let frameLength = 0;
getTotalLength(): number {
if (this.cachedTotalLength !== undefined) {
return this.cachedTotalLength;
}
let totalLength = 0;
let currentFrame = this.startFrame;
while (currentFrame != null) {
frameLength += currentFrame.getLength();
totalLength += currentFrame.getLength();
currentFrame = currentFrame.next;
}
return frameLength;
this.cachedTotalLength = totalLength;
return totalLength;
}

getFragmentationId(): number {
Expand All @@ -248,11 +254,13 @@ export class ClientMessage {
// Should be called after calling dropFragmentationFrame() on the fragment
this.endFrame.next = fragment.startFrame;
this.endFrame = fragment.endFrame;
this.cachedTotalLength = undefined;
}

dropFragmentationFrame(): void {
this.startFrame = this.startFrame.next;
this._nextFrame = this._nextFrame.next;
this.cachedTotalLength = undefined;
}

copyWithNewCorrelationId(): ClientMessage {
Expand All @@ -264,27 +272,30 @@ export class ClientMessage {
return newMessage;
}

toBuffer(): Buffer {
const buffers: Buffer[] = [];
let totalLength = 0;
writeTo(buffer: Buffer, offset = 0): number {
let pos = offset;
let currentFrame = this.startFrame;
while (currentFrame != null) {
const isLastFrame = currentFrame.next == null;
const frameLengthAndFlags = Buffer.allocUnsafe(SIZE_OF_FRAME_LENGTH_AND_FLAGS);
frameLengthAndFlags.writeInt32LE(currentFrame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, 0);

buffer.writeInt32LE(currentFrame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, pos);
if (isLastFrame) {
frameLengthAndFlags.writeUInt16LE(currentFrame.flags | IS_FINAL_FLAG, BitsUtil.INT_SIZE_IN_BYTES);
buffer.writeUInt16LE(currentFrame.flags | IS_FINAL_FLAG, pos + BitsUtil.INT_SIZE_IN_BYTES);
} else {
frameLengthAndFlags.writeUInt16LE(currentFrame.flags, BitsUtil.INT_SIZE_IN_BYTES);
buffer.writeUInt16LE(currentFrame.flags, pos + BitsUtil.INT_SIZE_IN_BYTES);
}
totalLength += SIZE_OF_FRAME_LENGTH_AND_FLAGS;
buffers.push(frameLengthAndFlags);
totalLength += currentFrame.content.length;
buffers.push(currentFrame.content);
pos += SIZE_OF_FRAME_LENGTH_AND_FLAGS;
currentFrame.content.copy(buffer, pos);
pos += currentFrame.content.length;
currentFrame = currentFrame.next;
}
return Buffer.concat(buffers, totalLength);
return pos;
}

toBuffer(): Buffer {
const totalLength = this.getTotalLength();
const buffer = Buffer.allocUnsafe(totalLength);
this.writeTo(buffer);
return buffer;
}
}

Expand Down
23 changes: 0 additions & 23 deletions src/util/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,26 +336,3 @@ export function timedPromise<T>(wrapped: Promise<T>, timeout: number, err?: Erro

return deferred.promise;
}

type Binary = {
buffer: Buffer;
}

/**
* Copy contents of the given array of objects with buffers into the target buffer.
*
* @param target target buffer
* @param sources source objects that contain buffers
* @param totalLength total length of all source buffers
* @internal
*/
export function copyBuffers(target: Buffer, sources: Binary[], totalLength: number): void {
if (target.length < totalLength) {
throw new RangeError('Target length ' + target.length + ' is less than requested ' + totalLength);
}
let pos = 0;
for (const source of sources) {
source.buffer.copy(target, pos);
pos += source.buffer.length;
}
}
39 changes: 25 additions & 14 deletions test/connection/DirectWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,28 @@

'use strict';

const Socket = require('net').Socket;
const { Socket } = require('net');
const sinon = require('sinon');
const expect = require('chai').expect;
const { expect } = require('chai');

const { deferredPromise } = require('../../lib/util/Util');
const { DirectWriter } = require('../../lib/network/ClientConnection');
const {
ClientMessage,
Frame
} = require('../../lib/protocol/ClientMessage');

describe('DirectWriterTest', function () {

let queue;
let mockSocket;

function createMessage(content) {
const clientMessage = ClientMessage.createForEncode();
clientMessage.addFrame(new Frame(Buffer.from(content, 'utf8')));
return clientMessage;
}

const setUpWriteSuccess = () => {
mockSocket = new Socket({});
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
Expand All @@ -45,39 +55,40 @@ describe('DirectWriterTest', function () {
queue = new DirectWriter(mockSocket);
}

it('writes single message into socket (without copying it)', (done) => {
it('writes single message into socket', (done) => {
setUpWriteSuccess();

const buffer = Buffer.from('test');
const msg = createMessage('test');
mockSocket.on('data', function(data) {
expect(data).to.be.equal(buffer);
expect(Buffer.compare(data, msg.toBuffer())).to.be.equal(0);
done();
});

queue.write(buffer, deferredPromise());
queue.write(msg, deferredPromise());
});

it('writes multiple messages separately into socket', (done) => {
setUpWriteSuccess();

const msg = createMessage('test');
let cnt = 0;
mockSocket.on('data', function(data) {
expect(data).to.be.deep.equal(Buffer.from('test'));
expect(Buffer.compare(data, msg.toBuffer())).to.be.equal(0);
if (++cnt === 3) {
done();
}
});

queue.write(Buffer.from('test'), deferredPromise());
queue.write(Buffer.from('test'), deferredPromise());
queue.write(Buffer.from('test'), deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
});

it('resolves promise on write success', (done) => {
setUpWriteSuccess();

const resolver = deferredPromise();
queue.write(Buffer.from('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.then(done);
});

Expand All @@ -86,7 +97,7 @@ describe('DirectWriterTest', function () {
setUpWriteFailure(err);

const resolver = deferredPromise();
queue.write(Buffer.from('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.catch((err) => {
expect(err).to.be.equal(err);
done();
Expand All @@ -97,15 +108,15 @@ describe('DirectWriterTest', function () {
setUpWriteSuccess();

queue.on('write', done);
queue.write(Buffer.from('test'), deferredPromise());
queue.write(createMessage('test'), deferredPromise());
});

it('does not emit write event on write failure', (done) => {
setUpWriteFailure(new Error());

queue.on('write', () => done(new Error()));
const resolver = deferredPromise();
queue.write(Buffer.from('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.catch(_ => {
done();
});
Expand Down
Loading