From 8565f293391082e612353aac6c812b19e3063c28 Mon Sep 17 00:00:00 2001 From: GPad Date: Fri, 1 Jul 2022 18:13:32 +0200 Subject: [PATCH 1/2] Decode multiple response on same buffer --- src/requests/abstract_request.ts | 6 ++- src/response_decoder.ts | 73 +++++++++++++++++++----------- src/responses/raw_response.ts | 3 +- test/unit/response_decoder.test.ts | 67 +++++++++++++++++++++++++++ 4 files changed, 120 insertions(+), 29 deletions(-) create mode 100644 test/unit/response_decoder.test.ts diff --git a/src/requests/abstract_request.ts b/src/requests/abstract_request.ts index bb4f5549..1314e99f 100644 --- a/src/requests/abstract_request.ts +++ b/src/requests/abstract_request.ts @@ -1,7 +1,7 @@ import { Request } from "./request" import { DataWriter } from "./sasl_authenticate_request" -class BufferDataWriter implements DataWriter { +export class BufferDataWriter implements DataWriter { private _offset = 0 constructor(private buffer: Buffer, startFrom: number) { @@ -33,6 +33,10 @@ class BufferDataWriter implements DataWriter { this._offset = this.buffer.writeUInt32BE(data, this._offset) } + writeInt32(data: number): void { + this._offset = this.buffer.writeUInt32BE(data, this._offset) + } + writeString(data: string): void { this._offset = this.buffer.writeInt16BE(data.length, this._offset) this.writeData(data) diff --git a/src/response_decoder.ts b/src/response_decoder.ts index d0c7ac99..96ef34d4 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -16,20 +16,36 @@ import { SaslHandshakeResponse } from "./responses/sasl_handshake_response" function decode(data: DataReader): RawResponse { const size = data.readUInt32() - const key = data.readUInt16() - const version = data.readUInt16() - const correlationId = data.readUInt32() - const responseCode = data.readUInt16() - const payload = data.slice() + const dataResponse = data.readTo(size) + const key = dataResponse.readUInt16() + const version = dataResponse.readUInt16() + const correlationId = dataResponse.readUInt32() + const responseCode = dataResponse.readUInt16() + const payload = dataResponse.readToEnd() return { size, key, version, correlationId, code: responseCode, payload } } class BufferDataReader implements DataReader { private offset = 0 + constructor(private data: Buffer) {} - slice(): DataReader { - return new BufferDataReader(this.data.slice(this.offset)) + + readTo(size: number): DataReader { + const ret = new BufferDataReader(this.data.slice(this.offset, this.offset + size)) + this.offset += size + return ret + } + + readToEnd(): DataReader { + const ret = new BufferDataReader(this.data.slice(this.offset)) + this.offset = this.data.length + return ret } + + atEnd(): boolean { + return this.offset === this.data.length + } + readUInt16(): number { const ret = this.data.readUInt16BE(this.offset) this.offset += 2 @@ -60,26 +76,29 @@ export class ResponseDecoder { constructor(private listener: DecoderListener) {} add(data: Buffer) { - const response = decode(new BufferDataReader(data)) - switch (response.key) { - case PeerPropertiesResponse.key: - this.listener.responseReceived(new PeerPropertiesResponse(response)) - break - - case SaslHandshakeResponse.key: - this.listener.responseReceived(new SaslHandshakeResponse(response)) - break - - case SaslAuthenticateResponse.key: - this.listener.responseReceived(new SaslAuthenticateResponse(response)) - break - - case OpenResponse.key: - this.listener.responseReceived(new OpenResponse(response)) - break - - default: - throw new Error(`Unknown response ${response.key.toString(16)}`) + const dataReader = new BufferDataReader(data) + while (!dataReader.atEnd()) { + const response = decode(dataReader) + switch (response.key) { + case PeerPropertiesResponse.key: + this.listener.responseReceived(new PeerPropertiesResponse(response)) + break + + case SaslHandshakeResponse.key: + this.listener.responseReceived(new SaslHandshakeResponse(response)) + break + + case SaslAuthenticateResponse.key: + this.listener.responseReceived(new SaslAuthenticateResponse(response)) + break + + case OpenResponse.key: + this.listener.responseReceived(new OpenResponse(response)) + break + + default: + throw new Error(`Unknown response ${response.key.toString(16)}`) + } } } } diff --git a/src/responses/raw_response.ts b/src/responses/raw_response.ts index 2eb0c964..6111c6fe 100644 --- a/src/responses/raw_response.ts +++ b/src/responses/raw_response.ts @@ -1,5 +1,6 @@ export interface DataReader { - slice(): DataReader + readTo(size: number): DataReader + readToEnd(): DataReader readUInt16(): number readUInt32(): number diff --git a/test/unit/response_decoder.test.ts b/test/unit/response_decoder.test.ts new file mode 100644 index 00000000..017bb40e --- /dev/null +++ b/test/unit/response_decoder.test.ts @@ -0,0 +1,67 @@ +import { DecoderListener } from "../../src/decoder_listener" +import { Response } from "../../src/responses/response" +import { ResponseDecoder } from "../../src/response_decoder" +import { PeerPropertiesResponse } from "../../src/responses/peer_properties_response" +import { expect } from "chai" +import { BufferDataWriter } from "../../src/requests/abstract_request" + +class MockDecoderListener implements DecoderListener { + readonly responses: Response[] = [] + + reset() { + this.responses.splice(0) + } + + responseReceived(data: Response) { + this.responses.push(data) + } +} + +describe("ResponseDecoder", () => { + let decoder: ResponseDecoder + const listener = new MockDecoderListener() + + beforeEach(() => { + listener.reset() + decoder = new ResponseDecoder(listener) + }) + + it("decode a buffer that contains a single response", () => { + const data = createResponse({ key: PeerPropertiesResponse.key }) + + decoder.add(data) + + expect(listener.responses).lengthOf(1) + }) + + it("decode a buffer that contains multiple responses", () => { + const data = [ + createResponse({ key: PeerPropertiesResponse.key }), + createResponse({ key: PeerPropertiesResponse.key }), + ] + + decoder.add(Buffer.concat(data)) + + expect(listener.responses).lengthOf(2) + }) +}) + +function createResponse(params: { key: number; correlationId?: number; responseCode?: number }): Buffer { + const dataWriter = new BufferDataWriter(Buffer.alloc(1024), 4) + dataWriter.writeUInt16(params.key) + dataWriter.writeUInt16(1) + dataWriter.writeUInt32(params.correlationId || 101) + dataWriter.writeUInt16(params.responseCode || 1) + + switch (params.key) { + case PeerPropertiesResponse.key: + dataWriter.writeInt32(0) + break + + default: + break + } + + dataWriter.writePrefixSize() + return dataWriter.toBuffer() +} From 9d89365e6e4d55b054d99309cc726f0f75bec3af Mon Sep 17 00:00:00 2001 From: GPad Date: Fri, 1 Jul 2022 19:01:31 +0200 Subject: [PATCH 2/2] Check and manage tune response --- src/connection.ts | 20 ++++++++++------ src/response_decoder.ts | 22 ++++++++++++----- src/responses/abstract_response.ts | 1 + src/responses/raw_response.ts | 8 +++++++ src/responses/tune_response.ts | 38 ++++++++++++++++++++++++++++++ 5 files changed, 76 insertions(+), 13 deletions(-) create mode 100644 src/responses/tune_response.ts diff --git a/src/connection.ts b/src/connection.ts index dfd71a08..085a4859 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -13,6 +13,7 @@ import { Response } from "./responses/response" import { ResponseDecoder } from "./response_decoder" import { createConsoleLog, removeFrom } from "./util" import { WaitingResponse } from "./waiting_response" +import { TuneResponse } from "./responses/tune_response" export class Connection { private readonly socket = new Socket() @@ -65,20 +66,25 @@ export class Connection { }) } private received(data: Buffer) { - this.logger.debug(`Receiving data ... ${inspect(data)}`) + this.logger.debug(`Receiving ${data.length} (${data.readUInt32BE()}) bytes ... ${inspect(data)}`) this.decoder.add(data) } - tune() { - // throw new Error("Method not implemented.") - // this.waitResponse - return Promise.resolve() + async tune(): Promise { + const data = await this.waitResponse({ correlationId: -1, key: TuneResponse.key }) + this.logger.debug(`TUNE response -> ${inspect(data)}`) + + return new Promise((res, rej) => { + this.socket.write(data.toBuffer(), (err) => { + this.logger.debug(`Write COMPLETED for cmd TUNE: ${inspect(data)} - err: ${err}`) + return err ? rej(err) : res() + }) + }) } async exchangeProperties(): Promise { this.logger.debug(`Exchange peer properties ...`) - const req = new PeerPropertiesRequest() - const res = await this.SendAndWait(req) + const res = await this.SendAndWait(new PeerPropertiesRequest()) if (!res.ok) { throw new Error(`Unable to exchange peer properties ${res.code} `) } diff --git a/src/response_decoder.ts b/src/response_decoder.ts index 96ef34d4..2b387aa4 100644 --- a/src/response_decoder.ts +++ b/src/response_decoder.ts @@ -1,9 +1,10 @@ import { DecoderListener } from "./decoder_listener" import { OpenResponse } from "./responses/open_response" import { PeerPropertiesResponse } from "./responses/peer_properties_response" -import { DataReader, RawResponse } from "./responses/raw_response" +import { DataReader, RawResponse, RawTuneResponse } from "./responses/raw_response" import { SaslAuthenticateResponse } from "./responses/sasl_authenticate_response" import { SaslHandshakeResponse } from "./responses/sasl_handshake_response" +import { TuneResponse } from "./responses/tune_response" // Frame => Size (Request | Response | Command) // Size => uint32 (size without the 4 bytes of the size element) @@ -14,11 +15,16 @@ import { SaslHandshakeResponse } from "./responses/sasl_handshake_response" // CorrelationId => uint32 // ResponseCode => uint16 -function decode(data: DataReader): RawResponse { +function decode(data: DataReader): RawResponse | RawTuneResponse { const size = data.readUInt32() const dataResponse = data.readTo(size) const key = dataResponse.readUInt16() const version = dataResponse.readUInt16() + if (key === TuneResponse.key) { + const frameMax = dataResponse.readUInt32() + const heartbeat = dataResponse.readUInt32() + return { size, key, version, frameMax, heartbeat } as RawTuneResponse + } const correlationId = dataResponse.readUInt32() const responseCode = dataResponse.readUInt16() const payload = dataResponse.readToEnd() @@ -81,19 +87,23 @@ export class ResponseDecoder { const response = decode(dataReader) switch (response.key) { case PeerPropertiesResponse.key: - this.listener.responseReceived(new PeerPropertiesResponse(response)) + this.listener.responseReceived(new PeerPropertiesResponse(response as RawResponse)) break case SaslHandshakeResponse.key: - this.listener.responseReceived(new SaslHandshakeResponse(response)) + this.listener.responseReceived(new SaslHandshakeResponse(response as RawResponse)) break case SaslAuthenticateResponse.key: - this.listener.responseReceived(new SaslAuthenticateResponse(response)) + this.listener.responseReceived(new SaslAuthenticateResponse(response as RawResponse)) break case OpenResponse.key: - this.listener.responseReceived(new OpenResponse(response)) + this.listener.responseReceived(new OpenResponse(response as RawResponse)) + break + + case TuneResponse.key: + this.listener.responseReceived(new TuneResponse(response as RawTuneResponse)) break default: diff --git a/src/responses/abstract_response.ts b/src/responses/abstract_response.ts index 3d8733b7..c28fd2b1 100644 --- a/src/responses/abstract_response.ts +++ b/src/responses/abstract_response.ts @@ -27,6 +27,7 @@ export abstract class AbstractResponse implements Response { get code(): number { return this.response.code } + get ok(): boolean { return this.code === 0x01 } diff --git a/src/responses/raw_response.ts b/src/responses/raw_response.ts index 6111c6fe..30e69032 100644 --- a/src/responses/raw_response.ts +++ b/src/responses/raw_response.ts @@ -16,3 +16,11 @@ export interface RawResponse { code: number payload: DataReader } + +export interface RawTuneResponse { + size: number + key: 0x0014 + version: number + frameMax: number + heartbeat: number +} diff --git a/src/responses/tune_response.ts b/src/responses/tune_response.ts new file mode 100644 index 00000000..0c046ace --- /dev/null +++ b/src/responses/tune_response.ts @@ -0,0 +1,38 @@ +import { BufferDataWriter } from "../requests/abstract_request" +import { RawTuneResponse } from "./raw_response" +import { Response } from "./response" + +export class TuneResponse implements Response { + static key = 0x0014 // I know it isn't 8014 + constructor(private response: RawTuneResponse) { + if (this.response.key !== TuneResponse.key) { + throw new Error(`Unable to create ${TuneResponse.name} from data of type ${this.response.key}`) + } + } + + toBuffer(): Buffer { + const dw = new BufferDataWriter(Buffer.alloc(1024), 4) + dw.writeUInt16(TuneResponse.key) + dw.writeUInt16(1) + dw.writeUInt32(this.response.frameMax) + dw.writeUInt32(this.response.heartbeat) + dw.writePrefixSize() + return dw.toBuffer() + } + + get key() { + return this.response.key + } + + get correlationId(): number { + return -1 + } + + get code(): number { + return -1 + } + + get ok(): boolean { + return true + } +}