diff --git a/demo/src/main/java/com/google/android/fhir/demo/data/DemoFhirSyncWorker.kt b/demo/src/main/java/com/google/android/fhir/demo/data/DemoFhirSyncWorker.kt index 21d24c0977..d284a98601 100644 --- a/demo/src/main/java/com/google/android/fhir/demo/data/DemoFhirSyncWorker.kt +++ b/demo/src/main/java/com/google/android/fhir/demo/data/DemoFhirSyncWorker.kt @@ -34,6 +34,4 @@ class DemoFhirSyncWorker(appContext: Context, workerParams: WorkerParameters) : override fun getConflictResolver() = AcceptLocalConflictResolver override fun getUploadStrategy(): UploadStrategy = UploadStrategy.AllChangesSquashedBundlePut - - override fun getFhirEngine() = FhirApplication.fhirEngine(applicationContext) } diff --git a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt index 91f777fa95..2ed2f5fc9c 100644 --- a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt +++ b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt @@ -18,12 +18,7 @@ package com.google.android.fhir import com.google.android.fhir.db.ResourceNotFoundException import com.google.android.fhir.search.Search -import com.google.android.fhir.sync.ConflictResolver -import com.google.android.fhir.sync.upload.LocalChangesFetchMode -import com.google.android.fhir.sync.upload.SyncUploadProgress -import com.google.android.fhir.sync.upload.UploadRequestResult import java.time.OffsetDateTime -import kotlinx.coroutines.flow.Flow import org.hl7.fhir.r4.model.Resource import org.hl7.fhir.r4.model.ResourceType @@ -50,31 +45,6 @@ interface FhirEngine { */ suspend fun search(search: Search): List> - /** - * Synchronizes the upload results in the database. - * - * The [upload] function may initiate multiple server calls. Each call's result can then be used - * to emit [UploadSyncResult]. The caller should collect these results using [Flow.collect]. - * - * @param localChangesFetchMode Specifies the mode to fetch local changes. - * @param upload A suspend function that takes a list of [LocalChange] and returns an - * [UploadSyncResult]. - * @return A [Flow] that emits the progress of the synchronization process. - */ - suspend fun syncUpload( - localChangesFetchMode: LocalChangesFetchMode, - upload: (suspend (List) -> Flow), - ): Flow - - /** - * Synchronizes the [download] result in the database. The database will be updated to reflect the - * result of the [download] operation. - */ - suspend fun syncDownload( - conflictResolver: ConflictResolver, - download: suspend () -> Flow>, - ) - /** * Returns the total count of entities available for given search. * diff --git a/engine/src/main/java/com/google/android/fhir/FhirEngineProvider.kt b/engine/src/main/java/com/google/android/fhir/FhirEngineProvider.kt index b3775f8165..2ee8aa94c5 100644 --- a/engine/src/main/java/com/google/android/fhir/FhirEngineProvider.kt +++ b/engine/src/main/java/com/google/android/fhir/FhirEngineProvider.kt @@ -18,6 +18,7 @@ package com.google.android.fhir import android.content.Context import com.google.android.fhir.DatabaseErrorStrategy.UNSPECIFIED +import com.google.android.fhir.db.Database import com.google.android.fhir.sync.DataSource import com.google.android.fhir.sync.FhirDataStore import com.google.android.fhir.sync.HttpAuthenticator @@ -67,6 +68,12 @@ object FhirEngineProvider { return getOrCreateFhirService(context).fhirDataStore } + @PublishedApi + @Synchronized + internal fun getFhirDatabase(context: Context): Database { + return getOrCreateFhirService(context).database + } + @Synchronized private fun getOrCreateFhirService(context: Context): FhirServices { if (fhirServices == null) { diff --git a/engine/src/main/java/com/google/android/fhir/FhirSyncDbInteractor.kt b/engine/src/main/java/com/google/android/fhir/FhirSyncDbInteractor.kt new file mode 100644 index 0000000000..ca32cf554e --- /dev/null +++ b/engine/src/main/java/com/google/android/fhir/FhirSyncDbInteractor.kt @@ -0,0 +1,158 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.android.fhir + +import com.google.android.fhir.db.Database +import com.google.android.fhir.sync.ConflictResolver +import com.google.android.fhir.sync.Resolved +import com.google.android.fhir.sync.SyncJobStatus +import com.google.android.fhir.sync.SyncOperation +import com.google.android.fhir.sync.download.DownloadState +import com.google.android.fhir.sync.upload.LocalChangeFetcher +import com.google.android.fhir.sync.upload.ResourceConsolidator +import com.google.android.fhir.sync.upload.UploadRequestResult +import org.hl7.fhir.r4.model.Resource + +internal interface FhirSyncDbInteractor { + suspend fun getLocalChanges(): List + + suspend fun consolidateUploadResult(uploadRequestResult: UploadRequestResult) + + suspend fun consolidateDownloadResult(downloadState: DownloadState) + + suspend fun updateSyncJobStatus( + previousSyncJobStatus: SyncJobStatus, + downloadState: DownloadState, + ): SyncJobStatus + + suspend fun updateSyncJobStatus( + previousSyncJobStatus: SyncJobStatus, + uploadRequestResult: UploadRequestResult, + ): SyncJobStatus +} + +internal class FhirSyncDbInteractorImpl( + private val database: Database, + private val localChangeFetcher: LocalChangeFetcher, + private val resourceConsolidator: ResourceConsolidator, + private val conflictResolver: ConflictResolver, +) : FhirSyncDbInteractor { + override suspend fun getLocalChanges() = localChangeFetcher.next() + + override suspend fun consolidateUploadResult(uploadRequestResult: UploadRequestResult) { + resourceConsolidator.consolidate(uploadRequestResult) + } + + override suspend fun consolidateDownloadResult(downloadState: DownloadState) { + when (downloadState) { + is DownloadState.Started -> { + /** */ + } + is DownloadState.Success -> { + database.withTransaction { + val resolved = + resolveConflictingResources( + downloadState.resources, + getConflictingResourceIds(downloadState.resources), + conflictResolver, + ) + database.insertSyncedResources(downloadState.resources) + saveResolvedResourcesToDatabase(resolved) + } + } + is DownloadState.Failure -> {} + } + } + + override suspend fun updateSyncJobStatus( + previousSyncJobStatus: SyncJobStatus, + downloadState: DownloadState, + ): SyncJobStatus { + return with(previousSyncJobStatus as SyncJobStatus.InProgress) { + when (downloadState) { + is DownloadState.Started -> { + SyncJobStatus.InProgress( + syncOperation = SyncOperation.DOWNLOAD, + total = downloadState.total, + completed = 0, + ) + } + is DownloadState.Success -> { + SyncJobStatus.InProgress( + syncOperation = syncOperation, + total = downloadState.total, + completed = downloadState.completed, + ) + } + is DownloadState.Failure -> { + SyncJobStatus.Failed( + exceptions = listOf(downloadState.syncError), + ) + } + } + } + } + + override suspend fun updateSyncJobStatus( + previousSyncJobStatus: SyncJobStatus, + uploadRequestResult: UploadRequestResult, + ): SyncJobStatus { + return when (uploadRequestResult) { + is UploadRequestResult.Success -> { + val localChangesCount = + uploadRequestResult.successfulUploadResponseMappings.flatMap { it.localChanges }.size + with(previousSyncJobStatus as SyncJobStatus.InProgress) { + SyncJobStatus.InProgress( + syncOperation = syncOperation, + completed = completed + localChangesCount, + total = total, + ) + } + } + is UploadRequestResult.Failure -> { + SyncJobStatus.Failed( + exceptions = listOf(uploadRequestResult.uploadError), + ) + } + } + } + + private suspend fun saveResolvedResourcesToDatabase(resolved: List?) { + resolved?.let { + database.deleteUpdates(it) + database.update(*it.toTypedArray()) + } + } + + private suspend fun resolveConflictingResources( + resources: List, + conflictingResourceIds: Set, + conflictResolver: ConflictResolver, + ) = + resources + .filter { conflictingResourceIds.contains(it.logicalId) } + .map { conflictResolver.resolve(database.select(it.resourceType, it.logicalId), it) } + .filterIsInstance() + .map { it.resolved } + .takeIf { it.isNotEmpty() } + + private suspend fun getConflictingResourceIds(resources: List) = + resources + .map { it.logicalId } + .toSet() + .intersect(database.getAllLocalChanges().map { it.resourceId }.toSet()) +} diff --git a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt index b8524c1b6f..cc11052bc8 100644 --- a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt @@ -22,22 +22,10 @@ import com.google.android.fhir.FhirEngineProvider import com.google.android.fhir.LocalChange import com.google.android.fhir.SearchResult import com.google.android.fhir.db.Database -import com.google.android.fhir.logicalId import com.google.android.fhir.search.Search import com.google.android.fhir.search.count import com.google.android.fhir.search.execute -import com.google.android.fhir.sync.ConflictResolver -import com.google.android.fhir.sync.Resolved -import com.google.android.fhir.sync.upload.DefaultResourceConsolidator -import com.google.android.fhir.sync.upload.LocalChangeFetcherFactory -import com.google.android.fhir.sync.upload.LocalChangesFetchMode -import com.google.android.fhir.sync.upload.SyncUploadProgress -import com.google.android.fhir.sync.upload.UploadRequestResult import java.time.OffsetDateTime -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.firstOrNull -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.onEach import org.hl7.fhir.r4.model.Resource import org.hl7.fhir.r4.model.ResourceType @@ -83,83 +71,4 @@ internal class FhirEngineImpl(private val database: Database, private val contex override suspend fun purge(type: ResourceType, id: String, forcePurge: Boolean) { database.purge(type, id, forcePurge) } - - override suspend fun syncDownload( - conflictResolver: ConflictResolver, - download: suspend () -> Flow>, - ) { - download().collect { resources -> - database.withTransaction { - val resolved = - resolveConflictingResources( - resources, - getConflictingResourceIds(resources), - conflictResolver, - ) - database.insertSyncedResources(resources) - saveResolvedResourcesToDatabase(resolved) - } - } - } - - private suspend fun saveResolvedResourcesToDatabase(resolved: List?) { - resolved?.let { - database.deleteUpdates(it) - database.update(*it.toTypedArray()) - } - } - - private suspend fun resolveConflictingResources( - resources: List, - conflictingResourceIds: Set, - conflictResolver: ConflictResolver, - ) = - resources - .filter { conflictingResourceIds.contains(it.logicalId) } - .map { conflictResolver.resolve(database.select(it.resourceType, it.logicalId), it) } - .filterIsInstance() - .map { it.resolved } - .takeIf { it.isNotEmpty() } - - private suspend fun getConflictingResourceIds(resources: List) = - resources - .map { it.logicalId } - .toSet() - .intersect(database.getAllLocalChanges().map { it.resourceId }.toSet()) - - override suspend fun syncUpload( - localChangesFetchMode: LocalChangesFetchMode, - upload: (suspend (List) -> Flow), - ): Flow = flow { - val resourceConsolidator = DefaultResourceConsolidator(database) - val localChangeFetcher = LocalChangeFetcherFactory.byMode(localChangesFetchMode, database) - - emit( - SyncUploadProgress( - remaining = localChangeFetcher.total, - initialTotal = localChangeFetcher.total, - ), - ) - - while (localChangeFetcher.hasNext()) { - val localChanges = localChangeFetcher.next() - val uploadRequestResult = - upload(localChanges) - .onEach { result -> - resourceConsolidator.consolidate(result) - val newProgress = - when (result) { - is UploadRequestResult.Success -> localChangeFetcher.getProgress() - is UploadRequestResult.Failure -> - localChangeFetcher.getProgress().copy(uploadError = result.uploadError) - } - emit(newProgress) - } - .firstOrNull { it is UploadRequestResult.Failure } - - if (uploadRequestResult is UploadRequestResult.Failure) { - break - } - } - } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt index bc07ddc12a..fb6af16966 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt @@ -21,10 +21,12 @@ import androidx.work.CoroutineWorker import androidx.work.Data import androidx.work.WorkerParameters import androidx.work.workDataOf -import com.google.android.fhir.FhirEngine import com.google.android.fhir.FhirEngineProvider +import com.google.android.fhir.FhirSyncDbInteractorImpl import com.google.android.fhir.OffsetDateTimeTypeAdapter import com.google.android.fhir.sync.download.DownloaderImpl +import com.google.android.fhir.sync.upload.DefaultResourceConsolidator +import com.google.android.fhir.sync.upload.LocalChangeFetcherFactory import com.google.android.fhir.sync.upload.UploadStrategy import com.google.android.fhir.sync.upload.Uploader import com.google.android.fhir.sync.upload.patch.PatchGeneratorFactory @@ -33,17 +35,12 @@ import com.google.gson.ExclusionStrategy import com.google.gson.FieldAttributes import com.google.gson.GsonBuilder import java.time.OffsetDateTime -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.cancel -import kotlinx.coroutines.launch -import timber.log.Timber +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.onEach /** A WorkManager Worker that handles periodic sync. */ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameters) : CoroutineWorker(appContext, workerParams) { - abstract fun getFhirEngine(): FhirEngine - abstract fun getDownloadWorkManager(): DownloadWorkManager abstract fun getConflictResolver(): ConflictResolver @@ -70,61 +67,55 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter ), ) - val synchronizer = - FhirSynchronizer( - getFhirEngine(), - UploadConfiguration( - Uploader( - dataSource = dataSource, - patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode), - requestGenerator = - UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode), + val database = FhirEngineProvider.getFhirDatabase(applicationContext) + val fhirDataStore = FhirEngineProvider.getFhirDataStore(applicationContext) + + val fhirSyncDbInteractor = + FhirSyncDbInteractorImpl( + database = database, + localChangeFetcher = + LocalChangeFetcherFactory.byMode( + getUploadStrategy().localChangesFetchMode, + database, ), - ), - DownloadConfiguration( - DownloaderImpl(dataSource, getDownloadWorkManager()), - getConflictResolver(), - ), - FhirEngineProvider.getFhirDataStore(applicationContext), + resourceConsolidator = DefaultResourceConsolidator(database), + conflictResolver = getConflictResolver(), ) + val uploader = + Uploader( + dataSource = dataSource, + patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode), + requestGenerator = + UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode), + ) + val downloader = DownloaderImpl(dataSource, getDownloadWorkManager()) - val job = - CoroutineScope(Dispatchers.IO).launch { - val fhirDataStore = FhirEngineProvider.getFhirDataStore(applicationContext) - synchronizer.syncState.collect { syncJobStatus -> - val uniqueWorkerName = inputData.getString(UNIQUE_WORK_NAME) - when (syncJobStatus) { - is SyncJobStatus.Succeeded, - is SyncJobStatus.Failed, -> { - // While creating periodicSync request if - // putString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY, uniqueWorkName) is not present, - // then inputData.getString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY) can be null. - if (uniqueWorkerName != null) { - fhirDataStore.writeTerminalSyncJobStatus(uniqueWorkerName, syncJobStatus) - } - cancel() - } - else -> { - setProgress(buildWorkData(syncJobStatus)) - } - } - } - } - - val result = synchronizer.synchronize() - val output = buildWorkData(result) - - // await/join is needed to collect states completely - kotlin.runCatching { job.join() }.onFailure(Timber::w) - - Timber.d("Received result from worker $result and sending output $output") + val terminalSyncJobStatus = + FhirSynchronizer( + uploader = uploader, + downloader = downloader, + fhirSyncDbInteractor = fhirSyncDbInteractor, + ) + .synchronize() + .onEach { setProgress(buildWorkData(it)) } + .first { it is SyncJobStatus.Failed || it is SyncJobStatus.Succeeded } + + fhirDataStore.writeLastSyncTimestamp(terminalSyncJobStatus.timestamp) + val uniqueWorkerName = inputData.getString(UNIQUE_WORK_NAME) + // While creating periodicSync request if + // putString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY, uniqueWorkName) is not present, + // then inputData.getString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY) can be null. + if (uniqueWorkerName != null) { + fhirDataStore.writeTerminalSyncJobStatus(uniqueWorkerName, terminalSyncJobStatus) + } /** * In case of failure, we can check if its worth retrying and do retry based on * [RetryConfiguration.maxRetries] set by user. */ val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0) - return when (result) { + val output = buildWorkData(terminalSyncJobStatus) + return when (terminalSyncJobStatus) { is SyncJobStatus.Succeeded -> Result.success(output) else -> { if (retries > runAttemptCount) Result.retry() else Result.failure(output) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index 6f5232e311..984bec544b 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -16,15 +16,16 @@ package com.google.android.fhir.sync -import com.google.android.fhir.FhirEngine -import com.google.android.fhir.sync.download.DownloadState +import com.google.android.fhir.FhirSyncDbInteractor import com.google.android.fhir.sync.download.Downloader -import com.google.android.fhir.sync.upload.LocalChangesFetchMode import com.google.android.fhir.sync.upload.Uploader -import java.time.OffsetDateTime -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.runningFold import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.hl7.fhir.r4.model.ResourceType @@ -34,120 +35,61 @@ enum class SyncOperation { UPLOAD, } -private sealed class SyncResult { - val timestamp: OffsetDateTime = OffsetDateTime.now() - - class Success : SyncResult() - - data class Error(val exceptions: List) : SyncResult() -} - data class ResourceSyncException(val resourceType: ResourceType, val exception: Exception) -internal data class UploadConfiguration( - val uploader: Uploader, -) - -internal class DownloadConfiguration( - val downloader: Downloader, - val conflictResolver: ConflictResolver, -) - /** Class that helps synchronize the data source and save it in the local database */ internal class FhirSynchronizer( - private val fhirEngine: FhirEngine, - private val uploadConfiguration: UploadConfiguration, - private val downloadConfiguration: DownloadConfiguration, - private val datastoreUtil: FhirDataStore, + private val uploader: Uploader, + private val downloader: Downloader, + private val fhirSyncDbInteractor: FhirSyncDbInteractor, ) { - - private val _syncState = MutableSharedFlow() - val syncState: SharedFlow = _syncState - - private suspend fun setSyncState(state: SyncJobStatus) = _syncState.emit(state) - - private suspend fun setSyncState(result: SyncResult): SyncJobStatus { - // todo: emit this properly instead of using datastore? - datastoreUtil.writeLastSyncTimestamp(result.timestamp) - - val state = - when (result) { - is SyncResult.Success -> SyncJobStatus.Succeeded() - is SyncResult.Error -> SyncJobStatus.Failed(result.exceptions) - } - - setSyncState(state) - return state - } - /** * Manages the sequential execution of downloading and uploading for coordinated operation. This * function is coroutine-safe, ensuring that multiple invocations will not interfere with each * other. */ - suspend fun synchronize(): SyncJobStatus { + suspend fun synchronize(): Flow = flow { mutex.withLock { - setSyncState(SyncJobStatus.Started()) - - return listOf(download(), upload()) - .filterIsInstance() - .flatMap { it.exceptions } - .let { - if (it.isEmpty()) { - setSyncState(SyncResult.Success()) - } else { - setSyncState(SyncResult.Error(it)) - } - } + emit(SyncJobStatus.Started()) + emitAll(download()) + emitAll(upload()) + emit(SyncJobStatus.Succeeded()) } } - private suspend fun download(): SyncResult { - val exceptions = mutableListOf() - fhirEngine.syncDownload(downloadConfiguration.conflictResolver) { - flow { - downloadConfiguration.downloader.download().collect { - when (it) { - is DownloadState.Started -> { - setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total)) - } - is DownloadState.Success -> { - setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total, it.completed)) - emit(it.resources) - } - is DownloadState.Failure -> { - exceptions.add(it.syncError) - } - } - } - } - } - return if (exceptions.isEmpty()) { - SyncResult.Success() - } else { - SyncResult.Error(exceptions) - } + private suspend fun download(): Flow = flow { + // Following is to bootstrap new state calculation based on previous "InProgress" states + val initialSyncJobStatus = + SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, 0, 0) as SyncJobStatus + downloader + .download() + .onEach { fhirSyncDbInteractor.consolidateDownloadResult(it) } + .runningFold(initialSyncJobStatus, fhirSyncDbInteractor::updateSyncJobStatus) + // initialSyncJobStatus is dropped + .drop(1) + .onEach { emit(it) } + .firstOrNull { it is SyncJobStatus.Failed } } - private suspend fun upload(): SyncResult { - val exceptions = mutableListOf() - val localChangesFetchMode = LocalChangesFetchMode.AllChanges - fhirEngine.syncUpload(localChangesFetchMode, uploadConfiguration.uploader::upload).collect { - progress -> - progress.uploadError?.let { exceptions.add(it) } - ?: setSyncState( - SyncJobStatus.InProgress( - SyncOperation.UPLOAD, - progress.initialTotal, - progress.initialTotal - progress.remaining, - ), - ) - } - - return if (exceptions.isEmpty()) { - SyncResult.Success() - } else { - SyncResult.Error(exceptions) + private suspend fun upload(): Flow = flow { + var localChanges = fhirSyncDbInteractor.getLocalChanges() + // Following is to bootstrap new state calculation based on previous "InProgress" states + val initialSyncJobStatus = + SyncJobStatus.InProgress(SyncOperation.UPLOAD, 0, localChanges.size) as SyncJobStatus + while (localChanges.isNotEmpty()) { + val failedOrNullSyncJobStatus = + uploader + .upload(localChanges) + .onEach { fhirSyncDbInteractor.consolidateUploadResult(it) } + .runningFold(initialSyncJobStatus, fhirSyncDbInteractor::updateSyncJobStatus) + // initialSyncJobStatus is dropped + .drop(1) + .onEach { emit(it) } + .firstOrNull { it is SyncJobStatus.Failed } + if (failedOrNullSyncJobStatus is SyncJobStatus.Failed) { + return@flow + } + localChanges = fhirSyncDbInteractor.getLocalChanges() } }