Skip to content

Commit

Permalink
Merge pull request #1110 from ably/feature/CAP-49-no-connection-serials
Browse files Browse the repository at this point in the history
[CAP-49] no connection serials
  • Loading branch information
SimonWoolf committed Jan 30, 2023
2 parents da87954 + 545ddbf commit 590f292
Show file tree
Hide file tree
Showing 23 changed files with 511 additions and 775 deletions.
2 changes: 1 addition & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2989,7 +2989,7 @@ declare namespace Types {
/**
* The recovery key string can be used by another client to recover this connection's state in the recover client options property. See [connection state recover options](https://ably.com/docs/realtime/connection#connection-state-recover-options) for more information.
*/
recoveryKey: string;
recoveryKey: string | null;
/**
* The serial number of the last message to be received on this connection, used automatically by the library when recovering or resuming a connection. When recovering a connection explicitly, the `recoveryKey` is used in the recover client options as it contains both the key and the last message serial.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/common/lib/client/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Channel extends EventEmitter {
this.channelOptions = normaliseChannelOptions(channelOptions);
}

setOptions(options: ChannelOptions): void {
setOptions(options?: ChannelOptions): void {
this.channelOptions = normaliseChannelOptions(options);
}

Expand Down
12 changes: 8 additions & 4 deletions src/common/lib/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class Connection extends EventEmitter {
state: string;
key?: string;
id?: string;
serial: undefined;
recoveryKey?: string | null;
errorReason: ErrorInfo | null;

constructor(ably: Realtime, options: NormalisedClientOptions) {
Expand All @@ -27,8 +25,6 @@ class Connection extends EventEmitter {
this.state = this.connectionManager.state.state;
this.key = undefined;
this.id = undefined;
this.serial = undefined;
this.recoveryKey = undefined;
this.errorReason = null;

this.connectionManager.on('connectionstate', (stateChange: ConnectionStateChange) => {
Expand Down Expand Up @@ -74,6 +70,14 @@ class Connection extends EventEmitter {
Logger.logAction(Logger.LOG_MINOR, 'Connection.close()', 'connectionKey = ' + this.key);
this.connectionManager.requestState({ state: 'closing' });
}

get recoveryKey(): string | null {
return this.createRecoveryKey();
}

createRecoveryKey(): string | null {
return this.connectionManager.createRecoveryKey();
}
}

export default Connection;
96 changes: 24 additions & 72 deletions src/common/lib/client/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import Defaults from '../util/defaults';
import ErrorInfo from '../types/errorinfo';
import ProtocolMessage from '../types/protocolmessage';
import { ChannelOptions } from '../../types/channel';
import { ErrCallback } from '../../types/utils';
import ClientOptions, { DeprecatedClientOptions } from '../../types/ClientOptions';
import * as API from '../../../../ably';
import ConnectionManager from '../transport/connectionmanager';
Expand Down Expand Up @@ -55,18 +54,35 @@ class Realtime extends Rest {
class Channels extends EventEmitter {
realtime: Realtime;
all: Record<string, RealtimeChannel>;
inProgress: Record<string, RealtimeChannel>;

constructor(realtime: Realtime) {
super();
this.realtime = realtime;
this.all = Object.create(null);
this.inProgress = Object.create(null);
realtime.connection.connectionManager.on('transport.active', () => {
this.onTransportActive();
});
}

channelSerials(): { [name: string]: string } {
let serials: { [name: string]: string } = {};
for (const name of Utils.keysArray(this.all, true)) {
const channel = this.all[name];
if (channel.properties.channelSerial) {
serials[name] = channel.properties.channelSerial;
}
}
return serials;
}

// recoverChannels gets the given channels and sets their channel serials.
recoverChannels(channelSerials: { [name: string]: string }) {
for (const name of Utils.keysArray(channelSerials, true)) {
const channel = this.get(name);
channel.properties.channelSerial = channelSerials[name];
}
}

onChannelMessage(msg: ProtocolMessage) {
const channelName = msg.channel;
if (channelName === undefined) {
Expand All @@ -90,50 +106,17 @@ class Channels extends EventEmitter {
}

/* called when a transport becomes connected; reattempt attach/detach
* for channels that are attaching or detaching.
* Note that this does not use inProgress as inProgress is only channels which have already made
* at least one attempt to attach/detach */
* for channels that are attaching or detaching. */
onTransportActive() {
for (const channelName in this.all) {
const channel = this.all[channelName];
if (channel.state === 'attaching' || channel.state === 'detaching') {
channel.checkPendingState();
} else if (channel.state === 'suspended') {
channel._attach(false, null);
}
}
}

reattach(reason: ErrorInfo) {
for (const channelId in this.all) {
const channel = this.all[channelId];
/* NB this should not trigger for merely attaching channels, as they will
* be reattached anyway through the onTransportActive checkPendingState */
if (channel.state === 'attached') {
channel.requestState('attaching', reason);
}
}
}

resetAttachedMsgIndicators() {
for (const channelId in this.all) {
const channel = this.all[channelId];
if (channel.state === 'attached') {
channel._attachedMsgIndicator = false;
}
}
}

checkAttachedMsgIndicators(connectionId: string) {
for (const channelId in this.all) {
const channel = this.all[channelId];
if (channel.state === 'attached' && channel._attachedMsgIndicator === false) {
const msg =
'30s after a resume, found channel which has not received an attached; channelId = ' +
channelId +
'; connectionId = ' +
connectionId;
Logger.logAction(Logger.LOG_ERROR, 'Channels.checkAttachedMsgIndicators()', msg);
} else if (channel.state === 'attached') {
// Note explicity request the state, channel.attach() would do nothing
// as its already attached.
channel.requestState('attaching');
}
}
Expand All @@ -160,7 +143,7 @@ class Channels extends EventEmitter {
}
}

get(name: string, channelOptions: ChannelOptions) {
get(name: string, channelOptions?: ChannelOptions) {
name = String(name);
let channel = this.all[name];
if (!channel) {
Expand Down Expand Up @@ -191,37 +174,6 @@ class Channels extends EventEmitter {
throw releaseErr;
}
delete this.all[name];
delete this.inProgress[name];
}

/* Records operations currently pending on a transport; used by connectionManager to decide when
* it's safe to upgrade. Note that a channel might be in the attaching state without any pending
* operations (eg if attached while the connection state is connecting) - such a channel must not
* hold up an upgrade, so is not considered inProgress.
* Operation is currently one of either 'statechange' or 'sync' */
setInProgress(channel: RealtimeChannel, operation: string, inProgress: boolean) {
this.inProgress[channel.name] = this.inProgress[channel.name] || {};
(this.inProgress[channel.name] as any)[operation] = inProgress;
if (!inProgress && this.hasNopending()) {
this.emit('nopending');
}
}

onceNopending(listener: ErrCallback) {
if (this.hasNopending()) {
listener();
return;
}
this.once('nopending', listener);
}

hasNopending() {
return Utils.arrEvery(
Utils.valuesArray(this.inProgress, true) as any,
function (operations: Record<string, unknown>) {
return !Utils.containsValue(operations, true);
}
);
}
}

Expand Down
Loading

0 comments on commit 590f292

Please sign in to comment.