Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions src/modules/SignallingClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
ConnectionClosedCode,
} from '../types';
import { TalkMessageStreamPayload } from '../types/signalling/TalkMessageStreamPayload';
import { toUnencodedMessage } from '../types/signalling/SignalMessage';

export class SignallingClient {
private publicEventEmitter: PublicEventEmitter;
Expand All @@ -19,7 +20,6 @@ export class SignallingClient {
private realtime: Ably.Realtime | null = null;
private channel: Ably.RealtimeChannel | null = null;
private stopSignal = false;
private isConnected = false;

constructor(
sessionId: string,
Expand Down Expand Up @@ -65,6 +65,11 @@ export class SignallingClient {
this.realtime = new Ably.Realtime({
token: this.ablyToken,
echoMessages: false,
disconnectedRetryTimeout: 1000, // Retry after 1 second
suspendedRetryTimeout: 2000, // Retry after 2 seconds if suspended (this comes after repeated disconnection and failed reconnects)
transportParams: {
heartbeatInterval: 5000, // this is the minimum heartbeat interval, we want it low so we can quickly detect disconnections.
},
});
// Initialize Ably Realtime client
this.realtime = new Ably.Realtime(this.ablyToken);
Expand All @@ -74,15 +79,13 @@ export class SignallingClient {
params: { rewind: '100' },
});

this.channel.presence.enter();

// Set up connection state listeners
this.realtime.connection.on('connected', () => {
this.onConnected();
});

this.realtime.connection.on('disconnected', () => {
this.onDisconnected();
});

this.realtime.connection.on('failed', () => {
this.onConnectionFailed();
});
Expand All @@ -91,6 +94,7 @@ export class SignallingClient {
this.channel.subscribe((message) => {
this.onMessage(message);
});
this.realtime.connect();
}

public async sendOffer(localDescription: RTCSessionDescription) {
Expand Down Expand Up @@ -134,12 +138,6 @@ export class SignallingClient {
);
}

if (!this.isConnected) {
throw new Error(
'SignallingClient - sendSignalMessage: Cannot send message, connection not established yet.',
);
}

try {
this.channel.publish('signal', message);
} catch (error) {
Expand Down Expand Up @@ -171,13 +169,11 @@ export class SignallingClient {
this.realtime.close();
this.realtime = null;
this.channel = null;
this.isConnected = false;
}
}

private onConnected(): void {
try {
this.isConnected = true;
this.internalEventEmitter.emit(InternalEvent.WEB_SOCKET_OPEN);
} catch (e) {
console.error('SignallingClient - onConnected: error', e);
Expand All @@ -188,14 +184,6 @@ export class SignallingClient {
}
}

private onDisconnected() {
this.isConnected = false;
if (this.stopSignal) {
return;
}
// Ably handles reconnection automatically
}

private onConnectionFailed() {
if (this.stopSignal) {
return;
Expand All @@ -208,7 +196,9 @@ export class SignallingClient {

private onMessage(message: Ably.Message) {
// Extract the SignalMessage from Ably message data
const signalMessage: SignalMessage = message.data;
let signalMessage: SignalMessage = message.data;
// Messages coming back from the server may have an encoded payload, convert it to unencoded for cosumption elsewhere in the SDK
signalMessage = toUnencodedMessage(signalMessage);
this.internalEventEmitter.emit(
InternalEvent.SIGNAL_MESSAGE_RECEIVED,
signalMessage,
Expand Down
13 changes: 13 additions & 0 deletions src/types/signalling/SignalMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,17 @@ export interface SignalMessage {
actionType: SignalMessageAction;
sessionId: string;
payload: object | string;
payloadFormat?: 'json-string' | 'unencoded';
}

export function toUnencodedMessage(message: SignalMessage): SignalMessage {
if (message.payloadFormat === 'json-string') {
return {
...message,
payload: JSON.parse(message.payload as string),
payloadFormat: 'unencoded',
};
}
// Already raw or undefined format (assume raw)
return message;
}