Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAP-49] no connection serials #1110

Merged
merged 33 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8515af0
http: fix http x-ably-version test ignoring headers
andydunstall Jan 4, 2023
633034e
update ably protocol version to 2 (RSC7a)
andydunstall Jan 4, 2023
b2a27eb
channel: fix channel serial type
andydunstall Jan 4, 2023
6394217
channel: resume with attach
andydunstall Jan 4, 2023
c74a354
channel: remove checkChannelsOnResume check
andydunstall Jan 5, 2023
4fd8561
connection: remove redundant reattach
andydunstall Jan 5, 2023
7d76878
presence: include the member id in re-enter
andydunstall Jan 5, 2023
8b3b487
presence: don't remove member on re-enter
andydunstall Jan 5, 2023
86072b1
presence: store own members by clientId only
andydunstall Jan 5, 2023
58a03a8
presence: re-enter presence members even if in main presence set
andydunstall Jan 5, 2023
21e0f26
presence: re-enter own members when moving into attached state
andydunstall Jan 5, 2023
51dce34
tsconfig: target es5
andydunstall Jan 5, 2023
237c515
connection: add createRecoveryKey
andydunstall Jan 5, 2023
20306f7
channel: fix channel options optional
andydunstall Jan 5, 2023
c156ea0
connection: recovery from new recovery key format
andydunstall Jan 5, 2023
2784464
connection: remove unsed setRecoveryKey
andydunstall Jan 5, 2023
ddf0e7c
upgrade: replace sync with activate
andydunstall Jan 5, 2023
098de12
connection: remove redundant connectionSerial
andydunstall Jan 5, 2023
d6a321d
upgrade: don't wait for channel no pending before activating
andydunstall Jan 5, 2023
eac6c1b
remove redundant duplicate message check
andydunstall Jan 5, 2023
904bdeb
comet: fix comet connection key
andydunstall Oct 4, 2022
0193edb
connection: remove redundant connectionKey test
andydunstall Jan 6, 2023
46ffe64
resume: update tested expected error code
andydunstall Jan 6, 2023
412a96a
upgrade: updated tested expected error code
andydunstall Jan 6, 2023
35a1bdd
upgrade: remove redundant `unresponsive_upgrade_sync` test
andydunstall Jan 6, 2023
a122a2b
upgrade: fix unrecoverableUpgrade upgrading before publishing
andydunstall Jan 6, 2023
8b89cbe
connection: updated tested expected error code
andydunstall Jan 6, 2023
f503f7d
remove redundant duplicateConnectionId test
andydunstall Jan 6, 2023
9af4386
presence: update tests for new presence model
andydunstall Jan 6, 2023
faf621a
ConnectionManager: fix RTN19a2 implementation
andydunstall Jan 6, 2023
4004274
resume: add recover test
andydunstall Jan 6, 2023
fe05647
Tests: fix flake/race in setOptionsCallbackBehaviour caused by protoc…
SimonWoolf Jan 16, 2023
545ddbf
init_usetokenauth_defaulttokenparams_wildcard test: increase ttl to f…
SimonWoolf Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2989,7 +2989,7 @@ declare namespace Types {
/**
* The recovery key string can be used by another client to recover this connection's state in the recover client options property. See [connection state recover options](https://ably.com/docs/realtime/connection#connection-state-recover-options) for more information.
*/
recoveryKey: string;
recoveryKey: string | null;
/**
* The serial number of the last message to be received on this connection, used automatically by the library when recovering or resuming a connection. When recovering a connection explicitly, the `recoveryKey` is used in the recover client options as it contains both the key and the last message serial.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/common/lib/client/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Channel extends EventEmitter {
this.channelOptions = normaliseChannelOptions(channelOptions);
}

setOptions(options: ChannelOptions): void {
setOptions(options?: ChannelOptions): void {
this.channelOptions = normaliseChannelOptions(options);
}

Expand Down
12 changes: 8 additions & 4 deletions src/common/lib/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class Connection extends EventEmitter {
state: string;
key?: string;
id?: string;
serial: undefined;
recoveryKey?: string | null;
errorReason: ErrorInfo | null;

constructor(ably: Realtime, options: NormalisedClientOptions) {
Expand All @@ -27,8 +25,6 @@ class Connection extends EventEmitter {
this.state = this.connectionManager.state.state;
this.key = undefined;
this.id = undefined;
this.serial = undefined;
this.recoveryKey = undefined;
this.errorReason = null;

this.connectionManager.on('connectionstate', (stateChange: ConnectionStateChange) => {
Expand Down Expand Up @@ -74,6 +70,14 @@ class Connection extends EventEmitter {
Logger.logAction(Logger.LOG_MINOR, 'Connection.close()', 'connectionKey = ' + this.key);
this.connectionManager.requestState({ state: 'closing' });
}

get recoveryKey(): string | null {
return this.createRecoveryKey();
}

createRecoveryKey(): string | null {
return this.connectionManager.createRecoveryKey();
}
}

export default Connection;
96 changes: 24 additions & 72 deletions src/common/lib/client/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import Defaults from '../util/defaults';
import ErrorInfo from '../types/errorinfo';
import ProtocolMessage from '../types/protocolmessage';
import { ChannelOptions } from '../../types/channel';
import { ErrCallback } from '../../types/utils';
import ClientOptions, { DeprecatedClientOptions } from '../../types/ClientOptions';
import * as API from '../../../../ably';
import ConnectionManager from '../transport/connectionmanager';
Expand Down Expand Up @@ -55,18 +54,35 @@ class Realtime extends Rest {
class Channels extends EventEmitter {
realtime: Realtime;
all: Record<string, RealtimeChannel>;
inProgress: Record<string, RealtimeChannel>;

constructor(realtime: Realtime) {
super();
this.realtime = realtime;
this.all = Object.create(null);
this.inProgress = Object.create(null);
realtime.connection.connectionManager.on('transport.active', () => {
this.onTransportActive();
});
}

channelSerials(): { [name: string]: string } {
let serials: { [name: string]: string } = {};
for (const name of Utils.keysArray(this.all, true)) {
const channel = this.all[name];
if (channel.properties.channelSerial) {
serials[name] = channel.properties.channelSerial;
}
}
return serials;
}

// recoverChannels gets the given channels and sets their channel serials.
recoverChannels(channelSerials: { [name: string]: string }) {
for (const name of Utils.keysArray(channelSerials, true)) {
const channel = this.get(name);
channel.properties.channelSerial = channelSerials[name];
}
}

onChannelMessage(msg: ProtocolMessage) {
const channelName = msg.channel;
if (channelName === undefined) {
Expand All @@ -90,50 +106,17 @@ class Channels extends EventEmitter {
}

/* called when a transport becomes connected; reattempt attach/detach
* for channels that are attaching or detaching.
* Note that this does not use inProgress as inProgress is only channels which have already made
* at least one attempt to attach/detach */
* for channels that are attaching or detaching. */
onTransportActive() {
for (const channelName in this.all) {
const channel = this.all[channelName];
if (channel.state === 'attaching' || channel.state === 'detaching') {
channel.checkPendingState();
} else if (channel.state === 'suspended') {
channel._attach(false, null);
}
}
}

reattach(reason: ErrorInfo) {
for (const channelId in this.all) {
const channel = this.all[channelId];
/* NB this should not trigger for merely attaching channels, as they will
* be reattached anyway through the onTransportActive checkPendingState */
if (channel.state === 'attached') {
channel.requestState('attaching', reason);
}
}
}

resetAttachedMsgIndicators() {
for (const channelId in this.all) {
const channel = this.all[channelId];
if (channel.state === 'attached') {
channel._attachedMsgIndicator = false;
}
}
}

checkAttachedMsgIndicators(connectionId: string) {
for (const channelId in this.all) {
const channel = this.all[channelId];
if (channel.state === 'attached' && channel._attachedMsgIndicator === false) {
const msg =
'30s after a resume, found channel which has not received an attached; channelId = ' +
channelId +
'; connectionId = ' +
connectionId;
Logger.logAction(Logger.LOG_ERROR, 'Channels.checkAttachedMsgIndicators()', msg);
} else if (channel.state === 'attached') {
// Note explicity request the state, channel.attach() would do nothing
// as its already attached.
channel.requestState('attaching');
}
}
Expand All @@ -160,7 +143,7 @@ class Channels extends EventEmitter {
}
}

get(name: string, channelOptions: ChannelOptions) {
get(name: string, channelOptions?: ChannelOptions) {
name = String(name);
let channel = this.all[name];
if (!channel) {
Expand Down Expand Up @@ -191,37 +174,6 @@ class Channels extends EventEmitter {
throw releaseErr;
}
delete this.all[name];
delete this.inProgress[name];
}

/* Records operations currently pending on a transport; used by connectionManager to decide when
* it's safe to upgrade. Note that a channel might be in the attaching state without any pending
* operations (eg if attached while the connection state is connecting) - such a channel must not
* hold up an upgrade, so is not considered inProgress.
* Operation is currently one of either 'statechange' or 'sync' */
setInProgress(channel: RealtimeChannel, operation: string, inProgress: boolean) {
this.inProgress[channel.name] = this.inProgress[channel.name] || {};
(this.inProgress[channel.name] as any)[operation] = inProgress;
if (!inProgress && this.hasNopending()) {
this.emit('nopending');
}
}

onceNopending(listener: ErrCallback) {
if (this.hasNopending()) {
listener();
return;
}
this.once('nopending', listener);
}

hasNopending() {
return Utils.arrEvery(
Utils.valuesArray(this.inProgress, true) as any,
function (operations: Record<string, unknown>) {
return !Utils.containsValue(operations, true);
}
);
}
}

Expand Down
Loading