Skip to content

Commit

Permalink
feat: add ChannelStateChange.hasBacklog
Browse files Browse the repository at this point in the history
Exposes the `hasBacklog` flag on `ChannelStateChange`. This may be used
in combination with rewind to check whether to expect a backlog of
messages upon attachment.
  • Loading branch information
owenpearson committed Jun 29, 2023
1 parent 8a50060 commit b75e0e4
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 7 deletions.
4 changes: 4 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,10 @@ declare namespace Types {
* Indicates whether message continuity on this channel is preserved, see [Nonfatal channel errors](https://ably.com/docs/realtime/channels#nonfatal-errors) for more info.
*/
resumed: boolean;
/**
* Indicates whether the client can expect a backlog of messages from a rewind or resume.
*/
hasBacklog?: boolean;
}

/**
Expand Down
14 changes: 12 additions & 2 deletions src/common/lib/client/channelstatechange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@ class ChannelStateChange {
current: string;
resumed?: boolean;
reason?: string | Error | ErrorInfo;
hasBacklog?: boolean;

constructor(previous: string, current: string, resumed?: boolean, reason?: string | Error | ErrorInfo | null) {
constructor(
previous: string,
current: string,
resumed?: boolean,
hasBacklog?: boolean,
reason?: string | Error | ErrorInfo | null
) {
this.previous = previous;
this.current = current;
if (current === 'attached') this.resumed = resumed;
if (current === 'attached') {
this.resumed = resumed;
this.hasBacklog = hasBacklog;
}
if (reason) this.reason = reason;
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,13 @@ class RealtimeChannel extends Channel {
this.modes = (modesFromFlags && Utils.allToLowerCase(modesFromFlags)) || undefined;
const resumed = message.hasFlag('RESUMED');
const hasPresence = message.hasFlag('HAS_PRESENCE');
const hasBacklog = message.hasFlag('HAS_BACKLOG');
if (this.state === 'attached') {
if (!resumed) {
/* On a loss of continuity, the presence set needs to be re-synced */
this.presence.onAttached(hasPresence);
}
const change = new ChannelStateChange(this.state, this.state, resumed, message.error);
const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error);
this._allChannelChanges.emit('update', change);
if (!resumed || this.channelOptions.updateOnAttached) {
this.emit('update', change);
Expand All @@ -636,7 +637,7 @@ class RealtimeChannel extends Channel {
/* RTL5i: re-send DETACH and remain in the 'detaching' state */
this.checkPendingState();
} else {
this.notifyState('attached', message.error, resumed, hasPresence);
this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog);
}
break;
}
Expand Down Expand Up @@ -797,7 +798,8 @@ class RealtimeChannel extends Channel {
state: API.Types.ChannelState,
reason?: ErrorInfo | null,
resumed?: boolean,
hasPresence?: boolean
hasPresence?: boolean,
hasBacklog?: boolean
): void {
Logger.logAction(
Logger.LOG_MICRO,
Expand All @@ -823,7 +825,7 @@ class RealtimeChannel extends Channel {
if (reason) {
this.errorReason = reason;
}
const change = new ChannelStateChange(this.state, state, resumed, reason);
const change = new ChannelStateChange(this.state, state, resumed, hasBacklog, reason);
const logLevel = state === 'failed' ? Logger.LOG_ERROR : Logger.LOG_MAJOR;
Logger.logAction(
logLevel,
Expand Down
2 changes: 1 addition & 1 deletion src/common/lib/client/realtimepresence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ class RealtimePresence extends Presence {
const msg = 'Presence auto-re-enter failed: ' + err.toString();
const wrappedErr = new ErrorInfo(msg, 91004, 400);
Logger.logAction(Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', msg);
const change = new ChannelStateChange(this.channel.state, this.channel.state, true, wrappedErr);
const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr);
this.channel.emit('update', change);
}
};
Expand Down
54 changes: 54 additions & 0 deletions test/realtime/channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1630,5 +1630,59 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
}
);
});

it('rewind_has_backlog_0', function (done) {
var realtime = helper.AblyRealtime();
var channelName = 'rewind_has_backlog_0';
var channelOpts = { params: { rewind: '1' } };
var channel = realtime.channels.get(channelName, channelOpts);

// attach with rewind but no channel history - hasBacklog should be false
channel.attach(function (err, stateChange) {
if (err) {
closeAndFinish(done, realtime, err);
return;
}

try {
expect(!stateChange.hasBacklog).to.be.ok;
} catch (err) {
closeAndFinish(done, realtime, err);
return;
}
closeAndFinish(done, realtime);
});
});

it('rewind_has_backlog_1', function (done) {
var realtime = helper.AblyRealtime();
var rest = helper.AblyRest();
var channelName = 'rewind_has_backlog_1';
var channelOpts = { params: { rewind: '1' } };
var rtChannel = realtime.channels.get(channelName, channelOpts);
var restChannel = rest.channels.get(channelName);

// attach with rewind after publishing - hasBacklog should be true
restChannel.publish('foo', 'bar', function (err) {
if (err) {
closeAndFinish(done, realtime, err);
return;
}
rtChannel.attach(function (err, stateChange) {
if (err) {
closeAndFinish(done, realtime, err);
return;
}

try {
expect(stateChange.hasBacklog).to.be.ok;
} catch (err) {
closeAndFinish(done, realtime, err);
return;
}
closeAndFinish(done, realtime);
});
});
});
});
});

0 comments on commit b75e0e4

Please sign in to comment.