Skip to content
This repository has been archived by the owner on Aug 28, 2023. It is now read-only.

Commit

Permalink
Fixed and Updated Plugin (#4)
Browse files Browse the repository at this point in the history
* Updated the Plugin, included Flask integration, fixed ambiguity in breadcrumbs.
  • Loading branch information
tiopi committed Jun 19, 2019
1 parent 184dc8f commit d4c7f11
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 31 deletions.
2 changes: 2 additions & 0 deletions README.md
@@ -1,4 +1,6 @@
[![Build Status](https://travis-ci.com/getsentry/sentry-airflow.svg?branch=master)](https://travis-ci.com/getsentry/sentry-airflow)
[![License](http://img.shields.io/:license-Apache%202-blue.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
[![Sentry Status](https://img.shields.io/badge/sentry-sign_up-white.svg?logo=sentry&style=social)](https://docs.sentry.io)

# Sentry Airflow Plugin

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
@@ -1,3 +1,4 @@
apache-airflow>=1.10.2
sentry-sdk>=0.5.4
celery>=4.3.0
blinker>=1.1
39 changes: 22 additions & 17 deletions sentry_plugin/hooks/sentry_hook.py
Expand Up @@ -6,13 +6,14 @@

from sentry_sdk import configure_scope, add_breadcrumb, init
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.integrations.flask import FlaskIntegration
from sqlalchemy import exc

SCOPE_TAGS = frozenset(("task_id", "dag_id", "execution_date", "operator"))


@provide_session
def get_task_instance_attr(self, task_id, attr, session=None):
def get_task_instance(task, execution_date, session=None):
"""
Retrieve attribute from task.
"""
Expand All @@ -22,17 +23,13 @@ def get_task_instance_attr(self, task_id, attr, session=None):
ti = (
session.query(TI)
.filter(
TI.dag_id == self.dag_id,
TI.task_id == task_id,
TI.execution_date == self.execution_date,
TI.dag_id == task.dag_id,
TI.task_id == task.task_id,
TI.execution_date == execution_date,
)
.all()
.first()
)
if ti:
attr = getattr(ti[0], attr)
else:
attr = None
return attr
return ti


def add_sentry(self, *args, **kwargs):
Expand All @@ -47,14 +44,18 @@ def add_sentry(self, *args, **kwargs):

original_pre_execute = self.task.pre_execute

def add_breadcrumbs(self, context):
for task in self.task.get_flat_relatives(upstream=True):
state = get_task_instance_attr(self, task.task_id, "state")
operation = task.__class__.__name__
task_copy = self.task
execution_date_copy = self.execution_date

def add_breadcrumbs(self=task_copy, context=None):
for task in task_copy.get_flat_relatives(upstream=True):
task_instance = get_task_instance(task, execution_date_copy)
operator = task.__class__.__name__
add_breadcrumb(
category="data",
message="Upstream Task: {}, State: {}, Operation: {}".format(
task.task_id, state, operation
category="upstream_tasks",
message="Upstream Task: {ti.dag_id}.{ti.task_id}, "
"Execution: {ti.execution_date}, State:[{ti.state}], Operation: {operator}".format(
ti=task_instance, operator=operator
),
level="info",
)
Expand All @@ -73,6 +74,8 @@ def __init__(self, sentry_conn_id=None):
ignore_logger("airflow.task")
executor_name = configuration.conf.get("core", "EXECUTOR")

sentry_flask = FlaskIntegration()

if executor_name == "CeleryExecutor":
from sentry_sdk.integrations.celery import CeleryIntegration

Expand All @@ -87,6 +90,8 @@ def __init__(self, sentry_conn_id=None):
)
integrations = [sentry_logging]

integrations.append(sentry_flask)

self.conn_id = None
self.dsn = None

Expand Down
31 changes: 17 additions & 14 deletions tests/test_sentry_hook.py
@@ -1,3 +1,4 @@
import copy
import datetime
import unittest
from unittest import mock
Expand All @@ -11,7 +12,7 @@

from sentry_sdk import configure_scope

from sentry_plugin.hooks.sentry_hook import SentryHook, get_task_instance_attr
from sentry_plugin.hooks.sentry_hook import SentryHook, get_task_instance

EXECUTION_DATE = timezone.utcnow()
DAG_ID = "test_dag"
Expand All @@ -25,19 +26,20 @@
"ds": EXECUTION_DATE.strftime("%Y-%m-%d"),
"operator": OPERATOR,
}
TASK_REPR = "Upstream Task: {}.{}, Execution: {}, State:[{}], Operation: {}".format(
DAG_ID, TASK_ID, EXECUTION_DATE, STATE, OPERATOR
)
CRUMB_DATE = datetime.datetime(2019, 5, 15)
CRUMB = {
"timestamp": CRUMB_DATE,
"type": "default",
"category": "data",
"message": "Upstream Task: {}, State: {}, Operation: {}".format(
TASK_ID, STATE, OPERATOR
),
"category": "upstream_tasks",
"message": TASK_REPR,
"level": "info",
}


class MockQuery(object):
class MockQuery:
"""
Mock Query for when session is called.
"""
Expand All @@ -52,6 +54,9 @@ def filter(self, *args, **kwargs):
def all(self):
return self.arr

def first(self):
return self.arr[0]

def delete(self):
pass

Expand Down Expand Up @@ -82,22 +87,20 @@ def test_add_sentry(self):
for key, value in scope._tags.items():
self.assertEqual(TEST_SCOPE[key], value)

def test_get_task_instance_attr(self):
def test_get_task_instance(self):
"""
Test getting object attributes.
Test adding tags.
"""

state = get_task_instance_attr(self.ti, TASK_ID, "state", self.session)
operator = get_task_instance_attr(self.ti, TASK_ID, "operator", self.session)
self.assertEqual(state, STATE)
self.assertEqual(operator, OPERATOR)
ti = get_task_instance(self.task, EXECUTION_DATE, self.session)
self.assertEqual(ti, self.ti)

@freeze_time(CRUMB_DATE.isoformat())
def test_pre_execute(self):
"""
Test adding breadcrumbs.
"""
self.task.pre_execute(self.ti, context=None)
task_copy = copy.copy(self.task)
task_copy.pre_execute(self.ti, context=None)
self.task.get_flat_relatives.assert_called_once()

with configure_scope() as scope:
Expand Down

0 comments on commit d4c7f11

Please sign in to comment.