diff --git a/src/socket.ts b/src/socket.ts index 42f1cef..9e6a4a1 100644 --- a/src/socket.ts +++ b/src/socket.ts @@ -10,7 +10,6 @@ import { } from "./error"; import * as protocol from "./protocol"; import {PassThrough, Duplex} from "stream"; -import {awaitNextTick} from "./util"; /** * Reconnection settings for the socket @@ -345,7 +344,9 @@ export class FluentSocket extends EventEmitter { ) { if (this.state === SocketState.DISCONNECTING) { // Try again once the socket has fully closed - await awaitNextTick(); + await new Promise(resolve => + this.once(FluentSocketEvent.CLOSE, resolve) + ); return await this.connect(); } else { // noop, we're connected diff --git a/test/test.client.ts b/test/test.client.ts index 76e01d7..69b16ea 100644 --- a/test/test.client.ts +++ b/test/test.client.ts @@ -16,6 +16,7 @@ import { } from "../src/error"; import {awaitNextTick, awaitTimeout} from "../src/util"; import {FluentSocketEvent} from "../src/socket"; +import {FluentServer} from "../src"; chai.use(chaiAsPromised); const expect = chai.expect; @@ -518,6 +519,33 @@ describe("FluentClient", () => { sinon.assert.calledTwice(spy); }); + it("should allow multiple connects and disconnects in succession", async () => { + const server = new FluentServer(); + await server.listen(); + + try { + const client = new FluentClient("abc", { + socket: { + port: server.port, + }, + disableAutoconnect: true, + }); + + await client.connect(); + try { + const firstEvent = client.emit("a", {event: "foo bar"}); + + await client.disconnect(); + await client.connect(); + await expect(firstEvent).to.eventually.be.fulfilled; + } finally { + await client.shutdown(); + } + } finally { + await server.close(); + } + }); + it("should reject pending events after shutdown", async () => { const {client, socket} = createFluentClient("test"); socket.isWritable = false;