diff --git a/src/invocation/InvocationService.ts b/src/invocation/InvocationService.ts index e21316ce9..9fae4d8bf 100644 --- a/src/invocation/InvocationService.ts +++ b/src/invocation/InvocationService.ts @@ -515,7 +515,7 @@ export class InvocationService { } private write(invocation: Invocation, connection: ClientConnection): Promise { - return connection.write(invocation.request.toBuffer()); + return connection.write(invocation.request); } private notifyError(invocation: Invocation, error: Error): void { diff --git a/src/network/ClientConnection.ts b/src/network/ClientConnection.ts index b1669b7ea..1a16ffe7a 100644 --- a/src/network/ClientConnection.ts +++ b/src/network/ClientConnection.ts @@ -22,7 +22,7 @@ import {BuildInfo} from '../BuildInfo'; import {HazelcastClient} from '../HazelcastClient'; import {AddressImpl, IOError, UUID} from '../core'; import {ClientMessageHandler} from '../protocol/ClientMessage'; -import {copyBuffers, deferredPromise, DeferredPromise} from '../util/Util'; +import {deferredPromise, DeferredPromise} from '../util/Util'; import {ILogger} from '../logging/ILogger'; import { ClientMessage, @@ -37,16 +37,18 @@ const PROPERTY_NO_DELAY = 'hazelcast.client.socket.no.delay'; abstract class Writer extends EventEmitter { - abstract write(buffer: Buffer, resolver: DeferredPromise): void; + abstract write(message: ClientMessage, resolver: DeferredPromise): void; abstract close(): void; + } interface OutputQueueItem { - buffer: Buffer; + message: ClientMessage; resolver: DeferredPromise; + } /** @internal */ @@ -75,12 +77,12 @@ export class PipelinedWriter extends Writer { }); } - write(buffer: Buffer, resolver: DeferredPromise): void { + write(message: ClientMessage, resolver: DeferredPromise): void { if (this.error) { // if there was a write error, it's useless to keep writing to the socket return process.nextTick(() => resolver.reject(this.error)); } - this.queue.push({ buffer, resolver }); + this.queue.push({ message, resolver }); this.schedule(); } @@ -106,13 +108,14 @@ export class PipelinedWriter extends Writer { let totalLength = 0; let queueIdx = 0; while (queueIdx < this.queue.length && totalLength < this.threshold) { - const buf = this.queue[queueIdx].buffer; + const msg = this.queue[queueIdx].message; + const msgLength = msg.getTotalLength(); // if the next buffer exceeds the threshold, // try to take multiple queued buffers which fit this.coalesceBuf - if (queueIdx > 0 && totalLength + buf.length > this.threshold) { + if (queueIdx > 0 && totalLength + msgLength > this.threshold) { break; } - totalLength += buf.length; + totalLength += msgLength; queueIdx++; } @@ -125,12 +128,15 @@ export class PipelinedWriter extends Writer { this.queue = this.queue.slice(queueIdx); let buf; - if (writeBatch.length === 1) { - // take the only buffer - buf = writeBatch[0].buffer; + if (writeBatch.length === 1 && totalLength > this.threshold) { + // take the only large message + buf = writeBatch[0].message.toBuffer(); } else { // coalesce buffers - copyBuffers(this.coalesceBuf, writeBatch, totalLength); + let pos = 0; + for (const item of writeBatch) { + pos = item.message.writeTo(this.coalesceBuf, pos); + } buf = this.coalesceBuf.slice(0, totalLength); } @@ -177,8 +183,9 @@ export class DirectWriter extends Writer { this.socket = socket; } - write(buffer: Buffer, resolver: DeferredPromise): void { - this.socket.write(buffer as any, (err: any) => { + write(message: ClientMessage, resolver: DeferredPromise): void { + const buffer = message.toBuffer(); + this.socket.write(buffer, (err: any) => { if (err) { resolver.reject(new IOError(err)); return; @@ -385,9 +392,9 @@ export class ClientConnection { this.remoteUuid = remoteUuid; } - write(buffer: Buffer): Promise { + write(message: ClientMessage): Promise { const deferred = deferredPromise(); - this.writer.write(buffer, deferred); + this.writer.write(message, deferred); return deferred.promise; } diff --git a/src/protocol/ClientMessage.ts b/src/protocol/ClientMessage.ts index a627f8ab1..d7f25d9eb 100644 --- a/src/protocol/ClientMessage.ts +++ b/src/protocol/ClientMessage.ts @@ -138,6 +138,8 @@ export class ClientMessage { private retryable: boolean; private connection: ClientConnection; private _nextFrame: Frame; + // cached total length for encode case + private cachedTotalLength: number; private constructor(startFrame?: Frame, endFrame?: Frame) { this.startFrame = startFrame; @@ -174,6 +176,7 @@ export class ClientMessage { } addFrame(frame: Frame): void { + this.cachedTotalLength = undefined; frame.next = null; if (this.startFrame == null) { this.startFrame = frame; @@ -181,7 +184,6 @@ export class ClientMessage { this._nextFrame = frame; return; } - this.endFrame.next = frame; this.endFrame = frame; } @@ -230,14 +232,18 @@ export class ClientMessage { this.connection = connection; } - getTotalFrameLength(): number { - let frameLength = 0; + getTotalLength(): number { + if (this.cachedTotalLength !== undefined) { + return this.cachedTotalLength; + } + let totalLength = 0; let currentFrame = this.startFrame; while (currentFrame != null) { - frameLength += currentFrame.getLength(); + totalLength += currentFrame.getLength(); currentFrame = currentFrame.next; } - return frameLength; + this.cachedTotalLength = totalLength; + return totalLength; } getFragmentationId(): number { @@ -248,11 +254,13 @@ export class ClientMessage { // Should be called after calling dropFragmentationFrame() on the fragment this.endFrame.next = fragment.startFrame; this.endFrame = fragment.endFrame; + this.cachedTotalLength = undefined; } dropFragmentationFrame(): void { this.startFrame = this.startFrame.next; this._nextFrame = this._nextFrame.next; + this.cachedTotalLength = undefined; } copyWithNewCorrelationId(): ClientMessage { @@ -264,27 +272,30 @@ export class ClientMessage { return newMessage; } - toBuffer(): Buffer { - const buffers: Buffer[] = []; - let totalLength = 0; + writeTo(buffer: Buffer, offset = 0): number { + let pos = offset; let currentFrame = this.startFrame; while (currentFrame != null) { const isLastFrame = currentFrame.next == null; - const frameLengthAndFlags = Buffer.allocUnsafe(SIZE_OF_FRAME_LENGTH_AND_FLAGS); - frameLengthAndFlags.writeInt32LE(currentFrame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, 0); - + buffer.writeInt32LE(currentFrame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, pos); if (isLastFrame) { - frameLengthAndFlags.writeUInt16LE(currentFrame.flags | IS_FINAL_FLAG, BitsUtil.INT_SIZE_IN_BYTES); + buffer.writeUInt16LE(currentFrame.flags | IS_FINAL_FLAG, pos + BitsUtil.INT_SIZE_IN_BYTES); } else { - frameLengthAndFlags.writeUInt16LE(currentFrame.flags, BitsUtil.INT_SIZE_IN_BYTES); + buffer.writeUInt16LE(currentFrame.flags, pos + BitsUtil.INT_SIZE_IN_BYTES); } - totalLength += SIZE_OF_FRAME_LENGTH_AND_FLAGS; - buffers.push(frameLengthAndFlags); - totalLength += currentFrame.content.length; - buffers.push(currentFrame.content); + pos += SIZE_OF_FRAME_LENGTH_AND_FLAGS; + currentFrame.content.copy(buffer, pos); + pos += currentFrame.content.length; currentFrame = currentFrame.next; } - return Buffer.concat(buffers, totalLength); + return pos; + } + + toBuffer(): Buffer { + const totalLength = this.getTotalLength(); + const buffer = Buffer.allocUnsafe(totalLength); + this.writeTo(buffer); + return buffer; } } diff --git a/src/util/Util.ts b/src/util/Util.ts index 8ebdf0627..39c74c2d7 100644 --- a/src/util/Util.ts +++ b/src/util/Util.ts @@ -336,26 +336,3 @@ export function timedPromise(wrapped: Promise, timeout: number, err?: Erro return deferred.promise; } - -type Binary = { - buffer: Buffer; -} - -/** - * Copy contents of the given array of objects with buffers into the target buffer. - * - * @param target target buffer - * @param sources source objects that contain buffers - * @param totalLength total length of all source buffers - * @internal - */ -export function copyBuffers(target: Buffer, sources: Binary[], totalLength: number): void { - if (target.length < totalLength) { - throw new RangeError('Target length ' + target.length + ' is less than requested ' + totalLength); - } - let pos = 0; - for (const source of sources) { - source.buffer.copy(target, pos); - pos += source.buffer.length; - } -} diff --git a/test/connection/DirectWriterTest.js b/test/connection/DirectWriterTest.js index 2c5a95da1..a98408f70 100644 --- a/test/connection/DirectWriterTest.js +++ b/test/connection/DirectWriterTest.js @@ -16,18 +16,28 @@ 'use strict'; -const Socket = require('net').Socket; +const { Socket } = require('net'); const sinon = require('sinon'); -const expect = require('chai').expect; +const { expect } = require('chai'); const { deferredPromise } = require('../../lib/util/Util'); const { DirectWriter } = require('../../lib/network/ClientConnection'); +const { + ClientMessage, + Frame +} = require('../../lib/protocol/ClientMessage'); describe('DirectWriterTest', function () { let queue; let mockSocket; + function createMessage(content) { + const clientMessage = ClientMessage.createForEncode(); + clientMessage.addFrame(new Frame(Buffer.from(content, 'utf8'))); + return clientMessage; + } + const setUpWriteSuccess = () => { mockSocket = new Socket({}); sinon.stub(mockSocket, 'write').callsFake((data, cb) => { @@ -45,39 +55,40 @@ describe('DirectWriterTest', function () { queue = new DirectWriter(mockSocket); } - it('writes single message into socket (without copying it)', (done) => { + it('writes single message into socket', (done) => { setUpWriteSuccess(); - const buffer = Buffer.from('test'); + const msg = createMessage('test'); mockSocket.on('data', function(data) { - expect(data).to.be.equal(buffer); + expect(Buffer.compare(data, msg.toBuffer())).to.be.equal(0); done(); }); - queue.write(buffer, deferredPromise()); + queue.write(msg, deferredPromise()); }); it('writes multiple messages separately into socket', (done) => { setUpWriteSuccess(); + const msg = createMessage('test'); let cnt = 0; mockSocket.on('data', function(data) { - expect(data).to.be.deep.equal(Buffer.from('test')); + expect(Buffer.compare(data, msg.toBuffer())).to.be.equal(0); if (++cnt === 3) { done(); } }); - queue.write(Buffer.from('test'), deferredPromise()); - queue.write(Buffer.from('test'), deferredPromise()); - queue.write(Buffer.from('test'), deferredPromise()); + queue.write(msg, deferredPromise()); + queue.write(msg, deferredPromise()); + queue.write(msg, deferredPromise()); }); it('resolves promise on write success', (done) => { setUpWriteSuccess(); const resolver = deferredPromise(); - queue.write(Buffer.from('test'), resolver); + queue.write(createMessage('test'), resolver); resolver.promise.then(done); }); @@ -86,7 +97,7 @@ describe('DirectWriterTest', function () { setUpWriteFailure(err); const resolver = deferredPromise(); - queue.write(Buffer.from('test'), resolver); + queue.write(createMessage('test'), resolver); resolver.promise.catch((err) => { expect(err).to.be.equal(err); done(); @@ -97,7 +108,7 @@ describe('DirectWriterTest', function () { setUpWriteSuccess(); queue.on('write', done); - queue.write(Buffer.from('test'), deferredPromise()); + queue.write(createMessage('test'), deferredPromise()); }); it('does not emit write event on write failure', (done) => { @@ -105,7 +116,7 @@ describe('DirectWriterTest', function () { queue.on('write', () => done(new Error())); const resolver = deferredPromise(); - queue.write(Buffer.from('test'), resolver); + queue.write(createMessage('test'), resolver); resolver.promise.catch(_ => { done(); }); diff --git a/test/connection/PipelinedWriterTest.js b/test/connection/PipelinedWriterTest.js index f4f6448c0..202d78e99 100644 --- a/test/connection/PipelinedWriterTest.js +++ b/test/connection/PipelinedWriterTest.js @@ -21,6 +21,10 @@ const { expect } = require('chai'); const { deferredPromise } = require('../../lib/util/Util'); const { PipelinedWriter } = require('../../lib/network/ClientConnection'); +const { + ClientMessage, + Frame +} = require('../../lib/protocol/ClientMessage'); describe('PipelinedWriterTest', function () { @@ -47,24 +51,36 @@ describe('PipelinedWriterTest', function () { writer = new PipelinedWriter(mockSocket, THRESHOLD); } - it('writes single small message into socket (without copying it)', (done) => { + function createMessageFromString(content) { + const clientMessage = ClientMessage.createForEncode(); + clientMessage.addFrame(new Frame(Buffer.from(content, 'utf8'))); + return clientMessage; + } + + function createMessageFromBuffer(buffer) { + const clientMessage = ClientMessage.createForEncode(); + clientMessage.addFrame(new Frame(buffer)); + return clientMessage; + } + + it('writes single small message into socket', (done) => { setUpWriteSuccess(true); - const buffer = Buffer.from('test'); - writer.write(buffer, deferredPromise()); + const msg = createMessageFromString('test'); + writer.write(msg, deferredPromise()); mockSocket.on('data', (data) => { - expect(data).to.be.equal(buffer); + expect(Buffer.compare(data, msg.toBuffer())).to.be.equal(0); done(); }); }); - it('writes single large message into socket (without copying it)', (done) => { + it('writes single large message into socket', (done) => { setUpWriteSuccess(true); - const buffer = Buffer.allocUnsafe(THRESHOLD * 2); - writer.write(buffer, deferredPromise()); + const msg = createMessageFromBuffer(Buffer.allocUnsafe(THRESHOLD * 2)); + writer.write(msg, deferredPromise()); mockSocket.on('data', (data) => { - expect(data).to.be.equal(buffer); + expect(Buffer.compare(data, msg.toBuffer())).to.be.equal(0); done(); }); }); @@ -72,11 +88,16 @@ describe('PipelinedWriterTest', function () { it('writes multiple small messages as one into socket', (done) => { setUpWriteSuccess(true); - writer.write(Buffer.from('1'), deferredPromise()); - writer.write(Buffer.from('2'), deferredPromise()); - writer.write(Buffer.from('3'), deferredPromise()); + const msg1 = createMessageFromString('1'); + writer.write(msg1, deferredPromise()); + const msg2 = createMessageFromString('2'); + writer.write(msg2, deferredPromise()); + const msg3 = createMessageFromString('3'); + writer.write(msg3, deferredPromise()); + + const expected = Buffer.concat([msg1.toBuffer(), msg2.toBuffer(), msg3.toBuffer()]); mockSocket.on('data', (data) => { - expect(Buffer.compare(data, Buffer.from('123'))).to.be.equal(0); + expect(Buffer.compare(data, expected)).to.be.equal(0); done(); }); }); @@ -84,37 +105,39 @@ describe('PipelinedWriterTest', function () { it('coalesces buffers when writing into socket (1/2 of threshold)', (done) => { setUpWriteSuccess(true); - const size = THRESHOLD / 2; - const data1 = Buffer.alloc(size).fill('1'); + // frame has header part, so we need some padding + const size = (THRESHOLD / 2) - 50; + const msg1 = createMessageFromBuffer(Buffer.alloc(size).fill('1')); const resolver1 = deferredPromise(); - writer.write(data1, resolver1); - const data2 = Buffer.alloc(size).fill('2'); + writer.write(msg1, resolver1); + const msg2 = createMessageFromBuffer(Buffer.alloc(size).fill('2')); const resolver2 = deferredPromise(); - writer.write(data2, resolver2); - const data3 = Buffer.alloc(size).fill('3'); + writer.write(msg2, resolver2); + const msg3 = createMessageFromBuffer(Buffer.alloc(size).fill('3')); const resolver3 = deferredPromise(); - writer.write(data3, resolver3); + writer.write(msg3, resolver3); let cnt = 0; - let allData = Buffer.alloc(0); + let actualAllData = Buffer.alloc(0); mockSocket.on('data', (data) => { - allData = Buffer.concat([allData, data]); + actualAllData = Buffer.concat([actualAllData, data]); cnt++; if (cnt === 1) { - expect(Buffer.compare(data, Buffer.concat([data1, data2]))).to.be.equal(0); + expect(Buffer.compare(data, Buffer.concat([msg1.toBuffer(), msg2.toBuffer()]))).to.be.equal(0); } if (cnt === 2) { - expect(Buffer.compare(data, data3)).to.be.equal(0); + expect(Buffer.compare(data, msg3.toBuffer())).to.be.equal(0); } }); + const expectedAllData = Buffer.concat([msg1.toBuffer(), msg2.toBuffer(), msg3.toBuffer()]); Promise.all([ resolver1.promise, resolver2.promise, resolver3.promise ]).then(() => { expect(cnt).to.be.equal(2); - expect(Buffer.compare(allData, Buffer.concat([data1, data2, data3]))).to.be.equal(0); + expect(Buffer.compare(actualAllData, expectedAllData)).to.be.equal(0); done(); }); }); @@ -123,8 +146,8 @@ describe('PipelinedWriterTest', function () { setUpWriteSuccess(true); const size = THRESHOLD * 2; - writer.write(Buffer.alloc(size), deferredPromise()); - writer.write(Buffer.alloc(size), deferredPromise()); + writer.write(createMessageFromBuffer(Buffer.alloc(size)), deferredPromise()); + writer.write(createMessageFromBuffer(Buffer.alloc(size)), deferredPromise()); let cnt = 0; // the second write is queued with setImmediate, // thus, callback in this setImmediate must not see cnt === 0 or cnt === 2 @@ -144,7 +167,7 @@ describe('PipelinedWriterTest', function () { setUpWriteSuccess(true); const resolver = deferredPromise(); - writer.write(Buffer.from('test'), resolver); + writer.write(createMessageFromString('test'), resolver); resolver.promise.then(done); }); @@ -152,9 +175,9 @@ describe('PipelinedWriterTest', function () { setUpWriteSuccess(true); const resolver1 = deferredPromise(); - writer.write(Buffer.from('test'), resolver1); + writer.write(createMessageFromString('test'), resolver1); const resolver2 = deferredPromise(); - writer.write(Buffer.from('test'), resolver2); + writer.write(createMessageFromString('test'), resolver2); Promise.all([resolver1.promise, resolver2.promise]).then(() => done()); }); @@ -163,7 +186,7 @@ describe('PipelinedWriterTest', function () { setUpWriteFailure(err); const resolver = deferredPromise(); - writer.write(Buffer.from('test'), resolver); + writer.write(createMessageFromString('test'), resolver); resolver.promise.catch((err) => { expect(err).to.be.equal(err); done(); @@ -175,9 +198,9 @@ describe('PipelinedWriterTest', function () { setUpWriteFailure(err); const resolver1 = deferredPromise(); - writer.write(Buffer.from('test'), resolver1); + writer.write(createMessageFromString('test'), resolver1); const resolver2 = deferredPromise(); - writer.write(Buffer.from('test'), resolver2); + writer.write(createMessageFromString('test'), resolver2); resolver1.promise.catch((err) => { expect(err).to.be.equal(err); }); @@ -191,7 +214,7 @@ describe('PipelinedWriterTest', function () { setUpWriteSuccess(true); writer.on('write', done); - writer.write(Buffer.from('test'), deferredPromise()); + writer.write(createMessageFromString('test'), deferredPromise()); }); it('does not emit write event on write failure', (done) => { @@ -199,7 +222,7 @@ describe('PipelinedWriterTest', function () { writer.on('write', () => done(new Error())); const resolver = deferredPromise(); - writer.write(Buffer.from('test'), resolver); + writer.write(createMessageFromString('test'), resolver); resolver.promise.catch(() => { done(); }); @@ -208,12 +231,12 @@ describe('PipelinedWriterTest', function () { it('waits for drain event when necessary', (done) => { setUpWriteSuccess(false); - const buffer = Buffer.from('test'); - writer.write(buffer, deferredPromise()); + const msg = createMessageFromString('test'); + writer.write(msg, deferredPromise()); let writes = 0; mockSocket.on('data', () => { if (++writes === 1) { - writer.write(buffer, deferredPromise()); + writer.write(msg, deferredPromise()); setTimeout(done, 10); } else { done(new Error('Unexpected write before drain event')); @@ -224,15 +247,15 @@ describe('PipelinedWriterTest', function () { it('writes queued items on drain event', (done) => { setUpWriteSuccess(false); - const buffer = Buffer.from('test'); - writer.write(buffer, deferredPromise()); + const msg = createMessageFromString('test'); + writer.write(msg, deferredPromise()); let writes = 0; mockSocket.on('data', () => { if (++writes === 10) { return done(); } mockSocket.emit('drain'); - writer.write(buffer, deferredPromise()); + writer.write(msg, deferredPromise()); }); }); }); diff --git a/test/unit/UtilTest.js b/test/unit/UtilTest.js index 8fc92d686..5214217bc 100644 --- a/test/unit/UtilTest.js +++ b/test/unit/UtilTest.js @@ -17,38 +17,12 @@ const { expect } = require('chai'); const { - copyBuffers, deferredPromise, timedPromise } = require('../../lib/util/Util'); describe('UtilTest', function () { - it('copyBuffers: throws on invalid total length', function () { - expect(() => copyBuffers(Buffer.from([0x1]), [ { buffer: Buffer.from([0x2]) } ], 3)) - .to.throw(RangeError); - }); - - it('copyBuffers: writes single buffer of less length', function () { - const target = Buffer.from('abc'); - const sources = [ { buffer: Buffer.from('d') } ]; - copyBuffers(target, sources, 1); - - expect(Buffer.compare(target, Buffer.from('dbc'))).to.be.equal(0); - }); - - it('copyBuffers: writes multiple buffers of same total length', function () { - const target = Buffer.from('abc'); - const sources = [ - { buffer: Buffer.from('d') }, - { buffer: Buffer.from('e') }, - { buffer: Buffer.from('f') } - ]; - copyBuffers(target, sources, 3); - - expect(Buffer.compare(target, Buffer.from('def'))).to.be.equal(0); - }); - it('deferredPromise: resolves promise on resolve call', function (done) { const deferred = deferredPromise(); let resolveCalled = false; diff --git a/test/unit/protocol/ClientMessageTest.js b/test/unit/protocol/ClientMessageTest.js index beecf16e7..0e5b0bb6d 100644 --- a/test/unit/protocol/ClientMessageTest.js +++ b/test/unit/protocol/ClientMessageTest.js @@ -23,12 +23,28 @@ const { ClientMessage, Frame, BEGIN_FRAME, - END_FRAME + END_FRAME, + SIZE_OF_FRAME_LENGTH_AND_FLAGS } = require('../../../lib/protocol/ClientMessage'); const { CodecUtil } = require('../../../lib/codec/builtin/CodecUtil'); describe('ClientMessageTest', function () { - it('should be encoded and decoded', function () { + + const IS_FINAL_FLAG = 1 << 13; + const INT_SIZE_IN_BYTES = 4; + + function createFrameLengthAndFlagsBuffer(frame, isLastFrame) { + const frameLengthAndFlags = Buffer.allocUnsafe(SIZE_OF_FRAME_LENGTH_AND_FLAGS); + frameLengthAndFlags.writeInt32LE(frame.content.length + SIZE_OF_FRAME_LENGTH_AND_FLAGS, 0); + if (isLastFrame) { + frameLengthAndFlags.writeUInt16LE(frame.flags | IS_FINAL_FLAG, INT_SIZE_IN_BYTES); + } else { + frameLengthAndFlags.writeUInt16LE(frame.flags, INT_SIZE_IN_BYTES); + } + return frameLengthAndFlags; + } + + it('should restore message when encoded and decoded', function () { const cmEncode = ClientMessage.createForEncode(); cmEncode.addFrame(Frame.createInitialFrame(50)); @@ -42,10 +58,10 @@ describe('ClientMessageTest', function () { expect(cmEncode.getStartFrame().flags).to.equal(cmDecode.getStartFrame().flags); expect(cmEncode.getCorrelationId()).to.equal(cmDecode.getCorrelationId()); expect(cmEncode.getPartitionId()).to.equal(cmDecode.getPartitionId()); - expect(cmEncode.getTotalFrameLength()).to.equal(cmDecode.getTotalFrameLength()); + expect(cmEncode.getTotalLength()).to.equal(cmDecode.getTotalLength()); }); - it('should be copied with new correlation id and share the non-header frames', function () { + it('copyWithNewCorrelationId: should assign new correlation id and share the non-header frames', function () { const originalMessage = ClientMessage.createForEncode(); originalMessage.addFrame(Frame.createInitialFrame(50)); @@ -70,7 +86,7 @@ describe('ClientMessageTest', function () { expect(originalMessage.getMessageType()).to.equal(copyMessage.getMessageType()); expect(originalMessage.getStartFrame().flags).to.equal(copyMessage.getStartFrame().flags); expect(originalMessage.getPartitionId()).to.equal(copyMessage.getPartitionId()); - expect(originalMessage.getTotalFrameLength()).to.equal(copyMessage.getTotalFrameLength()); + expect(originalMessage.getTotalLength()).to.equal(copyMessage.getTotalLength()); expect(copyMessage.getCorrelationId()).to.equal(-1); }); @@ -92,4 +108,61 @@ describe('ClientMessageTest', function () { expect(clientMessage.hasNextFrame()).to.be.false; }); + + it('getTotalLength: should calculate total length correctly', function () { + const clientMessage = ClientMessage.createForEncode(); + expect(clientMessage.getTotalLength()).to.be.equal(0); + + clientMessage.addFrame(Frame.createInitialFrame(42)); + expect(clientMessage.getTotalLength()).to.be.equal(SIZE_OF_FRAME_LENGTH_AND_FLAGS + 42); + + clientMessage.addFrame(Frame.createInitialFrame(1)); + expect(clientMessage.getTotalLength()).to.be.equal(2 * SIZE_OF_FRAME_LENGTH_AND_FLAGS + 43); + }); + + it('writeTo: should write to given buffer of sufficient length', function () { + const clientMessage = ClientMessage.createForEncode(); + + const frame1 = Frame.createInitialFrame(16); + clientMessage.addFrame(frame1); + clientMessage.setMessageType(1); + clientMessage.setCorrelationId(Long.fromString('123')); + clientMessage.setPartitionId(11223344); + + const frame2 = new Frame(Buffer.from('foo', 'utf8')); + clientMessage.addFrame(frame2); + + const buffer = Buffer.allocUnsafe(42 + clientMessage.getTotalLength()); + const newPos = clientMessage.writeTo(buffer, 42); + + expect(newPos).to.be.equal(42 + clientMessage.getTotalLength()); + + const expected = Buffer.concat([ + createFrameLengthAndFlagsBuffer(frame1, false), + frame1.content, + createFrameLengthAndFlagsBuffer(frame2, true), + frame2.content + ]); + const actual = buffer.slice(42, 42 + clientMessage.getTotalLength()); + + expect(Buffer.compare(actual, expected)).to.be.equal(0); + }); + + it('toBuffer: should return buffer with message contents', function () { + const clientMessage = ClientMessage.createForEncode(); + + const frame = Frame.createInitialFrame(16); + clientMessage.addFrame(frame); + clientMessage.setMessageType(1); + clientMessage.setCorrelationId(Long.fromString('123')); + clientMessage.setPartitionId(11223344); + + const actual = clientMessage.toBuffer(); + const expected = Buffer.concat([ + createFrameLengthAndFlagsBuffer(frame, true), + frame.content + ]); + + expect(Buffer.compare(actual, expected)).to.be.equal(0); + }); });