Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/run-precommit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ on:
push:
paths:
- '**.py'
- '.pre-commit-config.yaml'
- '.github/workflows/run-precommit.yml'
workflow_dispatch:

jobs:
Expand All @@ -17,6 +19,10 @@ jobs:
with:
python-version: '3.8'

# mainly needed so mypy will have the dependencies it needs
- name: Install elementary
run: pip install -e .

- name: Install dev requirements
run: pip install -r dev-requirements.txt

Expand Down
10 changes: 10 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ repos:
name: Check for NO_COMMIT marker
entry: bash -c "git diff --cached -U0 | (! grep NO_COMMIT)"
language: system
require_serial: true
pass_filenames: false

- repo: local
hooks:
- id: mypy
name: mypy
entry: mypy --no-error-summary
Comment thread
elongl marked this conversation as resolved.
language: system
files: ^elementary/.*\.py$
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
recursive-include elementary *.py *.sql *.yml *.html *.md .gitkeep .gitignore
recursive-include elementary *.py *.sql *.yml *.html *.md .gitkeep .gitignore py.typed
5 changes: 5 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ boto3-stubs
google-auth-stubs
ratelimit-stubs
types-google-cloud-ndb
types-protobuf
types-pytz
types-jsonschema
types-PyYAML
types-setuptools
5 changes: 5 additions & 0 deletions elementary/cli/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ def recommend_version_upgrade():
try:
latest_version = package.get_latest_package_version()
current_version = package.get_package_version()
Comment thread
haritamar marked this conversation as resolved.

if not latest_version:
# Failed to obtain the latest version, so skip the check
return

if version.parse(current_version) < version.parse(latest_version):
click.secho(
f"You are using Elementary {current_version}, however version {latest_version} is available.\n"
Expand Down
49 changes: 38 additions & 11 deletions elementary/clients/dbt/slim_dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import json
import os
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union, cast

import dbt.adapters.factory
from dbt.adapters.base import BaseAdapter, BaseConnectionManager
from packaging import version

# IMPORTANT: This must be kept before the rest of the dbt imports
Expand Down Expand Up @@ -82,10 +83,12 @@ def __init__(
**kwargs,
):
super().__init__(project_dir, profiles_dir, target, vars, secret_vars)
self.config = None
self.adapter = None
self.adapter_name = None
self.project_parser = None

self.config: Optional[RuntimeConfig] = None
self.adapter: Optional[BaseAdapter] = None
self.adapter_name: Optional[str] = None
self.connections_manager: Optional[BaseConnectionManager] = None
self.project_parser: Optional[ManifestLoader] = None
self.manifest = None

def _load_runner(
Expand Down Expand Up @@ -126,23 +129,41 @@ def _load_config(self):
self.config = RuntimeConfig.from_args(self.args)

def _load_adapter(self):
if not self.config:
raise Exception("Config not loaded")

register_adapter(self.config)
self.adapter_name = self.config.credentials.type
self.adapter = get_adapter_class_by_name(self.adapter_name)(self.config)
self.adapter.connections.set_connection_name()
self.config.adapter = self.adapter
self.adapter = cast(
BaseAdapter, get_adapter_class_by_name(self.adapter_name)(self.config)
)
Comment thread
elongl marked this conversation as resolved.

self.connections_manager = cast(BaseConnectionManager, self.adapter.connections)
self.connections_manager.set_connection_name()

self.config.adapter = self.adapter # type: ignore[attr-defined]

def _load_manifest(self):
if not self.config:
raise Exception("Config not loaded")
if not self.adapter or not self.connections_manager:
raise Exception("Adapter not loaded")

self.project_parser = ManifestLoader(
self.config,
self.config.load_dependencies(),
self.adapter.connections.set_query_header,
self.connections_manager.set_query_header,
)
self.manifest = self.project_parser.load()
if self.manifest is None:
raise Exception("Failed to load manifest!")
self.manifest.build_flat_graph()
self.project_parser.save_macros_to_adapter(self.adapter)

def _execute_macro(self, macro_name, **kwargs):
if not self.adapter:
raise Exception("Adapter not loaded")

if "." in macro_name:
package_name, actual_macro_name = macro_name.split(".", 1)
else:
Expand All @@ -157,18 +178,24 @@ def _execute_macro(self, macro_name, **kwargs):
)

def close_connection(self):
self.adapter.connections.cleanup_all()
if self.connections_manager:
self.connections_manager.cleanup_all()

def run_operation(
self,
macro_name: str,
capture_output: bool = True,
macro_args: dict = dict(),
macro_args: Optional[dict] = None,
log_errors: bool = True,
vars: Optional[dict] = None,
quiet: bool = False,
**kwargs,
) -> list:
if self.profiles_dir is None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it use dbt's default in this case?
I think this is supported.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall this was broken in dbt 1.5
The profiles dir is parsed externally, so the config is already assumed to contain it.
I already changed back then the __init__ function to make profiles_dir mandatory (only for the slim runner), but couldn't convince mypy here that it can't be None so added this check

raise Exception("profiles_dir must be passed to SlimDbtRunner")

macro_args = macro_args or {}

all_vars = self._get_all_vars(vars)
self._load_runner(
project_dir=self.project_dir,
Expand Down
22 changes: 17 additions & 5 deletions elementary/clients/slack/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
class SlackClient(ABC):
def __init__(
self,
token: Optional[str] = None,
webhook: Optional[str] = None,
tracking: Optional[Tracking] = None,
) -> None:
self.token = token
self.webhook = webhook
):
self.client = self._initial_client()
self.tracking = tracking
self._initial_retry_handlers()
Expand Down Expand Up @@ -73,6 +69,14 @@ def get_user_id_from_email(self, email: str) -> Optional[str]:


class SlackWebClient(SlackClient):
def __init__(
self,
token: str,
tracking: Optional[Tracking] = None,
):
self.token = token
super().__init__(tracking)

def _initial_client(self):
return WebClient(token=self.token)

Expand Down Expand Up @@ -203,6 +207,14 @@ def _handle_send_err(self, err: SlackApiError, channel_name: str) -> bool:


class SlackWebhookClient(SlackClient):
def __init__(
self,
webhook: str,
tracking: Optional[Tracking] = None,
):
self.webhook = webhook
super().__init__(tracking)

def _initial_client(self):
return WebhookClient(
url=self.webhook, default_headers={"Content-type": "application/json"}
Expand Down
2 changes: 1 addition & 1 deletion elementary/clients/slack/slack_message_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def get_slack_status_icon(status: Optional[str]) -> str:
icon = ":x:"
return icon

def get_slack_message(self) -> SlackMessageSchema:
def get_slack_message(self, *args, **kwargs) -> SlackMessageSchema:
return SlackMessageSchema(**self.slack_message)

@staticmethod
Expand Down
25 changes: 14 additions & 11 deletions elementary/monitor/alerts/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from elementary.monitor.alerts.schema.alert import AlertSuppressionSchema
from elementary.utils.json_utils import try_load_json
from elementary.utils.log import get_logger
from elementary.utils.time import convert_utc_iso_format_to_datetime
from elementary.utils.time import DATETIME_FORMAT, convert_utc_iso_format_to_datetime

logger = get_logger(__name__)

Expand Down Expand Up @@ -57,14 +57,17 @@ def __init__(
)
except Exception:
logger.error('Failed to parse "detected_at" field.')
self.detected_at_str = (
self.detected_at.strftime(DATETIME_FORMAT) if self.detected_at else "N/A"
)
self.database_name = database_name
self.schema_name = schema_name
self.owners = owners
self.tags = tags
self.owners: List[str] = owners or []
self.tags: List[str] = tags or []
self.meta = test_meta
self.model_meta = try_load_json(model_meta) or {}
self.status = status
self.subscribers = subscribers
self.subscribers: List[str] = subscribers or []
self.slack_channel = slack_channel
self.alert_suppression_interval = alert_suppression_interval
self.alert_fields = alert_fields
Expand All @@ -81,7 +84,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema:
raise NotImplementedError

@property
def consice_name(self):
def concise_name(self):
return "Alert"


Expand Down Expand Up @@ -124,24 +127,24 @@ def _create_slack_alert(
result: Optional[SlackBlocksType] = None,
configuration: Optional[SlackBlocksType] = None,
) -> SlackMessageSchema:
self._add_title_to_slack_alert(title)
self._add_preview_to_slack_alert(preview)
self._add_details_to_slack_alert(result, configuration)
self.add_title_to_slack_alert(title)
self.add_preview_to_slack_alert(preview)
self.add_details_to_slack_alert(result, configuration)
return super().get_slack_message()

def _add_title_to_slack_alert(self, title_blocks: Optional[SlackBlocksType] = None):
def add_title_to_slack_alert(self, title_blocks: Optional[SlackBlocksType] = None):
if title_blocks:
title = [*title_blocks, self.create_divider_block()]
self._add_always_displayed_blocks(title)

def _add_preview_to_slack_alert(
def add_preview_to_slack_alert(
self, preview_blocks: Optional[SlackBlocksType] = None
):
if preview_blocks:
validated_preview_blocks = self._validate_preview_blocks(preview_blocks)
self._add_blocks_as_attachments(validated_preview_blocks)

def _add_details_to_slack_alert(
def add_details_to_slack_alert(
self,
result: Optional[SlackBlocksType] = None,
configuration: Optional[SlackBlocksType] = None,
Expand Down
4 changes: 2 additions & 2 deletions elementary/monitor/alerts/alerts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict
from dataclasses import dataclass
from typing import Generic, List, Optional, Union
from typing import DefaultDict, Generic, List, Optional, Union

from elementary.monitor.alerts.alert import Alert, AlertType
from elementary.monitor.alerts.malformed import MalformedAlert
Expand Down Expand Up @@ -55,7 +55,7 @@ def get_all(self) -> List[Alert]:
)

def get_elementary_test_count(self):
elementary_test_count = defaultdict(int)
elementary_test_count: DefaultDict[str, int] = defaultdict(int)
for test_result in self.tests.alerts:
if isinstance(test_result, ElementaryTestAlert):
elementary_test_count[test_result.test_name] += 1
Expand Down
30 changes: 18 additions & 12 deletions elementary/monitor/alerts/group_of_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from elementary.monitor.fetchers.alerts.normalized_alert import CHANNEL_KEY
from elementary.utils.json_utils import (
list_of_lists_of_strings_to_comma_delimited_unique_strings,
try_load_json,
)
from elementary.utils.models import get_shortened_model_name

Expand Down Expand Up @@ -109,7 +108,7 @@ def _fill_components_to_alerts(self):
test_errors = []
test_warnings = []
test_failures = []
model_errors = []
model_errors: List[Alert] = []
for alert in self.alerts:
if isinstance(alert, ModelAlert):
model_errors.append(alert)
Expand Down Expand Up @@ -164,14 +163,14 @@ def to_slack(self) -> SlackMessageSchema:
f":{TestErrorComponent.emoji_in_summary}: {TestErrorComponent.name_in_summary}: {len(alert_list)}"
)
title_blocks.append(self._message_builder.create_context_block(fields_summary))
self._message_builder._add_title_to_slack_alert(title_blocks=title_blocks)
self._message_builder.add_title_to_slack_alert(title_blocks=title_blocks)

# attention required : tags, owners, subscribers
preview_blocks = [
self._message_builder.create_text_section_block(block)
for block in self._attention_required_blocks()
] + [self._message_builder.create_empty_section_block()]
self._message_builder._add_preview_to_slack_alert(preview_blocks=preview_blocks)
self._message_builder.add_preview_to_slack_alert(preview_blocks=preview_blocks)

details_blocks = []
for component, alerts_list in self._components_to_alerts.items():
Expand Down Expand Up @@ -237,7 +236,7 @@ def _get_model_error_block_body(self) -> str:
return ""

def _attention_required_blocks(self):
preview_blocks = [f"*{self._db}.{self._schema}.{self._model}*"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?
(Though I'm not sure I understand this piece of code in general.)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this code is pretty horrible
But self._db, self._schema and self._model are only defined in GroupOfAlertsByTable and not in GroupOfAlerts
So only used them there

preview_blocks = []

for component, val in sorted(
self._components_to_attention_required.items(), key=lambda x: x[0].order
Expand Down Expand Up @@ -299,9 +298,9 @@ def _sort_channel_destination(self, default_channel):
if alert.slack_channel:
model_specific_channel_config = alert.slack_channel
break
model_meta_data = try_load_json(alert.model_meta)
if model_meta_data and isinstance(model_meta_data, dict):
model_specific_channel_config = model_meta_data.get(CHANNEL_KEY)

model_specific_channel_config = alert.model_meta.get(CHANNEL_KEY)
if model_specific_channel_config:
Copy link
Copy Markdown
Collaborator Author

@haritamar haritamar Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit unsure if I'm missing anything here, but it does look like model_meta is always parsed in __init__ and is already a dict at this point
The only place I had to fix is the mock data in the tests

break

if model_specific_channel_config:
Expand All @@ -310,7 +309,14 @@ def _sort_channel_destination(self, default_channel):
self.channel_destination = default_channel

def _get_tabulated_row_from_alert(self, alert: Alert) -> str:
return alert.consice_name
return alert.concise_name

def _attention_required_blocks(self):
preview_blocks = [f"*{self._db}.{self._schema}.{self._model}*"]
preview_blocks.extend(
super(GroupOfAlertsByTable, self)._attention_required_blocks()
)
return preview_blocks


class GroupOfAlertsBySingleAlert(GroupOfAlerts):
Expand All @@ -333,10 +339,10 @@ def to_slack(self):
return self.alerts[0].to_slack()

def set_owners(self, owners: List[str]):
self.alerts[0].owners = ", ".join(owners)
self.alerts[0].owners = owners

def set_subscribers(self, subscribers: List[str]):
self.alerts[0].subscribers = ", ".join(subscribers)
self.alerts[0].subscribers = subscribers

def set_tags(self, tags: List[str]):
self.alerts[0].tags = ", ".join(tags)
self.alerts[0].tags = tags
Loading