From e1c04a92f2be31e185dc388293f86fda8ea1d258 Mon Sep 17 00:00:00 2001 From: Andrew Dunstall Date: Fri, 6 Jan 2023 12:40:39 +0000 Subject: [PATCH] resume: add recover test --- test/realtime/resume.test.js | 82 ++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/test/realtime/resume.test.js b/test/realtime/resume.test.js index 7d57e6bf6f..05bee83cda 100644 --- a/test/realtime/resume.test.js +++ b/test/realtime/resume.test.js @@ -624,5 +624,87 @@ define(['shared_helper', 'async', 'chai'], function (helper, async, chai) { closeAndFinish(done, [sender_realtime, receiver_realtime, resumed_receiver_realtime], err); } }); + + // Tests recovering multiple channels only receives the expected messages. + it('recover multiple channels', async function () { + const NUM_MSGS = 5; + + const txRest = helper.AblyRest(); + const rxRealtime = helper.AblyRealtime( + { + transports: [helper.bestTransport], + promises: true, + }, + true + ); + + const channelNames = Array(5) + .fill() + .map(() => String(Math.random())); + const rxChannels = channelNames.map((name) => rxRealtime.channels.get(name)); + + await Promise.all(rxChannels.map((channel) => channel.attach())); + + // Do a few publishes on each channel so the channelSerial is set. + await Promise.all( + channelNames.map(async (name) => { + const tx = txRest.channels.get(name); + const rx = rxRealtime.channels.get(name); + + for (let i = 0; i < NUM_MSGS; i++) { + const pSubscribe = rx.subscriptions.once(); + await tx.publish(null, null); + await pSubscribe; + } + }) + ); + + const connectionId = rxRealtime.connection.id; + const connectionKey = rxRealtime.connection.key; + + // Get the recovery key before becoming suspended as it will be reset. + const recoveryKey = rxRealtime.connection.recoveryKey; + + await new Promise((resolve) => helper.becomeSuspended(rxRealtime, resolve)); + + // Send some messages after we've detached that should be recovered. + for (const name of channelNames) { + const tx = txRest.channels.get(name); + for (let i = 0; i < NUM_MSGS; i++) { + await tx.publish('sentWhileDisconnected', null); + } + } + + const rxRealtimeRecover = helper.AblyRealtime({ recover: recoveryKey }); + const rxRecoverChannels = channelNames.map((name) => rxRealtimeRecover.channels.get(name)); + + // Attach recovered channels and check we receive the expected messages. + await Promise.all( + rxRecoverChannels.map(async (channel) => { + await channel.attach(); + + await new Promise((resolve) => { + let recoveredCount = 0; + channel.subscribe((msg) => { + // Check we don't receive unexpected messages. + expect(msg.name).to.equal('sentWhileDisconnected'); + + recoveredCount++; + if (recoveredCount === NUM_MSGS) { + resolve(); + } + }); + }); + }) + ); + + // RTN16d: After recovery expect the connection ID to be the same but the + // key should have updated. + expect(rxRealtimeRecover.connection.id).to.equal(connectionId); + expect(rxRealtimeRecover.connection.key).to.not.equal(connectionKey); + + await rxRealtime.close(); + await rxRealtimeRecover.close(); + }); }); });