Skip to content

Commit

Permalink
Ele 2626 support singular tests in test results runs (#1449)
Browse files Browse the repository at this point in the history
* rename

* test results and runs includes keys for singular as well

* freshness api types

* change test runs totals

* fix type

* types cleanup + fix test results totals
  • Loading branch information
NoyaArie authored Mar 6, 2024
1 parent 9f47652 commit 031aa90
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 71 deletions.
8 changes: 4 additions & 4 deletions elementary/monitor/api/filters/filters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional
from typing import Dict, List

from elementary.clients.api.api_client import APIClient
from elementary.monitor.api.filters.schema import FilterSchema, FiltersSchema
Expand All @@ -20,8 +20,8 @@
class FiltersAPI(APIClient):
def get_filters(
self,
test_results_totals: Dict[Optional[str], TotalsSchema],
test_runs_totals: Dict[Optional[str], TotalsSchema],
test_results_totals: Dict[str, TotalsSchema],
test_runs_totals: Dict[str, TotalsSchema],
models: Dict[str, NormalizedModelSchema],
sources: Dict[str, NormalizedSourceSchema],
models_runs: List[ModelRunsSchema],
Expand All @@ -39,7 +39,7 @@ def get_filters(

@staticmethod
def _get_test_filters(
totals: Dict[Optional[str], TotalsSchema],
totals: Dict[str, TotalsSchema],
models: Dict[str, NormalizedModelSchema],
sources: Dict[str, NormalizedSourceSchema],
) -> List[FilterSchema]:
Expand Down
40 changes: 18 additions & 22 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ def get_report_data(
models = models_api.get_models(exclude_elementary_models)
sources = models_api.get_sources()
exposures = models_api.get_exposures()
tests = tests_api.get_singular_tests()
singular_tests = tests_api.get_singular_tests()

groups = groups_api.get_groups(
artifacts=[
*models.values(),
*sources.values(),
*exposures.values(),
*tests,
*singular_tests,
]
)

Expand Down Expand Up @@ -140,9 +140,11 @@ def get_report_data(

invocations_job_identification = defaultdict(list)
for invocation in invocations:
key = invocation.job_name or invocation.job_id
if key is not None:
invocations_job_identification[key].append(invocation.invocation_id)
invocation_key = invocation.job_name or invocation.job_id
if invocation_key is not None:
invocations_job_identification[invocation_key].append(
invocation.invocation_id
)

report_data = ReportDataSchema(
creation_time=get_now_utc_iso_format(),
Expand Down Expand Up @@ -193,33 +195,27 @@ def _serialize_models_runs(self, models_runs: List[ModelRunsSchema]) -> List[dic
def _serialize_test_results(
self,
test_results: Dict[
Optional[str], List[Union[TestResultSchema, SourceFreshnessResultSchema]]
str, List[Union[TestResultSchema, SourceFreshnessResultSchema]]
],
) -> Dict[Optional[str], List[dict]]:
) -> Dict[str, List[dict]]:
serializable_test_results = defaultdict(list)
for model_unique_id, test_result in test_results.items():
serializable_test_results[model_unique_id].extend(
for key, test_result in test_results.items():
serializable_test_results[key].extend(
[result.dict() for result in test_result]
)
return serializable_test_results

def _serialize_test_runs(
self,
test_runs: Dict[
Optional[str], List[Union[TestRunSchema, SourceFreshnessRunSchema]]
],
) -> Dict[Optional[str], List[dict]]:
test_runs: Dict[str, List[Union[TestRunSchema, SourceFreshnessRunSchema]]],
) -> Dict[str, List[dict]]:
serializable_test_runs = defaultdict(list)
for model_unique_id, test_run in test_runs.items():
serializable_test_runs[model_unique_id].extend(
[run.dict() for run in test_run]
)
for key, test_run in test_runs.items():
serializable_test_runs[key].extend([run.dict() for run in test_run])
return serializable_test_runs

def _serialize_totals(
self, totals: Dict[Optional[str], TotalsSchema]
) -> Dict[Optional[str], dict]:
def _serialize_totals(self, totals: Dict[str, TotalsSchema]) -> Dict[str, dict]:
serialized_totals = dict()
for model_unique_id, total in totals.items():
serialized_totals[model_unique_id] = total.dict()
for key, total in totals.items():
serialized_totals[key] = total.dict()
return serialized_totals
48 changes: 16 additions & 32 deletions elementary/monitor/api/report/totals_utils.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,40 @@
from collections import defaultdict
from typing import Dict, List, Optional, Union
from typing import Dict, List, Union

from elementary.monitor.api.source_freshnesses.schema import (
SourceFreshnessMetadataSchema,
SourceFreshnessResultSchema,
SourceFreshnessRunSchema,
)
from elementary.monitor.api.tests.schema import (
TestMetadataSchema,
TestResultSchema,
TestRunSchema,
)
from elementary.monitor.api.tests.schema import TestResultSchema, TestRunSchema
from elementary.monitor.api.totals_schema import TotalsSchema


def get_total_test_results(
test_results: Dict[
Optional[str], List[Union[TestResultSchema, SourceFreshnessResultSchema]]
],
) -> Dict[Optional[str], TotalsSchema]:
test_metadatas = []
for test_result in test_results.values():
test_metadatas.extend([result.metadata for result in test_result])
test_results: Dict[str, List[Union[TestResultSchema, SourceFreshnessResultSchema]]],
) -> Dict[str, TotalsSchema]:
totals: Dict[str, TotalsSchema] = defaultdict(TotalsSchema)
for key, test_result in test_results.items():
for result in test_result:
# count by the key of the tests_results
totals[key].add_total(result.metadata.latest_run_status)

return _calculate_test_results_totals(test_metadatas)
return totals


def get_total_test_runs(
tests_runs: Dict[
Optional[str], List[Union[TestRunSchema, SourceFreshnessRunSchema]]
]
) -> Dict[Optional[str], TotalsSchema]:
totals: Dict[Optional[str], TotalsSchema] = defaultdict(TotalsSchema)
for test_runs in tests_runs.values():
tests_runs: Dict[str, List[Union[TestRunSchema, SourceFreshnessRunSchema]]]
) -> Dict[str, TotalsSchema]:
totals: Dict[str, TotalsSchema] = defaultdict(TotalsSchema)
for key, test_runs in tests_runs.items():
for test_run in test_runs:
# It's possible test_runs will be None if we didn't find any invocations associated
# with this test, in that case it also makes sense to skip it.
if not test_run.test_runs:
continue

test_invocations = test_run.test_runs.invocations
model_unique_id = test_run.metadata.model_unique_id

for test_invocation in test_invocations:
totals[model_unique_id].add_total(test_invocation.status)
return totals


def _calculate_test_results_totals(
test_metadatas: List[Union[TestMetadataSchema, SourceFreshnessMetadataSchema]],
) -> Dict[Optional[str], TotalsSchema]:
totals: Dict[Optional[str], TotalsSchema] = defaultdict(TotalsSchema)
for test in test_metadatas:
totals[test.model_unique_id].add_total(test.latest_run_status)
# count by the key of the tests_runs
totals[key].add_total(test_invocation.status)
return totals
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ def _get_source_freshness_results_db_rows(

def get_source_freshness_results(
self,
) -> Dict[Optional[str], List[SourceFreshnessResultSchema]]:
) -> Dict[str, List[SourceFreshnessResultSchema]]:
filtered_source_freshness_results_db_rows = [
test_result
for test_result in self.source_freshness_results_db_rows
if test_result.invocations_rank_index == 1
]
tests_results: DefaultDict[
Optional[str], List[SourceFreshnessResultSchema]
str, List[SourceFreshnessResultSchema]
] = defaultdict(list)

for (
Expand Down
25 changes: 14 additions & 11 deletions elementary/monitor/api/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def get_test_results(
self,
invocation_id: Optional[str],
disable_samples: bool = False,
) -> Dict[Optional[str], List[TestResultSchema]]:
) -> Dict[str, List[TestResultSchema]]:
filtered_test_results_db_rows = self.test_results_db_rows

if invocation_id:
Expand All @@ -149,9 +149,7 @@ def get_test_results(
if test_result.invocations_rank_index == 1
]

tests_results: DefaultDict[Optional[str], List[TestResultSchema]] = defaultdict(
list
)
tests_results: DefaultDict[str, List[TestResultSchema]] = defaultdict(list)
for test_result_db_row in filtered_test_results_db_rows:
metadata = self._get_test_metadata_from_test_result_db_row(
test_result_db_row
Expand All @@ -163,16 +161,18 @@ def get_test_results(
if inner_test_results is None:
continue

tests_results[test_result_db_row.model_unique_id].append(
TestResultSchema(
metadata=metadata,
test_results=inner_test_results,
)
test_result = TestResultSchema(
metadata=metadata,
test_results=inner_test_results,
)
if test_result_db_row.model_unique_id:
tests_results[test_result_db_row.model_unique_id].append(test_result)
if test_result_db_row.test_sub_type == "singular":
tests_results[test_result_db_row.test_unique_id].append(test_result)

return tests_results

def get_test_runs(self) -> Dict[Optional[str], List[TestRunSchema]]:
def get_test_runs(self) -> Dict[str, List[TestRunSchema]]:
tests_invocations = self._get_invocations(self.test_results_db_rows)
latest_test_results = [
test_result
Expand All @@ -191,7 +191,10 @@ def get_test_runs(self) -> Dict[Optional[str], List[TestRunSchema]]:
),
test_runs=test_invocations,
)
test_runs[test_result_db_row.model_unique_id].append(test_run)
if test_result_db_row.model_unique_id:
test_runs[test_result_db_row.model_unique_id].append(test_run)
if test_result_db_row.test_sub_type == "singular":
test_runs[test_result_db_row.test_unique_id].append(test_run)
return test_runs

def _get_invocations(
Expand Down

0 comments on commit 031aa90

Please sign in to comment.