diff --git a/services/test_analytics/ta_cache_rollups.py b/services/test_analytics/ta_cache_rollups.py index fdca2c7a4..fdffcc110 100644 --- a/services/test_analytics/ta_cache_rollups.py +++ b/services/test_analytics/ta_cache_rollups.py @@ -1,5 +1,6 @@ from datetime import UTC from io import BytesIO +from typing import cast import polars as pl import shared.storage @@ -24,8 +25,33 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str: ) -POLARS_SCHEMA = [ +# version number that the cache rollup task will be writing to GCS +# if you're creating a new version of the schema, increment this +VERSION = "1" + +# list of schemas, you should leave the old ones here as a reference for now +# old schemas should basically be expired after 60 days, since there would be +# no relevant data included in those files after that amount of time + +# so from the time you deprecate an old schema, you only have to keep handling it +# for 60 days +NO_VERSION_POLARS_SCHEMA = [ + "computed_name", + ("flags", pl.List(pl.String)), + "failing_commits", + "last_duration", + "avg_duration", + "pass_count", + "fail_count", + "flaky_fail_count", + "skip_count", + ("updated_at", pl.Datetime(time_zone=UTC)), + "timestamp_bin", +] + +V1_POLARS_SCHEMA = [ "computed_name", + "testsuite", ("flags", pl.List(pl.String)), "failing_commits", "last_duration", @@ -40,7 +66,6 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str: def cache_rollups(repoid: int, branch: str | None = None): - storage_service = shared.storage.get_appropriate_storage_service(repoid) serialized_table: BytesIO with read_rollups_from_db_summary.labels("new").time(): @@ -55,6 +80,7 @@ def cache_rollups(repoid: int, branch: str | None = None): data = [ { "computed_name": summary.computed_name, + "testsuite": summary.testsuite, "flags": summary.flags, "failing_commits": summary.failing_commits, "last_duration": summary.last_duration_seconds, @@ -69,15 +95,20 @@ def cache_rollups(repoid: int, branch: str | None = None): for summary in summaries ] - serialized_table = pl.DataFrame( + df = pl.DataFrame( data, - POLARS_SCHEMA, + V1_POLARS_SCHEMA, orient="row", - ).write_ipc(None) + ) + serialized_table = df.write_ipc(None) serialized_table.seek(0) + storage_service = shared.storage.get_appropriate_storage_service(repoid) storage_service.write_file( - settings.GCS_BUCKET_NAME, rollup_blob_path(repoid, branch), serialized_table + cast(str, settings.GCS_BUCKET_NAME), + rollup_blob_path(repoid, branch), + serialized_table, + metadata={"version": VERSION}, ) rollup_size_summary.labels("new").observe(serialized_table.tell()) diff --git a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json index 987b14e08..916b0ce54 100644 --- a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json +++ b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json @@ -3,6 +3,10 @@ "computed_name2", "computed_name" ], + "testsuite": [ + "testsuite2", + "testsuite" + ], "flags": [ [ "test-rollups2" diff --git a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json index 6798c8832..714804777 100644 --- a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json +++ b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json @@ -3,6 +3,10 @@ "computed_name", "computed_name2" ], + "testsuite": [ + "testsuite", + "testsuite2" + ], "flags": [ [ "test-rollups" diff --git a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json index 987b14e08..916b0ce54 100644 --- a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json +++ b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json @@ -3,6 +3,10 @@ "computed_name2", "computed_name" ], + "testsuite": [ + "testsuite2", + "testsuite" + ], "flags": [ [ "test-rollups2" diff --git a/services/test_analytics/tests/test_ta_cache_rollups.py b/services/test_analytics/tests/test_ta_cache_rollups.py index c8cf3b7b0..4d0c15b2a 100644 --- a/services/test_analytics/tests/test_ta_cache_rollups.py +++ b/services/test_analytics/tests/test_ta_cache_rollups.py @@ -1,4 +1,5 @@ import datetime as dt +from typing import cast import polars as pl import pytest @@ -8,14 +9,22 @@ TestrunBranchSummary, TestrunSummary, ) +from shared.storage.minio import MinioStorageService +from services.test_analytics.ta_cache_rollups import VERSION from services.test_analytics.utils import calc_test_id from tasks.cache_test_rollups import CacheTestRollupsTask -def read_table(storage, storage_path: str): +def read_table( + storage: MinioStorageService, + storage_path: str, + meta_container: dict[str, str] | None = None, +): decompressed_table: bytes = storage.read_file( - get_config("services", "minio", "bucket", default="archive"), storage_path + cast(str, get_config("services", "minio", "bucket", default="archive")), + storage_path, + metadata_container=meta_container, ) return pl.read_ipc(decompressed_table) @@ -82,8 +91,11 @@ def test_cache_test_rollups(storage, snapshot): branch=None, impl_type="new", ) - - table = read_table(storage, "test_analytics/repo_rollups/1.arrow") + meta = {} + table = read_table( + storage, "test_analytics/repo_rollups/1.arrow", meta_container=meta + ) + assert meta["version"] == VERSION table_dict = table.to_dict(as_series=False) del table_dict["timestamp_bin"] del table_dict["updated_at"] @@ -174,8 +186,11 @@ def test_cache_test_rollups_use_timeseries_main(storage, snapshot): branch="main", impl_type="new", ) - - table = read_table(storage, "test_analytics/branch_rollups/1/main.arrow") + meta = {} + table = read_table( + storage, "test_analytics/branch_rollups/1/main.arrow", meta_container=meta + ) + assert meta["version"] == VERSION table_dict = table.to_dict(as_series=False) del table_dict["timestamp_bin"] del table_dict["updated_at"]