Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions elementary/monitor/api/invocations/invocations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Dict, List

from elementary.clients.api.api_client import APIClient
from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from elementary.monitor.fetchers.invocations.invocations import InvocationsFetcher
Expand Down Expand Up @@ -37,3 +39,11 @@ def get_invocation_by_id(
)
else:
raise NotImplementedError

def get_invocations_by_ids(
self, invocations_ids: List[str]
) -> List[DbtInvocationSchema]:
return self.invocations_fetcher.get_invocations_by_ids(invocations_ids)

def get_resources_latest_invocation(self) -> Dict[str, str]:
return self.invocations_fetcher.get_resources_latest_invocation()
11 changes: 11 additions & 0 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from elementary.clients.api.api_client import APIClient
from elementary.monitor.api.filters.filters import FiltersAPI
from elementary.monitor.api.invocations.invocations import InvocationsAPI
from elementary.monitor.api.lineage.lineage import LineageAPI
from elementary.monitor.api.models.models import ModelsAPI
from elementary.monitor.api.models.schema import (
Expand Down Expand Up @@ -44,6 +45,7 @@ def get_report_data(
sidebar_api = SidebarAPI(dbt_runner=self.dbt_runner)
lineage_api = LineageAPI(dbt_runner=self.dbt_runner)
filters_api = FiltersAPI(dbt_runner=self.dbt_runner)
invocations_api = InvocationsAPI(dbt_runner=self.dbt_runner)

models = models_api.get_models(exclude_elementary_models)
sources = models_api.get_sources()
Expand Down Expand Up @@ -87,6 +89,13 @@ def get_report_data(
serializable_filters = filters.dict()
serializable_lineage = lineage.dict()

resources_latest_invocation = (
invocations_api.get_resources_latest_invocation()
)
invocations = invocations_api.get_invocations_by_ids(
invocations_ids=list(set(resources_latest_invocation.values()))
)

report_data = ReportDataSchema(
creation_time=get_now_utc_iso_format(),
days_back=days_back,
Expand All @@ -102,6 +111,8 @@ def get_report_data(
model_runs_totals=serializable_model_runs_totals,
filters=serializable_filters,
lineage=serializable_lineage,
invocations=invocations,
resources_latest_invocation=resources_latest_invocation,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need both of the new objects at the front?

env=dict(project_name=project_name, env=env),
)
return report_data, None
Expand Down
2 changes: 2 additions & 0 deletions elementary/monitor/api/report/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ class ReportDataSchema(BaseModel):
model_runs_totals: dict = dict()
filters: dict = dict()
lineage: dict = dict()
invocations: list = list()
resources_latest_invocation: dict = dict()
env: dict = dict()
tracking: Optional[dict] = None
940 changes: 538 additions & 402 deletions elementary/monitor/data_monitoring/report/index.html

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions elementary/monitor/dbt_project/macros/get_invocations_by_ids.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{% macro get_invocations_by_ids(ids) %}
{% set database, schema = elementary.target_database(), target.schema %}
{% set invocations_relation = adapter.get_relation(database, schema, 'dbt_invocations') %}
{% set column_exists = elementary.column_exists_in_relation(invocations_relation, 'job_url') %}

{% if invocations_relation %}
{% set get_invocations_query %}
select
invocation_id,
command,
selected,
full_refresh,
{% if column_exists %}
job_url,
{% endif %}
job_name,
job_id,
orchestrator
from {{ invocations_relation }}
where invocation_id in {{ elementary.strings_list_to_tuple(ids) }}
{% endset %}
{% set result = elementary.run_query(get_invocations_query) %}
{% do return(elementary.agate_to_dicts(result)) %}
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{% macro get_resources_latest_invocation() %}
{% set dbt_run_results = ref('dbt_run_results') %}
{% set get_resources_latest_invocation_query %}
with ordered_run_results as (
select
*,
ROW_NUMBER() OVER (PARTITION BY unique_id ORDER BY generated_at DESC) AS row_number
from {{ dbt_run_results }}
),

latest_run_results as (
select *
from ordered_run_results
where row_number = 1
)

select unique_id, invocation_id from latest_run_results
{% endset %}
{% set run_invocations_agate = run_query(get_resources_latest_invocation_query) %}
{% do return(elementary.agate_to_dicts(run_invocations_agate)) %}
{% endmacro %}
34 changes: 33 additions & 1 deletion elementary/monitor/fetchers/invocations/invocations.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import Optional
from typing import Dict, List, Optional

from elementary.clients.fetcher.fetcher import FetcherClient
from elementary.monitor.fetchers.invocations.schema import DbtInvocationSchema
Expand All @@ -21,3 +21,35 @@ def get_test_last_invocation(
else:
logger.warning(f"Could not find invocation by filter: {macro_args}")
return DbtInvocationSchema()

def get_invocations_by_ids(
self, invocations_ids: List[str]
) -> List[DbtInvocationSchema]:
invocations_response = self.dbt_runner.run_operation(
macro_name="get_invocations_by_ids",
macro_args={
"ids": invocations_ids,
},
)
invocation_results = (
json.loads(invocations_response[0]) if invocations_response else []
)
invocation_results = [
DbtInvocationSchema(**invocation_result)
for invocation_result in invocation_results
]
return invocation_results

def get_resources_latest_invocation(self) -> Dict[str, str]:
response = self.dbt_runner.run_operation(
macro_name="get_resources_latest_invocation"
)
resources_latest_invocation_results = (
json.loads(response[0]) if response else []
)

resources_latest_invocation_map = {
result["unique_id"]: result["invocation_id"]
for result in resources_latest_invocation_results
}
return resources_latest_invocation_map
4 changes: 4 additions & 0 deletions elementary/monitor/fetchers/invocations/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class DbtInvocationSchema(BaseModel):
command: Optional[str] = None
selected: Optional[str] = None
full_refresh: Optional[bool] = None
job_url: Optional[str] = None
job_name: Optional[str] = None
job_id: Optional[str] = None
orchestrator: Optional[str] = None

@validator("detected_at", pre=True)
def format_detected_at(cls, detected_at):
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/report/fixtures/elementary_output.json

Large diffs are not rendered by default.