Skip to content

Commit

Permalink
fix(datastore): Reconcile batch process optimization bug #2727 (#2749)
Browse files Browse the repository at this point in the history
Co-authored-by: JoonWon Choi <joonwonc@amazon.com>
  • Loading branch information
joon-won and JoonWon Choi committed May 3, 2024
1 parent d124592 commit 863330d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ internal class Merger(
fun <T : Model> merge(modelWithMetadata: ModelWithMetadata<T>): Completable {
return merge(listOf(modelWithMetadata), NoOpConsumer.create())
}

fun <T : Model> merge(
modelsWithMetadata: List<ModelWithMetadata<T>>,
changeTypeConsumer: Consumer<StorageItemChange.Type>
Expand All @@ -66,8 +65,16 @@ internal class Merger(
return@rxCompletable
}

// Keep the models with the highest version for each ids, abandon the rest
val modelsWithUniqueId = modelsWithMetadata.groupBy { modelWithMetadata ->
modelWithMetadata.model.primaryKeyString
}
.map { group ->
group.value.maxBy { dupModels -> dupModels.syncMetadata.version ?: 0 }
}

// create (key, model metadata) map for quick lookup to re-associate
val modelMetadataMap = modelsWithMetadata.associateBy { it.syncMetadata.primaryKeyString }
val modelMetadataMap = modelsWithUniqueId.associateBy { it.syncMetadata.primaryKeyString }

// Consumer to announce Model merges
val mergedConsumer = Consumer<ModelWithMetadata<T>> {
Expand All @@ -85,7 +92,7 @@ internal class Merger(
}

// fetch a Map of all model versions from Metadata table
val localModelVersions = versionRepository.fetchModelVersions(modelsWithMetadata)
val localModelVersions = versionRepository.fetchModelVersions(modelsWithUniqueId)

/*
Fetch a Set of all pending mutation ids for type T
Expand All @@ -94,13 +101,13 @@ internal class Merger(
whichever comes first
*/
val pendingMutations = mutationOutbox.fetchPendingMutations(
models = modelsWithMetadata.map { it.model },
modelClass = modelsWithMetadata.first().model.javaClass.name,
models = modelsWithUniqueId.map { it.model },
modelClass = modelsWithUniqueId.first().model.javaClass.name,
excludeInFlight = true
)

// Construct a batch of operations to be executed in a single transactions
val operations = modelsWithMetadata.mapNotNull {
val operations = modelsWithUniqueId.mapNotNull {
val incomingVersion = it.syncMetadata.version ?: -1
val localVersion = localModelVersions.getOrDefault(
it.model.getMetadataSQLitePrimaryKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import android.database.sqlite.SQLiteConstraintException
import androidx.test.core.app.ApplicationProvider
import com.amplifyframework.AmplifyException
import com.amplifyframework.core.Consumer
import com.amplifyframework.core.NoOpConsumer
import com.amplifyframework.core.model.ModelSchema
import com.amplifyframework.core.model.SchemaRegistry
import com.amplifyframework.core.model.query.Where
Expand All @@ -35,6 +36,7 @@ import com.amplifyframework.testmodels.commentsblog.AmplifyModelProvider
import com.amplifyframework.testmodels.commentsblog.Blog
import com.amplifyframework.testmodels.commentsblog.BlogOwner
import com.amplifyframework.testutils.random.RandomString
import java.util.UUID
import java.util.concurrent.TimeUnit
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
Expand Down Expand Up @@ -667,6 +669,81 @@ class MergerTest {
assertEquals(expectedStorageItemChanges, capturedStorageItemChanges)
}

/**
* When several models with the same identifier, but different versions with different flag for deleted,
* Only the model with the latest version should be considered as it represents the latest known state of the model
* @throws InterruptedException If interrupted while awaiting terminal result in test observer
* @throws
*/
@Test
@Throws(DataStoreException::class, InterruptedException::class)
fun testBatchMergerReconciliation() {
// Random UUID following RFC 4122 version 4 spec
val sameRandomId1 = UUID.randomUUID().toString()
val sameRandomId2 = UUID.randomUUID().toString()

// Models to merge
val blogFirstPost1ToBeDisregarded =
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS1").build(),
ModelMetadata(sameRandomId1, false, 1, Temporal.Timestamp.now()),
)
val blogFirstPost2LatestVer =
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS1").build(),
ModelMetadata(sameRandomId1, true, 3, Temporal.Timestamp.now()),
)
val blogFirstPost3LatestVerDuplicate =
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS1").build(),
ModelMetadata(sameRandomId1, false, 3, Temporal.Timestamp.now()),
)
val blogFirstPost4ToBeDisregarded =
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS1").build(),
ModelMetadata(sameRandomId1, false, 2, Temporal.Timestamp.now()),
)
val blogSecondPost1ToSurvive =
ModelWithMetadata(
Blog.builder().name("Hideo K.").id("DS2").build(),
ModelMetadata(sameRandomId2, false, 11, Temporal.Timestamp.now()),
)

// Input list of blog posts on remote storage
val remotemodels =
listOf(
blogFirstPost1ToBeDisregarded,
blogFirstPost2LatestVer,
blogFirstPost3LatestVerDuplicate,
blogFirstPost4ToBeDisregarded,
blogSecondPost1ToSurvive,
)

// Expected Blog table result
val expectedBlogResult =
listOf(
blogSecondPost1ToSurvive.model,
)

// Expected Metadata result
val expectedMetadataResult =
listOf(
blogFirstPost2LatestVer.syncMetadata,
blogSecondPost1ToSurvive.syncMetadata,
)

val observer = merger.merge(remotemodels, NoOpConsumer.create()).test()
assertTrue(observer.await(REASONABLE_WAIT_TIME, TimeUnit.MILLISECONDS))
observer.assertNoErrors().assertComplete()

val blogResult = storageAdapter.query(Blog::class.java)
val metadataResult = storageAdapter.query(ModelMetadata::class.java)
assertEquals(1, blogResult.size)
assertEquals(2, metadataResult.size)
assertEquals(expectedMetadataResult, metadataResult)
assertEquals(expectedBlogResult, blogResult)
}

companion object {
private val REASONABLE_WAIT_TIME = TimeUnit.SECONDS.toMillis(2)
}
Expand Down

0 comments on commit 863330d

Please sign in to comment.