Skip to content

Commit

Permalink
Sync ModelTypeState: Extend initial_sync_done from bool to enum
Browse files Browse the repository at this point in the history
Sync's ModelTypeState already contained a flag `initial_sync_done` which
tracked whether the initial sync (download+merge) was completed for a
data type.
There are two special cases that aren't well covered by this:
* CommitOnlyTypes: For these types, nothing gets downloaded and no merge
  happens.
* ApplyUpdatesImmediatelyTypes: For these types, downloaded updates get
  applied immediately, without first waiting for the download cycle to
  complete. For these types, initial_sync_done is set to true after the
  first GetUpdates request returns.

This CL adds a new enum field `initial_sync_state`, which has entries
for the two special cases mentioned above, and adds logic to populate
it, including migrating from the existing `initial_sync_done` flag.

So far, the new enum/field isn't used yet - that will come in followup
CLs.

Bug: 1423338
Change-Id: Icaa913cfd1e8ade5241c118f9298fcd2aba76c39
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4329031
Commit-Queue: Marc Treib <treib@chromium.org>
Reviewed-by: Mikel Astiz <mastiz@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1116474}
  • Loading branch information
Marc Treib authored and Chromium LUCI CQ committed Mar 13, 2023
1 parent cdc299e commit ec5f57a
Show file tree
Hide file tree
Showing 20 changed files with 257 additions and 55 deletions.
1 change: 1 addition & 0 deletions components/sync/BUILD.gn
Expand Up @@ -268,6 +268,7 @@ source_set("unit_tests") {
"nigori/nigori_storage_impl_unittest.cc",
"nigori/nigori_sync_bridge_impl_unittest.cc",
"protocol/entity_data_unittest.cc",
"protocol/model_type_state_helper_unittest.cc",
"protocol/proto_enum_conversions_unittest.cc",
"protocol/proto_value_conversions_unittest.cc",
]
Expand Down
2 changes: 1 addition & 1 deletion components/sync/engine/get_updates_processor.cc
Expand Up @@ -362,7 +362,7 @@ void GetUpdatesProcessor::ApplyUpdates(const ModelTypeSet& gu_types,
StatusController* status_controller) {
for (const auto& [type, update_handler] : *update_handler_map_) {
if (gu_types.Has(type)) {
update_handler->ApplyUpdates(status_controller);
update_handler->ApplyUpdates(status_controller, /*cycle_done=*/true);
}
}
}
Expand Down
22 changes: 16 additions & 6 deletions components/sync/engine/model_type_worker.cc
Expand Up @@ -32,7 +32,6 @@
#include "components/sync/base/model_type.h"
#include "components/sync/base/sync_invalidation_adapter.h"
#include "components/sync/base/time.h"
#include "components/sync/base/unique_position.h"
#include "components/sync/engine/bookmark_update_preprocessing.h"
#include "components/sync/engine/cancelation_signal.h"
#include "components/sync/engine/commit_contribution.h"
Expand Down Expand Up @@ -503,7 +502,7 @@ void ModelTypeWorker::ProcessGetUpdatesResponse(
// remote data first. Instead, apply updates as they come in. This saves the
// need to accumulate all data in memory.
if (ApplyUpdatesImmediatelyTypes().Has(type_)) {
ApplyUpdates(status);
ApplyUpdates(status, /*cycle_done=*/false);
}
}

Expand Down Expand Up @@ -587,12 +586,23 @@ ModelTypeWorker::DecryptionStatus ModelTypeWorker::PopulateUpdateResponseData(
return SUCCESS;
}

void ModelTypeWorker::ApplyUpdates(StatusController* status) {
void ModelTypeWorker::ApplyUpdates(StatusController* status, bool cycle_done) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Indicate to the processor that the initial download is done. The initial
// sync technically isn't done yet but by the time this value is persisted to
// disk on the model thread it will be.
// Indicate the new initial-sync state to the processor: If the current sync
// cycle was completed, the initial sync must be done. Otherwise, it's started
// now. The latter can only happen for ApplyUpdatesImmediatelyTypes(), since
// other types wait for the cycle to complete before applying any updates.
// Note that the initial sync technically isn't started/done yet but by the
// time this value is persisted to disk on the model thread it will be.
model_type_state_.set_initial_sync_done(true);
if (cycle_done) {
model_type_state_.set_initial_sync_state(
sync_pb::ModelTypeState_InitialSyncState_INITIAL_SYNC_DONE);
} else {
DCHECK(ApplyUpdatesImmediatelyTypes().Has(type_));
model_type_state_.set_initial_sync_state(
sync_pb::ModelTypeState_InitialSyncState_INITIAL_SYNC_PARTIALLY_DONE);
}

if (!entries_pending_decryption_.empty() &&
(!encryption_enabled_ || cryptographer_->CanEncrypt())) {
Expand Down
2 changes: 1 addition & 1 deletion components/sync/engine/model_type_worker.h
Expand Up @@ -176,7 +176,7 @@ class ModelTypeWorker : public UpdateHandler,
const sync_pb::DataTypeContext& mutated_context,
const SyncEntityList& applicable_updates,
StatusController* status) override;
void ApplyUpdates(StatusController* status) override;
void ApplyUpdates(StatusController* status, bool cycle_done) override;
void RecordRemoteInvalidation(
std::unique_ptr<SyncInvalidation> incoming) override;
void CollectPendingInvalidations(sync_pb::GetUpdateTriggers* msg) override;
Expand Down
68 changes: 35 additions & 33 deletions components/sync/engine/model_type_worker_unittest.cc
Expand Up @@ -306,14 +306,14 @@ class ModelTypeWorkerTest : public ::testing::Test {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
&status_controller_);
worker()->ApplyUpdates(&status_controller_);
worker()->ApplyUpdates(&status_controller_, /*cycle_done=*/true);
}

void TriggerEmptyUpdateFromServer() {
worker()->ProcessGetUpdatesResponse(
server()->GetProgress(), server()->GetContext(),
/*applicable_updates=*/{}, &status_controller_);
worker()->ApplyUpdates(&status_controller_);
worker()->ApplyUpdates(&status_controller_, /*cycle_done=*/true);
}

void TriggerPartialUpdateFromServer(int64_t version_offset,
Expand Down Expand Up @@ -361,7 +361,7 @@ class ModelTypeWorkerTest : public ::testing::Test {
const std::string& tag,
const std::string& value) {
TriggerPartialUpdateFromServer(version_offset, tag, value);
worker()->ApplyUpdates(&status_controller_);
worker()->ApplyUpdates(&status_controller_, /*cycle_done=*/true);
}

void TriggerTombstoneFromServer(int64_t version_offset,
Expand All @@ -377,12 +377,14 @@ class ModelTypeWorkerTest : public ::testing::Test {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
&status_controller_);
worker()->ApplyUpdates(&status_controller_);
worker()->ApplyUpdates(&status_controller_, /*cycle_done=*/true);
}

// Simulates the end of a GU sync cycle and tells the worker to flush changes
// to the processor.
void ApplyUpdates() { worker()->ApplyUpdates(&status_controller_); }
void ApplyUpdates() {
worker()->ApplyUpdates(&status_controller_, /*cycle_done=*/true);
}

// Delivers specified protos as updates.
//
Expand All @@ -393,7 +395,7 @@ class ModelTypeWorkerTest : public ::testing::Test {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), list,
&status_controller_);
worker()->ApplyUpdates(&status_controller_);
worker()->ApplyUpdates(&status_controller_, /*cycle_done=*/true);
}

// By default, this harness behaves as if all tasks posted to the model
Expand Down Expand Up @@ -1405,7 +1407,7 @@ TEST_F(ModelTypeWorkerTest, DecryptUpdateIfPossibleDespiteEncryptionDisabled) {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&update},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

// Even though encryption is disabled for this worker, it should decrypt the
// update and pass it on to the processor.
Expand Down Expand Up @@ -1640,7 +1642,7 @@ TEST_F(ModelTypeWorkerTest, ShouldCleanUpPendingUpdatesOnGcDirective) {

// Only the entities from the second GetUpdates should have made it to the
// processor.
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
EXPECT_EQ(1u, processor()->GetNumUpdateResponses());
EXPECT_THAT(processor()->GetNthUpdateResponse(0),
UnorderedElementsAre(HasPreferenceClientTag(kTag2),
Expand Down Expand Up @@ -2050,7 +2052,7 @@ TEST_F(ModelTypeWorkerPasswordsTest, ReceiveDecryptablePasswordEntities) {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

// Test its basic features and the value of encryption_key_name.
ASSERT_TRUE(processor()->HasUpdateResponse(kHash1));
Expand Down Expand Up @@ -2081,7 +2083,7 @@ TEST_F(ModelTypeWorkerPasswordsTest,
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

// Worker cannot decrypt it.
EXPECT_FALSE(processor()->HasUpdateResponse(kHash1));
Expand Down Expand Up @@ -2120,7 +2122,7 @@ TEST_F(ModelTypeWorkerPasswordsTest, ReceiveUndecryptablePasswordEntries) {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

// At this point, the cryptographer does not have access to the key, so the
// updates will be undecryptable. This will block all updates.
Expand Down Expand Up @@ -2160,7 +2162,7 @@ TEST_F(ModelTypeWorkerPasswordsTest, ReceiveCorruptedPasswordEntities) {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

// No updates should have reached the processor and worker is blocked for
// encryption because the cryptographer isn't ready yet.
Expand Down Expand Up @@ -2213,7 +2215,7 @@ TEST_F(ModelTypeWorkerBookmarksTest, CanDecryptUpdateWithMissingBookmarkGUID) {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

EXPECT_EQ(2U, processor()->GetNumUpdateResponses());

Expand Down Expand Up @@ -2264,7 +2266,7 @@ TEST_F(ModelTypeWorkerBookmarksTest,
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

EXPECT_EQ(2U, processor()->GetNumUpdateResponses());

Expand Down Expand Up @@ -2309,7 +2311,7 @@ TEST_F(ModelTypeWorkerBookmarksTest,
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

DecryptPendingKey();
EXPECT_EQ(1U, processor()->GetNumUpdateResponses());
Expand Down Expand Up @@ -2351,7 +2353,7 @@ TEST_F(ModelTypeWorkerBookmarksTest,
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

DecryptPendingKey();
EXPECT_EQ(1U, processor()->GetNumUpdateResponses());
Expand Down Expand Up @@ -2533,7 +2535,7 @@ TEST_F(ModelTypeWorkerPasswordsTestWithNotes,
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

ASSERT_TRUE(processor()->HasUpdateResponse(kHash1));
const UpdateResponseData& update = processor()->GetUpdateResponse(kHash1);
Expand Down Expand Up @@ -2578,7 +2580,7 @@ TEST_F(ModelTypeWorkerPasswordsTestWithNotes,
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

ASSERT_TRUE(processor()->HasUpdateResponse(kHash1));
const UpdateResponseData& update = processor()->GetUpdateResponse(kHash1);
Expand Down Expand Up @@ -2614,7 +2616,7 @@ TEST_F(ModelTypeWorkerPasswordsTestWithNotes,
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

ASSERT_TRUE(processor()->HasUpdateResponse(kHash1));
histogram_tester.ExpectUniqueSample("Sync.PasswordNotesStateInUpdate",
Expand Down Expand Up @@ -2656,7 +2658,7 @@ TEST_F(ModelTypeWorkerPasswordsTestWithNotes, ShouldEmitNotesBackupCorrupted) {
worker()->ProcessGetUpdatesResponse(server()->GetProgress(),
server()->GetContext(), {&entity},
status_controller());
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

histogram_tester.ExpectUniqueSample(
"Sync.PasswordNotesStateInUpdate",
Expand Down Expand Up @@ -2748,7 +2750,7 @@ TEST_F(ModelTypeWorkerTest, ModelTypeStateAfterApplyUpdates) {

// The GetUpdates request finishes. This should delete the processed
// invalidations.
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

// Unprocessed invalidations after ApplyUpdates are in ModelTypeState.
EXPECT_EQ(2, processor()->GetNthUpdateState(0).invalidations_size());
Expand Down Expand Up @@ -2806,7 +2808,7 @@ TEST_F(ModelTypeWorkerTest, DropHintsAtServer_Alone) {
}

// Clear status then verify.
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
{
sync_pb::GetUpdateTriggers gu_trigger;
worker()->CollectPendingInvalidations(&gu_trigger);
Expand Down Expand Up @@ -2834,7 +2836,7 @@ TEST_F(ModelTypeWorkerTest, DropHintsAtServer_WithOtherInvalidations) {
}

// Clear status then verify.
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
{
sync_pb::GetUpdateTriggers gu_trigger;
worker()->CollectPendingInvalidations(&gu_trigger);
Expand Down Expand Up @@ -2906,7 +2908,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, SimpleAcknowledgement) {
sync_pb::GetUpdateTriggers gu_trigger;
worker()->CollectPendingInvalidations(&gu_trigger);

worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
EXPECT_TRUE(IsInvalidationAcknowledged(inv_id));

EXPECT_TRUE(AllInvalidationsAccountedFor());
Expand All @@ -2924,7 +2926,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, ManyAcknowledgements) {
sync_pb::GetUpdateTriggers gu_trigger;
worker()->CollectPendingInvalidations(&gu_trigger);

worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
EXPECT_TRUE(IsInvalidationAcknowledged(inv1_id));
EXPECT_TRUE(IsInvalidationAcknowledged(inv2_id));

Expand Down Expand Up @@ -2958,7 +2960,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, OverflowAndRecover) {
worker()->CollectPendingInvalidations(&gu_trigger);

// This should recover from the drop and bring us back into sync.
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

for (int id : invalidation_ids)
EXPECT_TRUE(IsInvalidationAcknowledged(id));
Expand All @@ -2975,7 +2977,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, UnknownVersionFromServer_Simple) {
EXPECT_TRUE(IsInvalidationUnacknowledged(inv_id));
sync_pb::GetUpdateTriggers gu_trigger;
worker()->CollectPendingInvalidations(&gu_trigger);
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
EXPECT_TRUE(IsInvalidationAcknowledged(inv_id));
EXPECT_TRUE(AllInvalidationsAccountedFor());
}
Expand All @@ -3002,7 +3004,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, UnknownVersionFromServer_Complex) {
worker()->CollectPendingInvalidations(&gu_trigger);

// Finish the sync cycle and expect all remaining invalidations to be acked.
worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
EXPECT_TRUE(IsInvalidationAcknowledged(inv1_id));
EXPECT_TRUE(IsInvalidationAcknowledged(inv2_id));
EXPECT_TRUE(IsInvalidationAcknowledged(inv3_id));
Expand All @@ -3019,7 +3021,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, AckInvalidationsAddedDuringSyncCycle) {
int inv1_id = SendInvalidation(10, "hint");
int inv2_id = SendInvalidation(14, "hint2");

worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

EXPECT_FALSE(IsInvalidationAcknowledged(inv1_id));
EXPECT_FALSE(IsInvalidationAcknowledged(inv2_id));
Expand All @@ -3031,7 +3033,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, AckInvalidationsAddedDuringSyncCycle) {

int inv3_id = SendInvalidation(100, "hint3");

worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

EXPECT_TRUE(IsInvalidationAcknowledged(inv1_id));
EXPECT_TRUE(IsInvalidationAcknowledged(inv2_id));
Expand All @@ -3045,7 +3047,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, AckInvalidationsAddedDuringSyncCycle) {
worker()->CollectPendingInvalidations(&gu_trigger_2);
ASSERT_EQ(1, gu_trigger_2.notification_hint_size());

worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
EXPECT_TRUE(AllInvalidationsAccountedFor());
}

Expand All @@ -3055,7 +3057,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, MultipleGetUpdates) {
int inv1_id = SendInvalidation(1, "hint1");
int inv2_id = SendInvalidation(2, "hint2");

worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);

EXPECT_FALSE(IsInvalidationAcknowledged(inv1_id));
EXPECT_FALSE(IsInvalidationAcknowledged(inv2_id));
Expand All @@ -3075,7 +3077,7 @@ TEST_F(ModelTypeWorkerAckTrackingTest, MultipleGetUpdates) {
worker()->CollectPendingInvalidations(&gu_trigger_2);
ASSERT_EQ(3, gu_trigger_2.notification_hint_size());

worker()->ApplyUpdates(status_controller());
worker()->ApplyUpdates(status_controller(), /*cycle_done=*/true);
EXPECT_TRUE(AllInvalidationsAccountedFor());
}

Expand Down
7 changes: 5 additions & 2 deletions components/sync/engine/update_handler.h
Expand Up @@ -64,8 +64,11 @@ class UpdateHandler {
const SyncEntityList& applicable_updates,
StatusController* status) = 0;

// Called at the end of a GetUpdates loop to apply any unapplied updates.
virtual void ApplyUpdates(StatusController* status) = 0;
// Called whenever any unapplied updates should be applied. This is usually
// at the end of a sync cycle, but for data types in
// ApplyUpdatesImmediatelyTypes() it already happens while the download loop
// is still ongoing.
virtual void ApplyUpdates(StatusController* status, bool cycle_done) = 0;
};

} // namespace syncer
Expand Down

0 comments on commit ec5f57a

Please sign in to comment.