Skip to content

Commit

Permalink
Reduce number of allocated Buffers for writes
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Sep 21, 2020
1 parent 03b2002 commit 0338310
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 127 deletions.
2 changes: 1 addition & 1 deletion src/invocation/InvocationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ export class InvocationService {
}

private write(invocation: Invocation, connection: ClientConnection): Promise<void> {
return connection.write(invocation.request.toBuffer());
return connection.write(invocation.request);
}

private notifyError(invocation: Invocation, error: Error): void {
Expand Down
40 changes: 24 additions & 16 deletions src/network/ClientConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {BuildInfo} from '../BuildInfo';
import {HazelcastClient} from '../HazelcastClient';
import {AddressImpl, IOError, UUID} from '../core';
import {ClientMessageHandler} from '../protocol/ClientMessage';
import {copyBuffers, deferredPromise, DeferredPromise} from '../util/Util';
import {deferredPromise, DeferredPromise} from '../util/Util';
import {ILogger} from '../logging/ILogger';
import {
ClientMessage,
Expand All @@ -37,16 +37,18 @@ const PROPERTY_NO_DELAY = 'hazelcast.client.socket.no.delay';

abstract class Writer extends EventEmitter {

abstract write(buffer: Buffer, resolver: DeferredPromise<void>): void;
abstract write(message: ClientMessage, resolver: DeferredPromise<void>): void;

abstract close(): void;

}

interface OutputQueueItem {

buffer: Buffer;
message: ClientMessage;

resolver: DeferredPromise<void>;

}

/** @internal */
Expand Down Expand Up @@ -75,12 +77,12 @@ export class PipelinedWriter extends Writer {
});
}

write(buffer: Buffer, resolver: DeferredPromise<void>): void {
write(message: ClientMessage, resolver: DeferredPromise<void>): void {
if (this.error) {
// if there was a write error, it's useless to keep writing to the socket
return process.nextTick(() => resolver.reject(this.error));
}
this.queue.push({ buffer, resolver });
this.queue.push({ message, resolver });
this.schedule();
}

Expand All @@ -106,13 +108,14 @@ export class PipelinedWriter extends Writer {
let totalLength = 0;
let queueIdx = 0;
while (queueIdx < this.queue.length && totalLength < this.threshold) {
const buf = this.queue[queueIdx].buffer;
const msg = this.queue[queueIdx].message;
const msgLength = msg.getTotalLength();
// if the next buffer exceeds the threshold,
// try to take multiple queued buffers which fit this.coalesceBuf
if (queueIdx > 0 && totalLength + buf.length > this.threshold) {
if (queueIdx > 0 && totalLength + msgLength > this.threshold) {
break;
}
totalLength += buf.length;
totalLength += msgLength;
queueIdx++;
}

Expand All @@ -125,12 +128,16 @@ export class PipelinedWriter extends Writer {
this.queue = this.queue.slice(queueIdx);

let buf;
if (writeBatch.length === 1) {
// take the only buffer
buf = writeBatch[0].buffer;
if (writeBatch.length === 1 && totalLength > this.threshold) {
// take the only large message
buf = writeBatch[0].message.toBuffer();
} else {
// coalesce buffers
copyBuffers(this.coalesceBuf, writeBatch, totalLength);
let pos = 0;
for (const item of writeBatch) {
item.message.writeTo(this.coalesceBuf, pos);
pos += item.message.getTotalLength();
}
buf = this.coalesceBuf.slice(0, totalLength);
}

Expand Down Expand Up @@ -177,8 +184,9 @@ export class DirectWriter extends Writer {
this.socket = socket;
}

write(buffer: Buffer, resolver: DeferredPromise<void>): void {
this.socket.write(buffer as any, (err: any) => {
write(message: ClientMessage, resolver: DeferredPromise<void>): void {
const buffer = message.toBuffer();
this.socket.write(buffer, (err: any) => {
if (err) {
resolver.reject(new IOError(err));
return;
Expand Down Expand Up @@ -385,9 +393,9 @@ export class ClientConnection {
this.remoteUuid = remoteUuid;
}

write(buffer: Buffer): Promise<void> {
write(message: ClientMessage): Promise<void> {
const deferred = deferredPromise<void>();
this.writer.write(buffer, deferred);
this.writer.write(message, deferred);
return deferred.promise;
}

Expand Down
38 changes: 20 additions & 18 deletions src/protocol/ClientMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ export class ClientMessage {
this._nextFrame = frame;
return;
}

this.endFrame.next = frame;
this.endFrame = frame;
}
Expand Down Expand Up @@ -230,14 +229,14 @@ export class ClientMessage {
this.connection = connection;
}

getTotalFrameLength(): number {
let frameLength = 0;
getTotalLength(): number {
let totalLength = 0;
let currentFrame = this.startFrame;
while (currentFrame != null) {
frameLength += currentFrame.getLength();
totalLength += currentFrame.getLength();
currentFrame = currentFrame.next;
}
return frameLength;
return totalLength;
}

getFragmentationId(): number {
Expand All @@ -264,27 +263,30 @@ export class ClientMessage {
return newMessage;
}

toBuffer(): Buffer {
const buffers: Buffer[] = [];
let totalLength = 0;
writeTo(buffer: Buffer, offset = 0): number {
let pos = offset;
let currentFrame = this.startFrame;
while (currentFrame != null) {
const isLastFrame = currentFrame.next == null;
const frameLengthAndFlags = Buffer.allocUnsafe(SIZE_OF_FRAME_LENGTH_AND_FLAGS);
frameLengthAndFlags.writeInt32LE(currentFrame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, 0);

buffer.writeInt32LE(currentFrame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, pos);
if (isLastFrame) {
frameLengthAndFlags.writeUInt16LE(currentFrame.flags | IS_FINAL_FLAG, BitsUtil.INT_SIZE_IN_BYTES);
buffer.writeUInt16LE(currentFrame.flags | IS_FINAL_FLAG, pos + BitsUtil.INT_SIZE_IN_BYTES);
} else {
frameLengthAndFlags.writeUInt16LE(currentFrame.flags, BitsUtil.INT_SIZE_IN_BYTES);
buffer.writeUInt16LE(currentFrame.flags, pos + BitsUtil.INT_SIZE_IN_BYTES);
}
totalLength += SIZE_OF_FRAME_LENGTH_AND_FLAGS;
buffers.push(frameLengthAndFlags);
totalLength += currentFrame.content.length;
buffers.push(currentFrame.content);
pos += SIZE_OF_FRAME_LENGTH_AND_FLAGS;
currentFrame.content.copy(buffer, pos);
pos += currentFrame.content.length;
currentFrame = currentFrame.next;
}
return Buffer.concat(buffers, totalLength);
return pos;
}

toBuffer(): Buffer {
const totalLength = this.getTotalLength();
const buffer = Buffer.allocUnsafe(totalLength);
this.writeTo(buffer);
return buffer;
}
}

Expand Down
23 changes: 0 additions & 23 deletions src/util/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,26 +336,3 @@ export function timedPromise<T>(wrapped: Promise<T>, timeout: number, err?: Erro

return deferred.promise;
}

type Binary = {
buffer: Buffer;
}

/**
* Copy contents of the given array of objects with buffers into the target buffer.
*
* @param target target buffer
* @param sources source objects that contain buffers
* @param totalLength total length of all source buffers
* @internal
*/
export function copyBuffers(target: Buffer, sources: Binary[], totalLength: number): void {
if (target.length < totalLength) {
throw new RangeError('Target length ' + target.length + ' is less than requested ' + totalLength);
}
let pos = 0;
for (const source of sources) {
source.buffer.copy(target, pos);
pos += source.buffer.length;
}
}
Loading

0 comments on commit 0338310

Please sign in to comment.