Skip to content

Commit

Permalink
Add observers for aggregation service
Browse files Browse the repository at this point in the history
The observers will be notified when request storage is modified and
when a report is handled.

AggregatableReportRequest is added to the assembly callback so that
more information can be shown in the internals WebUI.

Bug: 1348029
Change-Id: I4a612a528b792bae7a59906413aa87fe6f306edc
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3810574
Reviewed-by: John Delaney <johnidel@chromium.org>
Commit-Queue: Nan Lin <linnan@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1035215}
  • Loading branch information
linnan-github authored and Chromium LUCI CQ committed Aug 15, 2022
1 parent 1c9f135 commit 55ad2a5
Show file tree
Hide file tree
Showing 18 changed files with 356 additions and 83 deletions.
1 change: 1 addition & 0 deletions content/browser/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ source_set("browser") {
"aggregation_service/aggregation_service_key_fetcher.h",
"aggregation_service/aggregation_service_network_fetcher_impl.cc",
"aggregation_service/aggregation_service_network_fetcher_impl.h",
"aggregation_service/aggregation_service_observer.h",
"aggregation_service/aggregation_service_storage.h",
"aggregation_service/aggregation_service_storage_context.h",
"aggregation_service/aggregation_service_storage_sql.cc",
Expand Down
2 changes: 1 addition & 1 deletion content/browser/aggregation_service/aggregatable_report.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ AggregatableReport::Provider::~Provider() = default;

absl::optional<AggregatableReport>
AggregatableReport::Provider::CreateFromRequestAndPublicKeys(
AggregatableReportRequest report_request,
const AggregatableReportRequest& report_request,
std::vector<PublicKey> public_keys) const {
const size_t num_processing_urls = public_keys.size();
DCHECK_EQ(num_processing_urls, report_request.processing_urls().size());
Expand Down
2 changes: 1 addition & 1 deletion content/browser/aggregation_service/aggregatable_report.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class CONTENT_EXPORT AggregatableReport {
// correspond to `report_request.processing_urls`, which should be
// sorted. Returns `absl::nullopt` if an error occurred during construction.
virtual absl::optional<AggregatableReport> CreateFromRequestAndPublicKeys(
AggregatableReportRequest report_request,
const AggregatableReportRequest& report_request,
std::vector<PublicKey> public_keys) const;

// Sets whether to disable encryption of the payload(s). Should only be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void AggregatableReportAssembler::AssembleReport(
if (pending_requests_.size() >= kMaxSimultaneousRequests) {
RecordAssemblyStatus(AssemblyStatus::kTooManySimultaneousRequests);

std::move(callback).Run(absl::nullopt,
std::move(callback).Run(std::move(report_request), absl::nullopt,
AssemblyStatus::kTooManySimultaneousRequests);
return;
}
Expand Down Expand Up @@ -182,7 +182,8 @@ void AggregatableReportAssembler::OnAllPublicKeysFetched(
RecordAssemblyStatus(AssemblyStatus::kPublicKeyFetchFailed);

std::move(pending_request.callback)
.Run(absl::nullopt, AssemblyStatus::kPublicKeyFetchFailed);
.Run(std::move(pending_request.report_request), absl::nullopt,
AssemblyStatus::kPublicKeyFetchFailed);
pending_requests_.erase(report_id);
return;
}
Expand All @@ -192,13 +193,14 @@ void AggregatableReportAssembler::OnAllPublicKeysFetched(

absl::optional<AggregatableReport> assembled_report =
report_provider_->CreateFromRequestAndPublicKeys(
std::move(pending_request.report_request), std::move(public_keys));
pending_request.report_request, std::move(public_keys));
AssemblyStatus assembly_status =
assembled_report ? AssemblyStatus::kOk : AssemblyStatus::kAssemblyFailed;
RecordAssemblyStatus(assembly_status);

std::move(pending_request.callback)
.Run(std::move(assembled_report), assembly_status);
.Run(std::move(pending_request.report_request),
std::move(assembled_report), assembly_status);

pending_requests_.erase(report_id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class CONTENT_EXPORT AggregatableReportAssembler {
};

using AssemblyCallback =
base::OnceCallback<void(absl::optional<AggregatableReport>,
base::OnceCallback<void(AggregatableReportRequest,
absl::optional<AggregatableReport>,
AssemblyStatus)>;

// While we shouldn't hit these limits in typical usage, we protect against
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ using AssemblyStatus = AggregatableReportAssembler::AssemblyStatus;
constexpr char kReportAssemblerStatusHistogramName[] =
"PrivacySandbox.AggregationService.ReportAssembler.Status";

auto MoveRequestAndReturnReport(absl::optional<AggregatableReportRequest>* out,
AggregatableReport report) {
auto CloneRequestAndReturnReport(absl::optional<AggregatableReportRequest>* out,
AggregatableReport report) {
return [out, report = std::move(report)](
AggregatableReportRequest report_request,
const AggregatableReportRequest& report_request,
std::vector<PublicKey> public_keys) {
*out = std::move(report_request);
*out = aggregation_service::CloneReportRequest(report_request);
return std::move(report);
};
}
Expand All @@ -74,7 +74,7 @@ class MockAggregatableReportProvider : public AggregatableReport::Provider {
public:
MOCK_METHOD(absl::optional<AggregatableReport>,
CreateFromRequestAndPublicKeys,
(AggregatableReportRequest, std::vector<PublicKey>),
(const AggregatableReportRequest&, std::vector<PublicKey>),
(const, override));
};

Expand Down Expand Up @@ -125,7 +125,7 @@ TEST_F(AggregatableReportAssemblerTest, BothKeyFetchesFail_ErrorReturned) {
.WillOnce(base::test::RunOnceCallback<1>(
absl::nullopt, PublicKeyFetchStatus::kPublicKeyFetchFailed));
EXPECT_CALL(callback(),
Run(Eq(absl::nullopt), AssemblyStatus::kPublicKeyFetchFailed));
Run(_, Eq(absl::nullopt), AssemblyStatus::kPublicKeyFetchFailed));

EXPECT_CALL(*report_provider(), CreateFromRequestAndPublicKeys(_, _))
.Times(0);
Expand All @@ -152,7 +152,7 @@ TEST_F(AggregatableReportAssemblerTest, FirstKeyFetchFails_ErrorReturned) {
aggregation_service::GenerateKey().public_key,
PublicKeyFetchStatus::kOk));
EXPECT_CALL(callback(),
Run(Eq(absl::nullopt), AssemblyStatus::kPublicKeyFetchFailed));
Run(_, Eq(absl::nullopt), AssemblyStatus::kPublicKeyFetchFailed));

EXPECT_CALL(*report_provider(), CreateFromRequestAndPublicKeys(_, _))
.Times(0);
Expand All @@ -179,7 +179,7 @@ TEST_F(AggregatableReportAssemblerTest, SecondKeyFetchFails_ErrorReturned) {
.WillOnce(base::test::RunOnceCallback<1>(
absl::nullopt, PublicKeyFetchStatus::kPublicKeyFetchFailed));
EXPECT_CALL(callback(),
Run(Eq(absl::nullopt), AssemblyStatus::kPublicKeyFetchFailed));
Run(_, Eq(absl::nullopt), AssemblyStatus::kPublicKeyFetchFailed));

EXPECT_CALL(*report_provider(), CreateFromRequestAndPublicKeys(_, _))
.Times(0);
Expand All @@ -205,7 +205,7 @@ TEST_F(AggregatableReportAssemblerTest,

absl::optional<AggregatableReport> report =
AggregatableReport::Provider().CreateFromRequestAndPublicKeys(
aggregation_service::CloneReportRequest(request), public_keys);
request, public_keys);
ASSERT_TRUE(report.has_value());

EXPECT_CALL(*fetcher(), GetPublicKey(processing_urls[0], _))
Expand All @@ -214,13 +214,13 @@ TEST_F(AggregatableReportAssemblerTest,
EXPECT_CALL(*fetcher(), GetPublicKey(processing_urls[1], _))
.WillOnce(base::test::RunOnceCallback<1>(public_keys[1],
PublicKeyFetchStatus::kOk));
EXPECT_CALL(callback(), Run(report, AssemblyStatus::kOk));
EXPECT_CALL(callback(), Run(_, report, AssemblyStatus::kOk));

absl::optional<AggregatableReportRequest> actual_request;
EXPECT_CALL(*report_provider(),
CreateFromRequestAndPublicKeys(_, public_keys))
.WillOnce(MoveRequestAndReturnReport(&actual_request,
std::move(report.value())));
.WillOnce(CloneRequestAndReturnReport(&actual_request,
std::move(report.value())));

assembler()->AssembleReport(aggregation_service::CloneReportRequest(request),
callback().Get());
Expand All @@ -244,19 +244,19 @@ TEST_F(AggregatableReportAssemblerTest,

absl::optional<AggregatableReport> report =
AggregatableReport::Provider().CreateFromRequestAndPublicKeys(
aggregation_service::CloneReportRequest(request), {public_key});
request, {public_key});
ASSERT_TRUE(report.has_value());

EXPECT_CALL(*fetcher(), GetPublicKey)
.WillOnce(base::test::RunOnceCallback<1>(public_key,
PublicKeyFetchStatus::kOk));
EXPECT_CALL(callback(), Run(report, AssemblyStatus::kOk));
EXPECT_CALL(callback(), Run(_, report, AssemblyStatus::kOk));

absl::optional<AggregatableReportRequest> actual_request;
EXPECT_CALL(*report_provider(), CreateFromRequestAndPublicKeys(
_, std::vector<PublicKey>{public_key}))
.WillOnce(MoveRequestAndReturnReport(&actual_request,
std::move(report.value())));
.WillOnce(CloneRequestAndReturnReport(&actual_request,
std::move(report.value())));

assembler()->AssembleReport(aggregation_service::CloneReportRequest(request),
callback().Get());
Expand All @@ -279,7 +279,7 @@ TEST_F(AggregatableReportAssemblerTest, OnlyKeyFetchFails_ErrorReturned) {
.WillOnce(base::test::RunOnceCallback<1>(
absl::nullopt, PublicKeyFetchStatus::kPublicKeyFetchFailed));
EXPECT_CALL(callback(),
Run(Eq(absl::nullopt), AssemblyStatus::kPublicKeyFetchFailed));
Run(_, Eq(absl::nullopt), AssemblyStatus::kPublicKeyFetchFailed));

EXPECT_CALL(*report_provider(), CreateFromRequestAndPublicKeys(_, _))
.Times(0);
Expand All @@ -305,21 +305,21 @@ TEST_F(AggregatableReportAssemblerTest,

absl::optional<AggregatableReport> report =
AggregatableReport::Provider().CreateFromRequestAndPublicKeys(
aggregation_service::CloneReportRequest(request), public_keys);
request, public_keys);
ASSERT_TRUE(report.has_value());

std::vector<FetchCallback> pending_callbacks(2);
EXPECT_CALL(*fetcher(), GetPublicKey(processing_urls[0], _))
.WillOnce(MoveArg<1>(&pending_callbacks.front()));
EXPECT_CALL(*fetcher(), GetPublicKey(processing_urls[1], _))
.WillOnce(MoveArg<1>(&pending_callbacks.back()));
EXPECT_CALL(callback(), Run(report, AssemblyStatus::kOk));
EXPECT_CALL(callback(), Run(_, report, AssemblyStatus::kOk));

absl::optional<AggregatableReportRequest> actual_request;
EXPECT_CALL(*report_provider(),
CreateFromRequestAndPublicKeys(_, public_keys))
.WillOnce(MoveRequestAndReturnReport(&actual_request,
std::move(report.value())));
.WillOnce(CloneRequestAndReturnReport(&actual_request,
std::move(report.value())));

assembler()->AssembleReport(aggregation_service::CloneReportRequest(request),
callback().Get());
Expand Down Expand Up @@ -364,23 +364,23 @@ TEST_F(AggregatableReportAssemblerTest,

absl::optional<AggregatableReport> report =
AggregatableReport::Provider().CreateFromRequestAndPublicKeys(
aggregation_service::CloneReportRequest(request), {public_key});
request, {public_key});
ASSERT_TRUE(report.has_value());

std::vector<FetchCallback> pending_callbacks(2);
EXPECT_CALL(*fetcher(), GetPublicKey(processing_urls[0], _))
.WillOnce(MoveArg<1>(&pending_callbacks.front()))
.WillOnce(MoveArg<1>(&pending_callbacks.back()));

EXPECT_CALL(callback(), Run(report, AssemblyStatus::kOk)).Times(2);
EXPECT_CALL(callback(), Run(_, report, AssemblyStatus::kOk)).Times(2);

absl::optional<AggregatableReportRequest> first_request;
absl::optional<AggregatableReportRequest> second_request;
EXPECT_CALL(*report_provider(), CreateFromRequestAndPublicKeys(
_, std::vector<PublicKey>{public_key}))
.WillOnce(MoveRequestAndReturnReport(&first_request, report.value()))
.WillOnce(MoveRequestAndReturnReport(&second_request,
std::move(report.value())));
.WillOnce(CloneRequestAndReturnReport(&first_request, report.value()))
.WillOnce(CloneRequestAndReturnReport(&second_request,
std::move(report.value())));

assembler()->AssembleReport(aggregation_service::CloneReportRequest(request),
callback().Get());
Expand Down Expand Up @@ -429,11 +429,11 @@ TEST_F(AggregatableReportAssemblerTest,
pending_callbacks.push_back(std::move(callback));
}));

EXPECT_CALL(callback(), Run(_, _)).Times(0);
EXPECT_CALL(callback(), Run).Times(0);

EXPECT_CALL(checkpoint, Call(current_check++));

EXPECT_CALL(callback(), Run(Eq(absl::nullopt),
EXPECT_CALL(callback(), Run(_, Eq(absl::nullopt),
AssemblyStatus::kTooManySimultaneousRequests))
.Times(1);

Expand All @@ -444,7 +444,7 @@ TEST_F(AggregatableReportAssemblerTest,
EXPECT_CALL(checkpoint, Call(current_check++));
EXPECT_CALL(*report_provider(), CreateFromRequestAndPublicKeys)
.WillOnce(Return(report));
EXPECT_CALL(callback(), Run(report, AssemblyStatus::kOk));
EXPECT_CALL(callback(), Run(_, report, AssemblyStatus::kOk));
}
}

Expand Down
5 changes: 5 additions & 0 deletions content/browser/aggregation_service/aggregation_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Value;

namespace content {

class AggregationServiceObserver;
class AggregatableReport;
class AggregatableReportRequest;
class BrowserContext;
Expand Down Expand Up @@ -88,6 +89,10 @@ class AggregationService {
virtual void SendReportsForWebUI(
const std::vector<AggregationServiceStorage::RequestId>& ids,
base::OnceClosure reports_sent_callback) = 0;

virtual void AddObserver(AggregationServiceObserver* observer) = 0;

virtual void RemoveObserver(AggregationServiceObserver* observer) = 0;
};

} // namespace content
Expand Down

0 comments on commit 55ad2a5

Please sign in to comment.