Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(@aws-amplify/datastore): add Selective Sync #7001

Merged
merged 11 commits into from
Oct 28, 2020
16 changes: 7 additions & 9 deletions packages/core/src/Util/Reachability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,22 @@ export default class ReachabilityNavigator implements Reachability {
return Observable.from([{ online: true }]);
}

return new Observable(observer => {
const online = isWebWorker()
? self.navigator.onLine
: window.navigator.onLine;
const globalObj = isWebWorker() ? self : window;

observer.next({ online });
return new Observable(observer => {
observer.next({ online: globalObj.navigator.onLine });

const notifyOnline = () => observer.next({ online: true });
const notifyOffline = () => observer.next({ online: false });

window.addEventListener('online', notifyOnline);
window.addEventListener('offline', notifyOffline);
globalObj.addEventListener('online', notifyOnline);
globalObj.addEventListener('offline', notifyOffline);

ReachabilityNavigator._observers.push(observer);

return () => {
window.removeEventListener('online', notifyOnline);
window.removeEventListener('offline', notifyOffline);
globalObj.removeEventListener('online', notifyOnline);
globalObj.removeEventListener('offline', notifyOffline);

ReachabilityNavigator._observers = ReachabilityNavigator._observers.filter(
_observer => _observer !== observer
Expand Down
4 changes: 2 additions & 2 deletions packages/datastore/__tests__/AsyncStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
DataStore as DataStoreType,
initSchema as initSchemaType,
} from '../src/datastore/datastore';
import { default as AsyncStorageAdapterType } from '../src/storage/adapter/asyncstorage';
import { default as AsyncStorageAdapterType } from '../src/storage/adapter/AsyncStorageAdapter';
import { DATASTORE, USER } from '../src/util';
import {
Author as AuthorType,
Expand Down Expand Up @@ -93,7 +93,7 @@ function setUpSchema(beforeSetUp?: Function) {

({
default: AsyncStorageAdapter,
} = require('../src/storage/adapter/asyncstorage'));
} = require('../src/storage/adapter/AsyncStorageAdapter'));

({ initSchema, DataStore } = require('../src/datastore/datastore'));

Expand Down
3 changes: 2 additions & 1 deletion packages/datastore/__tests__/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ describe('DataStore GraphQL generation', () => {
$limit: Int
$nextToken: String
$lastSync: AWSTimestamp
$filter: ModelPostFilterInput
) {
syncPosts(limit: $limit, nextToken: $nextToken, lastSync: $lastSync) {
syncPosts(limit: $limit, nextToken: $nextToken, lastSync: $lastSync, filter: $filter) {
items {
${postSelectionSet}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/datastore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"@aws-amplify/api": "^3.2.7",
"@aws-amplify/core": "^3.7.0",
"@aws-amplify/pubsub": "^3.2.5",
"idb": "5.0.2",
"idb": "5.0.6",
"immer": "6.0.1",
"ulid": "2.3.0",
"uuid": "3.3.2",
Expand Down
102 changes: 96 additions & 6 deletions packages/datastore/src/datastore/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
SyncError,
TypeConstructorMap,
ErrorHandler,
SyncExpression,
} from '../types';
import {
DATASTORE,
Expand Down Expand Up @@ -543,6 +544,7 @@ class DataStore {
private storage: Storage;
private sync: SyncEngine;
private syncPageSize: number;
private syncExpressions: SyncExpression<any>[];

getModuleName() {
return 'DataStore';
Expand Down Expand Up @@ -577,6 +579,8 @@ class DataStore {
if (aws_appsync_graphqlEndpoint) {
logger.debug('GraphQL endpoint available', aws_appsync_graphqlEndpoint);

const syncPredicates = await this.processSyncExpressions();

this.sync = new SyncEngine(
schema,
namespaceResolver,
Expand All @@ -587,7 +591,8 @@ class DataStore {
this.maxRecordsToSync,
this.syncPageSize,
this.conflictHandler,
this.errorHandler
this.errorHandler,
syncPredicates
);

// tslint:disable-next-line:max-line-length
Expand Down Expand Up @@ -679,8 +684,6 @@ class DataStore {
modelDefinition,
idOrCriteria
);

logger.debug('after createFromExisting - predicate', predicate);
}
}

Expand Down Expand Up @@ -979,6 +982,7 @@ class DataStore {
maxRecordsToSync: configMaxRecordsToSync,
syncPageSize: configSyncPageSize,
fullSyncInterval: configFullSyncInterval,
syncExpressions: configSyncExpressions,
...configFromAmplify
} = config;

Expand All @@ -987,20 +991,25 @@ class DataStore {
this.conflictHandler = this.setConflictHandler(config);
this.errorHandler = this.setErrorHandler(config);

this.syncExpressions =
(configDataStore && configDataStore.syncExpressions) ||
this.syncExpressions ||
configSyncExpressions;

this.maxRecordsToSync =
(configDataStore && configDataStore.maxRecordsToSync) ||
this.maxRecordsToSync ||
config.maxRecordsToSync;
configMaxRecordsToSync;

this.syncPageSize =
(configDataStore && configDataStore.syncPageSize) ||
this.syncPageSize ||
config.syncPageSize;
configSyncPageSize;

this.fullSyncInterval =
(configDataStore && configDataStore.fullSyncInterval) ||
this.fullSyncInterval ||
configFullSyncInterval ||
config.fullSyncInterval ||
24 * 60; // 1 day
};

Expand All @@ -1020,6 +1029,15 @@ class DataStore {
this.sync = undefined;
};

stop = async function clear() {
amhinson marked this conversation as resolved.
Show resolved Hide resolved
if (syncSubscription && !syncSubscription.closed) {
elorzafe marked this conversation as resolved.
Show resolved Hide resolved
syncSubscription.unsubscribe();
}

this.initialized = undefined; // Should re-initialize when start() is called.
this.sync = undefined;
};

private processPagination<T extends PersistentModel>(
modelDefinition: SchemaModel,
paginationProducer: ProducerPaginationInput<T>
Expand Down Expand Up @@ -1064,6 +1082,78 @@ class DataStore {
sort: sortPredicate,
};
}

private async processSyncExpressions(): Promise<
WeakMap<SchemaModel, ModelPredicate<any>>
> {
if (!this.syncExpressions || !this.syncExpressions.length) {
return;
}

const syncPredicates = await Promise.all(
this.syncExpressions.map(
async <T extends PersistentModel>(
syncExpression: SyncExpression<T>
): Promise<[SchemaModel, ModelPredicate<any>]> => {
const { modelConstructor, conditionProducer } = await syncExpression;
const modelDefinition = getModelDefinition(modelConstructor);

// conditionProducer is either a predicate, e.g. (c) => c.field('eq', 1)
// OR a function/promise that returns a predicate
const condition = await this.unwrapPromise(conditionProducer);
manueliglesias marked this conversation as resolved.
Show resolved Hide resolved
const predicate = this.createFromCondition(
modelDefinition,
condition
);

return [modelDefinition, predicate];
}
)
);

return this.weakMapFromEntries(syncPredicates);
}

private createFromCondition(modelDefinition, condition) {
try {
return ModelPredicateCreator.createFromExisting(
modelDefinition,
condition
);
} catch (error) {
logger.error('Error creating Sync Predicate');
throw error;
}
}

private async unwrapPromise(conditionProducer) {
try {
const condition = await conditionProducer();
return condition;
} catch (error) {
if (error instanceof TypeError) {
return conditionProducer;
}
}
}

private weakMapFromEntries(
entries: [SchemaModel, ModelPredicate<any>][]
): WeakMap<SchemaModel, ModelPredicate<any>> {
return entries.reduce((map, [modelDefinition, predicate]) => {
if (map.get(modelDefinition)) {
manueliglesias marked this conversation as resolved.
Show resolved Hide resolved
const { name } = modelDefinition;
logger.warn(
`You can only utilize one Sync Expression per model.
Subsequent sync expressions for the ${name} model will be ignored.`
);
return map;
}

map.set(modelDefinition, predicate);
return map;
}, new WeakMap<SchemaModel, ModelPredicate<any>>());
}
}

const instance = new DataStore();
Expand Down
32 changes: 26 additions & 6 deletions packages/datastore/src/storage/adapter/AsyncStorageAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import { ConsoleLogger as Logger } from '@aws-amplify/core';
import AsyncStorageDatabase from './AsyncStorageDatabase';
import { Adapter } from './index';
import { ModelInstanceCreator } from '../../datastore/datastore';
import { ModelPredicateCreator } from '../../predicates';
import {
ModelPredicateCreator,
ModelSortPredicateCreator,
Comment on lines +5 to +7
Copy link
Contributor Author

@iartemiev iartemiev Oct 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI to reviewers, the sort code is showing up in the diff for AsyncStorageAdapter and IndexedDBAdatper due to a file rename and delete. It's not actually new code, it's just that we had duplicate files (e.g., asyncstorage.ts and AsyncStorageAdapter.ts and the latter was out of date).

I deleted AsyncStorageAdapter.ts and then renamed asyncstorage.ts -> AsyncStorageAdapter.ts. Same for IDB

} from '../../predicates';
import {
InternalSchema,
isPredicateObj,
Expand All @@ -24,6 +27,7 @@ import {
isModelConstructor,
traverseModel,
validatePredicate,
sortCompareFunction,
} from '../../util';

const logger = new Logger('DataStore');
Expand Down Expand Up @@ -237,6 +241,7 @@ export class AsyncStorageAdapter implements Adapter {
): Promise<T[]> {
const storeName = this.getStorenameForModel(modelConstructor);
const namespaceName = this.namespaceResolver(modelConstructor);
const sortSpecified = pagination && pagination.sort;

if (predicate) {
const predicates = ModelPredicateCreator.getPredicates(predicate);
Expand Down Expand Up @@ -276,6 +281,15 @@ export class AsyncStorageAdapter implements Adapter {
}
}

if (sortSpecified) {
const all = <T[]>await this.db.getAll(storeName);
return await this.load(
namespaceName,
modelConstructor.name,
this.inMemoryPagination(all, pagination)
);
}

const all = <T[]>await this.db.getAll(storeName, pagination);

return await this.load(namespaceName, modelConstructor.name, all);
Expand All @@ -286,14 +300,23 @@ export class AsyncStorageAdapter implements Adapter {
pagination?: PaginationInput<T>
): T[] {
if (pagination) {
if (pagination.sort) {
const sortPredicates = ModelSortPredicateCreator.getPredicates(
pagination.sort
);

if (sortPredicates.length) {
const compareFn = sortCompareFunction(sortPredicates);
records.sort(compareFn);
}
}
const { page = 0, limit = 0 } = pagination;
const start = Math.max(0, page * limit) || 0;

const end = limit > 0 ? start + limit : records.length;

return records.slice(start, end);
}

return records;
}

Expand Down Expand Up @@ -381,10 +404,7 @@ export class AsyncStorageAdapter implements Adapter {
const isValid = validatePredicate(fromDB, type, predicateObjs);
if (!isValid) {
const msg = 'Conditional update failed';
logger.error(msg, {
model: fromDB,
condition: predicateObjs,
});
logger.error(msg, { model: fromDB, condition: predicateObjs });

throw new Error(msg);
}
Expand Down
28 changes: 27 additions & 1 deletion packages/datastore/src/storage/adapter/IndexedDBAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { ConsoleLogger as Logger } from '@aws-amplify/core';
import * as idb from 'idb';
import { ModelInstanceCreator } from '../../datastore/datastore';
import { ModelPredicateCreator } from '../../predicates';
import {
ModelPredicateCreator,
ModelSortPredicateCreator,
} from '../../predicates';
import {
InternalSchema,
isPredicateObj,
Expand All @@ -24,8 +27,10 @@ import {
isPrivateMode,
traverseModel,
validatePredicate,
sortCompareFunction,
} from '../../util';
import { Adapter } from './index';
import { tsIndexSignature } from '@babel/types';
manueliglesias marked this conversation as resolved.
Show resolved Hide resolved

const logger = new Logger('DataStore');

Expand Down Expand Up @@ -362,6 +367,7 @@ class IndexedDBAdapter implements Adapter {
await this.checkPrivate();
const storeName = this.getStorenameForModel(modelConstructor);
const namespaceName = this.namespaceResolver(modelConstructor);
const sortSpecified = pagination && pagination.sort;

if (predicate) {
const predicates = ModelPredicateCreator.getPredicates(predicate);
Expand Down Expand Up @@ -403,6 +409,15 @@ class IndexedDBAdapter implements Adapter {
}
}

if (sortSpecified) {
const all = <T[]>await this.db.getAll(storeName);
return await this.load(
namespaceName,
modelConstructor.name,
this.inMemoryPagination(all, pagination)
);
}

return await this.load(
namespaceName,
modelConstructor.name,
Expand All @@ -415,6 +430,17 @@ class IndexedDBAdapter implements Adapter {
pagination?: PaginationInput<T>
): T[] {
if (pagination) {
if (pagination.sort) {
const sortPredicates = ModelSortPredicateCreator.getPredicates(
pagination.sort
);

if (sortPredicates.length) {
const compareFn = sortCompareFunction(sortPredicates);
records.sort(compareFn);
}
}

const { page = 0, limit = 0 } = pagination;
const start = Math.max(0, page * limit) || 0;

Expand Down
Loading