Skip to content

Commit

Permalink
Merge 1a1e571 into 0eeb71f
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Aug 15, 2019
2 parents 0eeb71f + 1a1e571 commit b2b2907
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 409 deletions.
48 changes: 15 additions & 33 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1235,54 +1235,36 @@ export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
txn: PersistenceTransaction,
upperBound: ListenSequenceNumber
): PersistencePromise<number> {
let count = 0;
let bytesRemoved = 0;
const documentCache = this.db.getRemoteDocumentCache();
const changeBuffer = documentCache.newChangeBuffer();

const promises: Array<PersistencePromise<void>> = [];
let documentCount = 0;

const iteration = this.forEachOrphanedDocument(
txn,
(docKey, sequenceNumber) => {
if (sequenceNumber <= upperBound) {
const p = this.isPinned(txn, docKey).next(isPinned => {
if (!isPinned) {
count++;
return this.removeOrphanedDocument(txn, docKey).next(
documentBytes => {
bytesRemoved += documentBytes;
}
);
documentCount++;
// Our size accounting requires us to read all documents before
// removing them.
return changeBuffer.getEntry(txn, docKey).next(() => {
changeBuffer.removeEntry(docKey);
return documentTargetStore(txn).delete(sentinelKey(docKey));
});
}
});
promises.push(p);
}
}
);
// Wait for iteration first to make sure we have a chance to add all of the
// removal promises to the array.

return iteration
.next(() => PersistencePromise.waitFor(promises))
.next(() =>
this.db.getRemoteDocumentCache().updateSize(txn, -bytesRemoved)
)
.next(() => count);
}

/**
* Clears a document from the cache. The document is assumed to be orphaned, so target-document
* associations are not queried. We remove it from the remote document cache, as well as remove
* its sentinel row.
*/
private removeOrphanedDocument(
txn: PersistenceTransaction,
docKey: DocumentKey
): PersistencePromise<number> {
let totalBytesRemoved = 0;
const documentCache = this.db.getRemoteDocumentCache();
return PersistencePromise.waitFor([
documentTargetStore(txn).delete(sentinelKey(docKey)),
documentCache.removeEntry(txn, docKey).next(bytesRemoved => {
totalBytesRemoved += bytesRemoved;
})
]).next(() => totalBytesRemoved);
.next(() => changeBuffer.apply(txn))
.next(() => documentCount);
}

removeTarget(
Expand Down
239 changes: 130 additions & 109 deletions packages/firestore/src/local/indexeddb_remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import { PersistencePromise } from './persistence_promise';
import { RemoteDocumentCache } from './remote_document_cache';
import { RemoteDocumentChangeBuffer } from './remote_document_change_buffer';
import { SimpleDb, SimpleDbStore, SimpleDbTransaction } from './simple_db';
import { ObjectMap } from '../util/obj_map';

const REMOTE_DOCUMENT_CHANGE_MISSING_ERR_MSG =
'The remote document changelog no longer contains all changes for all ' +
Expand Down Expand Up @@ -95,66 +96,59 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
}

/**
* Adds the supplied entries to the cache. Adds the given size delta to the cached size.
* Adds the supplied entries to the cache.
*
* All calls of `addEntry` are required to go through the RemoteDocumentChangeBuffer
* returned by `newChangeBuffer()` to ensure proper accounting of metadata.
*/
addEntries(
private addEntry(
transaction: PersistenceTransaction,
entries: Array<{ key: DocumentKey; doc: DbRemoteDocument }>,
sizeDelta: number
key: DocumentKey,
doc: DbRemoteDocument
): PersistencePromise<void> {
const promises: Array<PersistencePromise<void>> = [];

if (entries.length > 0) {
const documentStore = remoteDocumentsStore(transaction);
let changedKeys = documentKeySet();
for (const { key, doc } of entries) {
promises.push(documentStore.put(dbKey(key), doc));
changedKeys = changedKeys.add(key);

promises.push(
this.indexManager.addToCollectionParentIndex(
transaction,
key.path.popLast()
)
);
}

if (this.keepDocumentChangeLog) {
promises.push(
documentChangesStore(transaction).put({
changes: this.serializer.toDbResourcePaths(changedKeys)
})
);
}

promises.push(this.updateSize(transaction, sizeDelta));
}

return PersistencePromise.waitFor(promises);
const documentStore = remoteDocumentsStore(transaction);
return documentStore.put(dbKey(key), doc).next(() => {
this.indexManager.addToCollectionParentIndex(
transaction,
key.path.popLast()
);
});
}

/**
* Removes a document from the cache. Note that this method does *not* do any
* size accounting. It is the responsibility of the caller to count the bytes removed
* and issue a final updateSize() call after removing documents.
* Removes a document from the cache.
*
* @param documentKey The key of the document to remove
* @return The size of the document that was removed.
* All calls of `removeEntry` are required to go through the RemoteDocumentChangeBuffer
* returned by `newChangeBuffer()` to ensure proper accounting of metadata.
*/
removeEntry(
private removeEntry(
transaction: PersistenceTransaction,
documentKey: DocumentKey
): PersistencePromise<number> {
// We don't need to keep changelog for these removals since `removeEntry` is
// only used for garbage collection.
): PersistencePromise<void> {
const store = remoteDocumentsStore(transaction);
const key = dbKey(documentKey);
return store.get(key).next(document => {
if (document) {
return store.delete(key).next(() => dbDocumentSize(document));
} else {
return PersistencePromise.resolve(0);
}
return store.delete(key);
}

/**
* Updates the document change log and adds the given delta to the cached current size.
* Callers to `addEntry()` and `removeEntry()` *must* call this afterwards to update the
* cache's metadata.
*/
private updateMetadata(
transaction: PersistenceTransaction,
changedKeys: DocumentKeySet,
sizeDelta: number
): PersistencePromise<void> {
return this.getMetadata(transaction).next(metadata => {
metadata.byteSize += sizeDelta;
return this.setMetadata(transaction, metadata).next(() => {
if (this.keepDocumentChangeLog) {
return documentChangesStore(transaction).put({
changes: this.serializer.toDbResourcePaths(changedKeys)
});
}
});
});
}

Expand Down Expand Up @@ -425,7 +419,7 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
}

newChangeBuffer(): RemoteDocumentChangeBuffer {
return new IndexedDbRemoteDocumentChangeBuffer(this);
return new IndexedDbRemoteDocumentCache.RemoteDocumentChangeBuffer(this);
}

getSize(txn: PersistenceTransaction): PersistencePromise<number> {
Expand All @@ -451,20 +445,95 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
}

/**
* Adds the given delta to the cached current size. Callers to removeEntry *must* call this
* afterwards to update the size of the cache.
* Handles the details of adding and updating documents in the IndexedDbRemoteDocumentCache.
*
* @param sizeDelta
* Unlike the MemoryRemoteDocumentChangeBuffer, the IndexedDb implementation computes the size
* delta for all submitted changes. This avoids having to re-read all documents from IndexedDb
* when we apply the changes.
*/
updateSize(
txn: PersistenceTransaction,
sizeDelta: number
): PersistencePromise<void> {
return this.getMetadata(txn).next(metadata => {
metadata.byteSize += sizeDelta;
return this.setMetadata(txn, metadata);
});
}
private static RemoteDocumentChangeBuffer = class extends RemoteDocumentChangeBuffer {
// A map of document sizes prior to applying the changes in this buffer.
protected documentSizes: ObjectMap<DocumentKey, number> = new ObjectMap(
key => key.toString()
);

constructor(private readonly documentCache: IndexedDbRemoteDocumentCache) {
super();
}

protected applyChanges(
transaction: PersistenceTransaction
): PersistencePromise<void> {
const promises: Array<PersistencePromise<void>> = [];

let sizeDelta = 0;
let changedKeys = documentKeySet();

this.changes.forEach((key, maybeDocument) => {
const previousSize = this.documentSizes.get(key);
assert(
previousSize !== undefined,
`Cannot modify a document that wasn't read (for ${key})`
);
if (maybeDocument) {
const doc = this.documentCache.serializer.toDbRemoteDocument(
maybeDocument
);
const size = dbDocumentSize(doc);
sizeDelta += size - previousSize!;
promises.push(this.documentCache.addEntry(transaction, key, doc));
} else {
sizeDelta -= previousSize!;
promises.push(this.documentCache.removeEntry(transaction, key));
}

changedKeys = changedKeys.add(key);
});

promises.push(
this.documentCache.updateMetadata(transaction, changedKeys, sizeDelta)
);

return PersistencePromise.waitFor(promises);
}

protected getFromCache(
transaction: PersistenceTransaction,
documentKey: DocumentKey
): PersistencePromise<MaybeDocument | null> {
// Record the size of everything we load from the cache so we can compute a delta later.
return this.documentCache
.getSizedEntry(transaction, documentKey)
.next(getResult => {
if (getResult === null) {
this.documentSizes.set(documentKey, 0);
return null;
} else {
this.documentSizes.set(documentKey, getResult.size);
return getResult.maybeDocument;
}
});
}

protected getAllFromCache(
transaction: PersistenceTransaction,
documentKeys: DocumentKeySet
): PersistencePromise<NullableMaybeDocumentMap> {
// Record the size of everything we load from the cache so we can compute
// a delta later.
return this.documentCache
.getSizedEntries(transaction, documentKeys)
.next(({ maybeDocuments, sizeMap }) => {
// Note: `getAllFromCache` returns two maps instead of a single map from
// keys to `DocumentSizeEntry`s. This is to allow returning the
// `NullableMaybeDocumentMap` directly, without a conversion.
sizeMap.forEach((documentKey, size) => {
this.documentSizes.set(documentKey, size);
});
return maybeDocuments;
});
}
};
}

function documentGlobalStore(
Expand All @@ -476,54 +545,6 @@ function documentGlobalStore(
>(txn, DbRemoteDocumentGlobal.store);
}

/**
* Handles the details of adding and updating documents in the IndexedDbRemoteDocumentCache
*/
class IndexedDbRemoteDocumentChangeBuffer extends RemoteDocumentChangeBuffer {
constructor(private readonly documentCache: IndexedDbRemoteDocumentCache) {
super();
}

protected applyChanges(
transaction: PersistenceTransaction
): PersistencePromise<void> {
const changes = this.assertChanges();
let delta = 0;
const toApply: Array<{ doc: DbRemoteDocument; key: DocumentKey }> = [];
changes.forEach((key, maybeDocument) => {
const doc = this.documentCache.serializer.toDbRemoteDocument(
maybeDocument
);
const previousSize = this.documentSizes.get(key);
// NOTE: if we ever decide we need to support doing writes without
// reading first, this assert will need to change to do the read automatically.
assert(
previousSize !== undefined,
`Attempting to change document ${key.toString()} without having read it first`
);
const size = dbDocumentSize(doc);
delta += size - previousSize!;
toApply.push({ key, doc });
});

return this.documentCache.addEntries(transaction, toApply, delta);
}

protected getFromCache(
transaction: PersistenceTransaction,
documentKey: DocumentKey
): PersistencePromise<DocumentSizeEntry | null> {
return this.documentCache.getSizedEntry(transaction, documentKey);
}

protected getAllFromCache(
transaction: PersistenceTransaction,
documentKeys: DocumentKeySet
): PersistencePromise<DocumentSizeEntries> {
return this.documentCache.getSizedEntries(transaction, documentKeys);
}
}

export function isDocumentChangeMissingError(err: FirestoreError): boolean {
return (
err.code === Code.DATA_LOSS &&
Expand Down
14 changes: 12 additions & 2 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
maybeDocumentMap,
MaybeDocumentMap
} from '../model/collections';
import { MaybeDocument } from '../model/document';
import { MaybeDocument, NoDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation, PatchMutation, Precondition } from '../model/mutation';
import {
Expand Down Expand Up @@ -521,13 +521,23 @@ export class LocalStore {
// resolution failing).
if (
existingDoc == null ||
doc.version.isEqual(SnapshotVersion.MIN) ||
(authoritativeUpdates.has(doc.key) &&
!existingDoc.hasPendingWrites) ||
doc.version.compareTo(existingDoc.version) >= 0
) {
// If a document update isn't authoritative, make sure we don't apply an old document
// version to the remote cache.
documentBuffer.addEntry(doc);
changedDocs = changedDocs.insert(key, doc);
} else if (
doc instanceof NoDocument &&
doc.version.isEqual(SnapshotVersion.MIN)
) {
// NoDocuments with SnapshotVersion.MIN are used in manufactured events (e.g. in the
// case of a limbo document resolution failing). We remove these documents from cache
// since we lost access.
documentBuffer.removeEntry(key);
changedDocs = changedDocs.insert(key, doc);
} else {
log.debug(
LOG_TAG,
Expand Down
Loading

0 comments on commit b2b2907

Please sign in to comment.