Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Change logic around closing connections and writers API-1283 (#1417)" #1430

Merged
merged 1 commit into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions src/network/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class Writer extends EventEmitter {

abstract write(message: ClientMessage, resolver: DeferredPromise<void>): void;

abstract close(cause: Error): void;
abstract close(): void;

}

Expand Down Expand Up @@ -81,22 +81,15 @@ export class PipelinedWriter extends Writer {

write(message: ClientMessage, resolver: DeferredPromise<void>): void {
if (this.error) {
// if the socket is closed, it's useless to keep writing to the socket
// if there was a write error, it's useless to keep writing to the socket
return process.nextTick(() => resolver.reject(this.error));
}
this.queue.push({ message, resolver });
this.schedule();
}

close(error: Error): void {
if (this.error) {
return;
}
this.error = this.makeIOError(error);
close(): void {
this.canWrite = false;
// If we pass an error to destroy, an unhandled error will be thrown because we don't handle the error event
// So we don't pass anything to the socket. It is internal anyway.
this.socket.destroy();
// no more items can be added now
this.queue = FROZEN_ARRAY;
}
Expand Down Expand Up @@ -171,22 +164,15 @@ export class PipelinedWriter extends Writer {
});
}

private handleError(err: Error, sentResolvers: OutputQueueItem[]): void {
const error = this.makeIOError(err);
private handleError(err: any, sentResolvers: OutputQueueItem[]): void {
this.error = new IOError(err);
for (const item of sentResolvers) {
item.resolver.reject(error);
item.resolver.reject(this.error);
}
for (const item of this.queue) {
item.resolver.reject(error);
}
this.close(error);
}

private makeIOError(err: Error): IOError {
if (err instanceof IOError) {
return err;
item.resolver.reject(this.error);
}
return new IOError(err.message, err);
this.close();
}
}

Expand All @@ -213,8 +199,8 @@ export class DirectWriter extends Writer {
});
}

close(cause: Error): void {
this.socket.destroy();
close(): void {
// no-op
}
}

Expand Down Expand Up @@ -425,7 +411,7 @@ export class Connection {
/**
* Closes this connection.
*/
close(reason: string | null, cause: Error | null): void {
close(reason: string, cause: Error): void {
if (this.closedTime !== 0) {
return;
}
Expand All @@ -436,7 +422,8 @@ export class Connection {

this.logClose();

this.writer.close(this.closedCause ? this.closedCause : new Error(reason ? reason : 'Connection closed'));
this.writer.close();
this.socket.end();

this.connectionManager.onConnectionClose(this);
}
Expand Down
48 changes: 16 additions & 32 deletions test/unit/connection/DirectWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ const {
Frame
} = require('../../../lib/protocol/ClientMessage');
const { deferredPromise } = require('../../../lib/util/Util');
const sandbox = sinon.createSandbox();

describe('DirectWriterTest', function () {
let writer;
let queue;
let mockSocket;
let writtenBytes;

Expand All @@ -41,29 +40,25 @@ describe('DirectWriterTest', function () {

const setUpWriteSuccess = () => {
mockSocket = new Socket({});
sandbox.stub(mockSocket, 'write').callsFake((data, cb) => {
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
cb();
mockSocket.emit('data', data);
});
writer = new DirectWriter(mockSocket, numberOfBytes => {
queue = new DirectWriter(mockSocket, numberOfBytes => {
writtenBytes += numberOfBytes;
});
};

const setUpWriteFailure = (err) => {
mockSocket = new Socket({});
sandbox.stub(mockSocket, 'write').callsFake((_, cb) => {
sinon.stub(mockSocket, 'write').callsFake((_, cb) => {
cb(err);
});
writer = new DirectWriter(mockSocket, numberOfBytes => {
queue = new DirectWriter(mockSocket, numberOfBytes => {
writtenBytes += numberOfBytes;
});
};

afterEach(function() {
sandbox.restore();
});

it('increment written bytes correctly', function(done) {
setUpWriteSuccess();

Expand All @@ -75,7 +70,7 @@ describe('DirectWriterTest', function () {
done();
});

writer.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
});

it('writes single message into socket', function(done) {
Expand All @@ -87,7 +82,7 @@ describe('DirectWriterTest', function () {
done();
});

writer.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
});

it('writes multiple messages separately into socket', function(done) {
Expand All @@ -102,16 +97,16 @@ describe('DirectWriterTest', function () {
}
});

writer.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
writer.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
queue.write(msg, deferredPromise());
});

it('resolves promise on write success', function(done) {
setUpWriteSuccess();

const resolver = deferredPromise();
writer.write(createMessage('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.then(done);
});

Expand All @@ -120,7 +115,7 @@ describe('DirectWriterTest', function () {
setUpWriteFailure(err);

const resolver = deferredPromise();
writer.write(createMessage('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.catch((err) => {
expect(err).to.be.equal(err);
done();
Expand All @@ -130,29 +125,18 @@ describe('DirectWriterTest', function () {
it('emits write event on write success', function(done) {
setUpWriteSuccess();

writer.on('write', done);
writer.write(createMessage('test'), deferredPromise());
queue.on('write', done);
queue.write(createMessage('test'), deferredPromise());
});

it('does not emit write event on write failure', function(done) {
setUpWriteFailure(new Error());

writer.on('write', () => done(new Error()));
queue.on('write', () => done(new Error()));
const resolver = deferredPromise();
writer.write(createMessage('test'), resolver);
queue.write(createMessage('test'), resolver);
resolver.promise.catch(() => {
done();
});
});

it('should close the socket upon being closed', function() {
setUpWriteSuccess();

// This is equivalent to a sinon spy
const spy = sandbox.fake(mockSocket.destroy);
sandbox.replace(mockSocket, 'destroy', spy);
writer.close();

expect(spy.calledOnce).to.be.true;
});
});
55 changes: 6 additions & 49 deletions test/unit/connection/PipelinedWriterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ const sinon = require('sinon');
const { expect } = require('chai');

const { PipelinedWriter } = require('../../../lib/network/Connection');
const { IOError } = require('../../../lib/core/HazelcastError');
const TestUtil = require('../../TestUtil');
const {
ClientMessage,
Frame
} = require('../../../lib/protocol/ClientMessage');
const { deferredPromise } = require('../../../lib/util/Util');
const sandbox = sinon.createSandbox();

describe('PipelinedWriterTest', function () {
const THRESHOLD = 8192;
Expand All @@ -38,7 +35,7 @@ describe('PipelinedWriterTest', function () {

function setUpWriteSuccess(canWrite) {
mockSocket = new Socket({});
sandbox.stub(mockSocket, 'write').callsFake((data, cb) => {
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
process.nextTick(cb);
process.nextTick(() => mockSocket.emit('data', data));
return canWrite;
Expand All @@ -50,7 +47,7 @@ describe('PipelinedWriterTest', function () {

function setUpWriteFailure(err) {
mockSocket = new Socket({});
sandbox.stub(mockSocket, 'write').callsFake((_, cb) => {
sinon.stub(mockSocket, 'write').callsFake((_, cb) => {
process.nextTick(() => cb(err));
return false;
});
Expand All @@ -71,10 +68,6 @@ describe('PipelinedWriterTest', function () {
return clientMessage;
}

afterEach(function () {
sandbox.restore();
});

it('increment written bytes correctly', function(done) {
setUpWriteSuccess(true);

Expand Down Expand Up @@ -206,17 +199,16 @@ describe('PipelinedWriterTest', function () {
Promise.all([resolver1.promise, resolver2.promise]).then(() => done());
});

it('rejects single promise on write failure', async function() {
it('rejects single promise on write failure', function(done) {
const err = new Error();
setUpWriteFailure(err);

const resolver = deferredPromise();
writer.write(createMessageFromString('test'), resolver);
const rejReason = await TestUtil.getRejectionReasonOrThrow(async () => {
await resolver.promise;
resolver.promise.catch((err) => {
expect(err).to.be.equal(err);
done();
});
expect(rejReason.cause).to.be.equal(err);
expect(rejReason).to.be.instanceOf(IOError);
});

it('rejects multiple promises on write failure', function(done) {
Expand Down Expand Up @@ -284,39 +276,4 @@ describe('PipelinedWriterTest', function () {
writer.write(msg, deferredPromise());
});
});

it('should not schedule a write if a write() is called when the writer is already closed', async function() {
setUpWriteSuccess(true);

const msg = createMessageFromString('test');
// Pass a IOError so that the same error is used to reject the write() deferred promise
const closeReason = new IOError();
writer.close(closeReason);
const deferred = deferredPromise();

// This is equivalent to a sinon spy
const spy = sandbox.fake(writer.schedule);
sandbox.replace(writer, 'schedule', spy);
writer.write(msg, deferred);

const rejectionReason = await TestUtil.getRejectionReasonOrThrow(async () => await deferred.promise);
expect(rejectionReason).to.be.equal(closeReason);
expect(spy.callCount).to.be.equal(0);
});

it('should not destroy the socket twice upon closing again', async function() {
setUpWriteSuccess(true);

// Pass a IOError so that the same error is used to reject the write() deferred promise
const closeReason = new IOError();

// This is equivalent to a sinon spy
const spy = sandbox.fake(mockSocket.destroy);
sandbox.replace(mockSocket, 'destroy', spy);

writer.close(closeReason);
writer.close(closeReason);

expect(spy.callCount).to.be.equal(1);
});
});