Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Response } from "./responses/response"
import { ResponseDecoder } from "./response_decoder"
import { createConsoleLog, removeFrom } from "./util"
import { WaitingResponse } from "./waiting_response"
import { TuneResponse } from "./responses/tune_response"

export class Connection {
private readonly socket = new Socket()
Expand Down Expand Up @@ -65,20 +66,25 @@ export class Connection {
})
}
private received(data: Buffer) {
this.logger.debug(`Receiving data ... ${inspect(data)}`)
this.logger.debug(`Receiving ${data.length} (${data.readUInt32BE()}) bytes ... ${inspect(data)}`)
this.decoder.add(data)
}

tune() {
// throw new Error("Method not implemented.")
// this.waitResponse<TuneResponse>
return Promise.resolve()
async tune(): Promise<void> {
const data = await this.waitResponse<TuneResponse>({ correlationId: -1, key: TuneResponse.key })
this.logger.debug(`TUNE response -> ${inspect(data)}`)

return new Promise((res, rej) => {
this.socket.write(data.toBuffer(), (err) => {
this.logger.debug(`Write COMPLETED for cmd TUNE: ${inspect(data)} - err: ${err}`)
return err ? rej(err) : res()
})
})
}

async exchangeProperties(): Promise<PeerPropertiesResponse> {
this.logger.debug(`Exchange peer properties ...`)
const req = new PeerPropertiesRequest()
const res = await this.SendAndWait<PeerPropertiesResponse>(req)
const res = await this.SendAndWait<PeerPropertiesResponse>(new PeerPropertiesRequest())
if (!res.ok) {
throw new Error(`Unable to exchange peer properties ${res.code} `)
}
Expand Down
6 changes: 5 additions & 1 deletion src/requests/abstract_request.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Request } from "./request"
import { DataWriter } from "./sasl_authenticate_request"

class BufferDataWriter implements DataWriter {
export class BufferDataWriter implements DataWriter {
private _offset = 0

constructor(private buffer: Buffer, startFrom: number) {
Expand Down Expand Up @@ -33,6 +33,10 @@ class BufferDataWriter implements DataWriter {
this._offset = this.buffer.writeUInt32BE(data, this._offset)
}

writeInt32(data: number): void {
this._offset = this.buffer.writeUInt32BE(data, this._offset)
}

writeString(data: string): void {
this._offset = this.buffer.writeInt16BE(data.length, this._offset)
this.writeData(data)
Expand Down
87 changes: 58 additions & 29 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { DecoderListener } from "./decoder_listener"
import { OpenResponse } from "./responses/open_response"
import { PeerPropertiesResponse } from "./responses/peer_properties_response"
import { DataReader, RawResponse } from "./responses/raw_response"
import { DataReader, RawResponse, RawTuneResponse } from "./responses/raw_response"
import { SaslAuthenticateResponse } from "./responses/sasl_authenticate_response"
import { SaslHandshakeResponse } from "./responses/sasl_handshake_response"
import { TuneResponse } from "./responses/tune_response"

// Frame => Size (Request | Response | Command)
// Size => uint32 (size without the 4 bytes of the size element)
Expand All @@ -14,22 +15,43 @@ import { SaslHandshakeResponse } from "./responses/sasl_handshake_response"
// CorrelationId => uint32
// ResponseCode => uint16

function decode(data: DataReader): RawResponse {
function decode(data: DataReader): RawResponse | RawTuneResponse {
const size = data.readUInt32()
const key = data.readUInt16()
const version = data.readUInt16()
const correlationId = data.readUInt32()
const responseCode = data.readUInt16()
const payload = data.slice()
const dataResponse = data.readTo(size)
const key = dataResponse.readUInt16()
const version = dataResponse.readUInt16()
if (key === TuneResponse.key) {
const frameMax = dataResponse.readUInt32()
const heartbeat = dataResponse.readUInt32()
return { size, key, version, frameMax, heartbeat } as RawTuneResponse
}
const correlationId = dataResponse.readUInt32()
const responseCode = dataResponse.readUInt16()
const payload = dataResponse.readToEnd()
return { size, key, version, correlationId, code: responseCode, payload }
}

class BufferDataReader implements DataReader {
private offset = 0

constructor(private data: Buffer) {}
slice(): DataReader {
return new BufferDataReader(this.data.slice(this.offset))

readTo(size: number): DataReader {
const ret = new BufferDataReader(this.data.slice(this.offset, this.offset + size))
this.offset += size
return ret
}

readToEnd(): DataReader {
const ret = new BufferDataReader(this.data.slice(this.offset))
this.offset = this.data.length
return ret
}

atEnd(): boolean {
return this.offset === this.data.length
}

readUInt16(): number {
const ret = this.data.readUInt16BE(this.offset)
this.offset += 2
Expand Down Expand Up @@ -60,26 +82,33 @@ export class ResponseDecoder {
constructor(private listener: DecoderListener) {}

add(data: Buffer) {
const response = decode(new BufferDataReader(data))
switch (response.key) {
case PeerPropertiesResponse.key:
this.listener.responseReceived(new PeerPropertiesResponse(response))
break

case SaslHandshakeResponse.key:
this.listener.responseReceived(new SaslHandshakeResponse(response))
break

case SaslAuthenticateResponse.key:
this.listener.responseReceived(new SaslAuthenticateResponse(response))
break

case OpenResponse.key:
this.listener.responseReceived(new OpenResponse(response))
break

default:
throw new Error(`Unknown response ${response.key.toString(16)}`)
const dataReader = new BufferDataReader(data)
while (!dataReader.atEnd()) {
const response = decode(dataReader)
switch (response.key) {
case PeerPropertiesResponse.key:
this.listener.responseReceived(new PeerPropertiesResponse(response as RawResponse))
break

case SaslHandshakeResponse.key:
this.listener.responseReceived(new SaslHandshakeResponse(response as RawResponse))
break

case SaslAuthenticateResponse.key:
this.listener.responseReceived(new SaslAuthenticateResponse(response as RawResponse))
break

case OpenResponse.key:
this.listener.responseReceived(new OpenResponse(response as RawResponse))
break

case TuneResponse.key:
this.listener.responseReceived(new TuneResponse(response as RawTuneResponse))
break

default:
throw new Error(`Unknown response ${response.key.toString(16)}`)
}
}
}
}
1 change: 1 addition & 0 deletions src/responses/abstract_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export abstract class AbstractResponse implements Response {
get code(): number {
return this.response.code
}

get ok(): boolean {
return this.code === 0x01
}
Expand Down
11 changes: 10 additions & 1 deletion src/responses/raw_response.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export interface DataReader {
slice(): DataReader
readTo(size: number): DataReader
readToEnd(): DataReader

readUInt16(): number
readUInt32(): number
Expand All @@ -15,3 +16,11 @@ export interface RawResponse {
code: number
payload: DataReader
}

export interface RawTuneResponse {
size: number
key: 0x0014
version: number
frameMax: number
heartbeat: number
}
38 changes: 38 additions & 0 deletions src/responses/tune_response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { BufferDataWriter } from "../requests/abstract_request"
import { RawTuneResponse } from "./raw_response"
import { Response } from "./response"

export class TuneResponse implements Response {
static key = 0x0014 // I know it isn't 8014
constructor(private response: RawTuneResponse) {
if (this.response.key !== TuneResponse.key) {
throw new Error(`Unable to create ${TuneResponse.name} from data of type ${this.response.key}`)
}
}

toBuffer(): Buffer {
const dw = new BufferDataWriter(Buffer.alloc(1024), 4)
dw.writeUInt16(TuneResponse.key)
dw.writeUInt16(1)
dw.writeUInt32(this.response.frameMax)
dw.writeUInt32(this.response.heartbeat)
dw.writePrefixSize()
return dw.toBuffer()
}

get key() {
return this.response.key
}

get correlationId(): number {
return -1
}

get code(): number {
return -1
}

get ok(): boolean {
return true
}
}
67 changes: 67 additions & 0 deletions test/unit/response_decoder.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { DecoderListener } from "../../src/decoder_listener"
import { Response } from "../../src/responses/response"
import { ResponseDecoder } from "../../src/response_decoder"
import { PeerPropertiesResponse } from "../../src/responses/peer_properties_response"
import { expect } from "chai"
import { BufferDataWriter } from "../../src/requests/abstract_request"

class MockDecoderListener implements DecoderListener {
readonly responses: Response[] = []

reset() {
this.responses.splice(0)
}

responseReceived(data: Response) {
this.responses.push(data)
}
}

describe("ResponseDecoder", () => {
let decoder: ResponseDecoder
const listener = new MockDecoderListener()

beforeEach(() => {
listener.reset()
decoder = new ResponseDecoder(listener)
})

it("decode a buffer that contains a single response", () => {
const data = createResponse({ key: PeerPropertiesResponse.key })

decoder.add(data)

expect(listener.responses).lengthOf(1)
})

it("decode a buffer that contains multiple responses", () => {
const data = [
createResponse({ key: PeerPropertiesResponse.key }),
createResponse({ key: PeerPropertiesResponse.key }),
]

decoder.add(Buffer.concat(data))

expect(listener.responses).lengthOf(2)
})
})

function createResponse(params: { key: number; correlationId?: number; responseCode?: number }): Buffer {
const dataWriter = new BufferDataWriter(Buffer.alloc(1024), 4)
dataWriter.writeUInt16(params.key)
dataWriter.writeUInt16(1)
dataWriter.writeUInt32(params.correlationId || 101)
dataWriter.writeUInt16(params.responseCode || 1)

switch (params.key) {
case PeerPropertiesResponse.key:
dataWriter.writeInt32(0)
break

default:
break
}

dataWriter.writePrefixSize()
return dataWriter.toBuffer()
}