Skip to content

Commit

Permalink
Revert "Change logic around closing connections and writers API-1283 (h…
Browse files Browse the repository at this point in the history
…azelcast#1417)" (hazelcast#1430)

This reverts commit 2d0b397.
  • Loading branch information
srknzl authored Dec 5, 2022
1 parent e1df29b commit f75acdc
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 107 deletions.
39 changes: 13 additions & 26 deletions src/network/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class Writer extends EventEmitter {

abstract write(message: ClientMessage, resolver: DeferredPromise<void>): void;

abstract close(cause: Error): void;
abstract close(): void;

}

Expand Down Expand Up @@ -81,22 +81,15 @@ export class PipelinedWriter extends Writer {

write(message: ClientMessage, resolver: DeferredPromise<void>): void {
if (this.error) {
// if the socket is closed, it's useless to keep writing to the socket
// if there was a write error, it's useless to keep writing to the socket
return process.nextTick(() => resolver.reject(this.error));
}
this.queue.push({ message, resolver });
this.schedule();
}

close(error: Error): void {
if (this.error) {
return;
}
this.error = this.makeIOError(error);
close(): void {
this.canWrite = false;
// If we pass an error to destroy, an unhandled error will be thrown because we don't handle the error event
// So we don't pass anything to the socket. It is internal anyway.
this.socket.destroy();
// no more items can be added now
this.queue = FROZEN_ARRAY;
}
Expand Down Expand Up @@ -171,22 +164,15 @@ export class PipelinedWriter extends Writer {
});
}

private handleError(err: Error, sentResolvers: OutputQueueItem[]): void {
const error = this.makeIOError(err);
private handleError(err: any, sentResolvers: OutputQueueItem[]): void {
this.error = new IOError(err);
for (const item of sentResolvers) {
item.resolver.reject(error);
item.resolver.reject(this.error);
}
for (const item of this.queue) {
item.resolver.reject(error);
}
this.close(error);
}

private makeIOError(err: Error): IOError {
if (err instanceof IOError) {
return err;
item.resolver.reject(this.error);
}
return new IOError(err.message, err);
this.close();
}
}

Expand All @@ -213,8 +199,8 @@ export class DirectWriter extends Writer {
});
}

close(cause: Error): void {
this.socket.destroy();
close(): void {
// no-op
}
}

Expand Down Expand Up @@ -425,7 +411,7 @@ export class Connection {
/**
* Closes this connection.
*/
close(reason: string | null, cause: Error | null): void {
close(reason: string, cause: Error): void {
if (this.closedTime !== 0) {
return;
}
Expand All @@ -436,7 +422,8 @@ export class Connection {

this.logClose();

this.writer.close(this.closedCause ? this.closedCause : new Error(reason ? reason : 'Connection closed'));
this.writer.close();
this.socket.end();

this.connectionManager.onConnectionClose(this);
}
Expand Down
48 changes: 16 additions & 32 deletions test/unit/connection/DirectWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ const {
Frame
} = require('../../../lib/protocol/ClientMessage');
const { deferredPromise } = require('../../../lib/util/Util');
const sandbox = sinon.createSandbox();

describe('DirectWriterTest', function () {
let writer;
let queue;
let mockSocket;
let writtenBytes;

Expand All @@ -41,29 +40,25 @@ describe('DirectWriterTest', function () {

const setUpWriteSuccess = () => {
mockSocket = new Socket({});
sandbox.stub(mockSocket, 'write').callsFake((data, cb) => {
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
cb();
mockSocket.emit('data', data);
});
writer = new DirectWriter(mockSocket, numberOfBytes => {
queue = new DirectWriter(mockSocket, numberOfBytes => {
writtenBytes += numberOfBytes;
});
};

const setUpWriteFailure = (err) => {
mockSocket = new Socket({});
sandbox.stub(mockSocket, 'write').callsFake((_, cb) => {
sinon.stub(mockSocket, 'write').callsFake((_, cb) => {
cb(err);
});
writer = new DirectWriter(mockSocket, numberOfBytes => {
queue = new DirectWriter(mockSocket, numberOfBytes => {
writtenBytes += numberOfBytes;
});
};

afterEach(function() {
sandbox.restore();
});

it('increment written bytes correctly', function(done) {
setUpWriteSuccess();

Expand All @@ -75,7 +70,7 @@ describe('DirectWriterTest', function () {
done();
});

writer.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
});

it('writes single message into socket', function(done) {
Expand All @@ -87,7 +82,7 @@ describe('DirectWriterTest', function () {
done();
});

writer.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
});

it('writes multiple messages separately into socket', function(done) {
Expand All @@ -102,16 +97,16 @@ describe('DirectWriterTest', function () {
}
});

writer.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
});

it('resolves promise on write success', function(done) {
setUpWriteSuccess();

const resolver = deferredPromise();
writer.write(createMessage('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.then(done);
});

Expand All @@ -120,7 +115,7 @@ describe('DirectWriterTest', function () {
setUpWriteFailure(err);

const resolver = deferredPromise();
writer.write(createMessage('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.catch((err) => {
expect(err).to.be.equal(err);
done();
Expand All @@ -130,29 +125,18 @@ describe('DirectWriterTest', function () {
it('emits write event on write success', function(done) {
setUpWriteSuccess();

writer.on('write', done);
writer.write(createMessage('test'), deferredPromise());
queue.on('write', done);
queue.write(createMessage('test'), deferredPromise());
});

it('does not emit write event on write failure', function(done) {
setUpWriteFailure(new Error());

writer.on('write', () => done(new Error()));
queue.on('write', () => done(new Error()));
const resolver = deferredPromise();
writer.write(createMessage('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.catch(() => {
done();
});
});

it('should close the socket upon being closed', function() {
setUpWriteSuccess();

// This is equivalent to a sinon spy
const spy = sandbox.fake(mockSocket.destroy);
sandbox.replace(mockSocket, 'destroy', spy);
writer.close();

expect(spy.calledOnce).to.be.true;
});
});
55 changes: 6 additions & 49 deletions test/unit/connection/PipelinedWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ const sinon = require('sinon');
const { expect } = require('chai');

const { PipelinedWriter } = require('../../../lib/network/Connection');
const { IOError } = require('../../../lib/core/HazelcastError');
const TestUtil = require('../../TestUtil');
const {
ClientMessage,
Frame
} = require('../../../lib/protocol/ClientMessage');
const { deferredPromise } = require('../../../lib/util/Util');
const sandbox = sinon.createSandbox();

describe('PipelinedWriterTest', function () {
const THRESHOLD = 8192;
Expand All @@ -38,7 +35,7 @@ describe('PipelinedWriterTest', function () {

function setUpWriteSuccess(canWrite) {
mockSocket = new Socket({});
sandbox.stub(mockSocket, 'write').callsFake((data, cb) => {
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
process.nextTick(cb);
process.nextTick(() => mockSocket.emit('data', data));
return canWrite;
Expand All @@ -50,7 +47,7 @@ describe('PipelinedWriterTest', function () {

function setUpWriteFailure(err) {
mockSocket = new Socket({});
sandbox.stub(mockSocket, 'write').callsFake((_, cb) => {
sinon.stub(mockSocket, 'write').callsFake((_, cb) => {
process.nextTick(() => cb(err));
return false;
});
Expand All @@ -71,10 +68,6 @@ describe('PipelinedWriterTest', function () {
return clientMessage;
}

afterEach(function () {
sandbox.restore();
});

it('increment written bytes correctly', function(done) {
setUpWriteSuccess(true);

Expand Down Expand Up @@ -206,17 +199,16 @@ describe('PipelinedWriterTest', function () {
Promise.all([resolver1.promise, resolver2.promise]).then(() => done());
});

it('rejects single promise on write failure', async function() {
it('rejects single promise on write failure', function(done) {
const err = new Error();
setUpWriteFailure(err);

const resolver = deferredPromise();
writer.write(createMessageFromString('test'), resolver);
const rejReason = await TestUtil.getRejectionReasonOrThrow(async () => {
await resolver.promise;
resolver.promise.catch((err) => {
expect(err).to.be.equal(err);
done();
});
expect(rejReason.cause).to.be.equal(err);
expect(rejReason).to.be.instanceOf(IOError);
});

it('rejects multiple promises on write failure', function(done) {
Expand Down Expand Up @@ -284,39 +276,4 @@ describe('PipelinedWriterTest', function () {
writer.write(msg, deferredPromise());
});
});

it('should not schedule a write if a write() is called when the writer is already closed', async function() {
setUpWriteSuccess(true);

const msg = createMessageFromString('test');
// Pass a IOError so that the same error is used to reject the write() deferred promise
const closeReason = new IOError();
writer.close(closeReason);
const deferred = deferredPromise();

// This is equivalent to a sinon spy
const spy = sandbox.fake(writer.schedule);
sandbox.replace(writer, 'schedule', spy);
writer.write(msg, deferred);

const rejectionReason = await TestUtil.getRejectionReasonOrThrow(async () => await deferred.promise);
expect(rejectionReason).to.be.equal(closeReason);
expect(spy.callCount).to.be.equal(0);
});

it('should not destroy the socket twice upon closing again', async function() {
setUpWriteSuccess(true);

// Pass a IOError so that the same error is used to reject the write() deferred promise
const closeReason = new IOError();

// This is equivalent to a sinon spy
const spy = sandbox.fake(mockSocket.destroy);
sandbox.replace(mockSocket, 'destroy', spy);

writer.close(closeReason);
writer.close(closeReason);

expect(spy.callCount).to.be.equal(1);
});
});

0 comments on commit f75acdc

Please sign in to comment.