Skip to content

Commit

Permalink
feat(@aws-amplify/datastore): add Selective Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
iartemiev committed Oct 19, 2020
1 parent 377acd0 commit 4c8e9ce
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 1,463 deletions.
16 changes: 7 additions & 9 deletions packages/core/src/Util/Reachability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,22 @@ export default class ReachabilityNavigator implements Reachability {
return Observable.from([{ online: true }]);
}

return new Observable(observer => {
const online = isWebWorker()
? self.navigator.onLine
: window.navigator.onLine;
const globalObj = isWebWorker() ? self : window;

observer.next({ online });
return new Observable(observer => {
observer.next({ online: globalObj.navigator.onLine });

const notifyOnline = () => observer.next({ online: true });
const notifyOffline = () => observer.next({ online: false });

window.addEventListener('online', notifyOnline);
window.addEventListener('offline', notifyOffline);
globalObj.addEventListener('online', notifyOnline);
globalObj.addEventListener('offline', notifyOffline);

ReachabilityNavigator._observers.push(observer);

return () => {
window.removeEventListener('online', notifyOnline);
window.removeEventListener('offline', notifyOffline);
globalObj.removeEventListener('online', notifyOnline);
globalObj.removeEventListener('offline', notifyOffline);

ReachabilityNavigator._observers = ReachabilityNavigator._observers.filter(
_observer => _observer !== observer
Expand Down
4 changes: 2 additions & 2 deletions packages/datastore/__tests__/AsyncStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
DataStore as DataStoreType,
initSchema as initSchemaType,
} from '../src/datastore/datastore';
import { default as AsyncStorageAdapterType } from '../src/storage/adapter/asyncstorage';
import { default as AsyncStorageAdapterType } from '../src/storage/adapter/AsyncStorageAdapter';
import { DATASTORE, USER } from '../src/util';
import {
Author as AuthorType,
Expand Down Expand Up @@ -93,7 +93,7 @@ function setUpSchema(beforeSetUp?: Function) {

({
default: AsyncStorageAdapter,
} = require('../src/storage/adapter/asyncstorage'));
} = require('../src/storage/adapter/AsyncStorageAdapter'));

({ initSchema, DataStore } = require('../src/datastore/datastore'));

Expand Down
3 changes: 2 additions & 1 deletion packages/datastore/__tests__/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ describe('DataStore GraphQL generation', () => {
$limit: Int
$nextToken: String
$lastSync: AWSTimestamp
$filter: ModelPostFilterInput
) {
syncPosts(limit: $limit, nextToken: $nextToken, lastSync: $lastSync) {
syncPosts(limit: $limit, nextToken: $nextToken, lastSync: $lastSync, filter: $filter) {
items {
${postSelectionSet}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/datastore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"@aws-amplify/api": "^3.2.7",
"@aws-amplify/core": "^3.7.0",
"@aws-amplify/pubsub": "^3.2.5",
"idb": "5.0.2",
"idb": "5.0.6",
"immer": "6.0.1",
"ulid": "2.3.0",
"uuid": "3.3.2",
Expand Down
102 changes: 96 additions & 6 deletions packages/datastore/src/datastore/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
SyncError,
TypeConstructorMap,
ErrorHandler,
SyncExpression,
} from '../types';
import {
DATASTORE,
Expand Down Expand Up @@ -543,6 +544,7 @@ class DataStore {
private storage: Storage;
private sync: SyncEngine;
private syncPageSize: number;
private syncExpressions: SyncExpression<any>[];

getModuleName() {
return 'DataStore';
Expand Down Expand Up @@ -577,6 +579,8 @@ class DataStore {
if (aws_appsync_graphqlEndpoint) {
logger.debug('GraphQL endpoint available', aws_appsync_graphqlEndpoint);

const syncPredicates = await this.processSyncExpressions();

this.sync = new SyncEngine(
schema,
namespaceResolver,
Expand All @@ -587,7 +591,8 @@ class DataStore {
this.maxRecordsToSync,
this.syncPageSize,
this.conflictHandler,
this.errorHandler
this.errorHandler,
syncPredicates
);

// tslint:disable-next-line:max-line-length
Expand Down Expand Up @@ -679,8 +684,6 @@ class DataStore {
modelDefinition,
idOrCriteria
);

logger.debug('after createFromExisting - predicate', predicate);
}
}

Expand Down Expand Up @@ -979,6 +982,7 @@ class DataStore {
maxRecordsToSync: configMaxRecordsToSync,
syncPageSize: configSyncPageSize,
fullSyncInterval: configFullSyncInterval,
syncExpressions: configSyncExpressions,
...configFromAmplify
} = config;

Expand All @@ -987,20 +991,25 @@ class DataStore {
this.conflictHandler = this.setConflictHandler(config);
this.errorHandler = this.setErrorHandler(config);

this.syncExpressions =
(configDataStore && configDataStore.syncExpressions) ||
this.syncExpressions ||
configSyncExpressions;

this.maxRecordsToSync =
(configDataStore && configDataStore.maxRecordsToSync) ||
this.maxRecordsToSync ||
config.maxRecordsToSync;
configMaxRecordsToSync;

this.syncPageSize =
(configDataStore && configDataStore.syncPageSize) ||
this.syncPageSize ||
config.syncPageSize;
configSyncPageSize;

this.fullSyncInterval =
(configDataStore && configDataStore.fullSyncInterval) ||
this.fullSyncInterval ||
configFullSyncInterval ||
config.fullSyncInterval ||
24 * 60; // 1 day
};

Expand All @@ -1020,6 +1029,15 @@ class DataStore {
this.sync = undefined;
};

stop = async function clear() {
if (syncSubscription && !syncSubscription.closed) {
syncSubscription.unsubscribe();
}

this.initialized = undefined; // Should re-initialize when start() is called.
this.sync = undefined;
};

private processPagination<T extends PersistentModel>(
modelDefinition: SchemaModel,
paginationProducer: ProducerPaginationInput<T>
Expand Down Expand Up @@ -1064,6 +1082,78 @@ class DataStore {
sort: sortPredicate,
};
}

private async processSyncExpressions(): Promise<
WeakMap<SchemaModel, ModelPredicate<any>>
> {
if (!this.syncExpressions || !this.syncExpressions.length) {
return;
}

const syncPredicates = await Promise.all(
this.syncExpressions.map(
async <T extends PersistentModel>(
syncExpression: SyncExpression<T>
): Promise<[SchemaModel, ModelPredicate<any>]> => {
const { modelConstructor, conditionProducer } = await syncExpression;
const modelDefinition = getModelDefinition(modelConstructor);

// conditionProducer is either a predicate, e.g. (c) => c.field('eq', 1)
// OR a function/promise that returns a predicate
const condition = await this.unwrapPromise(conditionProducer);
const predicate = this.createFromCondition(
modelDefinition,
condition
);

return [modelDefinition, predicate];
}
)
);

return this.weakMapFromEntries(syncPredicates);
}

private createFromCondition(modelDefinition, condition) {
try {
return ModelPredicateCreator.createFromExisting(
modelDefinition,
condition
);
} catch (error) {
logger.error('Error creating Sync Predicate');
throw error;
}
}

private async unwrapPromise(conditionProducer) {
try {
const condition = await conditionProducer();
return condition;
} catch (error) {
if (error instanceof TypeError) {
return conditionProducer;
}
}
}

private weakMapFromEntries(
entries: [SchemaModel, ModelPredicate<any>][]
): WeakMap<SchemaModel, ModelPredicate<any>> {
return entries.reduce((map, [modelDefinition, predicate]) => {
if (map.get(modelDefinition)) {
const { name } = modelDefinition;
logger.warn(
`You can only utilize one Sync Expression per model.
Subsequent sync expressions for the ${name} model will be ignored.`
);
return map;
}

map.set(modelDefinition, predicate);
return map;
}, new WeakMap<SchemaModel, ModelPredicate<any>>());
}
}

const instance = new DataStore();
Expand Down
32 changes: 26 additions & 6 deletions packages/datastore/src/storage/adapter/AsyncStorageAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import { ConsoleLogger as Logger } from '@aws-amplify/core';
import AsyncStorageDatabase from './AsyncStorageDatabase';
import { Adapter } from './index';
import { ModelInstanceCreator } from '../../datastore/datastore';
import { ModelPredicateCreator } from '../../predicates';
import {
ModelPredicateCreator,
ModelSortPredicateCreator,
} from '../../predicates';
import {
InternalSchema,
isPredicateObj,
Expand All @@ -24,6 +27,7 @@ import {
isModelConstructor,
traverseModel,
validatePredicate,
sortCompareFunction,
} from '../../util';

const logger = new Logger('DataStore');
Expand Down Expand Up @@ -237,6 +241,7 @@ export class AsyncStorageAdapter implements Adapter {
): Promise<T[]> {
const storeName = this.getStorenameForModel(modelConstructor);
const namespaceName = this.namespaceResolver(modelConstructor);
const sortSpecified = pagination && pagination.sort;

if (predicate) {
const predicates = ModelPredicateCreator.getPredicates(predicate);
Expand Down Expand Up @@ -276,6 +281,15 @@ export class AsyncStorageAdapter implements Adapter {
}
}

if (sortSpecified) {
const all = <T[]>await this.db.getAll(storeName);
return await this.load(
namespaceName,
modelConstructor.name,
this.inMemoryPagination(all, pagination)
);
}

const all = <T[]>await this.db.getAll(storeName, pagination);

return await this.load(namespaceName, modelConstructor.name, all);
Expand All @@ -286,14 +300,23 @@ export class AsyncStorageAdapter implements Adapter {
pagination?: PaginationInput<T>
): T[] {
if (pagination) {
if (pagination.sort) {
const sortPredicates = ModelSortPredicateCreator.getPredicates(
pagination.sort
);

if (sortPredicates.length) {
const compareFn = sortCompareFunction(sortPredicates);
records.sort(compareFn);
}
}
const { page = 0, limit = 0 } = pagination;
const start = Math.max(0, page * limit) || 0;

const end = limit > 0 ? start + limit : records.length;

return records.slice(start, end);
}

return records;
}

Expand Down Expand Up @@ -381,10 +404,7 @@ export class AsyncStorageAdapter implements Adapter {
const isValid = validatePredicate(fromDB, type, predicateObjs);
if (!isValid) {
const msg = 'Conditional update failed';
logger.error(msg, {
model: fromDB,
condition: predicateObjs,
});
logger.error(msg, { model: fromDB, condition: predicateObjs });

throw new Error(msg);
}
Expand Down
28 changes: 27 additions & 1 deletion packages/datastore/src/storage/adapter/IndexedDBAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { ConsoleLogger as Logger } from '@aws-amplify/core';
import * as idb from 'idb';
import { ModelInstanceCreator } from '../../datastore/datastore';
import { ModelPredicateCreator } from '../../predicates';
import {
ModelPredicateCreator,
ModelSortPredicateCreator,
} from '../../predicates';
import {
InternalSchema,
isPredicateObj,
Expand All @@ -24,8 +27,10 @@ import {
isPrivateMode,
traverseModel,
validatePredicate,
sortCompareFunction,
} from '../../util';
import { Adapter } from './index';
import { tsIndexSignature } from '@babel/types';

const logger = new Logger('DataStore');

Expand Down Expand Up @@ -362,6 +367,7 @@ class IndexedDBAdapter implements Adapter {
await this.checkPrivate();
const storeName = this.getStorenameForModel(modelConstructor);
const namespaceName = this.namespaceResolver(modelConstructor);
const sortSpecified = pagination && pagination.sort;

if (predicate) {
const predicates = ModelPredicateCreator.getPredicates(predicate);
Expand Down Expand Up @@ -403,6 +409,15 @@ class IndexedDBAdapter implements Adapter {
}
}

if (sortSpecified) {
const all = <T[]>await this.db.getAll(storeName);
return await this.load(
namespaceName,
modelConstructor.name,
this.inMemoryPagination(all, pagination)
);
}

return await this.load(
namespaceName,
modelConstructor.name,
Expand All @@ -415,6 +430,17 @@ class IndexedDBAdapter implements Adapter {
pagination?: PaginationInput<T>
): T[] {
if (pagination) {
if (pagination.sort) {
const sortPredicates = ModelSortPredicateCreator.getPredicates(
pagination.sort
);

if (sortPredicates.length) {
const compareFn = sortCompareFunction(sortPredicates);
records.sort(compareFn);
}
}

const { page = 0, limit = 0 } = pagination;
const start = Math.max(0, page * limit) || 0;

Expand Down
Loading

0 comments on commit 4c8e9ce

Please sign in to comment.