Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.

Commit 563a6fa

Browse files
committed
feat: add version metadata in ta_cache_rollups
we want to be able to evolve the schema of the rollups and we can do that by including a version tag in the GCS object metadata. However, even though in this case we're making the change in worker first, in the future, we should modify the reading code to handle the new schema before modifying the write code, since if we deploy both reader and writer at the same time, its possible a rollup written by a new version of the writer is read by an old version of the reader which doesn't understand the new format
1 parent 078221e commit 563a6fa

File tree

5 files changed

+70
-12
lines changed

5 files changed

+70
-12
lines changed

services/test_analytics/ta_cache_rollups.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from datetime import UTC
22
from io import BytesIO
3+
from typing import cast
34

45
import polars as pl
56
import shared.storage
@@ -24,8 +25,33 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str:
2425
)
2526

2627

27-
POLARS_SCHEMA = [
28+
# version number that the cache rollup task will be writing to GCS
29+
# if you're creating a new version of the schema, increment this
30+
VERSION = "1"
31+
32+
# list of schemas, you should leave the old ones here as a reference for now
33+
# old schemas should basically be expired after 60 days, since there would be
34+
# no relevant data included in those files after that amount of time
35+
36+
# so from the time you deprecate an old schema, you only have to keep handling it
37+
# for 60 days
38+
NO_VERSION_POLARS_SCHEMA = [
39+
"computed_name",
40+
("flags", pl.List(pl.String)),
41+
"failing_commits",
42+
"last_duration",
43+
"avg_duration",
44+
"pass_count",
45+
"fail_count",
46+
"flaky_fail_count",
47+
"skip_count",
48+
("updated_at", pl.Datetime(time_zone=UTC)),
49+
"timestamp_bin",
50+
]
51+
52+
V1_POLARS_SCHEMA = [
2853
"computed_name",
54+
"testsuite",
2955
("flags", pl.List(pl.String)),
3056
"failing_commits",
3157
"last_duration",
@@ -40,7 +66,6 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str:
4066

4167

4268
def cache_rollups(repoid: int, branch: str | None = None):
43-
storage_service = shared.storage.get_appropriate_storage_service(repoid)
4469
serialized_table: BytesIO
4570

4671
with read_rollups_from_db_summary.labels("new").time():
@@ -55,6 +80,7 @@ def cache_rollups(repoid: int, branch: str | None = None):
5580
data = [
5681
{
5782
"computed_name": summary.computed_name,
83+
"testsuite": summary.testsuite,
5884
"flags": summary.flags,
5985
"failing_commits": summary.failing_commits,
6086
"last_duration": summary.last_duration_seconds,
@@ -69,15 +95,20 @@ def cache_rollups(repoid: int, branch: str | None = None):
6995
for summary in summaries
7096
]
7197

72-
serialized_table = pl.DataFrame(
98+
df = pl.DataFrame(
7399
data,
74-
POLARS_SCHEMA,
100+
V1_POLARS_SCHEMA,
75101
orient="row",
76-
).write_ipc(None)
102+
)
103+
serialized_table = df.write_ipc(None)
77104

78105
serialized_table.seek(0)
79106

107+
storage_service = shared.storage.get_appropriate_storage_service(repoid)
80108
storage_service.write_file(
81-
settings.GCS_BUCKET_NAME, rollup_blob_path(repoid, branch), serialized_table
109+
cast(str, settings.GCS_BUCKET_NAME),
110+
rollup_blob_path(repoid, branch),
111+
serialized_table,
112+
metadata={"version": VERSION},
82113
)
83114
rollup_size_summary.labels("new").observe(serialized_table.tell())

services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
"computed_name2",
44
"computed_name"
55
],
6+
"testsuite": [
7+
"testsuite2",
8+
"testsuite"
9+
],
610
"flags": [
711
[
812
"test-rollups2"

services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
"computed_name",
44
"computed_name2"
55
],
6+
"testsuite": [
7+
"testsuite",
8+
"testsuite2"
9+
],
610
"flags": [
711
[
812
"test-rollups"

services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
"computed_name2",
44
"computed_name"
55
],
6+
"testsuite": [
7+
"testsuite2",
8+
"testsuite"
9+
],
610
"flags": [
711
[
812
"test-rollups2"

services/test_analytics/tests/test_ta_cache_rollups.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import datetime as dt
2+
from typing import cast
23

34
import polars as pl
45
import pytest
@@ -8,14 +9,22 @@
89
TestrunBranchSummary,
910
TestrunSummary,
1011
)
12+
from shared.storage.minio import MinioStorageService
1113

14+
from services.test_analytics.ta_cache_rollups import VERSION
1215
from services.test_analytics.utils import calc_test_id
1316
from tasks.cache_test_rollups import CacheTestRollupsTask
1417

1518

16-
def read_table(storage, storage_path: str):
19+
def read_table(
20+
storage: MinioStorageService,
21+
storage_path: str,
22+
meta_container: dict[str, str] | None = None,
23+
):
1724
decompressed_table: bytes = storage.read_file(
18-
get_config("services", "minio", "bucket", default="archive"), storage_path
25+
cast(str, get_config("services", "minio", "bucket", default="archive")),
26+
storage_path,
27+
metadata_container=meta_container,
1928
)
2029
return pl.read_ipc(decompressed_table)
2130

@@ -82,8 +91,11 @@ def test_cache_test_rollups(storage, snapshot):
8291
branch=None,
8392
impl_type="new",
8493
)
85-
86-
table = read_table(storage, "test_analytics/repo_rollups/1.arrow")
94+
meta = {}
95+
table = read_table(
96+
storage, "test_analytics/repo_rollups/1.arrow", meta_container=meta
97+
)
98+
assert meta["version"] == VERSION
8799
table_dict = table.to_dict(as_series=False)
88100
del table_dict["timestamp_bin"]
89101
del table_dict["updated_at"]
@@ -174,8 +186,11 @@ def test_cache_test_rollups_use_timeseries_main(storage, snapshot):
174186
branch="main",
175187
impl_type="new",
176188
)
177-
178-
table = read_table(storage, "test_analytics/branch_rollups/1/main.arrow")
189+
meta = {}
190+
table = read_table(
191+
storage, "test_analytics/branch_rollups/1/main.arrow", meta_container=meta
192+
)
193+
assert meta["version"] == VERSION
179194
table_dict = table.to_dict(as_series=False)
180195
del table_dict["timestamp_bin"]
181196
del table_dict["updated_at"]

0 commit comments

Comments
 (0)