Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ele 2626 support singular tests in test results runs #1449

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions elementary/monitor/api/filters/filters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional
from typing import Dict, List

from elementary.clients.api.api_client import APIClient
from elementary.monitor.api.filters.schema import FilterSchema, FiltersSchema
Expand All @@ -20,8 +20,8 @@
class FiltersAPI(APIClient):
def get_filters(
self,
test_results_totals: Dict[Optional[str], TotalsSchema],
test_runs_totals: Dict[Optional[str], TotalsSchema],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keys of test_results, test_runs, test_results_totals, test_runs_totals are no longer nullable

test_results_totals: Dict[str, TotalsSchema],
test_runs_totals: Dict[str, TotalsSchema],
models: Dict[str, NormalizedModelSchema],
sources: Dict[str, NormalizedSourceSchema],
models_runs: List[ModelRunsSchema],
Expand All @@ -39,7 +39,7 @@ def get_filters(

@staticmethod
def _get_test_filters(
totals: Dict[Optional[str], TotalsSchema],
totals: Dict[str, TotalsSchema],
models: Dict[str, NormalizedModelSchema],
sources: Dict[str, NormalizedSourceSchema],
) -> List[FilterSchema]:
Expand Down
40 changes: 18 additions & 22 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ def get_report_data(
models = models_api.get_models(exclude_elementary_models)
sources = models_api.get_sources()
exposures = models_api.get_exposures()
tests = tests_api.get_singular_tests()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename

singular_tests = tests_api.get_singular_tests()

groups = groups_api.get_groups(
artifacts=[
*models.values(),
*sources.values(),
*exposures.values(),
*tests,
*singular_tests,
]
)

Expand Down Expand Up @@ -140,9 +140,11 @@ def get_report_data(

invocations_job_identification = defaultdict(list)
for invocation in invocations:
key = invocation.job_name or invocation.job_id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename key to invocation_key

if key is not None:
invocations_job_identification[key].append(invocation.invocation_id)
invocation_key = invocation.job_name or invocation.job_id
if invocation_key is not None:
invocations_job_identification[invocation_key].append(
invocation.invocation_id
)

report_data = ReportDataSchema(
creation_time=get_now_utc_iso_format(),
Expand Down Expand Up @@ -193,33 +195,27 @@ def _serialize_models_runs(self, models_runs: List[ModelRunsSchema]) -> List[dic
def _serialize_test_results(
self,
test_results: Dict[
Optional[str], List[Union[TestResultSchema, SourceFreshnessResultSchema]]
str, List[Union[TestResultSchema, SourceFreshnessResultSchema]]
],
) -> Dict[Optional[str], List[dict]]:
) -> Dict[str, List[dict]]:
serializable_test_results = defaultdict(list)
for model_unique_id, test_result in test_results.items():
serializable_test_results[model_unique_id].extend(
for key, test_result in test_results.items():
serializable_test_results[key].extend(
[result.dict() for result in test_result]
)
return serializable_test_results

def _serialize_test_runs(
self,
test_runs: Dict[
Optional[str], List[Union[TestRunSchema, SourceFreshnessRunSchema]]
],
) -> Dict[Optional[str], List[dict]]:
test_runs: Dict[str, List[Union[TestRunSchema, SourceFreshnessRunSchema]]],
) -> Dict[str, List[dict]]:
serializable_test_runs = defaultdict(list)
for model_unique_id, test_run in test_runs.items():
serializable_test_runs[model_unique_id].extend(
[run.dict() for run in test_run]
)
for key, test_run in test_runs.items():
serializable_test_runs[key].extend([run.dict() for run in test_run])
return serializable_test_runs

def _serialize_totals(
self, totals: Dict[Optional[str], TotalsSchema]
) -> Dict[Optional[str], dict]:
def _serialize_totals(self, totals: Dict[str, TotalsSchema]) -> Dict[str, dict]:
serialized_totals = dict()
for model_unique_id, total in totals.items():
serialized_totals[model_unique_id] = total.dict()
for key, total in totals.items():
serialized_totals[key] = total.dict()
return serialized_totals
48 changes: 16 additions & 32 deletions elementary/monitor/api/report/totals_utils.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,40 @@
from collections import defaultdict
from typing import Dict, List, Optional, Union
from typing import Dict, List, Union

from elementary.monitor.api.source_freshnesses.schema import (
SourceFreshnessMetadataSchema,
SourceFreshnessResultSchema,
SourceFreshnessRunSchema,
)
from elementary.monitor.api.tests.schema import (
TestMetadataSchema,
TestResultSchema,
TestRunSchema,
)
from elementary.monitor.api.tests.schema import TestResultSchema, TestRunSchema
from elementary.monitor.api.totals_schema import TotalsSchema


def get_total_test_results(
test_results: Dict[
Optional[str], List[Union[TestResultSchema, SourceFreshnessResultSchema]]
],
) -> Dict[Optional[str], TotalsSchema]:
test_metadatas = []
for test_result in test_results.values():
test_metadatas.extend([result.metadata for result in test_result])
test_results: Dict[str, List[Union[TestResultSchema, SourceFreshnessResultSchema]]],
) -> Dict[str, TotalsSchema]:
totals: Dict[str, TotalsSchema] = defaultdict(TotalsSchema)
for key, test_result in test_results.items():
for result in test_result:
# count by the key of the tests_results
totals[key].add_total(result.metadata.latest_run_status)

return _calculate_test_results_totals(test_metadatas)
return totals


def get_total_test_runs(
tests_runs: Dict[
Optional[str], List[Union[TestRunSchema, SourceFreshnessRunSchema]]
]
) -> Dict[Optional[str], TotalsSchema]:
totals: Dict[Optional[str], TotalsSchema] = defaultdict(TotalsSchema)
for test_runs in tests_runs.values():
tests_runs: Dict[str, List[Union[TestRunSchema, SourceFreshnessRunSchema]]]
) -> Dict[str, TotalsSchema]:
totals: Dict[str, TotalsSchema] = defaultdict(TotalsSchema)
for key, test_runs in tests_runs.items():
for test_run in test_runs:
# It's possible test_runs will be None if we didn't find any invocations associated
# with this test, in that case it also makes sense to skip it.
if not test_run.test_runs:
continue

test_invocations = test_run.test_runs.invocations
model_unique_id = test_run.metadata.model_unique_id

for test_invocation in test_invocations:
totals[model_unique_id].add_total(test_invocation.status)
return totals


def _calculate_test_results_totals(
test_metadatas: List[Union[TestMetadataSchema, SourceFreshnessMetadataSchema]],
) -> Dict[Optional[str], TotalsSchema]:
totals: Dict[Optional[str], TotalsSchema] = defaultdict(TotalsSchema)
for test in test_metadatas:
totals[test.model_unique_id].add_total(test.latest_run_status)
# count by the key of the tests_runs
totals[key].add_total(test_invocation.status)
return totals
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ def _get_source_freshness_results_db_rows(

def get_source_freshness_results(
self,
) -> Dict[Optional[str], List[SourceFreshnessResultSchema]]:
) -> Dict[str, List[SourceFreshnessResultSchema]]:
filtered_source_freshness_results_db_rows = [
test_result
for test_result in self.source_freshness_results_db_rows
if test_result.invocations_rank_index == 1
]
tests_results: DefaultDict[
Optional[str], List[SourceFreshnessResultSchema]
str, List[SourceFreshnessResultSchema]
] = defaultdict(list)

for (
Expand Down
25 changes: 14 additions & 11 deletions elementary/monitor/api/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def get_test_results(
self,
invocation_id: Optional[str],
disable_samples: bool = False,
) -> Dict[Optional[str], List[TestResultSchema]]:
) -> Dict[str, List[TestResultSchema]]:
filtered_test_results_db_rows = self.test_results_db_rows

if invocation_id:
Expand All @@ -149,9 +149,7 @@ def get_test_results(
if test_result.invocations_rank_index == 1
]

tests_results: DefaultDict[Optional[str], List[TestResultSchema]] = defaultdict(
list
)
tests_results: DefaultDict[str, List[TestResultSchema]] = defaultdict(list)
for test_result_db_row in filtered_test_results_db_rows:
metadata = self._get_test_metadata_from_test_result_db_row(
test_result_db_row
Expand All @@ -163,16 +161,18 @@ def get_test_results(
if inner_test_results is None:
continue

tests_results[test_result_db_row.model_unique_id].append(
TestResultSchema(
metadata=metadata,
test_results=inner_test_results,
)
test_result = TestResultSchema(
metadata=metadata,
test_results=inner_test_results,
)
if test_result_db_row.model_unique_id:
tests_results[test_result_db_row.model_unique_id].append(test_result)
if test_result_db_row.test_sub_type == "singular":
tests_results[test_result_db_row.test_unique_id].append(test_result)

return tests_results

def get_test_runs(self) -> Dict[Optional[str], List[TestRunSchema]]:
def get_test_runs(self) -> Dict[str, List[TestRunSchema]]:
tests_invocations = self._get_invocations(self.test_results_db_rows)
latest_test_results = [
test_result
Expand All @@ -191,7 +191,10 @@ def get_test_runs(self) -> Dict[Optional[str], List[TestRunSchema]]:
),
test_runs=test_invocations,
)
test_runs[test_result_db_row.model_unique_id].append(test_run)
if test_result_db_row.model_unique_id:
test_runs[test_result_db_row.model_unique_id].append(test_run)
if test_result_db_row.test_sub_type == "singular":
test_runs[test_result_db_row.test_unique_id].append(test_run)
return test_runs

def _get_invocations(
Expand Down
Loading