Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

- To ensure all transactions are finalized and temp files cleaned up,… #139

Merged
merged 3 commits into from Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/server/command_processor.js
Expand Up @@ -145,7 +145,8 @@ class CommandProcessor extends Duplex {
if(this[kReadStream] === null || this[kSource] === null) return;

let chunk;
while((chunk = this[kReadStream].read()) !== null) {
const rs = this[kReadStream];
while((chunk = rs.read()) !== null) {
this._sendFileQueueChunkReads++;
this._sendFileQueueReadBytes += chunk.length;
if(!this.push(chunk, 'ascii')) break;
Expand Down
44 changes: 28 additions & 16 deletions lib/server/server.js
Expand Up @@ -107,10 +107,13 @@ class CacheServer {
this.errCallback = errCallback;

this._server = net.createServer(socket => {
helpers.log(consts.LOG_INFO, `${socket.remoteAddress}:${socket.remotePort} connected.`);
const remoteAddress = socket.remoteAddress;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, this makes the template literals easier to read.

const remotePort = socket.remotePort;

helpers.log(consts.LOG_INFO, `${remoteAddress}:${remotePort} connected.`);

const cmdProc = new CommandProcessor(this.cache);
const streamProc = new ClientStreamProcessor({clientAddress: `${socket.remoteAddress}:${socket.remotePort}`});
const streamProc = new ClientStreamProcessor({clientAddress: `${remoteAddress}:${remotePort}`});

const mirrors = this._mirrors;
if(mirrors.length > 0) {
Expand All @@ -119,23 +122,32 @@ class CacheServer {
});
}

const unpipeStreams = () => {
socket.unpipe();
streamProc.unpipe();
cmdProc.unpipe();
};

cmdProc.on('finish', () => {
helpers.log(consts.LOG_DBG, `${remoteAddress}:${remotePort} CommandProcessor finished.`);
process.nextTick(unpipeStreams);
});

socket.on('close', () => {
helpers.log(consts.LOG_INFO, `${socket.remoteAddress}:${socket.remotePort} closed connection.`);
socket.unpipe();
streamProc.unpipe();
cmdProc.unpipe();
}).on('error', err => {
helpers.log(consts.LOG_ERR, err.message);
});
helpers.log(consts.LOG_INFO, `${remoteAddress}:${remotePort} Closed connection.`);
}).on('error', err => {
helpers.log(consts.LOG_ERR, `${remoteAddress}:${remotePort} Connection ERROR: ${err.message}`);
unpipeStreams();
});

if(this.isRecordingClient) {
const sessionId = `${socket.remoteAddress}_${socket.remotePort}_${Date.now()}`;
socket.pipe(new ClientStreamRecorder({sessionId})); // Record the incoming byte stream to disk
}
if(this.isRecordingClient) {
const sessionId = `${remoteAddress}_${remotePort}_${Date.now()}`;
socket.pipe(new ClientStreamRecorder({sessionId})); // Record the incoming byte stream to disk
}

socket.pipe(streamProc) // Transform the incoming byte stream into commands and file data
.pipe(cmdProc) // Execute commands and interface with the cache module
.pipe(socket); // Connect back to socket to send files
socket.pipe(streamProc) // Transform the incoming byte stream into commands and file data
.pipe(cmdProc) // Execute commands and interface with the cache module
.pipe(socket); // Connect back to socket to send files
}).on('error', err => {
if (err.code === 'EADDRINUSE') {
helpers.log(consts.LOG_ERR, `Port ${this.port} is already in use...`);
Expand Down