diff --git a/packages/firestore/package.json b/packages/firestore/package.json index 8647a767000..2dceb6c68bb 100644 --- a/packages/firestore/package.json +++ b/packages/firestore/package.json @@ -6,6 +6,7 @@ "dev": "gulp dev", "test": "run-p test:browser test:node", "test:browser": "karma start --single-run", + "test:browser:debug" : "karma start --browsers=Chrome --auto-watch", "test:node": "mocha 'test/{,!(integration|browser)/**/}*.test.ts' --compilers ts:ts-node/register -r src/platform_node/node_init.ts --retries 5 --timeout 5000", "prepare": "gulp build" }, diff --git a/packages/firestore/src/api/database.ts b/packages/firestore/src/api/database.ts index 4614d4452de..5bcb2547ea2 100644 --- a/packages/firestore/src/api/database.ts +++ b/packages/firestore/src/api/database.ts @@ -187,7 +187,9 @@ export class Firestore implements firestore.Firestore, FirebaseService { // // Operations on the _firestoreClient don't block on _firestoreReady. Those // are already set to synchronize on the async queue. - private _firestoreClient: FirestoreClient | undefined; + // + // This is public for testing. + public _firestoreClient: FirestoreClient | undefined; public _dataConverter: UserDataConverter; constructor(databaseIdOrApp: FirestoreDatabase | FirebaseApp) { diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index 54d2faf813a..08a7e4648e6 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -160,6 +160,13 @@ export class FirestoreClient { return persistenceResult.promise; } + /** Enables the network connection and requeues all pending operations. */ + public enableNetwork(): Promise { + return this.asyncQueue.schedule(() => { + return this.remoteStore.enableNetwork(); + }); + } + /** * Initializes persistent storage, attempting to use IndexedDB if * usePersistence is true or memory-only if false. @@ -314,6 +321,13 @@ export class FirestoreClient { return this.syncEngine.handleUserChange(user); } + /** Disables the network connection. Pending operations will not complete. */ + public disableNetwork(): Promise { + return this.asyncQueue.schedule(() => { + return this.remoteStore.disableNetwork(); + }); + } + shutdown(): Promise { return this.asyncQueue .schedule(() => { diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index 330f82cc551..cdb753e814e 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -116,8 +116,8 @@ export class RemoteStore { private accumulatedWatchChanges: WatchChange[] = []; - private watchStream: PersistentListenStream; - private writeStream: PersistentWriteStream; + private watchStream: PersistentListenStream = null; + private writeStream: PersistentWriteStream = null; /** * The online state of the watch stream. The state is set to healthy if and @@ -149,10 +149,7 @@ export class RemoteStore { * LocalStore, etc. */ start(): Promise { - return this.setupStreams().then(() => { - // Resume any writes - return this.fillWritePipeline(); - }); + return this.enableNetwork(); } private setOnlineStateToHealthy(): void { @@ -192,7 +189,26 @@ export class RemoteStore { } } - private setupStreams(): Promise { + private isNetworkEnabled(): boolean { + assert( + (this.watchStream == null) == (this.writeStream == null), + 'WatchStream and WriteStream should both be null or non-null' + ); + return this.watchStream != null; + } + + /** Re-enables the network. Only to be called as the counterpart to disableNetwork(). */ + enableNetwork(): Promise { + assert( + this.watchStream == null, + 'enableNetwork() called with non-null watchStream.' + ); + assert( + this.writeStream == null, + 'enableNetwork() called with non-null writeStream.' + ); + + // Create new streams (but note they're not started yet). this.watchStream = this.datastore.newPersistentWatchStream({ onOpen: this.onWatchStreamOpen.bind(this), onClose: this.onWatchStreamClose.bind(this), @@ -208,15 +224,38 @@ export class RemoteStore { // Load any saved stream token from persistent storage return this.localStore.getLastStreamToken().then(token => { this.writeStream.lastStreamToken = token; + + if (this.shouldStartWatchStream()) { + this.startWatchStream(); + } + + this.updateAndBroadcastOnlineState(OnlineState.Unknown); + + return this.fillWritePipeline(); // This may start the writeStream. }); } - shutdown(): Promise { - log.debug(LOG_TAG, 'RemoteStore shutting down.'); - this.cleanupWatchStreamState(); - this.writeStream.stop(); + /** Temporarily disables the network. The network can be re-enabled using enableNetwork(). */ + disableNetwork(): Promise { + this.updateAndBroadcastOnlineState(OnlineState.Failed); + + // NOTE: We're guaranteed not to get any further events from these streams (not even a close + // event). this.watchStream.stop(); + this.writeStream.stop(); + + this.cleanUpWatchStreamState(); + this.cleanUpWriteStreamState(); + this.writeStream = null; + this.watchStream = null; + + return Promise.resolve(); + } + + shutdown(): Promise { + log.debug(LOG_TAG, 'RemoteStore shutting down.'); + this.disableNetwork(); return Promise.resolve(undefined); } @@ -228,11 +267,12 @@ export class RemoteStore { ); // Mark this as something the client is currently listening for. this.listenTargets[queryData.targetId] = queryData; - if (this.watchStream.isOpen()) { - this.sendWatchRequest(queryData); - } else if (!this.watchStream.isStarted()) { + + if (this.shouldStartWatchStream()) { // The listen will be sent in onWatchStreamOpen this.startWatchStream(); + } else if (this.isNetworkEnabled() && this.watchStream.isOpen()) { + this.sendWatchRequest(queryData); } } @@ -244,7 +284,7 @@ export class RemoteStore { ); const queryData = this.listenTargets[targetId]; delete this.listenTargets[targetId]; - if (this.watchStream.isOpen()) { + if (this.isNetworkEnabled() && this.watchStream.isOpen()) { this.sendUnwatchRequest(targetId); } } @@ -279,10 +319,9 @@ export class RemoteStore { } private startWatchStream(): void { - assert(!this.watchStream.isStarted(), "Can't restart started watch stream"); assert( this.shouldStartWatchStream(), - 'Tried to start watch stream even though it should not be started' + 'startWriteStream() called when shouldStartWatchStream() is false.' ); this.watchStream.start(); } @@ -292,10 +331,14 @@ export class RemoteStore { * active targets trying to be listened too */ private shouldStartWatchStream(): boolean { - return !objUtils.isEmpty(this.listenTargets); + return ( + this.isNetworkEnabled() && + !this.watchStream.isStarted() && + !objUtils.isEmpty(this.listenTargets) + ); } - private cleanupWatchStreamState(): void { + private cleanUpWatchStreamState(): void { // If the connection is closed then we'll never get a snapshot version for // the accumulated changes and so we'll never be able to complete the batch. // When we start up again the server is going to resend these changes @@ -314,7 +357,12 @@ export class RemoteStore { } private onWatchStreamClose(error: FirestoreError | null): Promise { - this.cleanupWatchStreamState(); + assert( + this.isNetworkEnabled(), + 'onWatchStreamClose() should only be called when the network is enabled' + ); + + this.cleanUpWatchStreamState(); // If there was an error, retry the connection. if (this.shouldStartWatchStream()) { @@ -510,6 +558,11 @@ export class RemoteStore { return promiseChain; } + cleanUpWriteStreamState() { + this.lastBatchSeen = BATCHID_UNKNOWN; + this.pendingWrites = []; + } + /** * Notifies that there are new mutations to process in the queue. This is * typically called by SyncEngine after it has sent mutations to LocalStore. @@ -543,7 +596,9 @@ export class RemoteStore { * writes complete the backend will be able to accept more. */ canWriteMutations(): boolean { - return this.pendingWrites.length < MAX_PENDING_WRITES; + return ( + this.isNetworkEnabled() && this.pendingWrites.length < MAX_PENDING_WRITES + ); } // For testing @@ -565,15 +620,26 @@ export class RemoteStore { this.pendingWrites.push(batch); - if (!this.writeStream.isStarted()) { + if (this.shouldStartWriteStream()) { this.startWriteStream(); - } else if (this.writeStream.handshakeComplete) { + } else if (this.isNetworkEnabled() && this.writeStream.handshakeComplete) { this.writeStream.writeMutations(batch.mutations); } } + private shouldStartWriteStream(): boolean { + return ( + this.isNetworkEnabled() && + !this.writeStream.isStarted() && + this.pendingWrites.length > 0 + ); + } + private startWriteStream(): void { - assert(!this.writeStream.isStarted(), "Can't restart started write stream"); + assert( + this.shouldStartWriteStream(), + 'startWriteStream() called when shouldStartWriteStream() is false.' + ); this.writeStream.start(); } @@ -632,6 +698,11 @@ export class RemoteStore { } private onWriteStreamClose(error?: FirestoreError): Promise { + assert( + this.isNetworkEnabled(), + 'onWriteStreamClose() should only be called when the network is enabled' + ); + // Ignore close if there are no pending writes. if (this.pendingWrites.length > 0) { assert( @@ -653,7 +724,7 @@ export class RemoteStore { return errorHandling.then(() => { // The write stream might have been started by refilling the write // pipeline for failed writes - if (this.pendingWrites.length > 0 && !this.writeStream.isStarted()) { + if (this.shouldStartWriteStream()) { this.startWriteStream(); } }); @@ -713,33 +784,10 @@ export class RemoteStore { handleUserChange(user: User): Promise { log.debug(LOG_TAG, 'RemoteStore changing users: uid=', user.uid); - // Clear pending writes because those are per-user. Watched targets - // persist across users so don't clear those. - this.lastBatchSeen = BATCHID_UNKNOWN; - this.pendingWrites = []; - - // Stop the streams. They promise not to call us back. - this.watchStream.stop(); - this.writeStream.stop(); - - this.cleanupWatchStreamState(); - - // Create new streams (but note they're not started yet). - return this.setupStreams() - .then(() => { - // If there are any watchedTargets, properly handle the stream - // restart now that RemoteStore is ready to handle them. - if (this.shouldStartWatchStream()) { - this.startWatchStream(); - } - - // Resume any writes - return this.fillWritePipeline(); - }) - .then(() => { - // User change moves us back to the unknown state because we might - // not want to re-open the stream - this.setOnlineStateToUnknown(); - }); + // Tear down and re-create our network streams. This will ensure we get a fresh auth token + // for the new user and re-fill the write pipeline with new mutations from the LocalStore + // (since mutations are per-user). + this.disableNetwork(); + return this.enableNetwork(); } } diff --git a/packages/firestore/test/integration/api/database.test.ts b/packages/firestore/test/integration/api/database.test.ts index b4f1eb099ef..5068d44c8f7 100644 --- a/packages/firestore/test/integration/api/database.test.ts +++ b/packages/firestore/test/integration/api/database.test.ts @@ -21,6 +21,7 @@ import { Deferred } from '../../../src/util/promise'; import { asyncIt } from '../../util/helpers'; import firebase from '../util/firebase_export'; import { apiDescribe, withTestCollection, withTestDb } from '../util/helpers'; +import { Firestore } from '../../../src/api/database'; apiDescribe('Database', persistence => { asyncIt('can set a document', () => { @@ -514,4 +515,47 @@ apiDescribe('Database', persistence => { return Promise.resolve(); }); }); + + asyncIt('can queue writes while offline', () => { + return withTestDb(persistence, db => { + const docRef = db.collection('rooms').doc(); + const firestoreClient = (docRef.firestore as Firestore)._firestoreClient; + + return firestoreClient + .disableNetwork() + .then(() => { + return Promise.all([ + docRef.set({ foo: 'bar' }), + firestoreClient.enableNetwork() + ]); + }) + .then(() => docRef.get()) + .then(doc => { + expect(doc.data()).to.deep.equal({ foo: 'bar' }); + }); + }); + }); + + asyncIt('can get documents while offline', () => { + return withTestDb(persistence, db => { + const docRef = db.collection('rooms').doc(); + const firestoreClient = (docRef.firestore as Firestore)._firestoreClient; + + return firestoreClient.disableNetwork().then(() => { + const writePromise = docRef.set({ foo: 'bar' }); + + return docRef.get().then(snapshot => { + expect(snapshot.metadata.fromCache).to.be.true; + return firestoreClient.enableNetwork().then(() => { + return writePromise.then(() => { + docRef.get().then(doc => { + expect(snapshot.metadata.fromCache).to.be.false; + expect(doc.data()).to.deep.equal({ foo: 'bar' }); + }); + }); + }); + }); + }); + }); + }); }); diff --git a/packages/firestore/test/integration/api/query.test.ts b/packages/firestore/test/integration/api/query.test.ts index 47d55078f94..0e5f04d6b2a 100644 --- a/packages/firestore/test/integration/api/query.test.ts +++ b/packages/firestore/test/integration/api/query.test.ts @@ -21,6 +21,8 @@ import { addEqualityMatcher } from '../../util/equality_matcher'; import { asyncIt, EventsAccumulator, toDataArray } from '../../util/helpers'; import firebase from '../util/firebase_export'; import { apiDescribe, withTestCollection, withTestDbs } from '../util/helpers'; +import { Firestore } from '../../../src/api/database'; +import { Deferred } from '../../../src/util/promise'; apiDescribe('Queries', persistence => { addEqualityMatcher(); @@ -451,4 +453,28 @@ apiDescribe('Queries', persistence => { }); }); }); + + asyncIt('can query while reconnecting to network', () => { + return withTestCollection(persistence, /* docs= */ {}, coll => { + const firestoreClient = (coll.firestore as Firestore)._firestoreClient; + + const deferred = new Deferred(); + + const unregister = coll.onSnapshot( + { includeQueryMetadataChanges: true }, + snapshot => { + if (!snapshot.empty && !snapshot.metadata.fromCache) { + deferred.resolve(); + } + } + ); + + firestoreClient.disableNetwork().then(() => { + coll.doc().set({ a: 1 }); + firestoreClient.enableNetwork(); + }); + + return deferred.promise.then(unregister); + }); + }); });