Skip to content

Commit

Permalink
Support aggregation coordinator selection for Private Aggregation
Browse files Browse the repository at this point in the history
Adds a new aggregation_coordinator_origin field to the Private
Aggregation layer. This field is validated and then sent on to the
aggregation_service layer. This change is not web visible as the
integrations with Shared Storage and Protected Audience will be added in
separate cls.

See the related spec change:
patcg-individual-drafts/private-aggregation-api#106

Note that this also modifies behavior of the database to only skip
reports that do not deserialize instead of skipping all reports if
there is any that do not deserialize. This may change behavior in case
of database corruption.

Bug: 1481254
Change-Id: Ie90eafc208f94ffd98ad95642b8f8c9b0af85fac
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4885966
Reviewed-by: Yao Xiao <yaoxia@chromium.org>
Reviewed-by: Russ Hamilton <behamilton@google.com>
Commit-Queue: Alex Turner <alexmt@chromium.org>
Reviewed-by: Nan Lin <linnan@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1212827}
  • Loading branch information
alexmturner authored and Chromium LUCI CQ committed Oct 20, 2023
1 parent dda15c4 commit ca88519
Show file tree
Hide file tree
Showing 20 changed files with 600 additions and 63 deletions.
19 changes: 14 additions & 5 deletions content/browser/aggregation_service/aggregatable_report.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ ConvertPayloadContentsFromProto(
return absl::nullopt;
}

absl::optional<url::Origin> aggregation_coordinator_origin;
if (proto.has_aggregation_coordinator_origin()) {
aggregation_coordinator_origin =
url::Origin::Create(GURL(proto.aggregation_coordinator_origin()));
}

int max_contributions_allowed = proto.max_contributions_allowed();
if (max_contributions_allowed < 0) {
return absl::nullopt;
Expand All @@ -346,8 +352,7 @@ ConvertPayloadContentsFromProto(
// Report storage doesn't support multiple aggregation coordinators.
return AggregationServicePayloadContents(
operation, std::move(contributions), aggregation_mode,
/*aggregation_coordinator_origin=*/absl::nullopt,
max_contributions_allowed);
std::move(aggregation_coordinator_origin), max_contributions_allowed);
}

absl::optional<AggregatableReportSharedInfo> ConvertSharedInfoFromProto(
Expand Down Expand Up @@ -442,11 +447,15 @@ void ConvertPayloadContentsToProto(
break;
}

if (base::FeatureList::IsEnabled(
aggregation_service::kAggregationServiceMultipleCloudProviders) &&
payload_contents.aggregation_coordinator_origin.has_value()) {
out->set_aggregation_coordinator_origin(
payload_contents.aggregation_coordinator_origin->Serialize());
}

out->set_max_contributions_allowed(
payload_contents.max_contributions_allowed);

// Report storage doesn't support multiple aggregation coordinators.
CHECK(!payload_contents.aggregation_coordinator_origin.has_value());
}

void ConvertSharedInfoToProto(const AggregatableReportSharedInfo& shared_info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,85 @@ TEST_P(AggregatableReportTest, MaxContributionsAllowed) {
20);
}

TEST_P(AggregatableReportTest, AggregationCoordinatorOrigin) {
const struct {
absl::optional<url::Origin> aggregation_coordinator_origin;
bool creation_should_succeed;
const char* description;
} kTestCases[] = {
{absl::nullopt, true, "default coordinator"},
{url::Origin::Create(GURL("https://aws.example.test")), true,
"valid coordinator"},
{url::Origin::Create(GURL("https://a.test")), false,
"invalid coordinator"},
};

for (const auto& test_case : kTestCases) {
SCOPED_TRACE(test_case.description);
AggregatableReportRequest example_request =
aggregation_service::CreateExampleRequest();

AggregationServicePayloadContents payload_contents =
example_request.payload_contents();
payload_contents.aggregation_coordinator_origin =
test_case.aggregation_coordinator_origin;

absl::optional<AggregatableReportRequest> request =
AggregatableReportRequest::Create(
payload_contents, example_request.shared_info().Clone());

EXPECT_EQ(request.has_value(), test_case.creation_should_succeed);

if (!request.has_value()) {
continue;
}

// The coordinator origin is correctly serialized and deserialized
std::vector<uint8_t> proto = request->Serialize();
absl::optional<AggregatableReportRequest> parsed_request =
AggregatableReportRequest::Deserialize(proto);
EXPECT_EQ(parsed_request.value()
.payload_contents()
.aggregation_coordinator_origin,
test_case.aggregation_coordinator_origin);
}
}

TEST_P(AggregatableReportTest, AggregationCoordinatorOriginAllowlistChanged) {
base::test::ScopedFeatureList scoped_feature_list;
scoped_feature_list.InitAndEnableFeatureWithParameters(
::aggregation_service::kAggregationServiceMultipleCloudProviders,
{{"aws_cloud", "https://aws.example.test"},
{"gcp_cloud", "https://gcp.example.test"}});

AggregatableReportRequest example_request =
aggregation_service::CreateExampleRequest();

AggregationServicePayloadContents payload_contents =
example_request.payload_contents();
payload_contents.aggregation_coordinator_origin =
url::Origin::Create(GURL("https://aws.example.test"));

AggregatableReportRequest request =
AggregatableReportRequest::Create(payload_contents,
example_request.shared_info().Clone())
.value();

std::vector<uint8_t> proto = request.Serialize();

// Change the allowlist between serializing and deserializing
scoped_feature_list.Reset();
scoped_feature_list.InitAndEnableFeatureWithParameters(
::aggregation_service::kAggregationServiceMultipleCloudProviders,
{{"aws_cloud", "https://aws2.example.test"},
{"gcp_cloud", "https://gcp2.example.test"}});

// Expect the report to fail to be recreated.
absl::optional<AggregatableReportRequest> parsed_request =
AggregatableReportRequest::Deserialize(proto);
EXPECT_FALSE(parsed_request.has_value());
}

TEST_P(AggregatableReportTest, ReportingPathEmpty_NotSetInRequest) {
AggregatableReportRequest example_request =
aggregation_service::CreateExampleRequest(
Expand Down Expand Up @@ -1005,9 +1084,10 @@ TEST(AggregatableReportProtoMigrationTest, NegativeDebugKey_ParsesCorrectly) {
deserialized_request.value(), expected_request));
}

TEST(AggregatableReportProtoMigrationTest, NoAdditionalFields_ParsesCorrectly) {
// An `AggregatableReport` serialized before `additional_fields` was added to
// the proto definition.
TEST(AggregatableReportProtoMigrationTest,
NoAdditionalFieldsOrAggregationCoordinatorOrigin_ParsesCorrectly) {
// An `AggregatableReport` serialized before `additional_fields` and
// `aggregataion_coordinator_origin` were added to the proto definition.
const char kHexEncodedOldProto[] =
"0A071205107B18C803126208D0DA8693FDBECF17122431323334353637382D393061622D"
"346364652D386631322D3334353637383930616263641A1368747470733A2F2F6578616D"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,11 @@ AggregationServiceStorageSql::GetRequestsReportingOnOrBefore(
if (!EnsureDatabaseOpen(DbCreationPolicy::kFailIfAbsent))
return {};

sql::Transaction transaction(&db_);
if (!transaction.Begin()) {
return {};
}

static constexpr char kGetRequestsSql[] =
"SELECT request_id,report_time,request_proto FROM report_requests "
"WHERE report_time<=? ORDER BY report_time LIMIT ?";
Expand All @@ -616,26 +621,41 @@ AggregationServiceStorageSql::GetRequestsReportingOnOrBefore(
// See https://www.sqlite.org/lang_select.html.
get_requests_statement.BindInt(1, limit.value_or(-1));

// Partial results are not returned in case of any error.
// TODO(crbug.com/1340046): Limit the total number of results that can be
// returned in one query.
std::vector<AggregationServiceStorage::RequestAndId> result;
std::vector<AggregationServiceStorage::RequestId> failures;
while (get_requests_statement.Step()) {
AggregationServiceStorage::RequestId request_id{
get_requests_statement.ColumnInt64(0)};
absl::optional<AggregatableReportRequest> parsed_request =
AggregatableReportRequest::Deserialize(
get_requests_statement.ColumnBlob(2));
if (!parsed_request)
return {};
if (!parsed_request) {
failures.push_back(request_id);
continue;
}

result.push_back(AggregationServiceStorage::RequestAndId{
.request = std::move(parsed_request.value()),
.id = AggregationServiceStorage::RequestId(
get_requests_statement.ColumnInt64(0))});
.request = std::move(parsed_request.value()), .id = request_id});
}

if (!get_requests_statement.Succeeded())
return {};

// In case of deserialization failures, remove the request from storage. This
// could occur if the coordinator chosen is no longer on the allowlist. It is
// also possible in case of database corruption.
for (AggregationServiceStorage::RequestId request_id : failures) {
if (!DeleteRequestImpl(request_id)) {
return {};
}
}

if (!transaction.Commit()) {
return {};
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include "base/strings/string_piece.h"
#include "base/test/bind.h"
#include "base/test/metrics/histogram_tester.h"
#include "base/test/scoped_feature_list.h"
#include "base/test/simple_test_clock.h"
#include "base/time/time.h"
#include "components/aggregation_service/features.h"
#include "content/browser/aggregation_service/aggregatable_report.h"
#include "content/browser/aggregation_service/aggregation_service.h"
#include "content/browser/aggregation_service/aggregation_service_storage.h"
Expand Down Expand Up @@ -1161,6 +1163,45 @@ TEST_F(AggregationServiceStorageSqlTest,
stored_requests_and_ids[0].request, request));
}

TEST_F(AggregationServiceStorageSqlTest,
StoreRequestWithCoordinatorOrigin_DeserializedWithOrigin) {
base::test::ScopedFeatureList scoped_feature_list;
scoped_feature_list.InitAndEnableFeatureWithParameters(
::aggregation_service::kAggregationServiceMultipleCloudProviders,
{{"aws_cloud", "https://coordinator.example"}});

OpenDatabase();

EXPECT_FALSE(storage_->NextReportTimeAfter(base::Time::Min()).has_value());
EXPECT_TRUE(
storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).empty());

AggregatableReportRequest example_request =
aggregation_service::CreateExampleRequest();

AggregationServicePayloadContents payload_contents =
example_request.payload_contents();
payload_contents.aggregation_coordinator_origin =
url::Origin::Create(GURL("https://coordinator.example"));

AggregatableReportRequest request =
AggregatableReportRequest::Create(payload_contents,
example_request.shared_info().Clone(),
/*reporting_path=*/std::string(),
/*debug_key=*/absl::nullopt,
/*additional_fields=*/{})
.value();

storage_->StoreRequest(aggregation_service::CloneReportRequest(request));

std::vector<AggregationServiceStorage::RequestAndId> stored_requests_and_ids =
storage_->GetRequestsReportingOnOrBefore(base::Time::Max());

ASSERT_EQ(stored_requests_and_ids.size(), 1u);
EXPECT_TRUE(aggregation_service::ReportRequestsEqual(
stored_requests_and_ids[0].request, request));
}

TEST_F(AggregationServiceStorageSqlInMemoryTest,
DatabaseInMemoryReopened_RequestsNotPersisted) {
OpenDatabase();
Expand All @@ -1181,6 +1222,114 @@ TEST_F(AggregationServiceStorageSqlInMemoryTest,
storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).empty());
}

TEST_F(AggregationServiceStorageSqlTest,
AggregationCoordinatorFeatureModifiedBetweenStorageAndLoading_Success) {
base::test::ScopedFeatureList scoped_feature_list;
scoped_feature_list.InitAndDisableFeature(
::aggregation_service::kAggregationServiceMultipleCloudProviders);
OpenDatabase();

AggregatableReportRequest example_request =
aggregation_service::CreateExampleRequest();

storage_->StoreRequest(
aggregation_service::CloneReportRequest(example_request));
EXPECT_EQ(storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).size(),
1u);

// Turning the feature on should not affect the report loading.
scoped_feature_list.Reset();
scoped_feature_list.InitAndEnableFeatureWithParameters(
::aggregation_service::kAggregationServiceMultipleCloudProviders,
{{"aws_cloud", "https://aws.example.test"},
{"gcp_cloud", "https://gcp.example.test"}});

ASSERT_EQ(storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).size(),
1u);
EXPECT_FALSE(storage_->GetRequestsReportingOnOrBefore(base::Time::Max())[0]
.request.payload_contents()
.aggregation_coordinator_origin.has_value());

storage_->ClearDataBetween(base::Time(), base::Time(), base::NullCallback());

AggregationServicePayloadContents payload_contents =
example_request.payload_contents();
payload_contents.aggregation_coordinator_origin =
url::Origin::Create(GURL("https://aws.example.test"));

storage_->StoreRequest(
AggregatableReportRequest::Create(payload_contents,
example_request.shared_info().Clone())
.value());
ASSERT_EQ(storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).size(),
1u);
EXPECT_EQ(storage_->GetRequestsReportingOnOrBefore(base::Time::Max())[0]
.request.payload_contents()
.aggregation_coordinator_origin.value()
.GetURL()
.spec(),
"https://aws.example.test/");

// Turning the feature off should also not affect the report loading.
scoped_feature_list.Reset();
scoped_feature_list.InitAndDisableFeature(
::aggregation_service::kAggregationServiceMultipleCloudProviders);

ASSERT_EQ(storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).size(),
1u);
EXPECT_EQ(storage_->GetRequestsReportingOnOrBefore(base::Time::Max())[0]
.request.payload_contents()
.aggregation_coordinator_origin.value()
.GetURL()
.spec(),
"https://aws.example.test/");
}

TEST_F(AggregationServiceStorageSqlTest,
AggregationCoordinatorAllowlistChanges_ReportDeleted) {
base::test::ScopedFeatureList scoped_feature_list;
scoped_feature_list.InitAndEnableFeatureWithParameters(
::aggregation_service::kAggregationServiceMultipleCloudProviders,
{{"aws_cloud", "https://aws.example.test"},
{"gcp_cloud", "https://gcp.example.test"}});
OpenDatabase();

AggregatableReportRequest example_request =
aggregation_service::CreateExampleRequest();

AggregationServicePayloadContents payload_contents =
example_request.payload_contents();
payload_contents.aggregation_coordinator_origin =
url::Origin::Create(GURL("https://aws.example.test"));

storage_->StoreRequest(
AggregatableReportRequest::Create(payload_contents,
example_request.shared_info().Clone())
.value());
EXPECT_EQ(storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).size(),
1u);

// If the origin is removed from the allowlist, the report is dropped.
scoped_feature_list.Reset();
scoped_feature_list.InitAndEnableFeatureWithParameters(
::aggregation_service::kAggregationServiceMultipleCloudProviders,
{{"aws_cloud", "https://aws2.example.test"},
{"gcp_cloud", "https://gcp.example.test"}});

EXPECT_TRUE(
storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).empty());

// Check that the report is not just ignored, but actually deleted.
scoped_feature_list.Reset();
scoped_feature_list.InitAndEnableFeatureWithParameters(
::aggregation_service::kAggregationServiceMultipleCloudProviders,
{{"aws_cloud", "https://aws.example.test"},
{"gcp_cloud", "https://gcp.example.test"}});

EXPECT_TRUE(
storage_->GetRequestsReportingOnOrBefore(base::Time::Max()).empty());
}

class AggregationServiceStorageSqlMigrationsTest
: public AggregationServiceStorageSqlTest {
public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ message AggregationServicePayloadContents {
Operation operation = 1;
repeated AggregatableReportHistogramContribution contributions = 2;
AggregationServiceMode aggregation_mode = 3;
// aggregation_coordinator_origin is assumed to be the default (for now).
int32 max_contributions_allowed = 5;
optional string aggregation_coordinator_origin = 6;

reserved 4;
reserved "aggregation_coordinator";
Expand Down

0 comments on commit ca88519

Please sign in to comment.