Skip to content

Commit

Permalink
Fixing null set with RTDB, better typings, and more (#1264)
Browse files Browse the repository at this point in the history
* Once now closes out the subscription
* Handle multiple subscriptions to the AngularFireDatabase
* Empty set should return null
* Better batching of first load, using `once` (needed step to support Universal)
* Handle ordered child_added events
* Don't error out on unsubscribe of a `once`
* Clean up the tests to use `take(...)` and `add(...)`
* Cleaning up the types

Closes #1220, closes #1246.
  • Loading branch information
jamesdaniels committed Oct 13, 2017
1 parent 2ff8d1d commit eda1c41
Show file tree
Hide file tree
Showing 24 changed files with 347 additions and 315 deletions.
5 changes: 2 additions & 3 deletions src/database/database.ts
Expand Up @@ -2,7 +2,7 @@ import { Injectable } from '@angular/core';
import { database } from 'firebase/app';
import 'firebase/database';
import { FirebaseApp } from 'angularfire2';
import { PathReference, DatabaseQuery, DatabaseReference, DatabaseSnapshot, ChildEvent, ListenEvent, SnapshotChange, QueryFn, AngularFireList, AngularFireObject } from './interfaces';
import { PathReference, DatabaseQuery, DatabaseReference, DatabaseSnapshot, ChildEvent, ListenEvent, QueryFn, AngularFireList, AngularFireObject } from './interfaces';
import { getRef } from './utils';
import { createListReference } from './list/create-reference';
import { createObjectReference } from './object/create-reference';
Expand Down Expand Up @@ -41,8 +41,7 @@ export {
DatabaseReference,
DatabaseSnapshot,
ChildEvent,
ListenEvent,
SnapshotChange,
ListenEvent,
QueryFn,
AngularFireList,
AngularFireObject,
Expand Down
33 changes: 11 additions & 22 deletions src/database/interfaces.ts
Expand Up @@ -12,51 +12,40 @@ export interface AngularFireList<T> {
update(item: FirebaseOperation, data: T): Promise<void>;
set(item: FirebaseOperation, data: T): Promise<void>;
push(data: T): firebase.database.ThenableReference;
remove(item?: FirebaseOperation): Promise<any>;
remove(item?: FirebaseOperation): Promise<void>;
}

export interface AngularFireObject<T> {
query: DatabaseQuery;
valueChanges<T>(): Observable<T | null>;
snapshotChanges<T>(): Observable<SnapshotAction>;
update(data: T): Promise<any>;
snapshotChanges(): Observable<SnapshotAction>;
update(data: Partial<T>): Promise<void>;
set(data: T): Promise<void>;
remove(): Promise<any>;
remove(): Promise<void>;
}

export interface FirebaseOperationCases {
stringCase: () => Promise<void | any>;
firebaseCase?: () => Promise<void | any>;
snapshotCase?: () => Promise<void | any>;
unwrappedSnapshotCase?: () => Promise<void | any>;
stringCase: () => Promise<void>;
firebaseCase?: () => Promise<void>;
snapshotCase?: () => Promise<void>;
unwrappedSnapshotCase?: () => Promise<void>;
}

export type QueryFn = (ref: DatabaseReference) => DatabaseQuery;
export type ChildEvent = 'child_added' | 'child_removed' | 'child_changed' | 'child_moved';
export type ListenEvent = 'value' | ChildEvent;

export type SnapshotChange = {
event: string;
snapshot: DatabaseSnapshot | null;
prevKey: string | undefined;
}

export interface Action<T> {
type: string;
type: ListenEvent;
payload: T;
};

export interface AngularFireAction<T> extends Action<T> {
prevKey: string | undefined;
prevKey: string | null | undefined;
key: string | null;
}

export interface SnapshotPrevKey {
snapshot: DatabaseSnapshot | null;
prevKey: string | undefined;
}

export type SnapshotAction = AngularFireAction<DatabaseSnapshot | null>;
export type SnapshotAction = AngularFireAction<DatabaseSnapshot>;

export type Primitive = number | string | boolean;

Expand Down
4 changes: 2 additions & 2 deletions src/database/list/audit-trail.spec.ts
Expand Up @@ -9,7 +9,7 @@ import 'rxjs/add/operator/skip';
const rando = () => (Math.random() + 1).toString(36).substring(7);
const FIREBASE_APP_NAME = rando();

describe('stateChanges', () => {
describe('auditTrail', () => {
let app: FirebaseApp;
let db: AngularFireDatabase;
let createRef: (path: string) => firebase.database.Reference;
Expand Down Expand Up @@ -56,7 +56,7 @@ describe('stateChanges', () => {

const { changes } = prepareAuditTrail();
changes.subscribe(actions => {
const data = actions.map(a => a.payload!.val());
const data = actions.map(a => a.payload.val());
expect(data).toEqual(items);
done();
});
Expand Down
50 changes: 48 additions & 2 deletions src/database/list/audit-trail.ts
@@ -1,8 +1,10 @@
import { DatabaseQuery, ChildEvent, AngularFireAction, SnapshotAction } from '../interfaces';
import { DatabaseQuery, ChildEvent, DatabaseSnapshot, AngularFireAction, SnapshotAction } from '../interfaces';
import { stateChanges } from './state-changes';
import { waitForLoaded } from './loaded';
import { Observable } from 'rxjs/Observable';
import { database } from 'firebase/app';
import { fromRef } from '../observable/fromRef';


import 'rxjs/add/operator/skipWhile';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';
Expand All @@ -16,3 +18,47 @@ export function auditTrail(query: DatabaseQuery, events?: ChildEvent[]): Observa
.scan((current, action) => [...current, action], []);
return waitForLoaded(query, auditTrail$);
}

interface LoadedMetadata {
data: AngularFireAction<database.DataSnapshot>;
lastKeyToLoad: any;
}

function loadedData(query: DatabaseQuery): Observable<LoadedMetadata> {
// Create an observable of loaded values to retrieve the
// known dataset. This will allow us to know what key to
// emit the "whole" array at when listening for child events.
return fromRef(query, 'value')
.map(data => {
// Store the last key in the data set
let lastKeyToLoad;
// Loop through loaded dataset to find the last key
data.payload.forEach(child => {
lastKeyToLoad = child.key; return false;
});
// return data set and the current last key loaded
return { data, lastKeyToLoad };
});
}

function waitForLoaded(query: DatabaseQuery, action$: Observable<SnapshotAction[]>) {
const loaded$ = loadedData(query);
return loaded$
.withLatestFrom(action$)
// Get the latest values from the "loaded" and "child" datasets
// We can use both datasets to form an array of the latest values.
.map(([loaded, actions]) => {
// Store the last key in the data set
let lastKeyToLoad = loaded.lastKeyToLoad;
// Store all child keys loaded at this point
const loadedKeys = actions.map(snap => snap.key);
return { actions, lastKeyToLoad, loadedKeys }
})
// This is the magical part, only emit when the last load key
// in the dataset has been loaded by a child event. At this point
// we can assume the dataset is "whole".
.skipWhile(meta => meta.loadedKeys.indexOf(meta.lastKeyToLoad) === -1)
// Pluck off the meta data because the user only cares
// to iterate through the snapshots
.map(meta => meta.actions);
}
99 changes: 67 additions & 32 deletions src/database/list/changes.spec.ts
Expand Up @@ -43,69 +43,104 @@ describe('listChanges', () => {

describe('events', () => {

it('should stream child_added events', (done) => {
it('should stream value at first', (done) => {
const someRef = ref(rando());
someRef.set(batch);
const obs = listChanges(someRef, ['child_added']);
const sub = obs.skip(2).subscribe(changes => {
const data = changes.map(change => change.payload!.val());
const sub = obs.take(1).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data).toEqual(items);
done();
});
}).add(done);
someRef.set(batch);
});

it('should process a new child_added event', (done) => {
it('should process a new child_added event', done => {
const aref = ref(rando());
aref.set(batch);
const obs = listChanges(aref, ['child_added']);
const sub = obs.skip(3).subscribe(changes => {
const data = changes.map(change => change.payload!.val());
const sub = obs.skip(1).take(1).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data[3]).toEqual({ name: 'anotha one' });
done();
});
}).add(done);
aref.set(batch);
aref.push({ name: 'anotha one' });
});

it('should process a new child_removed event', (done) => {
it('should stream in order events', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name'), ['child_added']);
const sub = obs.take(1).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('one');
expect(names[1]).toEqual('two');
expect(names[2]).toEqual('zero');
}).add(done);
aref.set(batch);
const obs = listChanges(aref, ['child_added','child_removed'])
});

const sub = obs.skip(3).subscribe(changes => {
const data = changes.map(change => change.payload!.val());
it('should stream in order events w/child_added', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name'), ['child_added']);
const sub = obs.skip(1).take(1).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('anotha one');
expect(names[1]).toEqual('one');
expect(names[2]).toEqual('two');
expect(names[3]).toEqual('zero');
}).add(done);
aref.set(batch);
aref.push({ name: 'anotha one' });
});

it('should stream events filtering', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name').equalTo('zero'), ['child_added']);
obs.skip(1).take(1).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('zero');
expect(names[1]).toEqual('zero');
}).add(done);
aref.set(batch);
aref.push({ name: 'zero' });
});

it('should process a new child_removed event', done => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added','child_removed']);
const sub = obs.skip(1).take(1).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data.length).toEqual(items.length - 1);
done();
}).add(done);
app.database().goOnline();
aref.set(batch).then(() => {
aref.child(items[0].key).remove();
});
const childR = aref.child(items[0].key);
childR.remove().then(console.log);
});

it('should process a new child_changed event', (done) => {
const aref = ref(rando());
aref.set(batch);
const obs = listChanges(aref, ['child_added','child_changed'])
const sub = obs.skip(3).subscribe(changes => {
const data = changes.map(change => change.payload!.val());
expect(data[0].name).toEqual('lol');
done();
const sub = obs.skip(1).take(1).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data[1].name).toEqual('lol');
}).add(done);
app.database().goOnline();
aref.set(batch).then(() => {
aref.child(items[1].key).update({ name: 'lol'});
});
const childR = aref.child(items[0].key);
childR.update({ name: 'lol'});
});

it('should process a new child_moved event', (done) => {
const aref = ref(rando());
aref.set(batch);
const obs = listChanges(aref, ['child_added','child_moved'])
const sub = obs.skip(3).subscribe(changes => {
const data = changes.map(change => change.payload!.val());
const sub = obs.skip(1).take(1).subscribe(changes => {
const data = changes.map(change => change.payload.val());
// We moved the first item to the last item, so we check that
// the new result is now the last result
expect(data[data.length - 1]).toEqual(items[0]);
done();
}).add(done);
app.database().goOnline();
aref.set(batch).then(() => {
aref.child(items[0].key).setPriority('a', () => {});
});
const childR = aref.child(items[0].key);
childR.setPriority('a', () => {});
});

});
Expand Down

0 comments on commit eda1c41

Please sign in to comment.