diff --git a/packages/core/src/Util/Reachability.ts b/packages/core/src/Util/Reachability.ts index f71bae65cfd..e1f0e32dbba 100644 --- a/packages/core/src/Util/Reachability.ts +++ b/packages/core/src/Util/Reachability.ts @@ -15,24 +15,22 @@ export default class ReachabilityNavigator implements Reachability { return Observable.from([{ online: true }]); } - return new Observable(observer => { - const online = isWebWorker() - ? self.navigator.onLine - : window.navigator.onLine; + const globalObj = isWebWorker() ? self : window; - observer.next({ online }); + return new Observable(observer => { + observer.next({ online: globalObj.navigator.onLine }); const notifyOnline = () => observer.next({ online: true }); const notifyOffline = () => observer.next({ online: false }); - window.addEventListener('online', notifyOnline); - window.addEventListener('offline', notifyOffline); + globalObj.addEventListener('online', notifyOnline); + globalObj.addEventListener('offline', notifyOffline); ReachabilityNavigator._observers.push(observer); return () => { - window.removeEventListener('online', notifyOnline); - window.removeEventListener('offline', notifyOffline); + globalObj.removeEventListener('online', notifyOnline); + globalObj.removeEventListener('offline', notifyOffline); ReachabilityNavigator._observers = ReachabilityNavigator._observers.filter( _observer => _observer !== observer diff --git a/packages/datastore/__tests__/AsyncStorage.ts b/packages/datastore/__tests__/AsyncStorage.ts index 28e73ebc587..c083d75613f 100644 --- a/packages/datastore/__tests__/AsyncStorage.ts +++ b/packages/datastore/__tests__/AsyncStorage.ts @@ -3,7 +3,7 @@ import { DataStore as DataStoreType, initSchema as initSchemaType, } from '../src/datastore/datastore'; -import { default as AsyncStorageAdapterType } from '../src/storage/adapter/asyncstorage'; +import { default as AsyncStorageAdapterType } from '../src/storage/adapter/AsyncStorageAdapter'; import { DATASTORE, USER } from '../src/util'; import { Author as AuthorType, @@ -93,7 +93,7 @@ function setUpSchema(beforeSetUp?: Function) { ({ default: AsyncStorageAdapter, - } = require('../src/storage/adapter/asyncstorage')); + } = require('../src/storage/adapter/AsyncStorageAdapter')); ({ initSchema, DataStore } = require('../src/datastore/datastore')); diff --git a/packages/datastore/__tests__/graphql.ts b/packages/datastore/__tests__/graphql.ts index e3f2c569ec9..89591df313d 100644 --- a/packages/datastore/__tests__/graphql.ts +++ b/packages/datastore/__tests__/graphql.ts @@ -4,7 +4,9 @@ import { buildGraphQLOperation, buildSubscriptionGraphQLOperation, TransformerMutationType, + predicateToGraphQLFilter, } from '../src/sync/utils'; +import { PredicatesGroup } from '../src/types'; import { newSchema } from './schema'; const postSelectionSet = ` @@ -39,8 +41,9 @@ describe('DataStore GraphQL generation', () => { $limit: Int $nextToken: String $lastSync: AWSTimestamp + $filter: ModelPostFilterInput ) { - syncPosts(limit: $limit, nextToken: $nextToken, lastSync: $lastSync) { + syncPosts(limit: $limit, nextToken: $nextToken, lastSync: $lastSync, filter: $filter) { items { ${postSelectionSet} } @@ -169,3 +172,108 @@ describe('DataStore GraphQL generation', () => { } ); }); + +describe('DataStore PredicateGroups to GraphQL filter', () => { + test('Single field', () => { + const group: PredicatesGroup = { + type: 'and', + predicates: [{ field: 'someField', operator: 'eq', operand: 'value' }], + }; + + const groupExpected = { and: [{ someField: { eq: 'value' } }] }; + + const gqlResult = predicateToGraphQLFilter(group); + + // stringifying to normalize whitespace and escape chars + expect(JSON.stringify(gqlResult)).toStrictEqual( + JSON.stringify(groupExpected) + ); + }); + + test('Multiple field', () => { + const group: PredicatesGroup = { + type: 'and', + predicates: [ + { field: 'someField', operator: 'eq', operand: 'value' }, + { field: 'someOtherField', operator: 'gt', operand: 'value2' }, + ], + }; + + const groupExpected = { + and: [ + { someField: { eq: 'value' } }, + { someOtherField: { gt: 'value2' } }, + ], + }; + + const gqlResult = predicateToGraphQLFilter(group); + + expect(JSON.stringify(gqlResult)).toStrictEqual( + JSON.stringify(groupExpected) + ); + }); + + test('Nested field', () => { + const group: PredicatesGroup = { + type: 'and', + predicates: [ + { field: 'someField', operator: 'eq', operand: 'value' }, + { + type: 'or', + predicates: [ + { field: 'someOtherField', operator: 'gt', operand: 'value2' }, + { field: 'orField', operator: 'contains', operand: 'str' }, + ], + }, + ], + }; + + const groupExpected = { + and: [ + { someField: { eq: 'value' } }, + { + or: [ + { someOtherField: { gt: 'value2' } }, + { orField: { contains: 'str' } }, + ], + }, + ], + }; + + const gqlResult = predicateToGraphQLFilter(group); + + expect(JSON.stringify(gqlResult)).toStrictEqual( + JSON.stringify(groupExpected) + ); + }); + + test('Nested not', () => { + const group: PredicatesGroup = { + type: 'not', + predicates: [ + { + type: 'or', + predicates: [ + { field: 'someOtherField', operator: 'gt', operand: 'value2' }, + { field: 'orField', operator: 'contains', operand: 'str' }, + ], + }, + ], + }; + + const groupExpected = { + not: { + or: [ + { someOtherField: { gt: 'value2' } }, + { orField: { contains: 'str' } }, + ], + }, + }; + + const gqlResult = predicateToGraphQLFilter(group); + + expect(JSON.stringify(gqlResult)).toStrictEqual( + JSON.stringify(groupExpected) + ); + }); +}); diff --git a/packages/datastore/package.json b/packages/datastore/package.json index beb25f4c4ba..606da23adfc 100644 --- a/packages/datastore/package.json +++ b/packages/datastore/package.json @@ -51,7 +51,7 @@ "@aws-amplify/api": "^3.2.7", "@aws-amplify/core": "^3.7.0", "@aws-amplify/pubsub": "^3.2.5", - "idb": "5.0.2", + "idb": "5.0.6", "immer": "6.0.1", "ulid": "2.3.0", "uuid": "3.3.2", diff --git a/packages/datastore/src/datastore/datastore.ts b/packages/datastore/src/datastore/datastore.ts index f6c0c6e6438..91e25a13657 100644 --- a/packages/datastore/src/datastore/datastore.ts +++ b/packages/datastore/src/datastore/datastore.ts @@ -38,6 +38,7 @@ import { SyncError, TypeConstructorMap, ErrorHandler, + SyncExpression, } from '../types'; import { DATASTORE, @@ -545,6 +546,9 @@ class DataStore { private storage: Storage; private sync: SyncEngine; private syncPageSize: number; + private syncExpressions: SyncExpression[]; + private syncPredicates: WeakMap>; + private syncModelsUpdated: Set = new Set(); getModuleName() { return 'DataStore'; @@ -579,6 +583,8 @@ class DataStore { if (aws_appsync_graphqlEndpoint) { logger.debug('GraphQL endpoint available', aws_appsync_graphqlEndpoint); + this.syncPredicates = await this.processSyncExpressions(); + this.sync = new SyncEngine( schema, namespaceResolver, @@ -589,7 +595,9 @@ class DataStore { this.maxRecordsToSync, this.syncPageSize, this.conflictHandler, - this.errorHandler + this.errorHandler, + this.syncPredicates, + this.syncModelsUpdated ); // tslint:disable-next-line:max-line-length @@ -681,8 +689,6 @@ class DataStore { modelDefinition, idOrCriteria ); - - logger.debug('after createFromExisting - predicate', predicate); } } @@ -981,6 +987,7 @@ class DataStore { maxRecordsToSync: configMaxRecordsToSync, syncPageSize: configSyncPageSize, fullSyncInterval: configFullSyncInterval, + syncExpressions: configSyncExpressions, ...configFromAmplify } = config; @@ -989,20 +996,25 @@ class DataStore { this.conflictHandler = this.setConflictHandler(config); this.errorHandler = this.setErrorHandler(config); + this.syncExpressions = + (configDataStore && configDataStore.syncExpressions) || + this.syncExpressions || + configSyncExpressions; + this.maxRecordsToSync = (configDataStore && configDataStore.maxRecordsToSync) || this.maxRecordsToSync || - config.maxRecordsToSync; + configMaxRecordsToSync; this.syncPageSize = (configDataStore && configDataStore.syncPageSize) || this.syncPageSize || - config.syncPageSize; + configSyncPageSize; this.fullSyncInterval = (configDataStore && configDataStore.fullSyncInterval) || + this.fullSyncInterval || configFullSyncInterval || - config.fullSyncInterval || 24 * 60; // 1 day }; @@ -1020,6 +1032,20 @@ class DataStore { this.initialized = undefined; // Should re-initialize when start() is called. this.storage = undefined; this.sync = undefined; + this.syncPredicates = undefined; + }; + + stop = async function stop() { + if (this.initialized !== undefined) { + await this.start(); + } + + if (syncSubscription && !syncSubscription.closed) { + syncSubscription.unsubscribe(); + } + + this.initialized = undefined; // Should re-initialize when start() is called. + this.sync = undefined; }; private processPagination( @@ -1066,6 +1092,113 @@ class DataStore { sort: sortPredicate, }; } + + private async processSyncExpressions(): Promise< + WeakMap> + > { + if (!this.syncExpressions || !this.syncExpressions.length) { + return; + } + + const syncPredicates = await Promise.all( + this.syncExpressions.map( + async ( + syncExpression: SyncExpression + ): Promise<[SchemaModel, ModelPredicate]> => { + const { modelConstructor, conditionProducer } = await syncExpression; + const modelDefinition = getModelDefinition(modelConstructor); + + // conditionProducer is either a predicate, e.g. (c) => c.field('eq', 1) + // OR a function/promise that returns a predicate + const condition = await this.unwrapPromise(conditionProducer); + const predicate = this.createFromCondition( + modelDefinition, + condition + ); + + return [modelDefinition, predicate]; + } + ) + ); + + this.compareSyncPredicates(syncPredicates); + + return this.weakMapFromEntries(syncPredicates); + } + + private compareSyncPredicates( + syncPredicates: [SchemaModel, ModelPredicate][] + ) { + if (!this.syncPredicates) { + return; + } + + this.syncModelsUpdated = new Set(); + + syncPredicates.forEach(([modelDefinition, predicate]) => { + const previousPredicate = ModelPredicateCreator.getPredicates( + this.syncPredicates.get(modelDefinition), + false + ); + + const newPredicate = ModelPredicateCreator.getPredicates( + predicate, + false + ); + + const predicateChanged = + JSON.stringify(previousPredicate) !== JSON.stringify(newPredicate); + + predicateChanged && this.syncModelsUpdated.add(modelDefinition.name); + }); + } + + private createFromCondition( + modelDefinition: SchemaModel, + condition: ProducerModelPredicate + ) { + try { + return ModelPredicateCreator.createFromExisting( + modelDefinition, + condition + ); + } catch (error) { + logger.error('Error creating Sync Predicate'); + throw error; + } + } + + private async unwrapPromise( + conditionProducer + ): Promise> { + try { + const condition = await conditionProducer(); + return condition; + } catch (error) { + if (error instanceof TypeError) { + return conditionProducer; + } + throw error; + } + } + + private weakMapFromEntries( + entries: [SchemaModel, ModelPredicate][] + ): WeakMap> { + return entries.reduce((map, [modelDefinition, predicate]) => { + if (map.has(modelDefinition)) { + const { name } = modelDefinition; + logger.warn( + `You can only utilize one Sync Expression per model. + Subsequent sync expressions for the ${name} model will be ignored.` + ); + return map; + } + + map.set(modelDefinition, predicate); + return map; + }, new WeakMap>()); + } } const instance = new DataStore(); diff --git a/packages/datastore/src/storage/adapter/AsyncStorageAdapter.ts b/packages/datastore/src/storage/adapter/AsyncStorageAdapter.ts index 92e3ce58e0e..631c49370c2 100644 --- a/packages/datastore/src/storage/adapter/AsyncStorageAdapter.ts +++ b/packages/datastore/src/storage/adapter/AsyncStorageAdapter.ts @@ -2,7 +2,10 @@ import { ConsoleLogger as Logger } from '@aws-amplify/core'; import AsyncStorageDatabase from './AsyncStorageDatabase'; import { Adapter } from './index'; import { ModelInstanceCreator } from '../../datastore/datastore'; -import { ModelPredicateCreator } from '../../predicates'; +import { + ModelPredicateCreator, + ModelSortPredicateCreator, +} from '../../predicates'; import { InternalSchema, isPredicateObj, @@ -24,6 +27,7 @@ import { isModelConstructor, traverseModel, validatePredicate, + sortCompareFunction, } from '../../util'; const logger = new Logger('DataStore'); @@ -237,6 +241,7 @@ export class AsyncStorageAdapter implements Adapter { ): Promise { const storeName = this.getStorenameForModel(modelConstructor); const namespaceName = this.namespaceResolver(modelConstructor); + const sortSpecified = pagination && pagination.sort; if (predicate) { const predicates = ModelPredicateCreator.getPredicates(predicate); @@ -276,6 +281,15 @@ export class AsyncStorageAdapter implements Adapter { } } + if (sortSpecified) { + const all = await this.db.getAll(storeName); + return await this.load( + namespaceName, + modelConstructor.name, + this.inMemoryPagination(all, pagination) + ); + } + const all = await this.db.getAll(storeName, pagination); return await this.load(namespaceName, modelConstructor.name, all); @@ -286,6 +300,16 @@ export class AsyncStorageAdapter implements Adapter { pagination?: PaginationInput ): T[] { if (pagination) { + if (pagination.sort) { + const sortPredicates = ModelSortPredicateCreator.getPredicates( + pagination.sort + ); + + if (sortPredicates.length) { + const compareFn = sortCompareFunction(sortPredicates); + records.sort(compareFn); + } + } const { page = 0, limit = 0 } = pagination; const start = Math.max(0, page * limit) || 0; @@ -293,7 +317,6 @@ export class AsyncStorageAdapter implements Adapter { return records.slice(start, end); } - return records; } @@ -381,10 +404,7 @@ export class AsyncStorageAdapter implements Adapter { const isValid = validatePredicate(fromDB, type, predicateObjs); if (!isValid) { const msg = 'Conditional update failed'; - logger.error(msg, { - model: fromDB, - condition: predicateObjs, - }); + logger.error(msg, { model: fromDB, condition: predicateObjs }); throw new Error(msg); } diff --git a/packages/datastore/src/storage/adapter/IndexedDBAdapter.ts b/packages/datastore/src/storage/adapter/IndexedDBAdapter.ts index 149b1bd3fcc..2eb2c6c4766 100644 --- a/packages/datastore/src/storage/adapter/IndexedDBAdapter.ts +++ b/packages/datastore/src/storage/adapter/IndexedDBAdapter.ts @@ -1,7 +1,10 @@ import { ConsoleLogger as Logger } from '@aws-amplify/core'; import * as idb from 'idb'; import { ModelInstanceCreator } from '../../datastore/datastore'; -import { ModelPredicateCreator } from '../../predicates'; +import { + ModelPredicateCreator, + ModelSortPredicateCreator, +} from '../../predicates'; import { InternalSchema, isPredicateObj, @@ -24,6 +27,7 @@ import { isPrivateMode, traverseModel, validatePredicate, + sortCompareFunction, } from '../../util'; import { Adapter } from './index'; @@ -362,6 +366,7 @@ class IndexedDBAdapter implements Adapter { await this.checkPrivate(); const storeName = this.getStorenameForModel(modelConstructor); const namespaceName = this.namespaceResolver(modelConstructor); + const sortSpecified = pagination && pagination.sort; if (predicate) { const predicates = ModelPredicateCreator.getPredicates(predicate); @@ -403,6 +408,15 @@ class IndexedDBAdapter implements Adapter { } } + if (sortSpecified) { + const all = await this.db.getAll(storeName); + return await this.load( + namespaceName, + modelConstructor.name, + this.inMemoryPagination(all, pagination) + ); + } + return await this.load( namespaceName, modelConstructor.name, @@ -415,6 +429,17 @@ class IndexedDBAdapter implements Adapter { pagination?: PaginationInput ): T[] { if (pagination) { + if (pagination.sort) { + const sortPredicates = ModelSortPredicateCreator.getPredicates( + pagination.sort + ); + + if (sortPredicates.length) { + const compareFn = sortCompareFunction(sortPredicates); + records.sort(compareFn); + } + } + const { page = 0, limit = 0 } = pagination; const start = Math.max(0, page * limit) || 0; diff --git a/packages/datastore/src/storage/adapter/asyncstorage.ts b/packages/datastore/src/storage/adapter/asyncstorage.ts deleted file mode 100644 index 631c49370c2..00000000000 --- a/packages/datastore/src/storage/adapter/asyncstorage.ts +++ /dev/null @@ -1,590 +0,0 @@ -import { ConsoleLogger as Logger } from '@aws-amplify/core'; -import AsyncStorageDatabase from './AsyncStorageDatabase'; -import { Adapter } from './index'; -import { ModelInstanceCreator } from '../../datastore/datastore'; -import { - ModelPredicateCreator, - ModelSortPredicateCreator, -} from '../../predicates'; -import { - InternalSchema, - isPredicateObj, - ModelInstanceMetadata, - ModelPredicate, - NamespaceResolver, - OpType, - PaginationInput, - PersistentModel, - PersistentModelConstructor, - PredicateObject, - QueryOne, - RelationType, -} from '../../types'; -import { - exhaustiveCheck, - getIndex, - getIndexFromAssociation, - isModelConstructor, - traverseModel, - validatePredicate, - sortCompareFunction, -} from '../../util'; - -const logger = new Logger('DataStore'); - -export class AsyncStorageAdapter implements Adapter { - private schema: InternalSchema; - private namespaceResolver: NamespaceResolver; - private modelInstanceCreator: ModelInstanceCreator; - private getModelConstructorByModelName: ( - namsespaceName: string, - modelName: string - ) => PersistentModelConstructor; - private db: AsyncStorageDatabase; - private initPromise: Promise; - private resolve: (value?: any) => void; - private reject: (value?: any) => void; - - private getStorenameForModel( - modelConstructor: PersistentModelConstructor - ) { - const namespace = this.namespaceResolver(modelConstructor); - const { name: modelName } = modelConstructor; - - return this.getStorename(namespace, modelName); - } - - private getStorename(namespace: string, modelName: string) { - const storeName = `${namespace}_${modelName}`; - - return storeName; - } - - async setUp( - theSchema: InternalSchema, - namespaceResolver: NamespaceResolver, - modelInstanceCreator: ModelInstanceCreator, - getModelConstructorByModelName: ( - namsespaceName: string, - modelName: string - ) => PersistentModelConstructor - ) { - if (!this.initPromise) { - this.initPromise = new Promise((res, rej) => { - this.resolve = res; - this.reject = rej; - }); - } else { - await this.initPromise; - return; - } - this.schema = theSchema; - this.namespaceResolver = namespaceResolver; - this.modelInstanceCreator = modelInstanceCreator; - this.getModelConstructorByModelName = getModelConstructorByModelName; - try { - if (!this.db) { - this.db = new AsyncStorageDatabase(); - await this.db.init(); - this.resolve(); - } - } catch (error) { - this.reject(error); - } - } - - async save( - model: T, - condition?: ModelPredicate - ): Promise<[T, OpType.INSERT | OpType.UPDATE][]> { - const modelConstructor = Object.getPrototypeOf(model) - .constructor as PersistentModelConstructor; - const storeName = this.getStorenameForModel(modelConstructor); - const connectedModels = traverseModel( - modelConstructor.name, - model, - this.schema.namespaces[this.namespaceResolver(modelConstructor)], - this.modelInstanceCreator, - this.getModelConstructorByModelName - ); - const namespaceName = this.namespaceResolver(modelConstructor); - const set = new Set(); - const connectionStoreNames = Object.values(connectedModels).map( - ({ modelName, item, instance }) => { - const storeName = this.getStorename(namespaceName, modelName); - set.add(storeName); - return { storeName, item, instance }; - } - ); - const fromDB = await this.db.get(model.id, storeName); - - if (condition && fromDB) { - const predicates = ModelPredicateCreator.getPredicates(condition); - const { predicates: predicateObjs, type } = predicates; - - const isValid = validatePredicate(fromDB, type, predicateObjs); - - if (!isValid) { - const msg = 'Conditional update failed'; - logger.error(msg, { model: fromDB, condition: predicateObjs }); - - throw new Error(msg); - } - } - - const result: [T, OpType.INSERT | OpType.UPDATE][] = []; - - for await (const resItem of connectionStoreNames) { - const { storeName, item, instance } = resItem; - - const { id } = item; - - const opType: OpType = (await this.db.get(id, storeName)) - ? OpType.UPDATE - : OpType.INSERT; - - if (id === model.id) { - await this.db.save(item, storeName); - - result.push([instance, opType]); - } else { - if (opType === OpType.INSERT) { - await this.db.save(item, storeName); - - result.push([instance, opType]); - } - } - } - - return result; - } - - private async load( - namespaceName: string, - srcModelName: string, - records: T[] - ): Promise { - const namespace = this.schema.namespaces[namespaceName]; - const relations = namespace.relationships[srcModelName].relationTypes; - const connectionStoreNames = relations.map(({ modelName }) => { - return this.getStorename(namespaceName, modelName); - }); - const modelConstructor = this.getModelConstructorByModelName( - namespaceName, - srcModelName - ); - - if (connectionStoreNames.length === 0) { - return records.map(record => - this.modelInstanceCreator(modelConstructor, record) - ); - } - - for await (const relation of relations) { - const { fieldName, modelName, targetName, relationType } = relation; - const storeName = this.getStorename(namespaceName, modelName); - const modelConstructor = this.getModelConstructorByModelName( - namespaceName, - modelName - ); - - switch (relationType) { - case 'HAS_ONE': - for await (const recordItem of records) { - if (recordItem[fieldName]) { - const connectionRecord = await this.db.get( - recordItem[fieldName], - storeName - ); - - recordItem[fieldName] = - connectionRecord && - this.modelInstanceCreator(modelConstructor, connectionRecord); - } - } - - break; - case 'BELONGS_TO': - for await (const recordItem of records) { - if (recordItem[targetName]) { - const connectionRecord = await this.db.get( - recordItem[targetName], - storeName - ); - - recordItem[fieldName] = - connectionRecord && - this.modelInstanceCreator(modelConstructor, connectionRecord); - delete recordItem[targetName]; - } - } - - break; - case 'HAS_MANY': - // TODO: Lazy loading - break; - default: - exhaustiveCheck(relationType); - break; - } - } - - return records.map(record => - this.modelInstanceCreator(modelConstructor, record) - ); - } - - async query( - modelConstructor: PersistentModelConstructor, - predicate?: ModelPredicate, - pagination?: PaginationInput - ): Promise { - const storeName = this.getStorenameForModel(modelConstructor); - const namespaceName = this.namespaceResolver(modelConstructor); - const sortSpecified = pagination && pagination.sort; - - if (predicate) { - const predicates = ModelPredicateCreator.getPredicates(predicate); - if (predicates) { - const { predicates: predicateObjs, type } = predicates; - const idPredicate = - predicateObjs.length === 1 && - (predicateObjs.find( - p => isPredicateObj(p) && p.field === 'id' && p.operator === 'eq' - ) as PredicateObject); - - if (idPredicate) { - const { operand: id } = idPredicate; - - const record = await this.db.get(id, storeName); - - if (record) { - const [x] = await this.load(namespaceName, modelConstructor.name, [ - record, - ]); - return [x]; - } - return []; - } - - const all = await this.db.getAll(storeName); - - const filtered = predicateObjs - ? all.filter(m => validatePredicate(m, type, predicateObjs)) - : all; - - return await this.load( - namespaceName, - modelConstructor.name, - this.inMemoryPagination(filtered, pagination) - ); - } - } - - if (sortSpecified) { - const all = await this.db.getAll(storeName); - return await this.load( - namespaceName, - modelConstructor.name, - this.inMemoryPagination(all, pagination) - ); - } - - const all = await this.db.getAll(storeName, pagination); - - return await this.load(namespaceName, modelConstructor.name, all); - } - - private inMemoryPagination( - records: T[], - pagination?: PaginationInput - ): T[] { - if (pagination) { - if (pagination.sort) { - const sortPredicates = ModelSortPredicateCreator.getPredicates( - pagination.sort - ); - - if (sortPredicates.length) { - const compareFn = sortCompareFunction(sortPredicates); - records.sort(compareFn); - } - } - const { page = 0, limit = 0 } = pagination; - const start = Math.max(0, page * limit) || 0; - - const end = limit > 0 ? start + limit : records.length; - - return records.slice(start, end); - } - return records; - } - - async queryOne( - modelConstructor: PersistentModelConstructor, - firstOrLast: QueryOne = QueryOne.FIRST - ): Promise { - const storeName = this.getStorenameForModel(modelConstructor); - const result = await this.db.getOne(firstOrLast, storeName); - return result && this.modelInstanceCreator(modelConstructor, result); - } - - async delete( - modelOrModelConstructor: T | PersistentModelConstructor, - condition?: ModelPredicate - ): Promise<[T[], T[]]> { - const deleteQueue: { storeName: string; items: T[] }[] = []; - - if (isModelConstructor(modelOrModelConstructor)) { - const modelConstructor = modelOrModelConstructor; - const nameSpace = this.namespaceResolver(modelConstructor); - - // models to be deleted. - const models = await this.query(modelConstructor, condition); - // TODO: refactor this to use a function like getRelations() - const relations = this.schema.namespaces[nameSpace].relationships[ - modelConstructor.name - ].relationTypes; - - if (condition !== undefined) { - await this.deleteTraverse( - relations, - models, - modelConstructor.name, - nameSpace, - deleteQueue - ); - - await this.deleteItem(deleteQueue); - - const deletedModels = deleteQueue.reduce( - (acc, { items }) => acc.concat(items), - [] - ); - return [models, deletedModels]; - } else { - await this.deleteTraverse( - relations, - models, - modelConstructor.name, - nameSpace, - deleteQueue - ); - - await this.deleteItem(deleteQueue); - - const deletedModels = deleteQueue.reduce( - (acc, { items }) => acc.concat(items), - [] - ); - - return [models, deletedModels]; - } - } else { - const model = modelOrModelConstructor; - - const modelConstructor = Object.getPrototypeOf(model) - .constructor as PersistentModelConstructor; - const nameSpace = this.namespaceResolver(modelConstructor); - - const storeName = this.getStorenameForModel(modelConstructor); - if (condition) { - const fromDB = await this.db.get(model.id, storeName); - - if (fromDB === undefined) { - const msg = 'Model instance not found in storage'; - logger.warn(msg, { model }); - - return [[model], []]; - } - - const predicates = ModelPredicateCreator.getPredicates(condition); - const { predicates: predicateObjs, type } = predicates; - - const isValid = validatePredicate(fromDB, type, predicateObjs); - if (!isValid) { - const msg = 'Conditional update failed'; - logger.error(msg, { model: fromDB, condition: predicateObjs }); - - throw new Error(msg); - } - - const relations = this.schema.namespaces[nameSpace].relationships[ - modelConstructor.name - ].relationTypes; - await this.deleteTraverse( - relations, - [model], - modelConstructor.name, - nameSpace, - deleteQueue - ); - } else { - const relations = this.schema.namespaces[nameSpace].relationships[ - modelConstructor.name - ].relationTypes; - - await this.deleteTraverse( - relations, - [model], - modelConstructor.name, - nameSpace, - deleteQueue - ); - } - - await this.deleteItem(deleteQueue); - - const deletedModels = deleteQueue.reduce( - (acc, { items }) => acc.concat(items), - [] - ); - - return [[model], deletedModels]; - } - } - - private async deleteItem( - deleteQueue?: { storeName: string; items: T[] | IDBValidKey[] }[] - ) { - for await (const deleteItem of deleteQueue) { - const { storeName, items } = deleteItem; - - for await (const item of items) { - if (item) { - if (typeof item === 'object') { - const id = item['id']; - await this.db.delete(id, storeName); - } - } - } - } - } - /** - * Populates the delete Queue with all the items to delete - * @param relations - * @param models - * @param srcModel - * @param nameSpace - * @param deleteQueue - */ - private async deleteTraverse( - relations: RelationType[], - models: T[], - srcModel: string, - nameSpace: string, - deleteQueue: { storeName: string; items: T[] }[] - ): Promise { - for await (const rel of relations) { - const { relationType, modelName } = rel; - const storeName = this.getStorename(nameSpace, modelName); - - const index: string = - getIndex( - this.schema.namespaces[nameSpace].relationships[modelName] - .relationTypes, - srcModel - ) || - // if we were unable to find an index via relationTypes - // i.e. for keyName connections, attempt to find one by the - // associatedWith property - getIndexFromAssociation( - this.schema.namespaces[nameSpace].relationships[modelName].indexes, - rel.associatedWith - ); - - switch (relationType) { - case 'HAS_ONE': - for await (const model of models) { - const allRecords = await this.db.getAll(storeName); - const recordToDelete = allRecords.filter( - childItem => childItem[index] === model.id - ); - - await this.deleteTraverse( - this.schema.namespaces[nameSpace].relationships[modelName] - .relationTypes, - recordToDelete, - modelName, - nameSpace, - deleteQueue - ); - } - break; - case 'HAS_MANY': - for await (const model of models) { - const allRecords = await this.db.getAll(storeName); - const childrenArray = allRecords.filter( - childItem => childItem[index] === model.id - ); - - await this.deleteTraverse( - this.schema.namespaces[nameSpace].relationships[modelName] - .relationTypes, - childrenArray, - modelName, - nameSpace, - deleteQueue - ); - } - break; - case 'BELONGS_TO': - // Intentionally blank - break; - default: - exhaustiveCheck(relationType); - break; - } - } - - deleteQueue.push({ - storeName: this.getStorename(nameSpace, srcModel), - items: models.map(record => - this.modelInstanceCreator( - this.getModelConstructorByModelName(nameSpace, srcModel), - record - ) - ), - }); - } - - async clear(): Promise { - await this.db.clear(); - - this.db = undefined; - this.initPromise = undefined; - } - - async batchSave( - modelConstructor: PersistentModelConstructor, - items: ModelInstanceMetadata[] - ): Promise<[T, OpType][]> { - const { name: modelName } = modelConstructor; - const namespaceName = this.namespaceResolver(modelConstructor); - const storeName = this.getStorename(namespaceName, modelName); - - const batch: ModelInstanceMetadata[] = []; - - for (const item of items) { - const { id } = item; - - const connectedModels = traverseModel( - modelConstructor.name, - this.modelInstanceCreator(modelConstructor, item), - this.schema.namespaces[this.namespaceResolver(modelConstructor)], - this.modelInstanceCreator, - this.getModelConstructorByModelName - ); - - const { instance } = connectedModels.find( - ({ instance }) => instance.id === id - ); - - batch.push(instance); - } - - return await this.db.batchSave(storeName, batch); - } -} - -export default new AsyncStorageAdapter(); diff --git a/packages/datastore/src/storage/adapter/getDefaultAdapter/index.native.ts b/packages/datastore/src/storage/adapter/getDefaultAdapter/index.native.ts index 2a6ed7f7062..9ea3fa169ae 100644 --- a/packages/datastore/src/storage/adapter/getDefaultAdapter/index.native.ts +++ b/packages/datastore/src/storage/adapter/getDefaultAdapter/index.native.ts @@ -1,5 +1,5 @@ import { Adapter } from '..'; -import AsyncStorageAdapter from '../asyncstorage'; +import AsyncStorageAdapter from '../AsyncStorageAdapter'; const getDefaultAdapter: () => Adapter = () => { return AsyncStorageAdapter; diff --git a/packages/datastore/src/storage/adapter/getDefaultAdapter/index.ts b/packages/datastore/src/storage/adapter/getDefaultAdapter/index.ts index 421860e4231..c80c1619a0a 100644 --- a/packages/datastore/src/storage/adapter/getDefaultAdapter/index.ts +++ b/packages/datastore/src/storage/adapter/getDefaultAdapter/index.ts @@ -5,10 +5,10 @@ const getDefaultAdapter: () => Adapter = () => { const { isBrowser } = browserOrNode(); if ((isBrowser && window.indexedDB) || (isWebWorker() && self.indexedDB)) { - return require('../indexeddb').default; + return require('../IndexedDBAdapter').default; } - const { AsyncStorageAdapter } = require('../asyncstorage'); + const { AsyncStorageAdapter } = require('../AsyncStorageAdapter'); return new AsyncStorageAdapter(); }; diff --git a/packages/datastore/src/storage/adapter/indexeddb.ts b/packages/datastore/src/storage/adapter/indexeddb.ts deleted file mode 100644 index fcff153f6a1..00000000000 --- a/packages/datastore/src/storage/adapter/indexeddb.ts +++ /dev/null @@ -1,823 +0,0 @@ -import { ConsoleLogger as Logger } from '@aws-amplify/core'; -import * as idb from 'idb'; -import { ModelInstanceCreator } from '../../datastore/datastore'; -import { - ModelPredicateCreator, - ModelSortPredicateCreator, -} from '../../predicates'; -import { - InternalSchema, - isPredicateObj, - ModelInstanceMetadata, - ModelPredicate, - NamespaceResolver, - OpType, - PaginationInput, - PersistentModel, - PersistentModelConstructor, - PredicateObject, - QueryOne, - RelationType, -} from '../../types'; -import { - exhaustiveCheck, - getIndex, - getIndexFromAssociation, - isModelConstructor, - isPrivateMode, - traverseModel, - validatePredicate, - sortCompareFunction, -} from '../../util'; -import { Adapter } from './index'; -import { tsIndexSignature } from '@babel/types'; - -const logger = new Logger('DataStore'); - -const DB_NAME = 'amplify-datastore'; - -class IndexedDBAdapter implements Adapter { - private schema: InternalSchema; - private namespaceResolver: NamespaceResolver; - private modelInstanceCreator: ModelInstanceCreator; - private getModelConstructorByModelName: ( - namsespaceName: string, - modelName: string - ) => PersistentModelConstructor; - private db: idb.IDBPDatabase; - private initPromise: Promise; - private resolve: (value?: any) => void; - private reject: (value?: any) => void; - - private async checkPrivate() { - const isPrivate = await isPrivateMode().then(isPrivate => { - return isPrivate; - }); - if (isPrivate) { - logger.error("IndexedDB not supported in this browser's private mode"); - return Promise.reject( - "IndexedDB not supported in this browser's private mode" - ); - } else { - return Promise.resolve(); - } - } - - private getStorenameForModel( - modelConstructor: PersistentModelConstructor - ) { - const namespace = this.namespaceResolver(modelConstructor); - const { name: modelName } = modelConstructor; - - return this.getStorename(namespace, modelName); - } - - private getStorename(namespace: string, modelName: string) { - const storeName = `${namespace}_${modelName}`; - - return storeName; - } - - async setUp( - theSchema: InternalSchema, - namespaceResolver: NamespaceResolver, - modelInstanceCreator: ModelInstanceCreator, - getModelConstructorByModelName: ( - namsespaceName: string, - modelName: string - ) => PersistentModelConstructor - ) { - await this.checkPrivate(); - if (!this.initPromise) { - this.initPromise = new Promise((res, rej) => { - this.resolve = res; - this.reject = rej; - }); - } else { - await this.initPromise; - } - - this.schema = theSchema; - this.namespaceResolver = namespaceResolver; - this.modelInstanceCreator = modelInstanceCreator; - this.getModelConstructorByModelName = getModelConstructorByModelName; - - try { - if (!this.db) { - const VERSION = 2; - this.db = await idb.openDB(DB_NAME, VERSION, { - upgrade: async (db, oldVersion, newVersion, txn) => { - if (oldVersion === 0) { - Object.keys(theSchema.namespaces).forEach(namespaceName => { - const namespace = theSchema.namespaces[namespaceName]; - - Object.keys(namespace.models).forEach(modelName => { - const storeName = this.getStorename(namespaceName, modelName); - const store = db.createObjectStore(storeName, { - autoIncrement: true, - }); - - const indexes = this.schema.namespaces[namespaceName] - .relationships[modelName].indexes; - indexes.forEach(index => store.createIndex(index, index)); - - store.createIndex('byId', 'id', { unique: true }); - }); - }); - - return; - } - - if (oldVersion === 1 && newVersion === 2) { - try { - for (const storeName of txn.objectStoreNames) { - const origStore = txn.objectStore(storeName); - - // rename original store - const tmpName = `tmp_${storeName}`; - origStore.name = tmpName; - - // create new store with original name - const newStore = db.createObjectStore(storeName, { - keyPath: undefined, - autoIncrement: true, - }); - - newStore.createIndex('byId', 'id', { unique: true }); - - let cursor = await origStore.openCursor(); - let count = 0; - - // Copy data from original to new - while (cursor && cursor.value) { - // we don't pass key, since they are all new entries in the new store - await newStore.put(cursor.value); - - cursor = await cursor.continue(); - count++; - } - - // delete original - db.deleteObjectStore(tmpName); - - logger.debug(`${count} ${storeName} records migrated`); - } - } catch (error) { - logger.error('Error migrating IndexedDB data', error); - txn.abort(); - throw error; - } - - return; - } - }, - }); - - this.resolve(); - } - } catch (error) { - this.reject(error); - } - } - - private async _get( - storeOrStoreName: idb.IDBPObjectStore | string, - id: string - ): Promise { - let index: idb.IDBPIndex; - - if (typeof storeOrStoreName === 'string') { - const storeName = storeOrStoreName; - index = this.db.transaction(storeName, 'readonly').store.index('byId'); - } else { - const store = storeOrStoreName; - index = store.index('byId'); - } - - const result = await index.get(id); - - return result; - } - - async save( - model: T, - condition?: ModelPredicate - ): Promise<[T, OpType.INSERT | OpType.UPDATE][]> { - await this.checkPrivate(); - const modelConstructor = Object.getPrototypeOf(model) - .constructor as PersistentModelConstructor; - const storeName = this.getStorenameForModel(modelConstructor); - const connectedModels = traverseModel( - modelConstructor.name, - model, - this.schema.namespaces[this.namespaceResolver(modelConstructor)], - this.modelInstanceCreator, - this.getModelConstructorByModelName - ); - const namespaceName = this.namespaceResolver(modelConstructor); - - const set = new Set(); - const connectionStoreNames = Object.values(connectedModels).map( - ({ modelName, item, instance }) => { - const storeName = this.getStorename(namespaceName, modelName); - set.add(storeName); - return { storeName, item, instance }; - } - ); - const tx = this.db.transaction( - [storeName, ...Array.from(set.values())], - 'readwrite' - ); - const store = tx.objectStore(storeName); - - const fromDB = await this._get(store, model.id); - - if (condition && fromDB) { - const predicates = ModelPredicateCreator.getPredicates(condition); - const { predicates: predicateObjs, type } = predicates; - - const isValid = validatePredicate(fromDB, type, predicateObjs); - - if (!isValid) { - const msg = 'Conditional update failed'; - logger.error(msg, { model: fromDB, condition: predicateObjs }); - - throw new Error(msg); - } - } - - const result: [T, OpType.INSERT | OpType.UPDATE][] = []; - - for await (const resItem of connectionStoreNames) { - const { storeName, item, instance } = resItem; - const store = tx.objectStore(storeName); - - const { id } = item; - - const opType: OpType = - (await this._get(store, id)) === undefined - ? OpType.INSERT - : OpType.UPDATE; - - // It is me - if (id === model.id) { - const key = await store.index('byId').getKey(item.id); - await store.put(item, key); - - result.push([instance, opType]); - } else { - if (opType === OpType.INSERT) { - // Even if the parent is an INSERT, the child might not be, so we need to get its key - const key = await store.index('byId').getKey(item.id); - await store.put(item, key); - - result.push([instance, opType]); - } - } - } - - await tx.done; - - return result; - } - - private async load( - namespaceName: string, - srcModelName: string, - records: T[] - ): Promise { - const namespace = this.schema.namespaces[namespaceName]; - const relations = namespace.relationships[srcModelName].relationTypes; - const connectionStoreNames = relations.map(({ modelName }) => { - return this.getStorename(namespaceName, modelName); - }); - const modelConstructor = this.getModelConstructorByModelName( - namespaceName, - srcModelName - ); - - if (connectionStoreNames.length === 0) { - return records.map(record => - this.modelInstanceCreator(modelConstructor, record) - ); - } - - const tx = this.db.transaction([...connectionStoreNames], 'readonly'); - - for await (const relation of relations) { - const { fieldName, modelName, targetName } = relation; - const storeName = this.getStorename(namespaceName, modelName); - const store = tx.objectStore(storeName); - const modelConstructor = this.getModelConstructorByModelName( - namespaceName, - modelName - ); - - switch (relation.relationType) { - case 'HAS_ONE': - for await (const recordItem of records) { - if (recordItem[fieldName]) { - const connectionRecord = await this._get( - store, - recordItem[fieldName] - ); - - recordItem[fieldName] = - connectionRecord && - this.modelInstanceCreator(modelConstructor, connectionRecord); - } - } - - break; - case 'BELONGS_TO': - for await (const recordItem of records) { - if (recordItem[targetName]) { - const connectionRecord = await this._get( - store, - recordItem[targetName] - ); - - recordItem[fieldName] = - connectionRecord && - this.modelInstanceCreator(modelConstructor, connectionRecord); - delete recordItem[targetName]; - } - } - - break; - case 'HAS_MANY': - // TODO: Lazy loading - break; - default: - exhaustiveCheck(relation.relationType); - break; - } - } - - return records.map(record => - this.modelInstanceCreator(modelConstructor, record) - ); - } - - async query( - modelConstructor: PersistentModelConstructor, - predicate?: ModelPredicate, - pagination?: PaginationInput - ): Promise { - await this.checkPrivate(); - const storeName = this.getStorenameForModel(modelConstructor); - const namespaceName = this.namespaceResolver(modelConstructor); - const sortSpecified = pagination && pagination.sort; - - if (predicate) { - const predicates = ModelPredicateCreator.getPredicates(predicate); - if (predicates) { - const { predicates: predicateObjs, type } = predicates; - const idPredicate = - predicateObjs.length === 1 && - (predicateObjs.find( - p => isPredicateObj(p) && p.field === 'id' && p.operator === 'eq' - ) as PredicateObject); - - if (idPredicate) { - const { operand: id } = idPredicate; - - const record = await this._get(storeName, id); - - if (record) { - const [x] = await this.load(namespaceName, modelConstructor.name, [ - record, - ]); - - return [x]; - } - return []; - } - - // TODO: Use indices if possible - const all = await this.db.getAll(storeName); - - const filtered = predicateObjs - ? all.filter(m => validatePredicate(m, type, predicateObjs)) - : all; - - return await this.load( - namespaceName, - modelConstructor.name, - this.inMemoryPagination(filtered, pagination) - ); - } - } - - if (sortSpecified) { - const all = await this.db.getAll(storeName); - return await this.load( - namespaceName, - modelConstructor.name, - this.inMemoryPagination(all, pagination) - ); - } - - return await this.load( - namespaceName, - modelConstructor.name, - await this.enginePagination(storeName, pagination) - ); - } - - private inMemoryPagination( - records: T[], - pagination?: PaginationInput - ): T[] { - if (pagination) { - if (pagination.sort) { - const sortPredicates = ModelSortPredicateCreator.getPredicates( - pagination.sort - ); - - if (sortPredicates.length) { - const compareFn = sortCompareFunction(sortPredicates); - records.sort(compareFn); - } - } - - const { page = 0, limit = 0 } = pagination; - const start = Math.max(0, page * limit) || 0; - - const end = limit > 0 ? start + limit : records.length; - - return records.slice(start, end); - } - - return records; - } - - private async enginePagination( - storeName: string, - pagination?: PaginationInput - ): Promise { - let result: T[]; - - if (pagination) { - const { page = 0, limit = 0 } = pagination; - const initialRecord = Math.max(0, page * limit) || 0; - - let cursor = await this.db - .transaction(storeName) - .objectStore(storeName) - .openCursor(); - - if (cursor && initialRecord > 0) { - await cursor.advance(initialRecord); - } - - const pageResults: T[] = []; - - const hasLimit = typeof limit === 'number' && limit > 0; - let moreRecords = true; - let itemsLeft = limit; - while (moreRecords && cursor && cursor.value) { - pageResults.push(cursor.value); - - cursor = await cursor.continue(); - - if (hasLimit) { - itemsLeft--; - moreRecords = itemsLeft > 0 && cursor !== null; - } else { - moreRecords = cursor !== null; - } - } - - result = pageResults; - } else { - result = await this.db.getAll(storeName); - } - - return result; - } - - async queryOne( - modelConstructor: PersistentModelConstructor, - firstOrLast: QueryOne = QueryOne.FIRST - ): Promise { - await this.checkPrivate(); - const storeName = this.getStorenameForModel(modelConstructor); - - const cursor = await this.db - .transaction([storeName], 'readonly') - .objectStore(storeName) - .openCursor(undefined, firstOrLast === QueryOne.FIRST ? 'next' : 'prev'); - - const result = cursor ? cursor.value : undefined; - - return result && this.modelInstanceCreator(modelConstructor, result); - } - - async delete( - modelOrModelConstructor: T | PersistentModelConstructor, - condition?: ModelPredicate - ): Promise<[T[], T[]]> { - await this.checkPrivate(); - const deleteQueue: { storeName: string; items: T[] }[] = []; - - if (isModelConstructor(modelOrModelConstructor)) { - const modelConstructor = modelOrModelConstructor; - const nameSpace = this.namespaceResolver(modelConstructor); - - const storeName = this.getStorenameForModel(modelConstructor); - - const models = await this.query(modelConstructor, condition); - const relations = this.schema.namespaces[nameSpace].relationships[ - modelConstructor.name - ].relationTypes; - - if (condition !== undefined) { - await this.deleteTraverse( - relations, - models, - modelConstructor.name, - nameSpace, - deleteQueue - ); - - await this.deleteItem(deleteQueue); - - const deletedModels = deleteQueue.reduce( - (acc, { items }) => acc.concat(items), - [] - ); - - return [models, deletedModels]; - } else { - await this.deleteTraverse( - relations, - models, - modelConstructor.name, - nameSpace, - deleteQueue - ); - - // Delete all - await this.db - .transaction([storeName], 'readwrite') - .objectStore(storeName) - .clear(); - - const deletedModels = deleteQueue.reduce( - (acc, { items }) => acc.concat(items), - [] - ); - - return [models, deletedModels]; - } - } else { - const model = modelOrModelConstructor; - - const modelConstructor = Object.getPrototypeOf(model) - .constructor as PersistentModelConstructor; - const nameSpace = this.namespaceResolver(modelConstructor); - - const storeName = this.getStorenameForModel(modelConstructor); - - if (condition) { - const tx = this.db.transaction([storeName], 'readwrite'); - const store = tx.objectStore(storeName); - - const fromDB = await this._get(store, model.id); - - if (fromDB === undefined) { - const msg = 'Model instance not found in storage'; - logger.warn(msg, { model }); - - return [[model], []]; - } - - const predicates = ModelPredicateCreator.getPredicates(condition); - const { predicates: predicateObjs, type } = predicates; - - const isValid = validatePredicate(fromDB, type, predicateObjs); - - if (!isValid) { - const msg = 'Conditional update failed'; - logger.error(msg, { model: fromDB, condition: predicateObjs }); - - throw new Error(msg); - } - await tx.done; - - const relations = this.schema.namespaces[nameSpace].relationships[ - modelConstructor.name - ].relationTypes; - - await this.deleteTraverse( - relations, - [model], - modelConstructor.name, - nameSpace, - deleteQueue - ); - } else { - const relations = this.schema.namespaces[nameSpace].relationships[ - modelConstructor.name - ].relationTypes; - - await this.deleteTraverse( - relations, - [model], - modelConstructor.name, - nameSpace, - deleteQueue - ); - } - - await this.deleteItem(deleteQueue); - - const deletedModels = deleteQueue.reduce( - (acc, { items }) => acc.concat(items), - [] - ); - - return [[model], deletedModels]; - } - } - - private async deleteItem( - deleteQueue?: { storeName: string; items: T[] | IDBValidKey[] }[] - ) { - const connectionStoreNames = deleteQueue.map(({ storeName }) => { - return storeName; - }); - - const tx = this.db.transaction([...connectionStoreNames], 'readwrite'); - for await (const deleteItem of deleteQueue) { - const { storeName, items } = deleteItem; - const store = tx.objectStore(storeName); - - for await (const item of items) { - if (item) { - let key: IDBValidKey; - - if (typeof item === 'object') { - key = await store.index('byId').getKey(item['id']); - } else { - key = await store.index('byId').getKey(item.toString()); - } - - if (key !== undefined) { - await store.delete(key); - } - } - } - } - } - - private async deleteTraverse( - relations: RelationType[], - models: T[], - srcModel: string, - nameSpace: string, - deleteQueue: { storeName: string; items: T[] }[] - ): Promise { - for await (const rel of relations) { - const { relationType, fieldName, modelName } = rel; - const storeName = this.getStorename(nameSpace, modelName); - - const index: string = - getIndex( - this.schema.namespaces[nameSpace].relationships[modelName] - .relationTypes, - srcModel - ) || - // if we were unable to find an index via relationTypes - // i.e. for keyName connections, attempt to find one by the - // associatedWith property - getIndexFromAssociation( - this.schema.namespaces[nameSpace].relationships[modelName].indexes, - rel.associatedWith - ); - - switch (relationType) { - case 'HAS_ONE': - for await (const model of models) { - const recordToDelete = await this.db - .transaction(storeName, 'readwrite') - .objectStore(storeName) - .index(index) - .get(model.id); - - await this.deleteTraverse( - this.schema.namespaces[nameSpace].relationships[modelName] - .relationTypes, - recordToDelete ? [recordToDelete] : [], - modelName, - nameSpace, - deleteQueue - ); - } - break; - case 'HAS_MANY': - for await (const model of models) { - const childrenArray = await this.db - .transaction(storeName, 'readwrite') - .objectStore(storeName) - .index(index) - .getAll(model['id']); - - await this.deleteTraverse( - this.schema.namespaces[nameSpace].relationships[modelName] - .relationTypes, - childrenArray, - modelName, - nameSpace, - deleteQueue - ); - } - break; - case 'BELONGS_TO': - // Intentionally blank - break; - default: - exhaustiveCheck(relationType); - break; - } - } - - deleteQueue.push({ - storeName: this.getStorename(nameSpace, srcModel), - items: models.map(record => - this.modelInstanceCreator( - this.getModelConstructorByModelName(nameSpace, srcModel), - record - ) - ), - }); - } - - async clear(): Promise { - await this.checkPrivate(); - - this.db.close(); - - await idb.deleteDB(DB_NAME); - - this.db = undefined; - this.initPromise = undefined; - } - - async batchSave( - modelConstructor: PersistentModelConstructor, - items: ModelInstanceMetadata[] - ): Promise<[T, OpType][]> { - if (items.length === 0) { - return []; - } - - await this.checkPrivate(); - - const result: [T, OpType][] = []; - - const storeName = this.getStorenameForModel(modelConstructor); - - const txn = this.db.transaction(storeName, 'readwrite'); - const store = txn.store; - - for (const item of items) { - const connectedModels = traverseModel( - modelConstructor.name, - this.modelInstanceCreator(modelConstructor, item), - this.schema.namespaces[this.namespaceResolver(modelConstructor)], - this.modelInstanceCreator, - this.getModelConstructorByModelName - ); - - const { id, _deleted } = item; - const index = store.index('byId'); - const key = await index.getKey(id); - - if (!_deleted) { - const { instance } = connectedModels.find( - ({ instance }) => instance.id === id - ); - - result.push([ - (instance), - key ? OpType.UPDATE : OpType.INSERT, - ]); - await store.put(instance, key); - } else { - result.push([(item), OpType.DELETE]); - - if (key) { - await store.delete(key); - } - } - } - - await txn.done; - - return result; - } -} - -export default new IndexedDBAdapter(); diff --git a/packages/datastore/src/sync/index.ts b/packages/datastore/src/sync/index.ts index 249533eaa53..9eef1cdebf7 100644 --- a/packages/datastore/src/sync/index.ts +++ b/packages/datastore/src/sync/index.ts @@ -19,6 +19,7 @@ import { SchemaModel, SchemaNamespace, TypeConstructorMap, + ModelPredicate, } from '../types'; import { exhaustiveCheck, getNow, SYNC } from '../util'; import DataStoreConnectivity from './datastoreConnectivity'; @@ -102,7 +103,9 @@ export class SyncEngine { private readonly maxRecordsToSync: number, private readonly syncPageSize: number, conflictHandler: ConflictHandler, - errorHandler: ErrorHandler + errorHandler: ErrorHandler, + private readonly syncPredicates: WeakMap>, + private readonly syncModelsUpdated: ReadonlySet ) { const MutationEvent = this.modelClasses[ 'MutationEvent' @@ -120,9 +123,13 @@ export class SyncEngine { this.syncQueriesProcessor = new SyncProcessor( this.schema, this.maxRecordsToSync, - this.syncPageSize + this.syncPageSize, + this.syncPredicates + ); + this.subscriptionsProcessor = new SubscriptionProcessor( + this.schema, + this.syncPredicates ); - this.subscriptionsProcessor = new SubscriptionProcessor(this.schema); this.mutationsProcessor = new MutationProcessor( this.schema, this.storage, @@ -714,11 +721,19 @@ export class SyncEngine { ownSymbol ); } else { + const syncPredicateUpdated = this.syncModelsUpdated.has(model); + [[savedModel]] = await this.storage.save( (this.modelClasses.ModelMetadata as PersistentModelConstructor< any >).copyOf(modelMetadata, draft => { draft.fullSyncInterval = fullSyncInterval; + // perform a base sync if the syncPredicate changed in between calls to DataStore.start + // ensures that the local store contains all the data specified by the syncExpression + if (syncPredicateUpdated) { + draft.lastSync = null; + draft.lastFullSync = null; + } }) ); } diff --git a/packages/datastore/src/sync/processors/subscription.ts b/packages/datastore/src/sync/processors/subscription.ts index 888e4859ba9..687a6a782bf 100644 --- a/packages/datastore/src/sync/processors/subscription.ts +++ b/packages/datastore/src/sync/processors/subscription.ts @@ -9,12 +9,16 @@ import { PersistentModel, SchemaModel, SchemaNamespace, + PredicatesGroup, + ModelPredicate, } from '../../types'; import { buildSubscriptionGraphQLOperation, getAuthorizationRules, TransformerMutationType, } from '../utils'; +import { ModelPredicateCreator } from '../../predicates'; +import { validatePredicate } from '../../util'; const logger = new Logger('DataStore'); @@ -40,7 +44,10 @@ class SubscriptionProcessor { ][] = []; private dataObserver: ZenObservable.Observer; - constructor(private readonly schema: InternalSchema) {} + constructor( + private readonly schema: InternalSchema, + private readonly syncPredicates: WeakMap> + ) {} private buildSubscription( namespace: SchemaNamespace, @@ -352,14 +359,29 @@ class SubscriptionProcessor { return; } - const { [opName]: record } = data; - - this.pushToBuffer( - transformerMutationType, - modelDefinition, - record + const predicatesGroup = ModelPredicateCreator.getPredicates( + this.syncPredicates.get(modelDefinition), + false ); + const { [opName]: record } = data; + + // checking incoming subscription against syncPredicate. + // once AppSync implements filters on subscriptions, we'll be + // able to set these when establishing the subscription instead. + // Until then, we'll need to filter inbound + if ( + this.passesPredicateValidation( + record, + predicatesGroup + ) + ) { + this.pushToBuffer( + transformerMutationType, + modelDefinition, + record + ); + } this.drainBuffer(); }, error: subscriptionError => { @@ -424,6 +446,19 @@ class SubscriptionProcessor { return [ctlObservable, dataObservable]; } + private passesPredicateValidation( + record: PersistentModel, + predicatesGroup: PredicatesGroup + ): boolean { + if (!predicatesGroup) { + return true; + } + + const { predicates, type } = predicatesGroup; + + return validatePredicate(record, type, predicates); + } + private pushToBuffer( transformerMutationType: TransformerMutationType, modelDefinition: SchemaModel, diff --git a/packages/datastore/src/sync/processors/sync.ts b/packages/datastore/src/sync/processors/sync.ts index bb72325538f..e8e577ecfeb 100644 --- a/packages/datastore/src/sync/processors/sync.ts +++ b/packages/datastore/src/sync/processors/sync.ts @@ -4,9 +4,16 @@ import { InternalSchema, ModelInstanceMetadata, SchemaModel, + ModelPredicate, + PredicatesGroup, + GraphQLFilter, } from '../../types'; -import { buildGraphQLOperation } from '../utils'; -import { jitteredExponentialRetry, ConsoleLogger as Logger } from '@aws-amplify/core'; +import { buildGraphQLOperation, predicateToGraphQLFilter } from '../utils'; +import { + jitteredExponentialRetry, + ConsoleLogger as Logger, +} from '@aws-amplify/core'; +import { ModelPredicateCreator } from '../../predicates'; const logger = new Logger('DataStore'); @@ -19,7 +26,8 @@ class SyncProcessor { constructor( private readonly schema: InternalSchema, private readonly maxRecordsToSync: number = DEFAULT_MAX_RECORDS_TO_SYNC, - private readonly syncPageSize: number = DEFAULT_PAGINATION_LIMIT + private readonly syncPageSize: number = DEFAULT_PAGINATION_LIMIT, + private readonly syncPredicates: WeakMap> ) { this.generateQueries(); } @@ -40,13 +48,30 @@ class SyncProcessor { }); } + private graphqlFilterFromPredicate(model: SchemaModel): GraphQLFilter { + if (!this.syncPredicates) { + return null; + } + const predicatesGroup: PredicatesGroup = ModelPredicateCreator.getPredicates( + this.syncPredicates.get(model), + false + ); + + if (!predicatesGroup) { + return null; + } + + return predicateToGraphQLFilter(predicatesGroup); + } + private async retrievePage< T extends ModelInstanceMetadata = ModelInstanceMetadata >( modelDefinition: SchemaModel, lastSync: number, nextToken: string, - limit: number = null + limit: number = null, + filter: GraphQLFilter ): Promise<{ nextToken: string; startedAt: number; items: T[] }> { const [opName, query] = this.typeQuery.get(modelDefinition); @@ -54,6 +79,7 @@ class SyncProcessor { limit, nextToken, lastSync, + filter, }; const { data } = < @@ -95,11 +121,18 @@ class SyncProcessor { }); } catch (error) { // If the error is unauthorized, filter out unauthorized items and return accessible items - const unauthorized = (error.errors as [any]).some(err => err.errorType === 'Unauthorized'); + const unauthorized = (error.errors as [any]).some( + err => err.errorType === 'Unauthorized' + ); if (unauthorized) { const result = error; - result.data[opName].items = result.data[opName].items.filter(item => item !== null); - logger.warn('queryError', 'User is unauthorized, some items could not be returned.'); + result.data[opName].items = result.data[opName].items.filter( + item => item !== null + ); + logger.warn( + 'queryError', + 'User is unauthorized, some items could not be returned.' + ); return result; } else { throw error; @@ -150,6 +183,7 @@ class SyncProcessor { let items: ModelInstanceMetadata[] = null; let recordsReceived = 0; + const filter = this.graphqlFilterFromPredicate(modelDefinition); const parents = this.schema.namespaces[ namespace @@ -175,7 +209,8 @@ class SyncProcessor { modelDefinition, lastSync, nextToken, - limit + limit, + filter )); recordsReceived += items.length; diff --git a/packages/datastore/src/sync/utils.ts b/packages/datastore/src/sync/utils.ts index 312a705ebd1..d8d812b96f8 100644 --- a/packages/datastore/src/sync/utils.ts +++ b/packages/datastore/src/sync/utils.ts @@ -2,6 +2,8 @@ import { ModelInstanceCreator } from '../datastore/datastore'; import { AuthorizationRule, GraphQLCondition, + GraphQLFilter, + GraphQLField, isEnumFieldType, isGraphQLScalarType, isPredicateObj, @@ -271,9 +273,9 @@ export function buildGraphQLOperation( switch (graphQLOpType) { case 'LIST': operation = `sync${pluralTypeName}`; - documentArgs = `($limit: Int, $nextToken: String, $lastSync: AWSTimestamp)`; + documentArgs = `($limit: Int, $nextToken: String, $lastSync: AWSTimestamp, $filter: Model${typeName}FilterInput)`; operationArgs = - '(limit: $limit, nextToken: $nextToken, lastSync: $lastSync)'; + '(limit: $limit, nextToken: $nextToken, lastSync: $lastSync, filter: $filter)'; selectionSet = `items { ${selectionSet} } @@ -388,3 +390,38 @@ export function predicateToGraphQLCondition( return result; } + +export function predicateToGraphQLFilter( + predicatesGroup: PredicatesGroup +): GraphQLFilter { + const result: GraphQLFilter = {}; + + if (!predicatesGroup || !Array.isArray(predicatesGroup.predicates)) { + return result; + } + + const { type, predicates } = predicatesGroup; + const isList = type === 'and' || type === 'or'; + + result[type] = isList ? [] : {}; + + const appendToFilter = value => + isList ? result[type].push(value) : (result[type] = value); + + predicates.forEach(predicate => { + if (isPredicateObj(predicate)) { + const { field, operator, operand } = predicate; + + const gqlField: GraphQLField = { + [field]: { [operator]: operand }, + }; + + appendToFilter(gqlField); + return; + } + + appendToFilter(predicateToGraphQLFilter(predicate)); + }); + + return result; +} diff --git a/packages/datastore/src/types.ts b/packages/datastore/src/types.ts index 4324831957e..efd636f4ed7 100644 --- a/packages/datastore/src/types.ts +++ b/packages/datastore/src/types.ts @@ -326,13 +326,14 @@ export enum QueryOne { FIRST, LAST, } +export type GraphQLField = { + [field: string]: { + [operator: string]: string | number | [number, number]; + }; +}; export type GraphQLCondition = Partial< - | { - [field: string]: { - [operator: string]: string | number | [number, number]; - }; - } + | GraphQLField | { and: [GraphQLCondition]; or: [GraphQLCondition]; @@ -340,6 +341,19 @@ export type GraphQLCondition = Partial< } >; +export type GraphQLFilter = Partial< + | GraphQLField + | { + and: GraphQLFilter[]; + } + | { + or: GraphQLFilter[]; + } + | { + not: GraphQLFilter; + } +>; + //#endregion //#region Pagination @@ -434,13 +448,57 @@ export type DataStoreConfig = { maxRecordsToSync?: number; // merge syncPageSize?: number; fullSyncInterval?: number; + syncExpressions?: SyncExpression[]; }; conflictHandler?: ConflictHandler; // default : retry until client wins up to x times errorHandler?: (error: SyncError) => void; // default : logger.warn maxRecordsToSync?: number; // merge syncPageSize?: number; fullSyncInterval?: number; -}; + syncExpressions?: SyncExpression[]; +}; + +export type SyncExpression = Promise<{ + modelConstructor: PersistentModelConstructor; + conditionProducer: + | ProducerModelPredicate + | (() => ProducerModelPredicate); +}>; + +/* +Adds Intellisense when passing a function | promise that returns a predicate +Or just a predicate. E.g., + +syncExpressions: [ + syncExpression(Post, c => c.rating('gt', 5)), + + OR + + syncExpression(Post, async () => { + return c => c.rating('gt', 5) + }), +] +*/ +export async function syncExpression( + modelConstructor: PersistentModelConstructor, + conditionProducer: ( + condition: P | ModelPredicate + ) => P extends ModelPredicate + ? ModelPredicate + : ProducerModelPredicate +): Promise<{ + modelConstructor: PersistentModelConstructor; + conditionProducer: ( + condition: P | ModelPredicate + ) => P extends ModelPredicate + ? ModelPredicate + : ProducerModelPredicate; +}> { + return { + modelConstructor, + conditionProducer, + }; +} export type SyncConflict = { modelConstructor: PersistentModelConstructor;