diff --git a/packages/core/src/Util/Retry.ts b/packages/core/src/Util/Retry.ts index 8b509036af4..c0c08247274 100644 --- a/packages/core/src/Util/Retry.ts +++ b/packages/core/src/Util/Retry.ts @@ -9,7 +9,7 @@ export class NonRetryableError extends Error { } } -const isNonRetryableError = (obj: any): obj is NonRetryableError => { +export const isNonRetryableError = (obj: any): obj is NonRetryableError => { const key: keyof NonRetryableError = 'nonRetryable'; return obj && obj[key]; }; diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index e0479427bc4..d17c2819db4 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -21,6 +21,7 @@ import { delay, FakeWebSocketInterface, replaceConstant } from './helpers'; import { ConnectionState as CS } from '../src'; import { AWSAppSyncRealTimeProvider } from '../src/Providers/AWSAppSyncRealTimeProvider'; +import { loggers } from 'winston'; describe('AWSAppSyncRealTimeProvider', () => { describe('isCustomDomain()', () => { @@ -79,8 +80,24 @@ describe('AWSAppSyncRealTimeProvider', () => { ); let provider: AWSAppSyncRealTimeProvider; + let reachabilityObserver: ZenObservable.Observer<{ online: boolean }>; beforeEach(async () => { + // Set the network to "online" for these tests + jest + .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }) + // Twice because we subscribe to get the initial state then again to monitor reachability + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }); + fakeWebSocketInterface = new FakeWebSocketInterface(); provider = new AWSAppSyncRealTimeProvider(); @@ -96,19 +113,14 @@ describe('AWSAppSyncRealTimeProvider', () => { Object.defineProperty(constants, 'MAX_DELAY_MS', { value: 100, }); - - // Set the network to "online" for these tests - const spyon = jest - .spyOn(Reachability.prototype, 'networkMonitor') - .mockImplementationOnce( - () => - new Observable(observer => { - observer.next?.({ online: true }); - }) - ); + // Reduce retry delay for tests to 100ms + Object.defineProperty(constants, 'RECONNECT_DELAY', { + value: 100, + }); }); afterEach(async () => { + provider?.close(); await fakeWebSocketInterface?.closeInterface(); fakeWebSocketInterface?.teardown(); loggerSpy.mockClear(); @@ -274,7 +286,7 @@ describe('AWSAppSyncRealTimeProvider', () => { ); }); - test('subscription fails when onclose triggered while waiting for onopen', async () => { + test('subscription disrupted triggering reconnect when onclose triggered while waiting for onopen', async () => { expect.assertions(1); provider @@ -287,7 +299,7 @@ describe('AWSAppSyncRealTimeProvider', () => { await fakeWebSocketInterface?.triggerClose(); await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - CS.Disconnected, + CS.ConnectionDisrupted, ]); // Watching for raised exception to be caught and logged expect(loggerSpy).toBeCalledWith( @@ -297,6 +309,68 @@ describe('AWSAppSyncRealTimeProvider', () => { message: expect.stringMatching('Connection handshake error'), }) ); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connecting, + ]); + }); + + test('subscription reconnects when onclose triggered while offline and waiting for onopen', async () => { + expect.assertions(1); + reachabilityObserver?.next?.({ online: false }); + + provider + .subscribe('test', { + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }) + .subscribe({ error: () => {} }); + reachabilityObserver?.next?.({ online: false }); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connecting, + ]); + await fakeWebSocketInterface?.readyForUse; + + await fakeWebSocketInterface?.triggerClose(); + + // Wait until the socket is disrupted pending network + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisruptedPendingNetwork, + ]); + + reachabilityObserver?.next?.({ online: true }); + + // Wait until the socket is disrupted + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisrupted, + ]); + + // Wait until we've started connecting the second time + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connecting, + ]); + + await fakeWebSocketInterface?.readyForUse; + + await fakeWebSocketInterface?.triggerOpen(); + + fakeWebSocketInterface?.handShakeMessage({ + connectionTimeoutMs: 100, + }); + + await fakeWebSocketInterface?.startAckMessage(); + + // Wait until the socket is automatically reconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); + + expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.ConnectionDisruptedPendingNetwork, + CS.ConnectionDisrupted, + CS.Connecting, + CS.Connected, + ]); }); test('subscription fails when onerror triggered while waiting for handshake', async () => { @@ -353,7 +427,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ + observer.subscribe({ // Succeed only when the first message comes through next: mockNext, // Closing a hot connection (for cleanup) makes it blow up the test stack @@ -376,22 +450,16 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ + observer.subscribe({ // Succeed only when the first message comes through next: mockNext, // Closing a hot connection (for cleanup) makes it blow up the test stack error: () => {}, }); await fakeWebSocketInterface?.standardConnectionHandshake(); - await fakeWebSocketInterface?.sendMessage( - new MessageEvent('start_ack', { - data: JSON.stringify({ - type: MESSAGE_TYPES.GQL_START_ACK, - payload: { connectionTimeoutMs: 100 }, - id: fakeWebSocketInterface?.webSocket.subscriptionId, - }), - }) - ); + await fakeWebSocketInterface?.startAckMessage({ + connectionTimeoutMs: 100, + }); await fakeWebSocketInterface?.sendDataMessage({ type: MESSAGE_TYPES.GQL_DATA, payload: { data: {} }, @@ -408,22 +476,16 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ + observer.subscribe({ // Succeed only when the first message comes through next: mockNext, // Closing a hot connection (for cleanup) makes it blow up the test stack error: () => {}, }); await fakeWebSocketInterface?.standardConnectionHandshake(); - await fakeWebSocketInterface?.sendMessage( - new MessageEvent('start_ack', { - data: JSON.stringify({ - type: MESSAGE_TYPES.GQL_START_ACK, - payload: { connectionTimeoutMs: 100 }, - id: fakeWebSocketInterface?.webSocket.subscriptionId, - }), - }) - ); + await fakeWebSocketInterface?.startAckMessage({ + connectionTimeoutMs: 100, + }); await fakeWebSocketInterface?.sendDataMessage({ type: MESSAGE_TYPES.GQL_DATA, payload: { data: {} }, @@ -431,18 +493,16 @@ describe('AWSAppSyncRealTimeProvider', () => { expect(mockNext).toBeCalled(); }); - test('subscription observer error is triggered when a connection is formed and a error data message is received', async () => { + test('subscription observer error is triggered when a connection is formed the error is logged and reconnect is triggered', async () => { // Test for error message path message receipt has nothing to assert (only passes when error triggers error subscription method) expect.assertions(1); - const mockError = jest.fn(); const observer = provider.subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ - // Succeed only when the first message comes through - error: mockError, + observer.subscribe({ + error: () => {}, }); await fakeWebSocketInterface?.standardConnectionHandshake(); @@ -450,11 +510,17 @@ describe('AWSAppSyncRealTimeProvider', () => { type: MESSAGE_TYPES.GQL_ERROR, payload: { data: {} }, }); - expect(mockError).toBeCalled(); + expect(loggerSpy).toBeCalledWith( + 'DEBUG', + 'Connection failed: {"data":{}}' + ); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connecting, + ]); }); - test('subscription observer error is triggered when a connection is formed and a non-retriable connection_error data message is received', async () => { - expect.assertions(2); + test('subscription observer error is triggered when a connection is formed and a non-retriable connection_error data message is received', async done => { + expect.assertions(3); const socketCloseSpy = jest.spyOn( fakeWebSocketInterface.webSocket, @@ -466,8 +532,13 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ - error: x => {}, + observer.subscribe({ + error: e => { + expect(e.errors[0].message).toEqual( + 'Connection failed: Non-retriable Test' + ); + done(); + }, }); await fakeWebSocketInterface?.readyForUse; @@ -507,7 +578,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ + observer.subscribe({ error: x => {}, }); @@ -519,34 +590,41 @@ describe('AWSAppSyncRealTimeProvider', () => { ); }); - test('subscription observer error is triggered when a connection is formed and a retriable connection_error data message is received', async () => { - expect.assertions(1); + test('subscription observer error is not triggered when a connection is formed and a retriable connection_error data message is received', async () => { + expect.assertions(2); const observer = provider.subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ + observer.subscribe({ error: x => {}, }); - await fakeWebSocketInterface?.readyForUse; - await fakeWebSocketInterface?.triggerOpen(); + const openSocketAttempt = async () => { + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); - // Resolve the message delivery actions - await Promise.resolve( - fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_CONNECTION_ERROR, - payload: { - errors: [ - { - errorType: 'Retriable Test', - errorCode: 408, // Request timed out - retriable - }, - ], - }, - }) - ); + // Resolve the message delivery actions + await Promise.resolve( + fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_CONNECTION_ERROR, + payload: { + errors: [ + { + errorType: 'Retriable Test', + errorCode: 408, // Request timed out - retriable + }, + ], + }, + }) + ); + await fakeWebSocketInterface?.resetWebsocket(); + }; + + // Go through two connection attempts to excercise backoff and retriable raise + await openSocketAttempt(); + await openSocketAttempt(); // Watching for raised exception to be caught and logged expect(loggerSpy).toBeCalledWith( @@ -556,6 +634,15 @@ describe('AWSAppSyncRealTimeProvider', () => { message: expect.stringMatching('Retriable Test'), }) ); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisrupted, + ]); + + expect(loggerSpy).toBeCalledWith( + 'DEBUG', + 'Connection failed: Retriable Test' + ); }); test('subscription observer error is triggered when a connection is formed and an ack data message is received then ka timeout prompts disconnect', async () => { @@ -565,7 +652,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ error: () => {} }); + observer.subscribe({ error: () => {} }); // Resolve the message delivery actions await replaceConstant( 'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT', @@ -573,29 +660,13 @@ describe('AWSAppSyncRealTimeProvider', () => { async () => { await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerOpen(); - await fakeWebSocketInterface?.sendMessage( - new MessageEvent('connection_ack', { - data: JSON.stringify({ - type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, - payload: { connectionTimeoutMs: 100 }, - }), - }) - ); + await fakeWebSocketInterface?.handShakeMessage({ + connectionTimeoutMs: 100, + }); - await fakeWebSocketInterface?.sendMessage( - new MessageEvent('start_ack', { - data: JSON.stringify({ - type: MESSAGE_TYPES.GQL_START_ACK, - payload: {}, - id: fakeWebSocketInterface?.webSocket.subscriptionId, - }), - }) - ); + await fakeWebSocketInterface?.startAckMessage(); - await fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, - payload: { data: {} }, - }); + await fakeWebSocketInterface?.keepAlive(); } ); @@ -618,6 +689,128 @@ describe('AWSAppSyncRealTimeProvider', () => { ); }); + test('subscription connection disruption triggers automatic reconnection', async () => { + expect.assertions(1); + + const observer = provider.subscribe('test', { + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + observer.subscribe({ error: () => {} }); + // Resolve the message delivery actions + + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + await fakeWebSocketInterface?.handShakeMessage({ + connectionTimeoutMs: 100, + }); + await fakeWebSocketInterface?.startAckMessage(); + await fakeWebSocketInterface.keepAlive(); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); + + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisrupted, + ]); + + await fakeWebSocketInterface?.triggerOpen(); + + await fakeWebSocketInterface?.handShakeMessage({ + connectionTimeoutMs: 100, + }); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connecting, + ]); + fakeWebSocketInterface?.startAckMessage(); + await fakeWebSocketInterface.keepAlive(); + + // Wait until the socket is automatically reconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); + + expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectionDisrupted, + CS.Connecting, + CS.Connected, + ]); + }); + + test('subscription connection disruption by network outage triggers automatic reconnection once network recovers', async () => { + expect.assertions(1); + + const observer = provider.subscribe('test', { + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + observer.subscribe({ error: () => {} }); + // Resolve the message delivery actions + + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + await fakeWebSocketInterface?.handShakeMessage({ + connectionTimeoutMs: 100, + }); + + await fakeWebSocketInterface?.startAckMessage(); + await fakeWebSocketInterface.keepAlive(); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); + + reachabilityObserver?.next?.({ online: false }); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectedPendingNetwork, + ]); + + fakeWebSocketInterface?.closeInterface(); + + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisruptedPendingNetwork, + ]); + + reachabilityObserver?.next?.({ online: true }); + + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisrupted, + ]); + + await fakeWebSocketInterface?.triggerOpen(); + await fakeWebSocketInterface?.handShakeMessage(); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connecting, + ]); + + await fakeWebSocketInterface?.startAckMessage(); + + // Wait until the socket is automatically reconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); + + expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingNetwork, + CS.ConnectionDisruptedPendingNetwork, + CS.ConnectionDisrupted, + CS.Connecting, + CS.Connected, + ]); + }); + test('socket is closed when subscription is closed', async () => { expect.assertions(1); @@ -646,13 +839,13 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ error: () => {} }); + observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.standardConnectionHandshake(); // Wait until the socket is automatically disconnected await fakeWebSocketInterface?.waitForConnectionState([ - CS.Disconnected, + CS.ConnectionDisrupted, ]); expect(loggerSpy).toBeCalledWith( @@ -670,7 +863,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ error: () => {} }); + observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.readyForUse; Promise.resolve(); @@ -708,7 +901,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ error: () => {} }); + observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.readyForUse; Promise.resolve(); @@ -739,7 +932,7 @@ describe('AWSAppSyncRealTimeProvider', () => { test('authenticating with API_KEY', async () => { expect.assertions(1); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'API_KEY', @@ -768,7 +961,7 @@ describe('AWSAppSyncRealTimeProvider', () => { }; }); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_IAM', @@ -799,23 +992,20 @@ describe('AWSAppSyncRealTimeProvider', () => { }; }); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_IAM', }) - .subscribe({ - error: e => { - expect(e).toEqual({ - errors: [ - { - message: - 'AppSync Realtime subscription init error: Error: No credentials', - }, - ], - }); - }, - }); + .subscribe({ error: () => {} }); + + // TODO Find a better way to give the catch stack time to resolve + await delay(10); + + expect(loggerSpy).toBeCalledWith( + 'DEBUG', + 'AppSync Realtime subscription init error: Error: No credentials' + ); }); test('authenticating with AWS_IAM with credentials exception', async () => { @@ -834,23 +1024,20 @@ describe('AWSAppSyncRealTimeProvider', () => { }; }); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_IAM', }) - .subscribe({ - error: e => { - expect(e).toEqual({ - errors: [ - { - message: - 'AppSync Realtime subscription init error: Error: No credentials', - }, - ], - }); - }, - }); + .subscribe({ error: () => {} }); + + // TODO Find a better way to give the catch stack time to resolve + await delay(10); + + expect(loggerSpy).toBeCalledWith( + 'DEBUG', + 'AppSync Realtime subscription init error: Error: No credentials' + ); // Wait until the socket is automatically disconnected await fakeWebSocketInterface?.waitUntilConnectionStateIn([ @@ -875,7 +1062,7 @@ describe('AWSAppSyncRealTimeProvider', () => { }); }); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'OPENID_CONNECT', @@ -893,7 +1080,7 @@ describe('AWSAppSyncRealTimeProvider', () => { test('authenticating with OPENID_CONNECT with empty token', async () => { expect.assertions(1); - const userSpy = jest + jest .spyOn(Auth, 'currentAuthenticatedUser') .mockImplementation(() => { return Promise.resolve({ @@ -901,37 +1088,32 @@ describe('AWSAppSyncRealTimeProvider', () => { }); }); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'OPENID_CONNECT', }) - .subscribe({ - error: e => { - expect(e).toEqual({ - errors: [ - { - message: - 'AppSync Realtime subscription init error: Error: No federated jwt', - }, - ], - }); - }, - }); + .subscribe({ error: () => {} }); + + // TODO Find a better way to give the catch stack time to resolve + await delay(10); + + expect(loggerSpy).toBeCalledWith( + 'DEBUG', + 'AppSync Realtime subscription init error: Error: No federated jwt' + ); }); test('authenticating with OPENID_CONNECT from cached token', async () => { expect.assertions(1); - const userSpy = jest - .spyOn(Cache, 'getItem') - .mockImplementation(() => { - return Promise.resolve({ - token: 'test', - }); + jest.spyOn(Cache, 'getItem').mockImplementation(() => { + return Promise.resolve({ + token: 'test', }); + }); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'OPENID_CONNECT', @@ -948,19 +1130,17 @@ describe('AWSAppSyncRealTimeProvider', () => { test('authenticating with AMAZON_COGNITO_USER_POOLS', async () => { expect.assertions(1); - const sessionSpy = jest - .spyOn(Auth, 'currentSession') - .mockImplementation(() => { - return Promise.resolve({ - getAccessToken: () => { - return { - getJwtToken: () => {}, - }; - }, - } as any); - }); + jest.spyOn(Auth, 'currentSession').mockImplementation(() => { + return Promise.resolve({ + getAccessToken: () => { + return { + getJwtToken: () => {}, + }; + }, + } as any); + }); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AMAZON_COGNITO_USER_POOLS', @@ -978,7 +1158,7 @@ describe('AWSAppSyncRealTimeProvider', () => { test('authenticating with AWS_LAMBDA', async () => { expect.assertions(1); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_LAMBDA', @@ -999,7 +1179,7 @@ describe('AWSAppSyncRealTimeProvider', () => { test('authenticating with AWS_LAMBDA without Authorization', async () => { expect.assertions(1); - const subscription = provider + provider .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_LAMBDA', @@ -1007,18 +1187,15 @@ describe('AWSAppSyncRealTimeProvider', () => { Authorization: '', }, }) - .subscribe({ - error: e => { - expect(e).toEqual({ - errors: [ - { - message: - 'AppSync Realtime subscription init error: Error: No auth token specified', - }, - ], - }); - }, - }); + .subscribe({ error: () => {} }); + + // TODO Find a better way to give the catch stack time to resolve + await delay(10); + + expect(loggerSpy).toBeCalledWith( + 'DEBUG', + 'AppSync Realtime subscription init error: Error: No auth token specified' + ); }); }); }); diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts index fc5d4f53650..4149a70b5e4 100644 --- a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -39,6 +39,12 @@ describe('ConnectionStateMonitor', () => { beforeEach(() => { const spyon = jest .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }) + // Twice because we subscribe to get the initial state then again to monitor reachability .mockImplementationOnce(() => { return new Observable(observer => { reachabilityObserver = observer; diff --git a/packages/pubsub/__tests__/PubSub-unit-test.ts b/packages/pubsub/__tests__/PubSub-unit-test.ts index c24f3a62646..3e9ea141719 100644 --- a/packages/pubsub/__tests__/PubSub-unit-test.ts +++ b/packages/pubsub/__tests__/PubSub-unit-test.ts @@ -16,7 +16,7 @@ import { AWSIoTProvider, mqttTopicMatch, } from '../src/Providers'; -// import Amplify from '../../src/'; + import { Credentials, Hub, @@ -25,13 +25,10 @@ import { Reachability, } from '@aws-amplify/core'; import * as Paho from 'paho-mqtt'; -import { - ConnectionState, - ConnectionState, - CONNECTION_STATE_CHANGE, -} from '../src'; +import { ConnectionState, CONNECTION_STATE_CHANGE } from '../src'; import { HubConnectionListener } from './helpers'; import Observable from 'zen-observable-ts'; +import * as constants from '../src/Providers/constants'; const pahoClientMockCache = {}; @@ -73,7 +70,8 @@ const credentials = { }; const testPubSubAsync = (pubsub, topic, message, options?) => - new Promise((resolve, reject) => { + new Promise(async (resolve, reject) => { + let hubConnectionListener = new HubConnectionListener('pubsub'); const obs = pubsub.subscribe(topic, options).subscribe({ next: data => { expect(data.value).toEqual(message); @@ -83,7 +81,9 @@ const testPubSubAsync = (pubsub, topic, message, options?) => close: () => console.log('close'), error: reject, }); - + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); pubsub.publish(topic, message, options); }); @@ -132,6 +132,11 @@ beforeEach(() => { res(credentials); }); }); + + // Reduce retry delay for tests to 100ms + Object.defineProperty(constants, 'RECONNECT_DELAY', { + value: 100, + }); }); afterEach(() => { @@ -179,6 +184,8 @@ describe('PubSub', () => { describe('AWSIoTProvider', () => { test('subscribe and publish to the same topic using AWSIoTProvider', async done => { + let hubConnectionListener = new HubConnectionListener('pubsub'); + const config = { PubSub: { aws_pubsub_region: 'region', @@ -206,6 +213,10 @@ describe('PubSub', () => { error: error => console.log('error', error), }); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); + await pubsub.publish('topicA', 'my message'); }); @@ -279,7 +290,8 @@ describe('PubSub', () => { }); }); - test('trigger observer error when disconnected', done => { + test('trigger reconnection when disconnected', async () => { + let hubConnectionListener = new HubConnectionListener('pubsub'); const pubsub = new PubSub(); const awsIotProvider = new AWSIoTProvider({ @@ -288,11 +300,26 @@ describe('PubSub', () => { }); pubsub.addPluggable(awsIotProvider); - pubsub.subscribe('topic', { clientId: '123' }).subscribe({ - error: () => done(), - }); + pubsub.subscribe('topic', { clientId: '123' }).subscribe({}); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); awsIotProvider.onDisconnect({ errorCode: 1, clientId: '123' }); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.ConnectionDisrupted, + ]); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); + expect(hubConnectionListener.observedConnectionStates).toEqual([ + ConnectionState.Disconnected, + ConnectionState.Connecting, + ConnectionState.Connected, + ConnectionState.ConnectionDisrupted, + ConnectionState.Connecting, + ConnectionState.Connected, + ]); }); test('should remove MqttOverWSProvider', () => { @@ -330,6 +357,13 @@ describe('PubSub', () => { // Setup a mock of the reachability monitor where the initial value is online. const spyon = jest .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce( + () => + new Observable(observer => { + reachabilityObserver = observer; + }) + ) + // Twice because we subscribe to get the initial state then again to monitor reachability .mockImplementationOnce( () => new Observable(observer => { @@ -352,18 +386,20 @@ describe('PubSub', () => { error: () => {}, }); - await hubConnectionListener.waitUntilConnectionStateIn(['Connected']); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); sub.unsubscribe(); awsIotProvider.onDisconnect({ errorCode: 1, clientId: '123' }); await hubConnectionListener.waitUntilConnectionStateIn([ - 'Disconnected', + ConnectionState.Disconnected, ]); expect(hubConnectionListener.observedConnectionStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingDisconnect', - 'Disconnected', + ConnectionState.Disconnected, + ConnectionState.Connecting, + ConnectionState.Connected, + ConnectionState.ConnectedPendingDisconnect, + ConnectionState.Disconnected, ]); }); @@ -380,22 +416,26 @@ describe('PubSub', () => { error: () => {}, }); - await hubConnectionListener.waitUntilConnectionStateIn(['Connected']); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); reachabilityObserver?.next?.({ online: false }); await hubConnectionListener.waitUntilConnectionStateIn([ - 'ConnectedPendingNetwork', + ConnectionState.ConnectedPendingNetwork, ]); reachabilityObserver?.next?.({ online: true }); - await hubConnectionListener.waitUntilConnectionStateIn(['Connected']); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); expect(hubConnectionListener.observedConnectionStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingNetwork', - 'Connected', + ConnectionState.Disconnected, + ConnectionState.Connecting, + ConnectionState.Connected, + ConnectionState.ConnectedPendingNetwork, + ConnectionState.Connected, ]); }); @@ -412,24 +452,26 @@ describe('PubSub', () => { error: () => {}, }); - await hubConnectionListener.waitUntilConnectionStateIn(['Connected']); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); reachabilityObserver?.next?.({ online: false }); await hubConnectionListener.waitUntilConnectionStateIn([ - 'ConnectedPendingNetwork', + ConnectionState.ConnectedPendingNetwork, ]); awsIotProvider.onDisconnect({ errorCode: 1, clientId: '123' }); await hubConnectionListener.waitUntilConnectionStateIn([ - 'Disconnected', + ConnectionState.ConnectionDisruptedPendingNetwork, ]); expect(hubConnectionListener.observedConnectionStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingNetwork', - 'Disconnected', + ConnectionState.Disconnected, + ConnectionState.Connecting, + ConnectionState.Connected, + ConnectionState.ConnectedPendingNetwork, + ConnectionState.ConnectionDisruptedPendingNetwork, ]); }); }); @@ -552,6 +594,7 @@ describe('PubSub', () => { }); test('On unsubscribe when is the last observer it should disconnect the websocket', async () => { + const hubConnectionListener = new HubConnectionListener('pubsub'); const pubsub = new PubSub(); const spyDisconnect = jest.spyOn( @@ -571,14 +614,16 @@ describe('PubSub', () => { error: error => console.log('error', error), }); - // TODO: we should now when the connection is established to wait for that first - await (() => { - return new Promise(res => { - setTimeout(res, 100); - }); - })(); + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Connected, + ]); subscription1.unsubscribe(); + + await hubConnectionListener.waitUntilConnectionStateIn([ + ConnectionState.Disconnected, + ]); + expect(spyDisconnect).toHaveBeenCalled(); spyDisconnect.mockClear(); }); diff --git a/packages/pubsub/__tests__/helpers.ts b/packages/pubsub/__tests__/helpers.ts index 54870614329..f916341d958 100644 --- a/packages/pubsub/__tests__/helpers.ts +++ b/packages/pubsub/__tests__/helpers.ts @@ -86,7 +86,7 @@ export class HubConnectionListener { } export class FakeWebSocketInterface { - readonly webSocket: FakeWebSocket; + webSocket: FakeWebSocket; readyForUse: Promise; hasClosed: Promise; hubConnectionListener: HubConnectionListener; @@ -95,6 +95,10 @@ export class FakeWebSocketInterface { constructor() { this.hubConnectionListener = new HubConnectionListener('api'); + this.resetWebsocket(); + } + + resetWebsocket() { this.readyForUse = new Promise((res, rej) => { this.readyResolve = res; }); @@ -128,6 +132,7 @@ export class FakeWebSocketInterface { await this.readyForUse; await this.triggerOpen(); await this.handShakeMessage(); + await this.keepAlive(); } /** @@ -159,13 +164,9 @@ export class FakeWebSocketInterface { */ async closeInterface() { await this.triggerClose(); - // Wait for either hasClosed or a half second has passed - await new Promise(async res => { - // The interface is closed when the socket "hasClosed" - this.hasClosed.then(() => res(undefined)); - await this.waitUntilConnectionStateIn([CS.Disconnected]); - res(undefined); - }); + + // Wait for the connection to be Disconnected + await this.waitUntilConnectionStateIn([CS.Disconnected]); } /** @@ -190,12 +191,38 @@ export class FakeWebSocketInterface { /** * Send a connection_ack */ - async handShakeMessage() { + async handShakeMessage(payload = { connectionTimeoutMs: 100_000 }) { await this.sendMessage( - new MessageEvent('connection_ack', { + new MessageEvent(constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, { data: JSON.stringify({ type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, - payload: { connectionTimeoutMs: 100_000 }, + payload: payload, + }), + }) + ); + } + + /** + * Send a connection_ack + */ + async keepAlive(payload = {}) { + await this.sendMessage( + new MessageEvent(constants.MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, { + data: JSON.stringify({ + type: constants.MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, + payload: payload, + }), + }) + ); + } + + async startAckMessage(payload = {}) { + await this.sendMessage( + new MessageEvent(constants.MESSAGE_TYPES.GQL_START_ACK, { + data: JSON.stringify({ + type: constants.MESSAGE_TYPES.GQL_START_ACK, + payload: payload, + id: this.webSocket.subscriptionId, }), }) ); @@ -226,7 +253,7 @@ export class FakeWebSocketInterface { } /** - * Run a gicommand and resolve to allow internal behavior to execute + * Run a command and resolve to allow internal behavior to execute */ async runAndResolve(fn) { fn(); diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index bc2149bcfb6..a7b3baffd49 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -26,11 +26,12 @@ import { jitteredExponentialRetry, NonRetryableError, ICredentials, + isNonRetryableError, } from '@aws-amplify/core'; import Cache from '@aws-amplify/cache'; import Auth, { GRAPHQL_AUTH_MODE } from '@aws-amplify/auth'; import { AbstractPubSubProvider } from '../PubSubProvider'; -import { CONTROL_MSG } from '../../types/PubSub'; +import { CONTROL_MSG, ConnectionState } from '../../types/PubSub'; import { AMPLIFY_SYMBOL, @@ -50,6 +51,10 @@ import { ConnectionStateMonitor, CONNECTION_CHANGE, } from '../../utils/ConnectionStateMonitor'; +import { + ReconnectEvent, + ReconnectionMonitor, +} from '../../utils/ReconnectionMonitor'; const logger = new Logger('AWSAppSyncRealTimeProvider'); @@ -99,23 +104,63 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { private keepAliveAlertTimeoutId?: ReturnType; private subscriptionObserverMap: Map = new Map(); private promiseArray: Array<{ res: Function; rej: Function }> = []; + private connectionState: ConnectionState; private readonly connectionStateMonitor = new ConnectionStateMonitor(); + private readonly reconnectionMonitor = new ReconnectionMonitor(); + private connectionStateMonitorSubscription: ZenObservable.Subscription; constructor(options: ProviderOptions = {}) { super(options); // Monitor the connection state and pass changes along to Hub - this.connectionStateMonitor.connectionStateObservable.subscribe( - ConnectionState => { - dispatchApiEvent( - CONNECTION_STATE_CHANGE, - { - provider: this, - connectionState: ConnectionState, - }, - `Connection state is ${ConnectionState}` - ); - } - ); + this.connectionStateMonitorSubscription = + this.connectionStateMonitor.connectionStateObservable.subscribe( + connectionState => { + dispatchApiEvent( + CONNECTION_STATE_CHANGE, + { + provider: this, + connectionState, + }, + `Connection state is ${connectionState}` + ); + this.connectionState = connectionState; + + // Trigger START_RECONNECT when the connection is disrupted + if (connectionState === ConnectionState.ConnectionDisrupted) { + this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT); + } + + // Trigger HALT_RECONNECT to halt reconnection attempts when the state is anything other than + // ConnectionDisrupted or Connecting + if ( + [ + ConnectionState.Connected, + ConnectionState.ConnectedPendingDisconnect, + ConnectionState.ConnectedPendingKeepAlive, + ConnectionState.ConnectedPendingNetwork, + ConnectionState.ConnectedPendingNetwork, + ConnectionState.ConnectionDisruptedPendingNetwork, + ConnectionState.Disconnected, + ].includes(connectionState) + ) { + this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT); + } + } + ); + } + + /** + * Mark the socket closed and release all active listeners + */ + close() { + // Mark the socket closed both in status and the connection monitor + this.socketStatus = SOCKET_STATUS.CLOSED; + this.connectionStateMonitor.record(CONNECTION_CHANGE.CONNECTION_FAILED); + + // Turn off the subscription monitor Hub publishing + this.connectionStateMonitorSubscription.unsubscribe(); + // Complete all reconnect observers + this.reconnectionMonitor.close(); } getNewWebSocket(url, protocol) { @@ -158,26 +203,44 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }); observer.complete(); } else { + let subscriptionStartActive = false; const subscriptionId = uuid(); - this._startSubscriptionWithAWSAppSyncRealTime({ - options, - observer, - subscriptionId, - }).catch(err => { - observer.error({ - errors: [ - { - ...new GraphQLError( + const startSubscription = () => { + if (!subscriptionStartActive) { + subscriptionStartActive = true; + const startSubscriptionPromise = + this._startSubscriptionWithAWSAppSyncRealTime({ + options, + observer, + subscriptionId, + }).catch(err => { + logger.debug( `${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}` - ), - }, - ], - }); - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - observer.complete(); + ); + + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + }); + startSubscriptionPromise.finally(() => { + subscriptionStartActive = false; + }); + } + }; + + let reconnectSubscription: ZenObservable.Subscription; + + // Add an observable to the reconnection list to manage reconnection for this subscription + reconnectSubscription = new Observable(observer => { + this.reconnectionMonitor.addObserver(observer); + }).subscribe(() => { + startSubscription(); }); + startSubscription(); + return async () => { + // Cleanup reconnection subscription + reconnectSubscription?.unsubscribe(); + // Cleanup after unsubscribing or observer.complete was called after _startSubscriptionWithAWSAppSyncRealTime try { // Waiting that subscription has been connected before trying to unsubscribe @@ -288,24 +351,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { additionalHeaders, }); } catch (err) { - logger.debug({ err }); - const message = err['message'] ?? ''; - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - observer.error({ - errors: [ - { - ...new GraphQLError(`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`), - }, - ], - }); - observer.complete(); - const { subscriptionFailedCallback } = - this.subscriptionObserverMap.get(subscriptionId) || {}; - - // Notify concurrent unsubscription - if (typeof subscriptionFailedCallback === 'function') { - subscriptionFailedCallback(); - } + this._logStartSubscriptionError(subscriptionId, observer, err); return; } @@ -333,6 +379,44 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { } } + // Log logic for start subscription failures + private _logStartSubscriptionError(subscriptionId, observer, err) { + logger.debug({ err }); + const message = err['message'] ?? ''; + // Resolving to give the state observer time to propogate the update + Promise.resolve( + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED) + ); + + // Capture the error only when the network didn't cause disruption + if ( + this.connectionState !== ConnectionState.ConnectionDisruptedPendingNetwork + ) { + // When the error is non-retriable, error out the observable + if (isNonRetryableError(err)) { + observer.error({ + errors: [ + { + ...new GraphQLError( + `${CONTROL_MSG.CONNECTION_FAILED}: ${message}` + ), + }, + ], + }); + } else { + logger.debug(`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`); + } + + const { subscriptionFailedCallback } = + this.subscriptionObserverMap.get(subscriptionId) || {}; + + // Notify concurrent unsubscription + if (typeof subscriptionFailedCallback === 'function') { + subscriptionFailedCallback(); + } + } + } + // Waiting that subscription has been connected before trying to unsubscribe private async _waitForSubscriptionToBeConnected(subscriptionId: string) { const subscriptionObserver = @@ -505,18 +589,12 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { subscriptionState, }); - observer.error({ - errors: [ - { - ...new GraphQLError( - `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}` - ), - }, - ], - }); + logger.debug( + `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}` + ); + if (startAckTimeoutId) clearTimeout(startAckTimeoutId); - observer.complete(); if (typeof subscriptionFailedCallback === 'function') { subscriptionFailedCallback(); } @@ -526,14 +604,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { private _errorDisconnect(msg: string) { logger.debug(`Disconnect error: ${msg}`); - this.subscriptionObserverMap.forEach(({ observer }) => { - if (observer && !observer.closed) { - observer.error({ - errors: [{ ...new GraphQLError(msg) }], - }); - } - }); - this.subscriptionObserverMap.clear(); + if (this.awsRealTimeSocket) { this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); this.awsRealTimeSocket.close(); @@ -557,22 +628,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { subscriptionState: SUBSCRIPTION_STATUS.FAILED, }); - if (observer && !observer.closed) { - observer.error({ - errors: [ - { - ...new GraphQLError( - `Subscription timeout ${JSON.stringify({ - query, - variables, - })}` - ), - }, - ], - }); - // Cleanup will be automatically executed - observer.complete(); - } + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); logger.debug( 'timeoutStartSubscription', JSON.stringify({ query, variables }) @@ -641,6 +697,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.socketStatus = SOCKET_STATUS.READY; this.promiseArray = []; } catch (err) { + logger.debug('Connection exited with', err); this.promiseArray.forEach(({ rej }) => rej(err)); this.promiseArray = []; if ( @@ -677,9 +734,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { logger.debug(`WebSocket connection error`); }; newSocket.onclose = () => { - this.connectionStateMonitor.record( - CONNECTION_CHANGE.CONNECTION_FAILED - ); rej(new Error('Connection handshake error')); }; newSocket.onopen = () => { @@ -688,7 +742,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }; }); })(); - // Step 2: wait for ack from AWS AppSyncReaTime after sending init await (() => { return new Promise((res, rej) => { diff --git a/packages/pubsub/src/Providers/MqttOverWSProvider.ts b/packages/pubsub/src/Providers/MqttOverWSProvider.ts index e6c2492f99e..805b8917559 100644 --- a/packages/pubsub/src/Providers/MqttOverWSProvider.ts +++ b/packages/pubsub/src/Providers/MqttOverWSProvider.ts @@ -12,16 +12,20 @@ */ import * as Paho from 'paho-mqtt'; import { v4 as uuid } from 'uuid'; -import Observable from 'zen-observable-ts'; +import Observable, { ZenObservable } from 'zen-observable-ts'; import { AbstractPubSubProvider } from './PubSubProvider'; -import { SubscriptionObserver } from '../types/PubSub'; +import { SubscriptionObserver, ConnectionState } from '../types/PubSub'; import { ProviderOptions } from '../types/Provider'; import { ConsoleLogger as Logger, Hub } from '@aws-amplify/core'; import { ConnectionStateMonitor, CONNECTION_CHANGE, } from '../utils/ConnectionStateMonitor'; +import { + ReconnectEvent, + ReconnectionMonitor, +} from '../utils/ReconnectionMonitor'; import { AMPLIFY_SYMBOL, CONNECTION_STATE_CHANGE } from './constants'; const logger = new Logger('MqttOverWSProvider'); @@ -55,17 +59,15 @@ class ClientsQueue { async get(clientId: string, clientFactory?: (input: string) => Promise) { const cachedPromise = this.promises.get(clientId); - if (cachedPromise) { - return cachedPromise; - } + if (cachedPromise) return cachedPromise; if (clientFactory) { const newPromise = clientFactory(clientId); - this.promises.set(clientId, newPromise); - + newPromise.catch(v => this.promises.delete(clientId)); return newPromise; } + return undefined; } @@ -86,7 +88,9 @@ const topicSymbol = typeof Symbol !== 'undefined' ? Symbol('topic') : '@@topic'; export class MqttOverWSProvider extends AbstractPubSubProvider { private _clientsQueue = new ClientsQueue(); + private connectionState: ConnectionState; private readonly connectionStateMonitor = new ConnectionStateMonitor(); + private readonly reconnectionMonitor = new ReconnectionMonitor(); constructor(options: MqttProviderOptions = {}) { super({ ...options, clientId: options.clientId || uuid() }); @@ -102,6 +106,16 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { }, `Connection state is ${connectionStateChange}` ); + + this.connectionState = connectionStateChange; + + // Trigger reconnection when the connection is disrupted + if (connectionStateChange === ConnectionState.ConnectionDisrupted) { + this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT); + } else if (connectionStateChange !== ConnectionState.Connecting) { + // Trigger connected to halt reconnection attempts + this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT); + } } ); } @@ -142,7 +156,6 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { if (errorCode !== 0) { logger.warn(clientId, JSON.stringify({ errorCode, ...args }, null, 2)); - const topicsToDelete: string[] = []; if (!clientId) { return; } @@ -150,24 +163,7 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { if (!clientIdObservers) { return; } - clientIdObservers.forEach(observer => { - observer.error('Disconnected, error code: ' + errorCode); - // removing observers for disconnected clientId - this._topicObservers.forEach((observerForTopic, observerTopic) => { - observerForTopic.delete(observer); - if (observerForTopic.size === 0) { - topicsToDelete.push(observerTopic); - } - }); - }); - - // forgiving any trace of clientId - this._clientIdObservers.delete(clientId); - - // Removing topics that are not listen by an observer - topicsToDelete.forEach(topic => { - this._topicObservers.delete(topic); - }); + this.disconnect(clientId); } } @@ -177,7 +173,7 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); // @ts-ignore const client = new Paho.Client(url, clientId); - // client.trace = (args) => logger.debug(clientId, JSON.stringify(args, null, 2)); + client.onMessageArrived = ({ destinationName: topic, payloadString: msg, @@ -197,23 +193,24 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); }; - await new Promise((resolve, reject) => { + const connected = await new Promise((resolve, reject) => { client.connect({ useSSL: this.isSSLEnabled, mqttVersion: 3, - onSuccess: () => resolve(client), - onFailure: () => { - reject(); - this.connectionStateMonitor.record( - CONNECTION_CHANGE.CONNECTION_FAILED - ); + onSuccess: () => resolve(true), + onFailure: x => { + if (clientId) this._clientsQueue.remove(clientId); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + resolve(false); }, }); }); - this.connectionStateMonitor.record( - CONNECTION_CHANGE.CONNECTION_ESTABLISHED - ); + if (connected) { + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_ESTABLISHED + ); + } return client; } @@ -222,9 +219,19 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { clientId: string, options: MqttProviderOptions = {} ): Promise { - return await this.clientsQueue.get(clientId, clientId => - this.newClient({ ...options, clientId }) - ); + return await this.clientsQueue.get(clientId, async clientId => { + const client = await this.newClient({ ...options, clientId }); + + if (client) { + // Once connected, subscribe to all topics registered observers + this._topicObservers.forEach( + (_value: Set>, key: string) => { + client.subscribe(key); + } + ); + } + return client; + }); } protected async disconnect(clientId: string): Promise { @@ -232,21 +239,27 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { if (client && client.isConnected()) { client.disconnect(); - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); } this.clientsQueue.remove(clientId); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); } async publish(topics: string[] | string, msg: any) { const targetTopics = ([] as string[]).concat(topics); const message = JSON.stringify(msg); - const url = await this.endpoint; - - const client = await this.connect(this.clientId, { url }); - - logger.debug('Publishing to topic(s)', targetTopics.join(','), message); - targetTopics.forEach(topic => client.send(topic, message)); + const client = await this.clientsQueue.get(this.clientId); + + if (client) { + logger.debug('Publishing to topic(s)', targetTopics.join(','), message); + targetTopics.forEach(topic => client.send(topic, message)); + } else { + logger.debug( + 'Publishing to topic(s) failed', + targetTopics.join(','), + message + ); + } } protected _topicObservers: Map>> = @@ -283,6 +296,7 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { ): Observable { const targetTopics = ([] as string[]).concat(topics); logger.debug('Subscribing to topic(s)', targetTopics.join(',')); + let reconnectSubscription: ZenObservable.Subscription; return new Observable(observer => { targetTopics.forEach(topic => { @@ -298,8 +312,6 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { observersForTopic.add(observer); }); - // @ts-ignore - let client: Paho.Client; const { clientId = this.clientId } = options; // this._clientIdObservers is used to close observers when client gets disconnected @@ -311,30 +323,44 @@ export class MqttOverWSProvider extends AbstractPubSubProvider { this._clientIdObservers.set(clientId, observersForClientId); (async () => { - const { url = await this.endpoint } = options; + const getClient = async () => { + try { + const { url = await this.endpoint } = options; + const client = await this.connect(clientId, { url }); + if (client !== undefined) { + targetTopics.forEach(topic => { + client.subscribe(topic); + }); + } + } catch (e) { + logger.debug('Error forming connection', e); + } + }; - try { - client = await this.connect(clientId, { url }); - targetTopics.forEach(topic => { - client.subscribe(topic); - }); - } catch (e) { - observer.error(e); - } + // Establish the initial connection + await getClient(); + + // Add an observable to the reconnection list to manage reconnection for this subscription + reconnectSubscription = new Observable(observer => { + this.reconnectionMonitor.addObserver(observer); + }).subscribe(() => { + getClient(); + }); })(); - return () => { - logger.debug('Unsubscribing from topic(s)', targetTopics.join(',')); + return async () => { + const client = await this.clientsQueue.get(clientId); + + reconnectSubscription?.unsubscribe(); if (client) { this._clientIdObservers.get(clientId)?.delete(observer); // No more observers per client => client not needed anymore if (this._clientIdObservers.get(clientId)?.size === 0) { + this.disconnect(clientId); this.connectionStateMonitor.record( CONNECTION_CHANGE.CLOSING_CONNECTION ); - - this.disconnect(clientId); this._clientIdObservers.delete(clientId); } diff --git a/packages/pubsub/src/Providers/constants.ts b/packages/pubsub/src/Providers/constants.ts index 775e6581eb6..238c583dd18 100644 --- a/packages/pubsub/src/Providers/constants.ts +++ b/packages/pubsub/src/Providers/constants.ts @@ -100,3 +100,13 @@ export const DEFAULT_KEEP_ALIVE_TIMEOUT = 5 * 60 * 1000; * Default Time in milleseconds to alert for missed GQL_CONNECTION_KEEP_ALIVE message */ export const DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT = 65 * 1000; + +/** + * Default delay time in milleseconds between when reconnect is triggered vs when it is attempted + */ +export const RECONNECT_DELAY = 5 * 1000; + +/** + * Default interval time in milleseconds between when reconnect is re-attempted + */ +export const RECONNECT_INTERVAL = 60 * 1000; diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index 678798acdbb..904f82998ef 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -63,6 +63,7 @@ export class ConnectionStateMonitor { private _linkedConnectionStateObservable: Observable; private _linkedConnectionStateObserver: ZenObservable.SubscriptionObserver; private _networkMonitoringSubscription?: ZenObservable.Subscription; + private _initialNetworkStateSubscription?: ZenObservable.Subscription; constructor() { this._networkMonitoringSubscription = undefined; @@ -73,6 +74,16 @@ export class ConnectionStateMonitor { keepAliveState: 'healthy', }; + // Attempt to update the state with the current actual network state + this._initialNetworkStateSubscription = ReachabilityMonitor().subscribe( + ({ online }) => { + this.record( + online ? CONNECTION_CHANGE.ONLINE : CONNECTION_CHANGE.OFFLINE + ); + this._initialNetworkStateSubscription?.unsubscribe(); + } + ); + this._linkedConnectionStateObservable = new Observable(connectionStateObserver => { connectionStateObserver.next(this._linkedConnectionState); @@ -84,6 +95,9 @@ export class ConnectionStateMonitor { * Turn network state monitoring on if it isn't on already */ private enableNetworkMonitoring() { + // If no initial network state was discovered, stop trying + this._initialNetworkStateSubscription?.unsubscribe(); + // Maintain the network state based on the reachability monitor if (this._networkMonitoringSubscription === undefined) { this._networkMonitoringSubscription = ReachabilityMonitor().subscribe( diff --git a/packages/pubsub/src/utils/ReconnectionMonitor.ts b/packages/pubsub/src/utils/ReconnectionMonitor.ts new file mode 100644 index 00000000000..662d7826754 --- /dev/null +++ b/packages/pubsub/src/utils/ReconnectionMonitor.ts @@ -0,0 +1,74 @@ +import { Observer } from 'zen-observable-ts'; +import { RECONNECT_DELAY, RECONNECT_INTERVAL } from '../Providers/constants'; + +export enum ReconnectEvent { + START_RECONNECT = 'START_RECONNECT', + HALT_RECONNECT = 'HALT_RECONNECT', +} + +/** + * Captures the reconnect event logic used to determine when to reconnect to PubSub providers. + * Reconnnect attempts are delayed by 5 seconds to let the interface settle. + * Attempting to reconnect only once creates unrecoverable states when the network state isn't + * supported by the browser, so this keeps retrying every minute until halted. + */ +export class ReconnectionMonitor { + private reconnectObservers: Observer[] = []; + private reconnectIntervalId?: ReturnType; + private reconnectSetTimeoutId?: ReturnType; + + /** + * Add reconnect observer to the list of observers to alert on reconnect + */ + addObserver(reconnectObserver: Observer) { + this.reconnectObservers.push(reconnectObserver); + } + + /** + * Given a reconnect event, start the appropriate behavior + */ + record(event: ReconnectEvent) { + if (event === ReconnectEvent.START_RECONNECT) { + // If the reconnection hasn't been started + if ( + this.reconnectSetTimeoutId === undefined && + this.reconnectIntervalId === undefined + ) { + this.reconnectSetTimeoutId = setTimeout(() => { + // Reconnect now + this._triggerReconnect(); + // Retry reconnect every periodically until it works + this.reconnectIntervalId = setInterval(() => { + this._triggerReconnect(); + }, RECONNECT_INTERVAL); + }, RECONNECT_DELAY); + } + } + + if (event === ReconnectEvent.HALT_RECONNECT) { + if (this.reconnectIntervalId) { + clearInterval(this.reconnectIntervalId); + this.reconnectIntervalId = undefined; + } + if (this.reconnectSetTimeoutId) { + clearTimeout(this.reconnectSetTimeoutId); + this.reconnectSetTimeoutId = undefined; + } + } + } + + /** + * Complete all reconnect observers + */ + close() { + this.reconnectObservers.forEach(reconnectObserver => { + reconnectObserver.complete?.(); + }); + } + + private _triggerReconnect() { + this.reconnectObservers.forEach(reconnectObserver => { + reconnectObserver.next?.(); + }); + } +}