Skip to content

Commit

Permalink
fix(afs): mutiple subscribes/unsubscribes could still get confused
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesdaniels committed Oct 5, 2017
1 parent 86e2c24 commit 28aef60
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 18 deletions.
26 changes: 17 additions & 9 deletions src/firestore/collection/changes.ts
Expand Up @@ -16,6 +16,7 @@ import { DocumentChangeAction, Action } from '../interfaces';
*/
export function docChanges(query: firebase.firestore.Query): Observable<DocumentChangeAction[]> {
return fromCollectionRef(query)
.filter(action => !!action)
.map(action =>
action.payload.docChanges
.map(change => ({ type: change.type, payload: change })));
Expand All @@ -27,7 +28,7 @@ export function docChanges(query: firebase.firestore.Query): Observable<Document
*/
export function sortedChanges(query: firebase.firestore.Query, events: firebase.firestore.DocumentChangeType[]): Observable<DocumentChangeAction[]> {
return fromCollectionRef(query)
.map(changes => changes.payload.docChanges)
.map(changes => changes && changes.payload.docChanges)
.scan((current, changes) => combineChanges(current, changes, events), [])
.map(changes => changes.map(c => ({ type: c.type, payload: c })))
.filter(changes => changes.length > 0)
Expand All @@ -41,14 +42,21 @@ export function sortedChanges(query: firebase.firestore.Query, events: firebase.
* @param changes
* @param events
*/
export function combineChanges(current: firebase.firestore.DocumentChange[], changes: firebase.firestore.DocumentChange[], events: firebase.firestore.DocumentChangeType[]) {
changes.forEach(change => {
// skip unwanted change types
if(events.indexOf(change.type) > -1) {
current = combineChange(current, change);
}
});
return current;
export function combineChanges(current: firebase.firestore.DocumentChange[], changes: firebase.firestore.DocumentChange[] | undefined, events: firebase.firestore.DocumentChangeType[]) {
if (changes) {
changes.forEach(change => {
// skip unwanted change types
if(events.indexOf(change.type) > -1) {
current = combineChange(current, change);
}
});
return current;
} else {
// in the case of undefined, empty current
// if you do odd things with the subscribes/unsubscrbes you can mess things
// up and get double or tripled results
return [];
}
}

/**
Expand Down
35 changes: 27 additions & 8 deletions src/firestore/collection/collection.spec.ts
Expand Up @@ -74,28 +74,45 @@ describe('AngularFirestoreCollection', () => {

});

it('should handle multiple subscriptions', async (done: any) => {
it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
const sub = changes.subscribe(() => {}).add(
changes.subscribe(data => {
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(done);
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should handle multiple subscriptions + cold observer', async (done: any) => {
it('should handle multiple subscriptions (cold)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
const sub = changes.take(1).subscribe(() => {
const sub = changes.subscribe(() => {
sub.unsubscribe();
}).add(() => {
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(done);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

Expand Down Expand Up @@ -132,11 +149,13 @@ describe('AngularFirestoreCollection', () => {
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.snapshotChanges();
const sub = changes.subscribe(() => {}).add(
changes.subscribe(data => {
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(done);
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should update order on queries', async (done) => {
Expand Down
4 changes: 3 additions & 1 deletion src/firestore/observable/fromRef.ts
Expand Up @@ -11,6 +11,7 @@ import 'rxjs/add/operator/share';

function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
const ref$ = new Observable(subscriber => {
subscriber.next(undefined); // fire an undefined to let subcribers know this is a new Observable
const unsubscribe = ref.onSnapshot(subscriber);
return { unsubscribe };
});
Expand All @@ -23,9 +24,10 @@ export function fromRef<R>(ref: firebase.firestore.DocumentReference | firebase.

export function fromDocRef(ref: firebase.firestore.DocumentReference): Observable<Action<firebase.firestore.DocumentSnapshot>>{
return fromRef<firebase.firestore.DocumentSnapshot>(ref)
.filter(payload => !!payload)
.map(payload => ({ payload, type: 'value' }));
}

export function fromCollectionRef(ref: firebase.firestore.Query): Observable<Action<firebase.firestore.QuerySnapshot>> {
return fromRef<firebase.firestore.QuerySnapshot>(ref).map(payload => ({ payload, type: 'query' }))
return fromRef<firebase.firestore.QuerySnapshot>(ref).map(payload => payload && ({ payload, type: 'query' }))
}

0 comments on commit 28aef60

Please sign in to comment.