Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions services/test_analytics/ta_cache_rollups.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import UTC
from io import BytesIO
from typing import cast

import polars as pl
import shared.storage
Expand All @@ -24,8 +25,33 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str:
)


POLARS_SCHEMA = [
# version number that the cache rollup task will be writing to GCS
# if you're creating a new version of the schema, increment this
VERSION = "1"

# list of schemas, you should leave the old ones here as a reference for now
# old schemas should basically be expired after 60 days, since there would be
# no relevant data included in those files after that amount of time

# so from the time you deprecate an old schema, you only have to keep handling it
# for 60 days
NO_VERSION_POLARS_SCHEMA = [
"computed_name",
("flags", pl.List(pl.String)),
"failing_commits",
"last_duration",
"avg_duration",
"pass_count",
"fail_count",
"flaky_fail_count",
"skip_count",
("updated_at", pl.Datetime(time_zone=UTC)),
"timestamp_bin",
]

V1_POLARS_SCHEMA = [
"computed_name",
"testsuite",
("flags", pl.List(pl.String)),
"failing_commits",
"last_duration",
Expand All @@ -40,7 +66,6 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str:


def cache_rollups(repoid: int, branch: str | None = None):
storage_service = shared.storage.get_appropriate_storage_service(repoid)
serialized_table: BytesIO

with read_rollups_from_db_summary.labels("new").time():
Expand All @@ -55,6 +80,7 @@ def cache_rollups(repoid: int, branch: str | None = None):
data = [
{
"computed_name": summary.computed_name,
"testsuite": summary.testsuite,
"flags": summary.flags,
"failing_commits": summary.failing_commits,
"last_duration": summary.last_duration_seconds,
Expand All @@ -69,15 +95,20 @@ def cache_rollups(repoid: int, branch: str | None = None):
for summary in summaries
]

serialized_table = pl.DataFrame(
df = pl.DataFrame(
data,
POLARS_SCHEMA,
V1_POLARS_SCHEMA,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we going to serialize data for schemas before this V1 in the next 60 days? They won't have the testsuite right, so wouldn't this lead to it line 83 failing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema is describing the format of what we're writing (the keys in data) so as long as we include a value for each field in the schema in data the schema will be respected:

schema = ["field1", "field2"]
data = {"field1": value1, "field2": value2} # this is good
data = {"field3": value1} # bad

orient="row",
).write_ipc(None)
)
serialized_table = df.write_ipc(None)

serialized_table.seek(0)

storage_service = shared.storage.get_appropriate_storage_service(repoid)
storage_service.write_file(
settings.GCS_BUCKET_NAME, rollup_blob_path(repoid, branch), serialized_table
cast(str, settings.GCS_BUCKET_NAME),
rollup_blob_path(repoid, branch),
serialized_table,
metadata={"version": VERSION},
)
rollup_size_summary.labels("new").observe(serialized_table.tell())
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"computed_name2",
"computed_name"
],
"testsuite": [
"testsuite2",
"testsuite"
],
"flags": [
[
"test-rollups2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"computed_name",
"computed_name2"
],
"testsuite": [
"testsuite",
"testsuite2"
],
"flags": [
[
"test-rollups"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"computed_name2",
"computed_name"
],
"testsuite": [
"testsuite2",
"testsuite"
],
"flags": [
[
"test-rollups2"
Expand Down
27 changes: 21 additions & 6 deletions services/test_analytics/tests/test_ta_cache_rollups.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime as dt
from typing import cast

import polars as pl
import pytest
Expand All @@ -8,14 +9,22 @@
TestrunBranchSummary,
TestrunSummary,
)
from shared.storage.minio import MinioStorageService

from services.test_analytics.ta_cache_rollups import VERSION
from services.test_analytics.utils import calc_test_id
from tasks.cache_test_rollups import CacheTestRollupsTask


def read_table(storage, storage_path: str):
def read_table(
storage: MinioStorageService,
storage_path: str,
meta_container: dict[str, str] | None = None,
):
decompressed_table: bytes = storage.read_file(
get_config("services", "minio", "bucket", default="archive"), storage_path
cast(str, get_config("services", "minio", "bucket", default="archive")),
storage_path,
metadata_container=meta_container,
)
return pl.read_ipc(decompressed_table)

Expand Down Expand Up @@ -82,8 +91,11 @@ def test_cache_test_rollups(storage, snapshot):
branch=None,
impl_type="new",
)

table = read_table(storage, "test_analytics/repo_rollups/1.arrow")
meta = {}
table = read_table(
storage, "test_analytics/repo_rollups/1.arrow", meta_container=meta
)
assert meta["version"] == VERSION
table_dict = table.to_dict(as_series=False)
del table_dict["timestamp_bin"]
del table_dict["updated_at"]
Expand Down Expand Up @@ -174,8 +186,11 @@ def test_cache_test_rollups_use_timeseries_main(storage, snapshot):
branch="main",
impl_type="new",
)

table = read_table(storage, "test_analytics/branch_rollups/1/main.arrow")
meta = {}
table = read_table(
storage, "test_analytics/branch_rollups/1/main.arrow", meta_container=meta
)
assert meta["version"] == VERSION
table_dict = table.to_dict(as_series=False)
del table_dict["timestamp_bin"]
del table_dict["updated_at"]
Expand Down
Loading