Skip to content

Commit

Permalink
Add contracts for parser functions
Browse files Browse the repository at this point in the history
PR-URL: metarhia#12
  • Loading branch information
tshemsedinov authored and dimanadko committed Jul 24, 2018
1 parent 9142074 commit 9cd4190
Showing 1 changed file with 62 additions and 32 deletions.
94 changes: 62 additions & 32 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,35 @@

const common = require('metarhia-common');
const EventEmitter = require('events');

const parser = require('./parser.js');

const INITIAL_BUFFER_SIZE = 16384;

const ENCODING = {
binary: 0,
jstp: 1,
json: 2,
bson: 3,
v8: 4
};

const Connection = function() {
EventEmitter.call(this);
this.encoding = ENCODING.binary;

this.parcels = [];
this.chunks = [];
this.parcels = new Map();
this.chunks = new Map();

// Buffer for collecting full data,
// cut by lower-level protocols
this.buffer = Buffer.alloc(0);
this.position = 0;
this.buffer = Buffer.alloc(INITIAL_BUFFER_SIZE);
this.bytesToRead = parser.HANDSHAKE_LENGTH;

this.transport.on('data', (data) => {
if (this.bytesToRead === 0) {
const structType = data.readUInt8();
this.bytesToRead = structType === 0 ?
const structType = parser.readStructType(data);
this.bytesToRead = structType === parser.STRUCT_PARCEL ?
parser.PARCEL_LENGTH : parser.CHUNK_LENGTH;
}
this._onRawData(data);
Expand All @@ -34,80 +45,93 @@ common.inherits(Connection, EventEmitter);
Connection.prototype._onRawData = function(data) {
if (data.length < this.bytesToRead) {
this.bytesToRead -= data.length;
this.buffer = Buffer.concat([this.buffer, data]);
this.position += data.copy(this.buffer, this.position);
} else {
const newDataChunk = data.slice(0, this.bytesToRead);
this.buffer = Buffer.concat([this.buffer, newDataChunk]);
this.position += data.copy(
this.buffer, this.position, 0, this.bytesToRead
);
const structType = parser.readStructType(this.buffer);

data = data.slice(this.bytesToRead);

if (
// Chunk structure header
this.buffer.readUInt8() === 1 &&
structType === parser.STRUCT_CHUNK &&
this.buffer.length === parser.CHUNK_LENGTH
) {
const length = parser.readPayloadLength(this.buffer);
this.bytesToRead = length;
} else {
this.emit('structure', this.buffer);
this.buffer = Buffer.alloc(0);
this.position = 0;
// if more data, read struct type from it
// and set appropriate value to bytesToRead
// or set bytesToRead equal to 0
this.bytesToRead = data.length ? data.readUInt8() === 0 ?
parser.PARCEL_LENGTH : parser.CHUNK_LENGTH : 0;
if (data.length) {
const structType = parser.readStructType(data);
this.bytesToRead = structType === parser.STRUCT_PARCEL ?
parser.PARCEL_LENGTH : parser.CHUNK_LENGTH;
} else {
this.bytesToRead = 0;
}
}

if (data.length > 0) this._onRawData(data);
}
};

Connection.prototype.setEncoding = function(encoding) {
this.encoding = ENCODING[encoding];
};

Connection.prototype.sendHandshake = function(handshake) {
const buffer = parser.handshake(handshake);
this._send(buffer);
this.send(buffer);
};

Connection.prototype.sendParcel = function(parcel, callback) {
const buffer = parser.parcel(parcel);
this._send(buffer, callback);
const buffer = parser.parcel(parcel, this.encoding);
this.send(buffer, callback);
};

Connection.prototype.sendChunk = function(chunk, encoding, callback) {
const buffer = parser.chunk(chunk);
this._send(buffer, encoding, callback);
Connection.prototype.sendChunk = function(chunk, callback) {
const buffer = parser.chunk(chunk, this.encoding);
this.send(buffer, this.encoding, callback);
};

Connection.prototype._send = function(data, encoding, callback) {
this.transport.write(data, encoding, callback);
Connection.prototype.send = function(data, callback) {
this.transport.write(data, callback);
};

Connection.prototype._onHandshake = function(buffer) {
const handshake = parser.readHandshake(buffer);
this.emit('handshake', handshake);
this.on('structure', (buffer) => {
const struct = parser.readStruct(buffer);
if (struct.structType === 'parcel') this._onParcel(struct);
else if (struct.structType === 'chunk') this._onChunk(struct);
if (struct.structType === parser.STRUCT_PARCEL) this._onParcel(struct);
else if (struct.structType === parser.STRUCT_CHUNK) this._onChunk(struct);
});
};

Connection.prototype._onParcel = function(parcel) {
const chunks = this.chunks.filter(
chunk => chunk.parcelId === parcel.parcelId
);
if (parcel.parcelType === parser.PARCEL_STREAM) {
this._emitParcelType(parcel);
}

const chunks = this.chunks.get(parcel.parcelId);
if (!chunks) return;

const currentLength = chunks.reduce((acc, cur) => acc += cur.length, 0);
Object.assign(parcel, { chunks, currentLength });

if (currentLength === parcel.length) {
this._emitParcelType(parcel);
} else {
this.parcels.push(parcel);
this.parcels.set(parcel.parcelId, parcel);
}
};

Connection.prototype._onChunk = function(chunk) {
const parcel = this.parcels.find(
parcel => parcel.parcelId === chunk.parcelId
);
const parcel = this.parcels.get(chunk.parcelId);

if (parcel) {
parcel.chunks.push(chunk);
Expand All @@ -117,7 +141,13 @@ Connection.prototype._onChunk = function(chunk) {
this._emitParcelType(parcel);
}
} else {
this.chunks.push(chunk);
const chunks = this.chunks.get(chunk.parcelId);

if (chunks) {
chunks.push(chunk);
} else {
this.chunks.set(chunk.parcelId, [chunk]);
}
}
};

Expand Down

0 comments on commit 9cd4190

Please sign in to comment.