Skip to content

Commit

Permalink
Store every incoming invalidation to ModelTypeState storage.
Browse files Browse the repository at this point in the history
Added new method in ModelTypeWorker which sends all pending
invalidations to ModelTypeProcessor and store them.

Bug: 1365290
Change-Id: I0c62900446aad803a811110f0334488eaf78a98f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3974531
Reviewed-by: Rushan Suleymanov <rushans@google.com>
Reviewed-by: Marc Treib <treib@chromium.org>
Commit-Queue: Shabdan Batyrkulov <shabdan@google.com>
Cr-Commit-Position: refs/heads/main@{#1067507}
  • Loading branch information
Shabdan Batyrkulov authored and Chromium LUCI CQ committed Nov 4, 2022
1 parent be987ff commit 94ec444
Show file tree
Hide file tree
Showing 21 changed files with 249 additions and 11 deletions.
5 changes: 5 additions & 0 deletions components/sync/engine/forwarding_model_type_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ void ForwardingModelTypeProcessor::OnUpdateReceived(
std::move(gc_directive));
}

void ForwardingModelTypeProcessor::StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store) {
processor_->StorePendingInvalidations(std::move(invalidations_to_store));
}

} // namespace syncer
4 changes: 4 additions & 0 deletions components/sync/engine/forwarding_model_type_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define COMPONENTS_SYNC_ENGINE_FORWARDING_MODEL_TYPE_PROCESSOR_H_

#include <memory>
#include <vector>

#include "base/memory/raw_ptr.h"
#include "components/sync/engine/model_type_processor.h"
Expand Down Expand Up @@ -40,6 +41,9 @@ class ForwardingModelTypeProcessor : public ModelTypeProcessor {
UpdateResponseDataList updates,
absl::optional<sync_pb::GarbageCollectionDirective>
gc_directive) override;
void StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store)
override;

private:
const raw_ptr<ModelTypeProcessor> processor_;
Expand Down
7 changes: 7 additions & 0 deletions components/sync/engine/model_type_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define COMPONENTS_SYNC_ENGINE_MODEL_TYPE_PROCESSOR_H_

#include <memory>
#include <vector>

#include "base/callback_forward.h"
#include "components/sync/engine/commit_and_get_updates_types.h"
Expand Down Expand Up @@ -58,6 +59,12 @@ class ModelTypeProcessor {
const sync_pb::ModelTypeState& type_state,
UpdateResponseDataList updates,
absl::optional<sync_pb::GarbageCollectionDirective> gc_directive) = 0;

// Informs this object that it should handle new invalidations to store,
// replacing any previously-stored invalidations.
virtual void StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation>
invalidations_to_store) = 0;
};

} // namespace syncer
Expand Down
7 changes: 7 additions & 0 deletions components/sync/engine/model_type_processor_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,11 @@ void ModelTypeProcessorProxy::OnUpdateReceived(
type_state, std::move(updates), std::move(gc_directive)));
}

void ModelTypeProcessorProxy::StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store) {
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ModelTypeProcessor::StorePendingInvalidations,
processor_, std::move(invalidations_to_store)));
}

} // namespace syncer
4 changes: 4 additions & 0 deletions components/sync/engine/model_type_processor_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define COMPONENTS_SYNC_ENGINE_MODEL_TYPE_PROCESSOR_PROXY_H_

#include <memory>
#include <vector>

#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
Expand Down Expand Up @@ -34,6 +35,9 @@ class ModelTypeProcessorProxy : public ModelTypeProcessor {
UpdateResponseDataList updates,
absl::optional<sync_pb::GarbageCollectionDirective>
gc_directive) override;
void StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store)
override;

private:
base::WeakPtr<ModelTypeProcessor> processor_;
Expand Down
38 changes: 27 additions & 11 deletions components/sync/engine/model_type_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,17 +573,7 @@ void ModelTypeWorker::ApplyUpdates(StatusController* status) {
}
}
if (base::FeatureList::IsEnabled(kSyncPersistInvalidations)) {
// Store invalidations to |model_type_state_|.
model_type_state_.clear_invalidations();
for (const auto& inv : pending_invalidations_) {
SyncInvalidation* invalidation = inv.pending_invalidation.get();
sync_pb::ModelTypeState_Invalidation* invalidation_to_store =
model_type_state_.add_invalidations();
invalidation_to_store->set_hint(invalidation->GetPayload());
if (!invalidation->IsUnknownVersion()) {
invalidation_to_store->set_version(invalidation->GetVersion());
}
}
UpdateModelTypeStateInvalidations();
}

has_dropped_invalidation_ = false;
Expand Down Expand Up @@ -1120,6 +1110,9 @@ void ModelTypeWorker::RecordRemoteInvalidation(
pending_invalidations_.erase(pending_invalidations_.begin());
}
nudge_handler_->SetHasPendingInvalidations(type_, HasPendingInvalidations());
if (base::FeatureList::IsEnabled(kSyncPersistInvalidations)) {
SendPendingInvalidationsToProcessor();
}
}

void ModelTypeWorker::CollectPendingInvalidations(
Expand All @@ -1146,6 +1139,29 @@ bool ModelTypeWorker::HasPendingInvalidations() const {
return !pending_invalidations_.empty() || has_dropped_invalidation_;
}

void ModelTypeWorker::SendPendingInvalidationsToProcessor() {
DCHECK(base::FeatureList::IsEnabled(kSyncPersistInvalidations));
UpdateModelTypeStateInvalidations();
model_type_processor_->StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation>(
model_type_state_.invalidations().begin(),
model_type_state_.invalidations().end()));
}

void ModelTypeWorker::UpdateModelTypeStateInvalidations() {
DCHECK(base::FeatureList::IsEnabled(kSyncPersistInvalidations));
model_type_state_.clear_invalidations();
for (const auto& inv : pending_invalidations_) {
SyncInvalidation* invalidation = inv.pending_invalidation.get();
sync_pb::ModelTypeState_Invalidation* invalidation_to_store =
model_type_state_.add_invalidations();
invalidation_to_store->set_hint(invalidation->GetPayload());
if (!invalidation->IsUnknownVersion()) {
invalidation_to_store->set_version(invalidation->GetVersion());
}
}
}

GetLocalChangesRequest::GetLocalChangesRequest(
CancelationSignal* cancelation_signal)
: cancelation_signal_(cancelation_signal),
Expand Down
7 changes: 7 additions & 0 deletions components/sync/engine/model_type_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,13 @@ class ModelTypeWorker : public UpdateHandler,
// the definition of an unknown key, and returns their info.
std::vector<UnknownEncryptionKeyInfo> RemoveKeysNoLongerUnknown();

// Sends copy of |pending_invalidations_| vector to |model_type_processor_|
// to store them in storage along |model_type_state_|.
void SendPendingInvalidationsToProcessor();

// Copies |pending_invalidations_| vector to |model_type_state_|.
void UpdateModelTypeStateInvalidations();

// The (up to kMaxPayloads) most recent invalidations received since the last
// successful sync cycle.
std::vector<PendingInvalidation> pending_invalidations_;
Expand Down
13 changes: 13 additions & 0 deletions components/sync/engine/model_type_worker_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2648,6 +2648,19 @@ TEST_F(ModelTypeWorkerPasswordsTestWithNotes, ShouldEmitNotesBackupCorrupted) {
syncer::PasswordNotesStateForUMA::kSetOnlyInBackupButCorrupted, 1);
}

// Verifies StorePendingInvalidations() calls for every incoming invalidation.
TEST_F(ModelTypeWorkerTest, StoreInvalidationsCallCount) {
base::test::ScopedFeatureList feature;
feature.InitAndEnableFeature(kSyncPersistInvalidations);

NormalInitialize();
for (size_t i = 0; i < ModelTypeWorker::kMaxPendingInvalidations + 2u; ++i) {
worker()->RecordRemoteInvalidation(BuildInvalidation(i + 1, "hint"));
EXPECT_EQ(static_cast<int>(i + 1),
processor()->GetStoreInvalidationsCallCount());
}
}

// Verifies the management of invalidation hints and GU trigger fields.
TEST_F(ModelTypeWorkerTest, HintCoalescing) {
// Easy case: record one hint.
Expand Down
16 changes: 16 additions & 0 deletions components/sync/model/client_tag_based_model_type_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,22 @@ void ClientTagBasedModelTypeProcessor::OnUpdateReceived(
NudgeForCommitIfNeeded();
}

void ClientTagBasedModelTypeProcessor::StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!model_error_);
DCHECK(IsConnected());
std::unique_ptr<MetadataChangeList> metadata_changes =
bridge_->CreateMetadataChangeList();
sync_pb::ModelTypeState model_type_state =
entity_tracker_->model_type_state();
model_type_state.mutable_invalidations()->Assign(
invalidations_to_store.begin(), invalidations_to_store.end());
metadata_changes->UpdateModelTypeState(model_type_state);
entity_tracker_->set_model_type_state(model_type_state);
bridge_->ApplySyncChanges(std::move(metadata_changes), EntityChangeList());
}

bool ClientTagBasedModelTypeProcessor::ValidateUpdate(
const sync_pb::ModelTypeState& model_type_state,
const UpdateResponseDataList& updates,
Expand Down
3 changes: 3 additions & 0 deletions components/sync/model/client_tag_based_model_type_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ class ClientTagBasedModelTypeProcessor : public ModelTypeProcessor,
UpdateResponseDataList updates,
absl::optional<sync_pb::GarbageCollectionDirective>
gc_directive) override;
void StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store)
override;

// ModelTypeControllerDelegate implementation.
// |start_callback| will never be called synchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,15 @@ class ClientTagBasedModelTypeProcessorTest : public ::testing::Test {
bridge()->change_processor());
}

sync_pb::ModelTypeState::Invalidation BuildInvalidation(
int64_t version,
const std::string& payload) {
sync_pb::ModelTypeState::Invalidation inv;
inv.set_version(version);
inv.set_hint(payload);
return inv;
}

protected:
void CheckPostConditions() { EXPECT_FALSE(expect_error_); }

Expand Down Expand Up @@ -583,6 +592,25 @@ TEST_F(ClientTagBasedModelTypeProcessorTest,
EXPECT_EQ("NewAccountId", type_processor()->TrackedAccountId());
}

TEST_F(ClientTagBasedModelTypeProcessorTest,
ShouldExposeNewlyAddedInvalidations) {
// Populate the bridge's metadata with some non-empty values for us to later
// check that it hasn't been cleared.
sync_pb::ModelTypeState::Invalidation inv_1 = BuildInvalidation(1, "hint_1");
sync_pb::ModelTypeState::Invalidation inv_2 = BuildInvalidation(2, "hint_2");
InitializeToReadyState();
type_processor()->StorePendingInvalidations({inv_1, inv_2});

ModelTypeState model_type_state(db()->model_type_state());
EXPECT_EQ(2, model_type_state.invalidations_size());

EXPECT_EQ(inv_1.hint(), model_type_state.invalidations(0).hint());
EXPECT_EQ(inv_1.version(), model_type_state.invalidations(0).version());

EXPECT_EQ(inv_2.hint(), model_type_state.invalidations(1).hint());
EXPECT_EQ(inv_2.version(), model_type_state.invalidations(1).version());
}

TEST_F(ClientTagBasedModelTypeProcessorTest,
ShouldExposeNewlyTrackedCacheGuid) {
ModelReadyToSync();
Expand Down
12 changes: 12 additions & 0 deletions components/sync/nigori/nigori_model_type_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "components/sync/nigori/nigori_model_type_processor.h"

#include <vector>

#include "base/logging.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "components/sync/base/client_tag_hash.h"
Expand All @@ -13,6 +15,7 @@
#include "components/sync/engine/data_type_activation_response.h"
#include "components/sync/engine/forwarding_model_type_processor.h"
#include "components/sync/engine/model_type_processor_metrics.h"
#include "components/sync/engine/model_type_worker.h"
#include "components/sync/model/processor_entity.h"
#include "components/sync/model/type_entities_count.h"
#include "components/sync/nigori/nigori_sync_bridge.h"
Expand Down Expand Up @@ -192,6 +195,15 @@ void NigoriModelTypeProcessor::OnUpdateReceived(
NudgeForCommitIfNeeded();
}

void NigoriModelTypeProcessor::StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
model_type_state_.mutable_invalidations()->Assign(
invalidations_to_store.begin(), invalidations_to_store.end());
// ApplySyncChanges does actually query and persist the |model_type_state_|.
bridge_->ApplySyncChanges(/*data=*/absl::nullopt);
}

void NigoriModelTypeProcessor::OnSyncStarting(
const DataTypeActivationRequest& request,
StartCallback callback) {
Expand Down
4 changes: 4 additions & 0 deletions components/sync/nigori/nigori_model_type_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <memory>
#include <utility>
#include <vector>

#include "base/memory/raw_ptr.h"
#include "components/sync/engine/model_type_processor.h"
Expand Down Expand Up @@ -44,6 +45,9 @@ class NigoriModelTypeProcessor : public ModelTypeProcessor,
UpdateResponseDataList updates,
absl::optional<sync_pb::GarbageCollectionDirective>
gc_directive) override;
void StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store)
override;

// ModelTypeControllerDelegate implementation.
void OnSyncStarting(const DataTypeActivationRequest& request,
Expand Down
32 changes: 32 additions & 0 deletions components/sync/nigori/nigori_model_type_processor_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class NigoriModelTypeProcessorTest : public testing::Test {
return count.non_tombstone_entities > 0;
}

sync_pb::ModelTypeState::Invalidation BuildInvalidation(
int64_t version,
const std::string& payload) {
sync_pb::ModelTypeState::Invalidation inv;
inv.set_version(version);
inv.set_hint(payload);
return inv;
}

private:
testing::NiceMock<MockNigoriSyncBridge> mock_nigori_sync_bridge_;
std::unique_ptr<testing::NiceMock<MockCommitQueue>> mock_commit_queue_;
Expand Down Expand Up @@ -608,6 +617,29 @@ TEST_F(NigoriModelTypeProcessorTest,
processor()->OnSyncStarting(request, base::DoNothing());
}

TEST_F(NigoriModelTypeProcessorTest,
ShouldUpdateModelTypeStateUponHandlingInvalidations) {
SimulateModelReadyToSync(/*initial_sync_done=*/true);
// Build invalidations.
sync_pb::ModelTypeState::Invalidation inv_1 = BuildInvalidation(1, "hint_1");
sync_pb::ModelTypeState::Invalidation inv_2 = BuildInvalidation(2, "hint_2");

processor()->StorePendingInvalidations({inv_1, inv_2});

// The model type state and the metadata should have been stored in the
// processor.
NigoriMetadataBatch processor_metadata_batch = processor()->GetMetadata();
sync_pb::ModelTypeState model_type_state =
processor_metadata_batch.model_type_state;
EXPECT_EQ(2, model_type_state.invalidations_size());

EXPECT_EQ(inv_1.hint(), model_type_state.invalidations(0).hint());
EXPECT_EQ(inv_1.version(), model_type_state.invalidations(0).version());

EXPECT_EQ(inv_2.hint(), model_type_state.invalidations(1).hint());
EXPECT_EQ(inv_2.version(), model_type_state.invalidations(1).version());
}

} // namespace

} // namespace syncer
4 changes: 4 additions & 0 deletions components/sync/test/fake_model_type_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "components/sync/test/fake_model_type_processor.h"

#include <utility>
#include <vector>

#include "base/callback.h"
#include "components/sync/engine/commit_queue.h"
Expand All @@ -31,5 +32,8 @@ void FakeModelTypeProcessor::OnUpdateReceived(
const sync_pb::ModelTypeState& type_state,
UpdateResponseDataList updates,
absl::optional<sync_pb::GarbageCollectionDirective> gc_directive) {}
void FakeModelTypeProcessor::StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store) {
}

} // namespace syncer
4 changes: 4 additions & 0 deletions components/sync/test/fake_model_type_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define COMPONENTS_SYNC_TEST_FAKE_MODEL_TYPE_PROCESSOR_H_

#include <memory>
#include <vector>

#include "components/sync/engine/model_type_processor.h"

Expand All @@ -29,6 +30,9 @@ class FakeModelTypeProcessor : public ModelTypeProcessor {
UpdateResponseDataList updates,
absl::optional<sync_pb::GarbageCollectionDirective>
gc_directive) override;
void StorePendingInvalidations(
std::vector<sync_pb::ModelTypeState::Invalidation> invalidations_to_store)
override;
};

} // namespace syncer
Expand Down

0 comments on commit 94ec444

Please sign in to comment.