Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/firestore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"dev": "gulp dev",
"test": "run-p test:browser test:node",
"test:browser": "karma start --single-run",
"test:browser:debug" : "karma start --browsers=Chrome --auto-watch",
"test:node": "mocha 'test/{,!(integration|browser)/**/}*.test.ts' --compilers ts:ts-node/register -r src/platform_node/node_init.ts --retries 5 --timeout 5000",
"prepare": "gulp build"
},
Expand Down
4 changes: 3 additions & 1 deletion packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ export class Firestore implements firestore.Firestore, FirebaseService {
//
// Operations on the _firestoreClient don't block on _firestoreReady. Those
// are already set to synchronize on the async queue.
private _firestoreClient: FirestoreClient | undefined;
//
// This is public for testing.
public _firestoreClient: FirestoreClient | undefined;
public _dataConverter: UserDataConverter;

constructor(databaseIdOrApp: FirestoreDatabase | FirebaseApp) {
Expand Down
14 changes: 14 additions & 0 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ export class FirestoreClient {
return persistenceResult.promise;
}

/** Enables the network connection and requeues all pending operations. */
public enableNetwork(): Promise<void> {
return this.asyncQueue.schedule(() => {
return this.remoteStore.enableNetwork();
});
}

/**
* Initializes persistent storage, attempting to use IndexedDB if
* usePersistence is true or memory-only if false.
Expand Down Expand Up @@ -314,6 +321,13 @@ export class FirestoreClient {
return this.syncEngine.handleUserChange(user);
}

/** Disables the network connection. Pending operations will not complete. */
public disableNetwork(): Promise<void> {
return this.asyncQueue.schedule(() => {
return this.remoteStore.disableNetwork();
});
}

shutdown(): Promise<void> {
return this.asyncQueue
.schedule(() => {
Expand Down
154 changes: 101 additions & 53 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ export class RemoteStore {

private accumulatedWatchChanges: WatchChange[] = [];

private watchStream: PersistentListenStream;
private writeStream: PersistentWriteStream;
private watchStream: PersistentListenStream = null;
private writeStream: PersistentWriteStream = null;

/**
* The online state of the watch stream. The state is set to healthy if and
Expand Down Expand Up @@ -149,10 +149,7 @@ export class RemoteStore {
* LocalStore, etc.
*/
start(): Promise<void> {
return this.setupStreams().then(() => {
// Resume any writes
return this.fillWritePipeline();
});
return this.enableNetwork();
}

private setOnlineStateToHealthy(): void {
Expand Down Expand Up @@ -192,7 +189,26 @@ export class RemoteStore {
}
}

private setupStreams(): Promise<void> {
private isNetworkEnabled(): boolean {
assert(
(this.watchStream == null) == (this.writeStream == null),
'WatchStream and WriteStream should both be null or non-null'
);
return this.watchStream != null;
}

/** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */
enableNetwork(): Promise<void> {
assert(
this.watchStream == null,
'enableNetwork() called with non-null watchStream.'
);
assert(
this.writeStream == null,
'enableNetwork() called with non-null writeStream.'
);

// Create new streams (but note they're not started yet).
this.watchStream = this.datastore.newPersistentWatchStream({
onOpen: this.onWatchStreamOpen.bind(this),
onClose: this.onWatchStreamClose.bind(this),
Expand All @@ -208,15 +224,38 @@ export class RemoteStore {
// Load any saved stream token from persistent storage
return this.localStore.getLastStreamToken().then(token => {
this.writeStream.lastStreamToken = token;

if (this.shouldStartWatchStream()) {
this.startWatchStream();
}

this.updateAndBroadcastOnlineState(OnlineState.Unknown);

return this.fillWritePipeline(); // This may start the writeStream.
});
}

shutdown(): Promise<void> {
log.debug(LOG_TAG, 'RemoteStore shutting down.');
this.cleanupWatchStreamState();
this.writeStream.stop();
/** Temporarily disables the network. The network can be re-enabled using enableNetwork(). */
disableNetwork(): Promise<void> {
this.updateAndBroadcastOnlineState(OnlineState.Failed);

// NOTE: We're guaranteed not to get any further events from these streams (not even a close
// event).
this.watchStream.stop();
this.writeStream.stop();

this.cleanUpWatchStreamState();
this.cleanUpWriteStreamState();

this.writeStream = null;
this.watchStream = null;

return Promise.resolve();
}

shutdown(): Promise<void> {
log.debug(LOG_TAG, 'RemoteStore shutting down.');
this.disableNetwork();
return Promise.resolve(undefined);
}

Expand All @@ -228,11 +267,12 @@ export class RemoteStore {
);
// Mark this as something the client is currently listening for.
this.listenTargets[queryData.targetId] = queryData;
if (this.watchStream.isOpen()) {
this.sendWatchRequest(queryData);
} else if (!this.watchStream.isStarted()) {

if (this.shouldStartWatchStream()) {
// The listen will be sent in onWatchStreamOpen
this.startWatchStream();
} else if (this.isNetworkEnabled() && this.watchStream.isOpen()) {
this.sendWatchRequest(queryData);
}
}

Expand All @@ -244,7 +284,7 @@ export class RemoteStore {
);
const queryData = this.listenTargets[targetId];
delete this.listenTargets[targetId];
if (this.watchStream.isOpen()) {
if (this.isNetworkEnabled() && this.watchStream.isOpen()) {
this.sendUnwatchRequest(targetId);
}
}
Expand Down Expand Up @@ -279,10 +319,9 @@ export class RemoteStore {
}

private startWatchStream(): void {
assert(!this.watchStream.isStarted(), "Can't restart started watch stream");
assert(
this.shouldStartWatchStream(),
'Tried to start watch stream even though it should not be started'
'startWriteStream() called when shouldStartWatchStream() is false.'
);
this.watchStream.start();
}
Expand All @@ -292,10 +331,14 @@ export class RemoteStore {
* active targets trying to be listened too
*/
private shouldStartWatchStream(): boolean {
return !objUtils.isEmpty(this.listenTargets);
return (
this.isNetworkEnabled() &&
!this.watchStream.isStarted() &&
!objUtils.isEmpty(this.listenTargets)
);
}

private cleanupWatchStreamState(): void {
private cleanUpWatchStreamState(): void {
// If the connection is closed then we'll never get a snapshot version for
// the accumulated changes and so we'll never be able to complete the batch.
// When we start up again the server is going to resend these changes
Expand All @@ -314,7 +357,12 @@ export class RemoteStore {
}

private onWatchStreamClose(error: FirestoreError | null): Promise<void> {
this.cleanupWatchStreamState();
assert(
this.isNetworkEnabled(),
'onWatchStreamClose() should only be called when the network is enabled'
);

this.cleanUpWatchStreamState();

// If there was an error, retry the connection.
if (this.shouldStartWatchStream()) {
Expand Down Expand Up @@ -510,6 +558,11 @@ export class RemoteStore {
return promiseChain;
}

cleanUpWriteStreamState() {
this.lastBatchSeen = BATCHID_UNKNOWN;
this.pendingWrites = [];
}

/**
* Notifies that there are new mutations to process in the queue. This is
* typically called by SyncEngine after it has sent mutations to LocalStore.
Expand Down Expand Up @@ -543,7 +596,9 @@ export class RemoteStore {
* writes complete the backend will be able to accept more.
*/
canWriteMutations(): boolean {
return this.pendingWrites.length < MAX_PENDING_WRITES;
return (
this.isNetworkEnabled() && this.pendingWrites.length < MAX_PENDING_WRITES
);
}

// For testing
Expand All @@ -565,15 +620,26 @@ export class RemoteStore {

this.pendingWrites.push(batch);

if (!this.writeStream.isStarted()) {
if (this.shouldStartWriteStream()) {
this.startWriteStream();
} else if (this.writeStream.handshakeComplete) {
} else if (this.isNetworkEnabled() && this.writeStream.handshakeComplete) {
this.writeStream.writeMutations(batch.mutations);
}
}

private shouldStartWriteStream(): boolean {
return (
this.isNetworkEnabled() &&
!this.writeStream.isStarted() &&
this.pendingWrites.length > 0
);
}

private startWriteStream(): void {
assert(!this.writeStream.isStarted(), "Can't restart started write stream");
assert(
this.shouldStartWriteStream(),
'startWriteStream() called when shouldStartWriteStream() is false.'
);
this.writeStream.start();
}

Expand Down Expand Up @@ -632,6 +698,11 @@ export class RemoteStore {
}

private onWriteStreamClose(error?: FirestoreError): Promise<void> {
assert(
this.isNetworkEnabled(),
'onWriteStreamClose() should only be called when the network is enabled'
);

// Ignore close if there are no pending writes.
if (this.pendingWrites.length > 0) {
assert(
Expand All @@ -653,7 +724,7 @@ export class RemoteStore {
return errorHandling.then(() => {
// The write stream might have been started by refilling the write
// pipeline for failed writes
if (this.pendingWrites.length > 0 && !this.writeStream.isStarted()) {
if (this.shouldStartWriteStream()) {
this.startWriteStream();
}
});
Expand Down Expand Up @@ -713,33 +784,10 @@ export class RemoteStore {
handleUserChange(user: User): Promise<void> {
log.debug(LOG_TAG, 'RemoteStore changing users: uid=', user.uid);

// Clear pending writes because those are per-user. Watched targets
// persist across users so don't clear those.
this.lastBatchSeen = BATCHID_UNKNOWN;
this.pendingWrites = [];

// Stop the streams. They promise not to call us back.
this.watchStream.stop();
this.writeStream.stop();

this.cleanupWatchStreamState();

// Create new streams (but note they're not started yet).
return this.setupStreams()
.then(() => {
// If there are any watchedTargets, properly handle the stream
// restart now that RemoteStore is ready to handle them.
if (this.shouldStartWatchStream()) {
this.startWatchStream();
}

// Resume any writes
return this.fillWritePipeline();
})
.then(() => {
// User change moves us back to the unknown state because we might
// not want to re-open the stream
this.setOnlineStateToUnknown();
});
// Tear down and re-create our network streams. This will ensure we get a fresh auth token
// for the new user and re-fill the write pipeline with new mutations from the LocalStore
// (since mutations are per-user).
this.disableNetwork();
return this.enableNetwork();
}
}
44 changes: 44 additions & 0 deletions packages/firestore/test/integration/api/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Deferred } from '../../../src/util/promise';
import { asyncIt } from '../../util/helpers';
import firebase from '../util/firebase_export';
import { apiDescribe, withTestCollection, withTestDb } from '../util/helpers';
import { Firestore } from '../../../src/api/database';

apiDescribe('Database', persistence => {
asyncIt('can set a document', () => {
Expand Down Expand Up @@ -514,4 +515,47 @@ apiDescribe('Database', persistence => {
return Promise.resolve();
});
});

asyncIt('can queue writes while offline', () => {
return withTestDb(persistence, db => {
const docRef = db.collection('rooms').doc();
const firestoreClient = (docRef.firestore as Firestore)._firestoreClient;

return firestoreClient
.disableNetwork()
.then(() => {
return Promise.all([
docRef.set({ foo: 'bar' }),
firestoreClient.enableNetwork()
]);
})
.then(() => docRef.get())
.then(doc => {
expect(doc.data()).to.deep.equal({ foo: 'bar' });
});
});
});

asyncIt('can get documents while offline', () => {
return withTestDb(persistence, db => {
const docRef = db.collection('rooms').doc();
const firestoreClient = (docRef.firestore as Firestore)._firestoreClient;

return firestoreClient.disableNetwork().then(() => {
const writePromise = docRef.set({ foo: 'bar' });

return docRef.get().then(snapshot => {
expect(snapshot.metadata.fromCache).to.be.true;
return firestoreClient.enableNetwork().then(() => {
return writePromise.then(() => {
docRef.get().then(doc => {
expect(snapshot.metadata.fromCache).to.be.false;
expect(doc.data()).to.deep.equal({ foo: 'bar' });
});
});
});
});
});
});
});
});
Loading