Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): Automated reconnect for IoT and API #10235

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5a7eb71
chore: Rebase onto latest change
stocaaro Aug 19, 2022
8df7264
fix: Cleaning up PR
stocaaro Aug 19, 2022
908254e
fix: More cleaning up PR
stocaaro Aug 19, 2022
dcbbfbc
feat: Add reconnection delay. Factor out reconnect event management f…
stocaaro Aug 31, 2022
56518e0
feat: When loaded offline, the reconnect logic triggers once the netw…
stocaaro Aug 31, 2022
a920333
Fix comment accuracy
stocaaro Aug 31, 2022
fe6d33d
Comment for clarity
stocaaro Aug 31, 2022
e52c180
Rolling back unnecessary string casting
stocaaro Aug 31, 2022
85e0725
Moving the initial startSubscription to after the reconnection setup
stocaaro Aug 31, 2022
9dfe416
Merge branch 'next-major-version/5' into GH-9824/pubsub/reconnect-pr
stocaaro Aug 31, 2022
fe7cd5e
Merge branch 'GH-9824/pubsub/reconnect-pr' of github.com:stocaaro/amp…
stocaaro Aug 31, 2022
fe2ead6
Fix LGTM issues
stocaaro Aug 31, 2022
f24b276
feat: Add retry interval around reconnect and remove observer error c…
stocaaro Sep 9, 2022
8953703
fix: Stop reconnecting unless in Connecting or ConnectionDisrupted
stocaaro Sep 9, 2022
4db1fd7
fix: Remove unused variable
stocaaro Sep 9, 2022
e41f1e3
fix: Missed variable rename
stocaaro Sep 9, 2022
b3792cc
fix: Tests broken in earlier change
stocaaro Sep 9, 2022
98d3875
fix: LGTM warnings
stocaaro Sep 10, 2022
b3ca0e8
Merge branch 'next-major-version/5' into GH-9824/pubsub/reconnect-pr
stocaaro Sep 13, 2022
86f55ee
fix(pubsub): Duplicate import
stocaaro Sep 13, 2022
1a605e9
fix: Reconnect monitor cleanup bug and other comments
stocaaro Sep 14, 2022
a44e9f3
fix: Error out subscriptions for non-retriable errors such as 400-403
stocaaro Sep 14, 2022
9d4259a
refactor: Changes and comments from PR feedback
stocaaro Sep 22, 2022
6bc69dc
Merge branch 'next-major-version/5' into GH-9824/pubsub/reconnect-pr
stocaaro Sep 23, 2022
e6bae74
fix: Add imports missed by merge
stocaaro Sep 23, 2022
3c49956
Fix
stocaaro Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 137 additions & 50 deletions packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,18 @@ describe('AWSAppSyncRealTimeProvider', () => {
);

let provider: AWSAppSyncRealTimeProvider;
let reachabilityObserver: ZenObservable.Observer<{ online: boolean }>;

beforeEach(async () => {
// Set the network to "online" for these tests
jest
.spyOn(Reachability.prototype, 'networkMonitor')
.mockImplementationOnce(() => {
return new Observable(observer => {
reachabilityObserver = observer;
});
});

fakeWebSocketInterface = new FakeWebSocketInterface();
provider = new AWSAppSyncRealTimeProvider();

Expand All @@ -96,19 +106,10 @@ describe('AWSAppSyncRealTimeProvider', () => {
Object.defineProperty(constants, 'MAX_DELAY_MS', {
value: 100,
});

// Set the network to "online" for these tests
const spyon = jest
.spyOn(Reachability.prototype, 'networkMonitor')
.mockImplementationOnce(
() =>
new Observable(observer => {
observer.next?.({ online: true });
})
);
});

afterEach(async () => {
provider?.close();
await fakeWebSocketInterface?.closeInterface();
fakeWebSocketInterface?.teardown();
loggerSpy.mockClear();
Expand Down Expand Up @@ -383,15 +384,9 @@ describe('AWSAppSyncRealTimeProvider', () => {
error: () => {},
});
await fakeWebSocketInterface?.standardConnectionHandshake();
await fakeWebSocketInterface?.sendMessage(
new MessageEvent('start_ack', {
data: JSON.stringify({
type: MESSAGE_TYPES.GQL_START_ACK,
payload: { connectionTimeoutMs: 100 },
id: fakeWebSocketInterface?.webSocket.subscriptionId,
}),
})
);
await fakeWebSocketInterface?.startAckMessage({
connectionTimeoutMs: 100,
});
await fakeWebSocketInterface?.sendDataMessage({
type: MESSAGE_TYPES.GQL_DATA,
payload: { data: {} },
Expand All @@ -415,15 +410,9 @@ describe('AWSAppSyncRealTimeProvider', () => {
error: () => {},
});
await fakeWebSocketInterface?.standardConnectionHandshake();
await fakeWebSocketInterface?.sendMessage(
new MessageEvent('start_ack', {
data: JSON.stringify({
type: MESSAGE_TYPES.GQL_START_ACK,
payload: { connectionTimeoutMs: 100 },
id: fakeWebSocketInterface?.webSocket.subscriptionId,
}),
})
);
await fakeWebSocketInterface?.startAckMessage({
connectionTimeoutMs: 100,
});
await fakeWebSocketInterface?.sendDataMessage({
type: MESSAGE_TYPES.GQL_DATA,
payload: { data: {} },
Expand Down Expand Up @@ -573,29 +562,13 @@ describe('AWSAppSyncRealTimeProvider', () => {
async () => {
await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.sendMessage(
new MessageEvent('connection_ack', {
data: JSON.stringify({
type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK,
payload: { connectionTimeoutMs: 100 },
}),
})
);

await fakeWebSocketInterface?.sendMessage(
new MessageEvent('start_ack', {
data: JSON.stringify({
type: MESSAGE_TYPES.GQL_START_ACK,
payload: {},
id: fakeWebSocketInterface?.webSocket.subscriptionId,
}),
})
);

await fakeWebSocketInterface?.sendDataMessage({
type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE,
payload: { data: {} },
await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});

await fakeWebSocketInterface?.startAckMessage();

await fakeWebSocketInterface?.keepAlive();
}
);

Expand All @@ -618,6 +591,120 @@ describe('AWSAppSyncRealTimeProvider', () => {
);
});

test('subscription connection disruption triggers automatic reconnection', async () => {
expect.assertions(1);

const observer = provider.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
});

const subscription = observer.subscribe({ error: () => {} });
// Resolve the message delivery actions

await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});
await fakeWebSocketInterface?.startAckMessage();
await fakeWebSocketInterface.keepAlive();

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisrupted,
]);

await fakeWebSocketInterface?.triggerOpen();

await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});
fakeWebSocketInterface?.startAckMessage();
await fakeWebSocketInterface.keepAlive();

// Wait until the socket is automatically reconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([
CS.Disconnected,
CS.Connecting,
CS.Connected,
CS.ConnectionDisrupted,
CS.Connecting,
CS.Connected,
]);
});

test('subscription connection disruption by network outage triggers automatic reconnection once network recovers', async () => {
expect.assertions(1);

const observer = provider.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
});

const subscription = observer.subscribe({ error: () => {} });
// Resolve the message delivery actions

await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});
await fakeWebSocketInterface?.startAckMessage();
await fakeWebSocketInterface.keepAlive();

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

reachabilityObserver?.next?.({ online: false });

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectedPendingNetwork,
]);

fakeWebSocketInterface?.closeInterface();

// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisruptedPendingNetwork,
]);

reachabilityObserver?.next?.({ online: true });

// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisrupted,
]);

await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.handShakeMessage();

await fakeWebSocketInterface?.startAckMessage();

// Wait until the socket is automatically reconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([
CS.Disconnected,
CS.Connecting,
CS.Connected,
CS.ConnectedPendingNetwork,
CS.ConnectionDisruptedPendingNetwork,
CS.ConnectionDisrupted,
CS.Connecting,
CS.Connected,
]);
});

test('socket is closed when subscription is closed', async () => {
expect.assertions(1);

Expand Down
Loading