Skip to content

Commit

Permalink
fix: Reconnect monitor cleanup bug and other comments
Browse files Browse the repository at this point in the history
  • Loading branch information
stocaaro committed Sep 14, 2022
1 parent 86f55ee commit 1a605e9
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 19 deletions.
10 changes: 5 additions & 5 deletions packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ describe('AWSAppSyncRealTimeProvider', () => {
payload: { data: {} },
});
expect(loggerSpy).toBeCalledWith(
'ERROR',
'DEBUG',
'Connection failed: {"data":{}}'
);
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
Expand Down Expand Up @@ -981,7 +981,7 @@ describe('AWSAppSyncRealTimeProvider', () => {
await delay(10);

expect(loggerSpy).toBeCalledWith(
'ERROR',
'DEBUG',
'AppSync Realtime subscription init error: Error: No credentials'
);
});
Expand Down Expand Up @@ -1013,7 +1013,7 @@ describe('AWSAppSyncRealTimeProvider', () => {
await delay(10);

expect(loggerSpy).toBeCalledWith(
'ERROR',
'DEBUG',
'AppSync Realtime subscription init error: Error: No credentials'
);

Expand Down Expand Up @@ -1077,7 +1077,7 @@ describe('AWSAppSyncRealTimeProvider', () => {
await delay(10);

expect(loggerSpy).toBeCalledWith(
'ERROR',
'DEBUG',
'AppSync Realtime subscription init error: Error: No federated jwt'
);
});
Expand Down Expand Up @@ -1171,7 +1171,7 @@ describe('AWSAppSyncRealTimeProvider', () => {
await delay(10);

expect(loggerSpy).toBeCalledWith(
'ERROR',
'DEBUG',
'AppSync Realtime subscription init error: Error: No auth token specified'
);
});
Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub/__tests__/PubSub-unit-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
AWSIoTProvider,
mqttTopicMatch,
} from '../src/Providers';
// import Amplify from '../../src/';

import {
Credentials,
Hub,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {

// Trigger reconnection when the connection is disrupted
if (connectionState === ConnectionState.ConnectionDisrupted) {
this.reconnectionMonitor.record(ReconnectEvent.RECONNECT);
this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT);
} else if (connectionState !== ConnectionState.Connecting) {
// Trigger connected to halt reconnection attempts
this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT);
Expand Down Expand Up @@ -198,7 +198,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
observer,
subscriptionId,
}).catch<any>(err => {
logger.error(
logger.debug(
`${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`
);

Expand Down Expand Up @@ -346,7 +346,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
this.connectionState !==
ConnectionState.ConnectionDisruptedPendingNetwork
) {
logger.error(`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`);
logger.debug(`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);

const { subscriptionFailedCallback } =
Expand Down Expand Up @@ -556,7 +556,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
subscriptionState,
});

logger.error(
logger.debug(
`${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`
);

Expand Down Expand Up @@ -664,7 +664,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
this.socketStatus = SOCKET_STATUS.READY;
this.promiseArray = [];
} catch (err) {
logger.error(err);
logger.debug('Notifying connection exited with', err);
this.promiseArray.forEach(({ rej }) => rej(err));
this.promiseArray = [];
if (
Expand Down
8 changes: 3 additions & 5 deletions packages/pubsub/src/Providers/MqttOverWSProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export interface MqttProviderOptions extends ProviderOptions {
export type MqttProvidertOptions = MqttProviderOptions;

class ClientsQueue {
private promises: Map<string, Promise<any> | any> = new Map();
private promises: Map<string, Promise<any>> = new Map();

async get(clientId: string, clientFactory?: (input: string) => Promise<any>) {
const cachedPromise = this.promises.get(clientId);
Expand All @@ -67,9 +67,7 @@ class ClientsQueue {
if (clientFactory) {
const newPromise = clientFactory(clientId);
this.promises.set(clientId, newPromise);
newPromise.then(v => this.promises.set(clientId, v));
newPromise.catch(v => this.promises.delete(clientId));
newPromise.finally();
return newPromise;
}

Expand Down Expand Up @@ -116,7 +114,7 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {

// Trigger reconnection when the connection is disrupted
if (connectionStateChange === ConnectionState.ConnectionDisrupted) {
this.reconnectionMonitor.record(ReconnectEvent.RECONNECT);
this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT);
} else if (connectionStateChange !== ConnectionState.Connecting) {
// Trigger connected to halt reconnection attempts
this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT);
Expand Down Expand Up @@ -338,7 +336,7 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {
});
}
} catch (e) {
logger.error('Error forming connection', e);
logger.debug('Error forming connection', e);
}
};

Expand Down
20 changes: 17 additions & 3 deletions packages/pubsub/src/utils/ReconnectionMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,35 @@ import { Observer } from 'zen-observable-ts';
import { RECONNECT_DELAY, RECONNECT_INTERVAL } from '../Providers/constants';

export enum ReconnectEvent {
RECONNECT = 'RECONNECT',
START_RECONNECT = 'START_RECONNECT',
HALT_RECONNECT = 'HALT_RECONNECT',
}

export class ReconnectionMonitor {
private reconnectObservers: Observer<void>[] = [];
private reconnectIntervalId?: ReturnType<typeof setInterval>;
private reconnectSetTimeoutId?: ReturnType<typeof setTimeout>;

/**
* Add reconnect observer to the list of observers to alert on reconnect
*/
addObserver(reconnectObserver: Observer<void>) {
this.reconnectObservers.push(reconnectObserver);
}

/**
* Given a reconnect event, start the appropriate behavior
*/
record(event: ReconnectEvent) {
if (event === ReconnectEvent.RECONNECT) {
if (event === ReconnectEvent.START_RECONNECT) {
const triggerReconnect = () => {
this.reconnectObservers.forEach(reconnectObserver => {
reconnectObserver.next?.();
});
};
// If the reconnect interval isn't set
if (this.reconnectIntervalId === undefined) {
setTimeout(() => {
this.reconnectSetTimeoutId = setTimeout(() => {
// Reconnect now
triggerReconnect();
// Retry reconnect every periodically until it works
Expand All @@ -39,9 +46,16 @@ export class ReconnectionMonitor {
clearInterval(this.reconnectIntervalId);
this.reconnectIntervalId = undefined;
}
if (this.reconnectSetTimeoutId) {
clearTimeout(this.reconnectSetTimeoutId);
this.reconnectSetTimeoutId = undefined;
}
}
}

/**
* Complete all reconnect observers
*/
close() {
this.reconnectObservers.forEach(reconnectObserver => {
reconnectObserver.complete?.();
Expand Down

0 comments on commit 1a605e9

Please sign in to comment.