Skip to content

Commit

Permalink
feat: Datastore use configurable syncMaxRecords and syncPageSize (#388)
Browse files Browse the repository at this point in the history
  • Loading branch information
wooj2 committed Apr 22, 2020
1 parent b02c3b7 commit ca15e88
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 20 deletions.
Expand Up @@ -20,7 +20,14 @@ final class InitialSyncOperation: AsynchronousOperation {
private let modelType: Model.Type
private let completion: AWSInitialSyncOrchestrator.SyncOperationResultHandler

private var lastSyncTime: Int?
private var recordsReceived: UInt

private var syncMaxRecords: UInt {
return dataStoreConfiguration.syncMaxRecords
}
private var syncPageSize: UInt {
return dataStoreConfiguration.syncPageSize
}

init(modelType: Model.Type,
api: APICategoryGraphQLBehavior?,
Expand All @@ -34,6 +41,7 @@ final class InitialSyncOperation: AsynchronousOperation {
self.storageAdapter = storageAdapter
self.dataStoreConfiguration = dataStoreConfiguration
self.completion = completion
self.recordsReceived = 0
}

override func main() {
Expand All @@ -43,19 +51,19 @@ final class InitialSyncOperation: AsynchronousOperation {
}

log.info("Beginning sync for \(modelType.modelName)")
setUpLastSyncTime()
query()
let lastSyncTime = getLastSyncTime()
query(lastSyncTime: lastSyncTime)
}

private func setUpLastSyncTime() {
private func getLastSyncTime() -> Int? {
guard !isCancelled else {
super.finish()
return
return nil
}

let lastSyncMetadata = getLastSyncMetadata()
guard let lastSync = lastSyncMetadata?.lastSync else {
return
return nil
}

//TODO: Update to use TimeInterval.milliseconds when it is pushed to master branch
Expand All @@ -64,14 +72,11 @@ final class InitialSyncOperation: AsynchronousOperation {
let secondsSinceLastSync = (lastSyncDate.timeIntervalSinceNow * -1)
if secondsSinceLastSync < 0 {
log.info("lastSyncTime was in the future, assuming base query")
lastSyncTime = nil
return
return nil
}

let shouldDoDeltaQuery = secondsSinceLastSync < dataStoreConfiguration.syncInterval
if shouldDoDeltaQuery {
lastSyncTime = lastSync
}
return shouldDoDeltaQuery ? lastSync : nil
}

private func getLastSyncMetadata() -> ModelSyncMetadata? {
Expand All @@ -95,7 +100,7 @@ final class InitialSyncOperation: AsynchronousOperation {

}

private func query(nextToken: String? = nil) {
private func query(lastSyncTime: Int?, nextToken: String? = nil) {
guard !isCancelled else {
super.finish()
return
Expand All @@ -105,8 +110,10 @@ final class InitialSyncOperation: AsynchronousOperation {
finish(result: .failure(DataStoreError.nilAPIHandle()))
return
}

let minSyncPageSize = Int(min(syncMaxRecords - recordsReceived, syncPageSize))
let limit = minSyncPageSize < 0 ? Int(syncPageSize) : minSyncPageSize
let request = GraphQLRequest<SyncQueryResult>.syncQuery(modelType: modelType,
limit: limit,
nextToken: nextToken,
lastSync: lastSyncTime)

Expand All @@ -116,7 +123,7 @@ final class InitialSyncOperation: AsynchronousOperation {
// TODO: Retry query on error
self.finish(result: .failure(DataStoreError.api(apiError)))
case .completed(let graphQLResult):
self.handleQueryResults(graphQLResult: graphQLResult)
self.handleQueryResults(lastSyncTime: lastSyncTime, graphQLResult: graphQLResult)
default:
break
}
Expand All @@ -125,7 +132,7 @@ final class InitialSyncOperation: AsynchronousOperation {

/// Disposes of the query results: Stops if error, reconciles results if success, and kick off a new query if there
/// is a next token
private func handleQueryResults(graphQLResult: Result<SyncQueryResult, GraphQLResponseError<SyncQueryResult>>) {
private func handleQueryResults(lastSyncTime: Int?, graphQLResult: Result<SyncQueryResult, GraphQLResponseError<SyncQueryResult>>) {
guard !isCancelled else {
super.finish()
return
Expand All @@ -146,23 +153,23 @@ final class InitialSyncOperation: AsynchronousOperation {
}

let items = syncQueryResult.items
lastSyncTime = syncQueryResult.startedAt
recordsReceived += UInt(items.count)

for item in items {
reconciliationQueue.offer(item)
}

if let nextToken = syncQueryResult.nextToken {
if let nextToken = syncQueryResult.nextToken, recordsReceived < syncMaxRecords {
DispatchQueue.global().async {
self.query(nextToken: nextToken)
self.query(lastSyncTime: lastSyncTime, nextToken: nextToken)
}
} else {
updateModelSyncMetadata()
updateModelSyncMetadata(lastSyncTime: syncQueryResult.startedAt)
}

}

private func updateModelSyncMetadata() {
private func updateModelSyncMetadata(lastSyncTime: Int?) {
guard !isCancelled else {
super.finish()
return
Expand Down
Expand Up @@ -358,4 +358,40 @@ class InitialSyncOperationTests: XCTestCase {

wait(for: [apiWasQueried], timeout: 1.0)
}

func testBaseQueryWithCustomSyncPageSize() throws {
let storageAdapter = try SQLiteStorageEngineAdapter(connection: Connection(.inMemory))
try storageAdapter.setUp(models: StorageEngine.systemModels + [MockSynced.self])

let apiWasQueried = expectation(description: "API was queried for a PaginatedList of AnyModel")
let responder = QueryRequestListenerResponder<PaginatedList<AnyModel>> { request, listener in
let lastSync = request.variables?["lastSync"] as? Int
XCTAssertNil(lastSync)
XCTAssert(request.document.contains("limit: Int"))
let limitValue = request.variables?["limit"] as? Int
XCTAssertEqual(10, limitValue)

let list = PaginatedList<AnyModel>(items: [], nextToken: nil, startedAt: nil)
let event: GraphQLOperation<PaginatedList<AnyModel>>.Event = .completed(.success(list))
listener?(event)
apiWasQueried.fulfill()
return nil
}

let apiPlugin = MockAPICategoryPlugin()
apiPlugin.responders[.queryRequestListener] = responder

let reconciliationQueue = MockReconciliationQueue()
let configuration = DataStoreConfiguration.custom(syncPageSize: 10)
let operation = InitialSyncOperation(
modelType: MockSynced.self,
api: apiPlugin,
reconciliationQueue: reconciliationQueue,
storageAdapter: storageAdapter,
dataStoreConfiguration: configuration) {_ in }

operation.main()

wait(for: [apiWasQueried], timeout: 1.0)
}
}

0 comments on commit ca15e88

Please sign in to comment.