Skip to content

Commit

Permalink
fix: add timeout method to remote socket (socketio#4558)
Browse files Browse the repository at this point in the history
The RemoteSocket interface, which is returned when the client is
connected on another Socket.IO server of the cluster, was lacking the
`timeout()` method.

Syntax:

```js
const sockets = await io.fetchSockets();

for (const socket of sockets) {
  if (someCondition) {
    socket.timeout(1000).emit("some-event", (err) => {
      if (err) {
        // the client did not acknowledge the event in the given delay
      }
    });
  }
}
```

Related: socketio#4595
  • Loading branch information
walde1024 authored and haneenmahd committed Apr 15, 2023
1 parent 53d6129 commit ef855e6
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions lib/broadcast-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
EventNames,
EventsMap,
TypedEventBroadcaster,
DecorateAcknowledgements,
DecorateAcknowledgementsWithTimeoutAndMultipleResponses,
AllButLast,
Last,
Expand All @@ -20,7 +21,9 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
private readonly adapter: Adapter,
private readonly rooms: Set<Room> = new Set<Room>(),
private readonly exceptRooms: Set<Room> = new Set<Room>(),
private readonly flags: BroadcastFlags = {}
private readonly flags: BroadcastFlags & {
expectSingleResponse?: boolean;
} = {}
) {}

/**
Expand Down Expand Up @@ -232,7 +235,10 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>

const timer = setTimeout(() => {
timedOut = true;
ack.apply(this, [new Error("operation has timed out"), responses]);
ack.apply(this, [
new Error("operation has timed out"),
this.flags.expectSingleResponse ? null : responses,
]);
}, this.flags.timeout);

let expectedServerCount = -1;
Expand All @@ -246,7 +252,10 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
responses.length === expectedClientCount
) {
clearTimeout(timer);
ack.apply(this, [null, responses]);
ack.apply(this, [
null,
this.flags.expectSingleResponse ? null : responses,
]);
}
};

Expand Down Expand Up @@ -476,10 +485,44 @@ export class RemoteSocket<EmitEvents extends EventsMap, SocketData>
this.data = details.data;
this.operator = new BroadcastOperator<EmitEvents, SocketData>(
adapter,
new Set([this.id])
new Set([this.id]),
new Set(),
{
expectSingleResponse: true, // so that remoteSocket.emit() with acknowledgement behaves like socket.emit()
}
);
}

/**
* Adds a timeout in milliseconds for the next operation.
*
* @example
* const sockets = await io.fetchSockets();
*
* for (const socket of sockets) {
* if (someCondition) {
* socket.timeout(1000).emit("some-event", (err) => {
* if (err) {
* // the client did not acknowledge the event in the given delay
* }
* });
* }
* }
*
* // note: if possible, using a room instead of looping over all sockets is preferable
* io.timeout(1000).to(someConditionRoom).emit("some-event", (err, responses) => {
* // ...
* });
*
* @param timeout
*/
public timeout(timeout: number) {
return this.operator.timeout(timeout) as BroadcastOperator<
DecorateAcknowledgements<EmitEvents>,
SocketData
>;
}

public emit<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: EventParams<EmitEvents, Ev>
Expand Down

0 comments on commit ef855e6

Please sign in to comment.