Skip to content

Commit

Permalink
Change logic around closing connections and writers API-1283 (hazelca…
Browse files Browse the repository at this point in the history
…st#1417)

The issue was writing some data to the connection after it being closed. I changed and refactored some logic around PipelinedWriter and Connection to avoid that. 

- Moved socket closing responsibility to Writers
- Fixed a test in PipelinedWriterTest
- Ongoing and future socket.write calls will end with an error after the writer is closed. This is because close() destroys the socket and any ongoing and future socket.write calls will end with an error as described in node.js [documentation](https://nodejs.org/api/stream.html#writabledestroyerror). So we will be able to reject all deferred promises in the write queue upon close due to connection.close() or socket write error close().
- We were not closing the socket on write error before, now we do it. We were sending end packet and don't write anymore. This is the same as java. [See](https://hazelcast.slack.com/archives/C01JU7ZJYGP/p1668695419424179)

fixes hazelcast#1256
  • Loading branch information
srknzl committed Dec 5, 2022
1 parent 2255a52 commit 2d0b397
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 35 deletions.
39 changes: 26 additions & 13 deletions src/network/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class Writer extends EventEmitter {

abstract write(message: ClientMessage, resolver: DeferredPromise<void>): void;

abstract close(): void;
abstract close(cause: Error): void;

}

Expand Down Expand Up @@ -81,15 +81,22 @@ export class PipelinedWriter extends Writer {

write(message: ClientMessage, resolver: DeferredPromise<void>): void {
if (this.error) {
// if there was a write error, it's useless to keep writing to the socket
// if the socket is closed, it's useless to keep writing to the socket
return process.nextTick(() => resolver.reject(this.error));
}
this.queue.push({ message, resolver });
this.schedule();
}

close(): void {
close(error: Error): void {
if (this.error) {
return;
}
this.error = this.makeIOError(error);
this.canWrite = false;
// If we pass an error to destroy, an unhandled error will be thrown because we don't handle the error event
// So we don't pass anything to the socket. It is internal anyway.
this.socket.destroy();
// no more items can be added now
this.queue = FROZEN_ARRAY;
}
Expand Down Expand Up @@ -164,15 +171,22 @@ export class PipelinedWriter extends Writer {
});
}

private handleError(err: any, sentResolvers: OutputQueueItem[]): void {
this.error = new IOError(err);
private handleError(err: Error, sentResolvers: OutputQueueItem[]): void {
const error = this.makeIOError(err);
for (const item of sentResolvers) {
item.resolver.reject(this.error);
item.resolver.reject(error);
}
for (const item of this.queue) {
item.resolver.reject(this.error);
item.resolver.reject(error);
}
this.close(error);
}

private makeIOError(err: Error): IOError {
if (err instanceof IOError) {
return err;
}
this.close();
return new IOError(err.message, err);
}
}

Expand All @@ -199,8 +213,8 @@ export class DirectWriter extends Writer {
});
}

close(): void {
// no-op
close(cause: Error): void {
this.socket.destroy();
}
}

Expand Down Expand Up @@ -411,7 +425,7 @@ export class Connection {
/**
* Closes this connection.
*/
close(reason: string, cause: Error): void {
close(reason: string | null, cause: Error | null): void {
if (this.closedTime !== 0) {
return;
}
Expand All @@ -422,8 +436,7 @@ export class Connection {

this.logClose();

this.writer.close();
this.socket.end();
this.writer.close(this.closedCause ? this.closedCause : new Error(reason ? reason : 'Connection closed'));

this.connectionManager.onConnectionClose(this);
}
Expand Down
48 changes: 32 additions & 16 deletions test/unit/connection/DirectWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ const {
Frame
} = require('../../../lib/protocol/ClientMessage');
const { deferredPromise } = require('../../../lib/util/Util');
const sandbox = sinon.createSandbox();

describe('DirectWriterTest', function () {
let queue;
let writer;
let mockSocket;
let writtenBytes;

Expand All @@ -40,25 +41,29 @@ describe('DirectWriterTest', function () {

const setUpWriteSuccess = () => {
mockSocket = new Socket({});
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
sandbox.stub(mockSocket, 'write').callsFake((data, cb) => {
cb();
mockSocket.emit('data', data);
});
queue = new DirectWriter(mockSocket, numberOfBytes => {
writer = new DirectWriter(mockSocket, numberOfBytes => {
writtenBytes += numberOfBytes;
});
};

const setUpWriteFailure = (err) => {
mockSocket = new Socket({});
sinon.stub(mockSocket, 'write').callsFake((_, cb) => {
sandbox.stub(mockSocket, 'write').callsFake((_, cb) => {
cb(err);
});
queue = new DirectWriter(mockSocket, numberOfBytes => {
writer = new DirectWriter(mockSocket, numberOfBytes => {
writtenBytes += numberOfBytes;
});
};

afterEach(function() {
sandbox.restore();
});

it('increment written bytes correctly', function(done) {
setUpWriteSuccess();

Expand All @@ -70,7 +75,7 @@ describe('DirectWriterTest', function () {
done();
});

queue.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
});

it('writes single message into socket', function(done) {
Expand All @@ -82,7 +87,7 @@ describe('DirectWriterTest', function () {
done();
});

queue.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
});

it('writes multiple messages separately into socket', function(done) {
Expand All @@ -97,16 +102,16 @@ describe('DirectWriterTest', function () {
}
});

queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
});

it('resolves promise on write success', function(done) {
setUpWriteSuccess();

const resolver = deferredPromise();
queue.write(createMessage('test'), resolver);
writer.write(createMessage('test'), resolver);
resolver.promise.then(done);
});

Expand All @@ -115,7 +120,7 @@ describe('DirectWriterTest', function () {
setUpWriteFailure(err);

const resolver = deferredPromise();
queue.write(createMessage('test'), resolver);
writer.write(createMessage('test'), resolver);
resolver.promise.catch((err) => {
expect(err).to.be.equal(err);
done();
Expand All @@ -125,18 +130,29 @@ describe('DirectWriterTest', function () {
it('emits write event on write success', function(done) {
setUpWriteSuccess();

queue.on('write', done);
queue.write(createMessage('test'), deferredPromise());
writer.on('write', done);
writer.write(createMessage('test'), deferredPromise());
});

it('does not emit write event on write failure', function(done) {
setUpWriteFailure(new Error());

queue.on('write', () => done(new Error()));
writer.on('write', () => done(new Error()));
const resolver = deferredPromise();
queue.write(createMessage('test'), resolver);
writer.write(createMessage('test'), resolver);
resolver.promise.catch(() => {
done();
});
});

it('should close the socket upon being closed', function() {
setUpWriteSuccess();

// This is equivalent to a sinon spy
const spy = sandbox.fake(mockSocket.destroy);
sandbox.replace(mockSocket, 'destroy', spy);
writer.close();

expect(spy.calledOnce).to.be.true;
});
});
55 changes: 49 additions & 6 deletions test/unit/connection/PipelinedWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ const sinon = require('sinon');
const { expect } = require('chai');

const { PipelinedWriter } = require('../../../lib/network/Connection');
const { IOError } = require('../../../lib/core/HazelcastError');
const TestUtil = require('../../TestUtil');
const {
ClientMessage,
Frame
} = require('../../../lib/protocol/ClientMessage');
const { deferredPromise } = require('../../../lib/util/Util');
const sandbox = sinon.createSandbox();

describe('PipelinedWriterTest', function () {
const THRESHOLD = 8192;
Expand All @@ -35,7 +38,7 @@ describe('PipelinedWriterTest', function () {

function setUpWriteSuccess(canWrite) {
mockSocket = new Socket({});
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
sandbox.stub(mockSocket, 'write').callsFake((data, cb) => {
process.nextTick(cb);
process.nextTick(() => mockSocket.emit('data', data));
return canWrite;
Expand All @@ -47,7 +50,7 @@ describe('PipelinedWriterTest', function () {

function setUpWriteFailure(err) {
mockSocket = new Socket({});
sinon.stub(mockSocket, 'write').callsFake((_, cb) => {
sandbox.stub(mockSocket, 'write').callsFake((_, cb) => {
process.nextTick(() => cb(err));
return false;
});
Expand All @@ -68,6 +71,10 @@ describe('PipelinedWriterTest', function () {
return clientMessage;
}

afterEach(function () {
sandbox.restore();
});

it('increment written bytes correctly', function(done) {
setUpWriteSuccess(true);

Expand Down Expand Up @@ -199,16 +206,17 @@ describe('PipelinedWriterTest', function () {
Promise.all([resolver1.promise, resolver2.promise]).then(() => done());
});

it('rejects single promise on write failure', function(done) {
it('rejects single promise on write failure', async function() {
const err = new Error();
setUpWriteFailure(err);

const resolver = deferredPromise();
writer.write(createMessageFromString('test'), resolver);
resolver.promise.catch((err) => {
expect(err).to.be.equal(err);
done();
const rejReason = await TestUtil.getRejectionReasonOrThrow(async () => {
await resolver.promise;
});
expect(rejReason.cause).to.be.equal(err);
expect(rejReason).to.be.instanceOf(IOError);
});

it('rejects multiple promises on write failure', function(done) {
Expand Down Expand Up @@ -276,4 +284,39 @@ describe('PipelinedWriterTest', function () {
writer.write(msg, deferredPromise());
});
});

it('should not schedule a write if a write() is called when the writer is already closed', async function() {
setUpWriteSuccess(true);

const msg = createMessageFromString('test');
// Pass a IOError so that the same error is used to reject the write() deferred promise
const closeReason = new IOError();
writer.close(closeReason);
const deferred = deferredPromise();

// This is equivalent to a sinon spy
const spy = sandbox.fake(writer.schedule);
sandbox.replace(writer, 'schedule', spy);
writer.write(msg, deferred);

const rejectionReason = await TestUtil.getRejectionReasonOrThrow(async () => await deferred.promise);
expect(rejectionReason).to.be.equal(closeReason);
expect(spy.callCount).to.be.equal(0);
});

it('should not destroy the socket twice upon closing again', async function() {
setUpWriteSuccess(true);

// Pass a IOError so that the same error is used to reject the write() deferred promise
const closeReason = new IOError();

// This is equivalent to a sinon spy
const spy = sandbox.fake(mockSocket.destroy);
sandbox.replace(mockSocket, 'destroy', spy);

writer.close(closeReason);
writer.close(closeReason);

expect(spy.callCount).to.be.equal(1);
});
});

0 comments on commit 2d0b397

Please sign in to comment.