Skip to content

Commit

Permalink
Add snowflake to slack operator (#9023)
Browse files Browse the repository at this point in the history
Addition of a new `snowflake_to_slack` operator. The operator
allows you to run a SQL statement against Snowflake and render
the results into a Slack message.

* Add snowflake_to_slack operator to example dag
* Add type hints to operator and clean example dag
  • Loading branch information
simond committed Jun 3, 2020
1 parent 2b45d8f commit 1c9374d
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 12 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ opsgenie http
postgres amazon
sftp ssh
slack http
snowflake slack
========================== ===========================

.. END PACKAGE DEPENDENCIES HERE
Expand Down
3 changes: 3 additions & 0 deletions airflow/providers/dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,8 @@
],
"slack": [
"http"
],
"snowflake": [
"slack"
]
}
32 changes: 21 additions & 11 deletions airflow/providers/snowflake/example_dags/example_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@
from airflow import DAG
from airflow.providers.snowflake.operators.s3_to_snowflake import S3ToSnowflakeTransferOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.operators.snowflake_to_slack import SnowflakeToSlackOperator
from airflow.utils.dates import days_ago

SNOWFLAKE_CONN_ID = os.environ.get('SNOWFLAKE_CONN_ID', 'snowflake_default')
SLACK_CONN_ID = os.environ.get('SLACK_CONN_ID', 'slack_default')
# TODO: should be able to rely on connection's schema, but currently param required by S3ToSnowflakeTransfer
SNOWFLAKE_SCHEMA = os.environ.get('SNOWFLAKE_SCHEMA', 'public')
SNOWFLAKE_STAGE = os.environ.get('SNOWFLAKE_STAGE', 'airflow')
SNOWFLAKE_SAMPLE_TABLE = os.environ.get('SNOWFLAKE_SAMPLE_TABLE', 'snowflake_sample_data.tpch_sf001.orders')
SNOWFLAKE_LOAD_TABLE = os.environ.get('SNOWFLAKE_LOAD_TABLE', 'airflow_example')
SNOWFLAKE_LOAD_JSON_PATH = os.environ.get('SNOWFLAKE_LOAD_PATH', 'example.json')

SNOWFLAKE_SELECT_SQL = f"SELECT * FROM {SNOWFLAKE_SAMPLE_TABLE} LIMIT 100;"
SNOWFLAKE_SLACK_SQL = f"SELECT O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS FROM {SNOWFLAKE_SAMPLE_TABLE} LIMIT 10;"
SNOWFLAKE_SLACK_MESSAGE = "Results in an ASCII table:\n" \
"```{{ results_df | tabulate(tablefmt='pretty', headers='keys') }}```"
SNOWFLAKE_CREATE_TABLE_SQL = f"CREATE TRANSIENT TABLE IF NOT EXISTS {SNOWFLAKE_LOAD_TABLE}(data VARIANT);"

default_args = {
'owner': 'airflow',
'start_date': days_ago(2),
Expand All @@ -47,22 +55,23 @@
select = SnowflakeOperator(
task_id='select',
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="""
SELECT *
FROM {0}
LIMIT 100;
""".format(SNOWFLAKE_SAMPLE_TABLE),
sql=SNOWFLAKE_SELECT_SQL,
dag=dag,
)

slack_report = SnowflakeToSlackOperator(
task_id="slack_report",
sql=SNOWFLAKE_SLACK_SQL,
slack_message=SNOWFLAKE_SLACK_MESSAGE,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
slack_conn_id=SLACK_CONN_ID,
dag=dag
)

create_table = SnowflakeOperator(
task_id='create_table',
snowflake_conn_id='snowflake_conn_id',
sql="""
CREATE TRANSIENT TABLE IF NOT EXISTS {0} (
data VARIANT
);
""".format(SNOWFLAKE_LOAD_TABLE),
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql=SNOWFLAKE_CREATE_TABLE_SQL,
schema=SNOWFLAKE_SCHEMA,
dag=dag,
)
Expand All @@ -78,4 +87,5 @@
dag=dag,
)

select >> slack_report
create_table >> copy_into_table
152 changes: 152 additions & 0 deletions airflow/providers/snowflake/operators/snowflake_to_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Iterable, Mapping, Optional, Union

from pandas import DataFrame
from tabulate import tabulate

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.decorators import apply_defaults


class SnowflakeToSlackOperator(BaseOperator):
"""
Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
results_df }}'. The 'results_df' variable name can be changed by specifing a different
'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.
:param sql: The SQL statement to execute on Snowflake (templated)
:type sql: str
:param slack_message: The templated Slack message to send with the data returned from Snowflake.
You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the
SQL results
:type slack_message: str
:param snowflake_conn_id: The Snowflake connection id
:type snowflake_conn_id: str
:param slack_conn_id: The connection id for Slack
:type slack_conn_id: str
:param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df'
:type results_df_name: str
:param parameters: The parameters to pass to the SQL query
:type parameters: Optional[Union[Iterable, Mapping]]
:param warehouse: The Snowflake virtual warehouse to use to run the SQL query
:type warehouse: Optional[str]
:param database: The Snowflake database to use for the SQL query
:type database: Optional[str]
:param schema: The schema to run the SQL against in Snowflake
:type schema: Optional[str]
:param role: The role to use when connecting to Snowflake
:type role: Optional[str]
:param slack_token: The token to use to authenticate to Slack. If this is not provided, the
'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id
:type slack_token: Optional[str]
"""
template_fields = ['sql', 'slack_message']
template_ext = ['.sql', '.jinja', '.j2']
times_rendered = 0

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
self,
sql: str,
slack_message: str,
snowflake_conn_id: str = 'snowflake_default',
slack_conn_id: str = 'slack_default',
results_df_name: str = 'results_df',
parameters: Optional[Union[Iterable, Mapping]] = None,
warehouse: Optional[str] = None,
database: Optional[str] = None,
schema: Optional[str] = None,
role: Optional[str] = None,
slack_token: Optional[str] = None,
*args, **kwargs
) -> None:
super(SnowflakeToSlackOperator, self).__init__(*args, **kwargs)

self.snowflake_conn_id = snowflake_conn_id
self.sql = sql
self.parameters = parameters
self.warehouse = warehouse
self.database = database
self.schema = schema
self.role = role
self.slack_conn_id = slack_conn_id
self.slack_token = slack_token
self.slack_message = slack_message
self.results_df_name = results_df_name

def _get_query_results(self) -> DataFrame:
snowflake_hook = self._get_snowflake_hook()

self.log.info('Running SQL query: %s', self.sql)
df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters)
return df

def _render_and_send_slack_message(self, context, df) -> None:
# Put the dataframe into the context and render the JINJA template fields
context[self.results_df_name] = df
self.render_template_fields(context)

slack_hook = self._get_slack_hook()
self.log.info('Sending slack message: %s', self.slack_message)
slack_hook.execute()

def _get_snowflake_hook(self) -> SnowflakeHook:
return SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id,
warehouse=self.warehouse, database=self.database,
role=self.role, schema=self.schema)

def _get_slack_hook(self) -> SlackWebhookHook:
return SlackWebhookHook(http_conn_id=self.slack_conn_id, message=self.slack_message,
webhook_token=self.slack_token)

def render_template_fields(self, context, jinja_env=None) -> None:
# If this is the first render of the template fields, exclude slack_message from rendering since
# the snowflake results haven't been retrieved yet.
if self.times_rendered == 0:
fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields)
else:
fields_to_render = self.template_fields

if not jinja_env:
jinja_env = self.get_template_env()

# Add the tabulate library into the JINJA environment
jinja_env.filters['tabulate'] = tabulate

self._do_render_template_fields(self, fields_to_render, context, jinja_env, set())
self.times_rendered += 1

def execute(self, context) -> None:
if not isinstance(self.sql, str):
raise AirflowException("Expected 'sql' parameter should be a string.")
if self.sql is None or self.sql.strip() == "":
raise AirflowException("Expected 'sql' parameter is missing.")
if self.slack_message is None or self.slack_message.strip() == "":
raise AirflowException("Expected 'slack_message' parameter is missing.")

df = self._get_query_results()
self._render_and_send_slack_message(context, df)

self.log.debug('Finished sending Snowflake data to Slack')
3 changes: 2 additions & 1 deletion docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,8 @@ These integrations allow you to perform various operations within various servic
* - `Snowflake <https://www.snowflake.com/>`__
-
- :mod:`airflow.providers.snowflake.hooks.snowflake`
- :mod:`airflow.providers.snowflake.operators.snowflake`
- :mod:`airflow.providers.snowflake.operators.snowflake`,
:mod:`airflow.providers.snowflake.operators.snowflake_to_slack`
-

* - `Vertica <https://www.vertica.com/>`__
Expand Down
80 changes: 80 additions & 0 deletions tests/providers/snowflake/operators/test_snowflake_to_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest import mock

from airflow.models import DAG
from airflow.providers.snowflake.operators.snowflake_to_slack import SnowflakeToSlackOperator
from airflow.utils import timezone

TEST_DAG_ID = 'snowflake_to_slack_unit_test'
DEFAULT_DATE = timezone.datetime(2017, 1, 1)


class TestSnowflakeToSlackOperator(unittest.TestCase):
def setUp(self):
self.example_dag = DAG('unit_test_dag_snowflake_to_slack', start_date=DEFAULT_DATE)

@staticmethod
def _construct_operator(**kwargs):
operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
return operator

@mock.patch('airflow.providers.snowflake.operators.snowflake_to_slack.SnowflakeHook')
@mock.patch('airflow.providers.snowflake.operators.snowflake_to_slack.SlackWebhookHook')
def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_class):
operator_args = {
'snowflake_conn_id': 'snowflake_connection',
'slack_conn_id': 'slack_connection',
'sql': "sql {{ ds }}",
'results_df_name': 'xxxx',
'warehouse': 'test_warehouse',
'database': 'test_database',
'role': 'test_role',
'schema': 'test_schema',
'parameters': ['1', '2', '3'],
'slack_message': 'message: {{ ds }}, {{ xxxx }}',
'slack_token': 'test_token',
'dag': self.example_dag
}
snowflake_to_slack_operator = self._construct_operator(**operator_args)

snowflake_hook = mock_snowflake_hook_class.return_value
snowflake_hook.get_pandas_df.return_value = '1234'
slack_webhook_hook = mock_slack_hook_class.return_value

snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

# Test that the Snowflake hook is instantiated with the right parameters
mock_snowflake_hook_class.assert_called_once_with(database='test_database',
role='test_role',
schema='test_schema',
snowflake_conn_id='snowflake_connection',
warehouse='test_warehouse')

# Test that the get_pandas_df method is executed on the Snowflake hook with the prendered sql and
# correct params
snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3'])

# Test that the Slack hook is instantiated with the right parameters
mock_slack_hook_class.assert_called_once_with(http_conn_id='slack_connection',
message='message: 2017-01-01, 1234',
webhook_token='test_token')

# Test that the Slack hook's execute method gets run once
slack_webhook_hook.execute.assert_called_once()

0 comments on commit 1c9374d

Please sign in to comment.