Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
fix(stream): subject, not observable :/
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Reed committed Sep 25, 2020
1 parent e20d4be commit d134a61
Showing 1 changed file with 9 additions and 12 deletions.
21 changes: 9 additions & 12 deletions src/lib/api/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Injectable } from '@angular/core';
import { Observable, Observer } from 'rxjs';
import { Observable, Subject } from 'rxjs';
import { createPatch } from 'rfc6902/dist/rfc6902'

import { AppState, Plugins, DeviceInfo, PluginListenerHandle } from '@capacitor/core';
Expand Down Expand Up @@ -62,8 +62,7 @@ export class APIStream {
// max out at 10 minutes for a retry interval
private maxRetryMillis = 10 * ONE_MINUTE;

private observable: Observable<StreamData|ErrorEvent> | null;
private observer: Observer<StreamData|ErrorEvent> | null;
private subject: Subject<StreamData|ErrorEvent> | null;
private deviceInfo: DeviceInfo | null;

private handles = {} as { [key: string]: PluginListenerHandle };
Expand All @@ -86,9 +85,7 @@ export class APIStream {
console.debug(`APIStream(): default check interval: ${this.defaultCheckIntervalMillis}ms`);
console.debug(`APIStream(): default retry fallback: ${this.defaultRetryFallback}x`);

this.observable = new Observable((observer: Observer<StreamData|ErrorEvent>) => {
this.observer = observer;
});
this.subject = new Subject<StreamData|ErrorEvent>();

App.addListener('appStateChange', (state: AppState) => {
if (this.deviceInfo?.platform !== 'web') {
Expand Down Expand Up @@ -142,9 +139,9 @@ export class APIStream {
public async subscribe(next?: (value: StreamData|ErrorEvent) => void, error?: (error: any) => void, complete?: () => void) {
await this.ready;
console.info('APIStream.subscribe()');
const subscription = this.observable.subscribe(next, error, complete);
const subscription = this.subject.subscribe(next, error, complete);
if (this.streamData) {
this.observer.next(this.streamData);
this.subject.next(this.streamData);
}
return subscription;
}
Expand Down Expand Up @@ -226,8 +223,8 @@ export class APIStream {
}

// always publish the latest, so things refresh
if (this.observer) {
this.observer.next(this.streamData);
if (this.subject) {
this.subject.next(this.streamData);
}
}

Expand Down Expand Up @@ -259,11 +256,11 @@ export class APIStream {
this.handles.error = es.addListener('error', (res: ErrorResult) => {
console.error('APIStream.createSource(): An error occurred reading from the event source. Resetting.', res.error);
reject(true);
if (this.observer) {
if (this.subject) {
const ev = new ErrorEvent('error', {
message: res.error
});
this.observer.next(ev);
this.subject.next(ev);
} else {
console.debug('APIStream.createSource(): No observer?');
}
Expand Down

0 comments on commit d134a61

Please sign in to comment.