Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
fix: still streamlining events
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Reed committed Sep 16, 2020
1 parent dd3cb38 commit f3dbb1a
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 104 deletions.
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"cordova-plugin-ionic": "5.4.7",
"cordova-plugin-whitelist": "1.3.4",
"hash.js": "^1.1.7",
"rfc6902": "^4.0.0",
"rxjs": "^6.6.3",
"tslib": "^2.0.0",
"zone.js": "~0.10.3"
Expand Down
5 changes: 3 additions & 2 deletions src/app/app.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ export class AppComponent {
initializeApp() {
this.platform.ready().then(async () => {
console.debug('AppComponent.initializeApp(): starting stream');
const observable = await this.stream.start();
const subscription = observable.subscribe((evt: any) => {
const subscription = await this.stream.subscribe((evt: any) => {
console.debug('AppComponent.initializeApp(): got message, hiding splash screen');
// once we get a real message, hide the splash screen
SplashScreen.hide();
subscription.unsubscribe();
}, (err: any) => {
console.debug('AppComponent.initializeApp(): got error, hiding splash screen');
// even if we get an error, hide the splash screen
SplashScreen.hide();
subscription.unsubscribe();
Expand Down
165 changes: 63 additions & 102 deletions src/lib/api/stream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { OnInit, Injectable } from '@angular/core';
import { Injectable } from '@angular/core';
import { Observable, Observer } from 'rxjs';
import { createPatch } from 'rfc6902/dist/rfc6902'

import { Plugins, DeviceInfo, PluginListenerHandle } from '@capacitor/core';
const { Device, EventSource } = Plugins;

import 'capacitor-eventsource';
import { MessageResult, ErrorResult, EventSourcePlugin /*, EventSourceWeb */ } from 'capacitor-eventsource';
import { MessageResult, ErrorResult, EventSourcePlugin } from 'capacitor-eventsource';
import { StreamData } from '../model/streamData';
import { Platform } from '@ionic/angular';

Expand Down Expand Up @@ -36,7 +37,7 @@ export class APIStream {
/**
* whether or not the user has started the service
*/
public isStarted = false;
public ready: Promise<void>;

/**
* whether or not we are connected to a network
Expand Down Expand Up @@ -80,7 +81,6 @@ export class APIStream {
this.defaultCheckIntervalMillis = 2 * SECOND;
this.defaultRetryFallback = 1.2;
this.retryMillis = this.defaultRetryMillis;
this.isStarted = false;

console.debug(`APIStream(): default retry: ${this.defaultRetryMillis}ms`);
console.debug(`APIStream(): default check interval: ${this.defaultCheckIntervalMillis}ms`);
Expand All @@ -89,20 +89,23 @@ export class APIStream {
this.observable = new Observable((observer: Observer<StreamData|ErrorEvent>) => {
this.observer = observer;
});
}

async init() {
await this.platform.ready();

if (this.url) {
console.debug('APIStream.init(): already initialized.');
return this.url;
}
this.ready = new Promise(async (resolve) => {
await this.platform.ready();
await this.start();
resolve();
});
}

console.debug('APIStream.ngOnInit(): initializing.');
this.isStarted = false;
/**
* Start listening on the event stream.
*
* @returns an {@link Observable} that can be subscribed to.
*/
private async start(): Promise<void> {
console.info('APIStream.start()');

return this.url = Device.getInfo().then(info => {
this.url = Device.getInfo().then(info => {
this.deviceInfo = info;
if (this.deviceInfo.platform !== 'web') {
return 'https://www.blaseball.com/events/streamData';
Expand All @@ -115,80 +118,24 @@ export class APIStream {
} as DeviceInfo;
return 'https://cors-proxy.blaseball-reference.com/events/streamData';
});
}

async handleSystemChange(retrigger?: boolean) {
console.debug(`APIStream.handleSystemChange(): retrigger=${retrigger}`);

if (this.isStarted) {
if (retrigger || !this.source) {
console.debug('APIStream.handleSystemChange(): (re)creating connection');
await this.createSource();
this.startCheckingLastUpdated();
}
} else {
console.debug('APIStream.handleSystemChange(): shutting down');
// disable checker
if (this.retryChecker) {
clearInterval(this.retryChecker);
this.retryChecker = null;
}

// close the event source
await this.closeSource();
this.source = null;
}
return this.source;
}

/**
* Start listening on the event stream.
*
* @returns an {@link Observable} that can be subscribed to.
*/
async start(): Promise<Observable<StreamData|ErrorEvent>> {
console.info('APIStream.start()');

await this.init();
this.isStarted = true;
await this.url;
this.handleSystemChange();

return this.observable;
}

/**
* Subscribe to the ongoing event stream. Triggers a new event.
*/
async subscribe(next?: (value: StreamData|ErrorEvent) => void, error?: (error: any) => void, complete?: () => void) {
await this.init();
public async subscribe(next?: (value: StreamData|ErrorEvent) => void, error?: (error: any) => void, complete?: () => void) {
await this.ready;
console.info('APIStream.subscribe()');
const subscription = this.observable.subscribe(next, error, complete);
if (this.streamData) {
this.observer.next(this.streamData);
}
return subscription;
}

/**
* Stop listening on the event stream.
*
* Completes the {@link Observable} and closes all resources.
*/
async stop() {
console.info('APIStream.stop()');

this.isStarted = false;
await this.handleSystemChange();

// clean up observable
this.observer.complete();
this.observer = null;
this.observable = null;

// reset retry state
this.retryMillis = this.defaultRetryMillis;
this.lastUpdated = 0;
}

/**
* Get the last update returned by the stream.
*/
Expand Down Expand Up @@ -222,23 +169,39 @@ export class APIStream {
});
}

private async handleSystemChange(retrigger?: boolean) {
console.debug(`APIStream.handleSystemChange(): retrigger=${retrigger}`);

if (retrigger || !this.source) {
await this.ready;
console.debug('APIStream.handleSystemChange(): (re)creating connection');
await this.createSource();
this.startCheckingLastUpdated();
}
}

private onMessage(data: any) {
console.debug('APIStream.onMessage()');

if (this.lastUpdate !== data) {
// console.debug('APIStream.onMessage(): change:', this.lastUpdate, data);
if (!data) {
console.error('APIStream.onMessage(): missing data?!?');
return;
}

const parsed = JSON.parse(data).value;

const diff = createPatch(this.lastUpdate, parsed);
// console.debug(`APIStream.onMessage(): ${diff.length} change(s)`);

if (diff.length > 0) {
// successful/new message, reset retry and last updated
if (this.retryMillis !== this.defaultRetryMillis) {
console.debug(`APIStream.onMessage(): ${this.retryMillis} -> ${this.defaultRetryMillis}`);
this.retryMillis = this.defaultRetryMillis;
}

this.lastUpdated = Date.now();

this.lastUpdate = data;

const parsed = JSON.parse(this.lastUpdate).value;
this.lastUpdate = parsed;

if (!this.streamData) {
this.streamData = new StreamData({});
Expand All @@ -258,11 +221,11 @@ export class APIStream {
protected async createSource() {
console.debug('APIStream.createSource()');

return new Promise(async (resolve, reject) => {
// const es = new EventSourceWeb();
const es = EventSource;
this.source = es as EventSourcePlugin;
// const es = new EventSourceWeb();
const es = EventSource;
this.source = es as EventSourcePlugin;

return new Promise(async (resolve, reject) => {
const url = await this.url;

// clean up existing and create new event source
Expand Down Expand Up @@ -312,18 +275,13 @@ export class APIStream {
}

protected startCheckingLastUpdated() {
this.stopCheckingLastUpdated();
setTimeout(() => {
this.retryChecker = setInterval(() => {
this.checkLastUpdated();
}, this.defaultCheckIntervalMillis) as unknown as number;
}, this.defaultCheckIntervalMillis);
}

protected stopCheckingLastUpdated() {
console.debug('APIStream.startCheckingLastUpdated()');
if (this.retryChecker) {
clearInterval(this.retryChecker);
if (!this.retryChecker) {
console.debug('APIStream.startCheckingLastUpdated()');
setTimeout(() => {
this.retryChecker = setInterval(() => {
this.checkLastUpdated();
}, this.defaultCheckIntervalMillis) as unknown as number;
}, this.defaultCheckIntervalMillis);
}
}

Expand All @@ -333,13 +291,16 @@ export class APIStream {
const now = Date.now();

if (!this.hourChecker) {
const remainder = now % ONE_HOUR;
const nextHour = now - remainder + ONE_HOUR;
const msAfterHour = now % ONE_HOUR;
const nextHour = now - msAfterHour + ONE_HOUR;
const remaining = nextHour - now;
console.debug(`APIStream.checkLastUpdated(): now=${now}, remainder=${msAfterHour}, nextHour=${nextHour}`);
console.debug(`APIStream.checkLastUpdated(): checking in ~${Math.round((remaining) / 1000 / 60)}m`);

this.hourChecker = setTimeout(() => {
console.debug('APIStream.checkLastUpdate(): new hour, force a check.');
console.debug('APIStream.checkLastUpdated(): new hour, force a check.');
this.checkLastUpdated();
}, nextHour) as unknown as number;
}, remaining + (1 * SECOND)) as unknown as number; // give it an extra second to update
}

// check how far through the hour we are
Expand All @@ -356,7 +317,7 @@ export class APIStream {

const threshold = lastCheck + this.retryMillis;
if (now > threshold) {
console.debug(`APIStream.checkLastUpdated(): threshold reached: now=${this.formatDate(now)}, lastUpdated=${this.formatDate(this.lastUpdated)}, lastRetry=${this.formatDate(this.lastRetry)}, threshold=${this.formatDate(threshold)}, retryMillis=${(this.retryMillis / SECOND / 1.0).toPrecision(2)}s`);
console.debug(`APIStream.checkLastUpdated(): threshold (retryMillis=~${Math.round(this.retryMillis / SECOND / 1.0)}s) reached: now=${this.formatDate(now)}, lastUpdated=${this.formatDate(this.lastUpdated)}, lastRetry=${this.formatDate(this.lastRetry)}, threshold=${this.formatDate(threshold)}`);
this.retry();
} else {
// console.debug(`APIStream.checkLastUpdated(): ${this.formatDate(now)} < ${this.formatDate(threshold)}`);
Expand Down

0 comments on commit f3dbb1a

Please sign in to comment.