Skip to content

Commit

Permalink
[AIRFLOW-4543] Update slack operator to support slackclient v2 (#5519)
Browse files Browse the repository at this point in the history
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Kaxil Naik <8811558+kaxil@users.noreply.github.com>
  • Loading branch information
3 people committed May 12, 2020
1 parent 01db738 commit 578fc51
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 110 deletions.
81 changes: 65 additions & 16 deletions airflow/providers/slack/hooks/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
# specific language governing permissions and limitations
# under the License.
"""Hook for Slack"""
from typing import Optional
from typing import Any, Optional

from slackclient import SlackClient
from slack import WebClient
from slack.errors import SlackClientError # pylint: disable=E0611

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
Expand All @@ -27,40 +28,88 @@
# noinspection PyAbstractClass
class SlackHook(BaseHook):
"""
Creates a Slack connection, to be used for calls.
Takes both Slack API token directly and connection that has Slack API token.
If both supplied, Slack API token will be used.
Exposes also the rest of slack.WebClient args.
Examples:
.. code-block:: python
# Create hook
slack_hook = SlackHook(token="xxx") # or slack_hook = SlackHook(slack_conn_id="slack")
# Call generic API with parameters (errors are handled by hook)
# For more details check https://api.slack.com/methods/chat.postMessage
slack_hook.call("chat.postMessage", json={"channel": "#random", "text": "Hello world!"})
# Call method from Slack SDK (you have to handle errors yourself)
# For more details check https://slack.dev/python-slackclient/basic_usage.html#sending-a-message
slack_hook.client.chat_postMessage(channel="#random", text="Hello world!")
:param token: Slack API token
:type token: str
:param slack_conn_id: connection that has Slack API token in the password field
:type slack_conn_id: str
:param use_session: A boolean specifying if the client should take advantage of
connection pooling. Default is True.
:type use_session: bool
:param base_url: A string representing the Slack API base URL. Default is
``https://www.slack.com/api/``
:type base_url: str
:param timeout: The maximum number of seconds the client will wait
to connect and receive a response from Slack. Default is 30 seconds.
:type timeout: int
"""
def __init__(self, token: Optional[str] = None, slack_conn_id: Optional[str] = None) -> None:

def __init__(
self,
token: Optional[str] = None,
slack_conn_id: Optional[str] = None,
**client_args: Any,
) -> None:
super().__init__()
self.token = self.__get_token(token, slack_conn_id)
self.client = WebClient(token, **client_args)

def __get_token(self, token, slack_conn_id):
if token is not None:
return token
elif slack_conn_id is not None:

if slack_conn_id is not None:
conn = self.get_connection(slack_conn_id)

if not getattr(conn, 'password', None):
raise AirflowException('Missing token(password) in Slack connection')
return conn.password
else:
raise AirflowException('Cannot get token: '
'No valid Slack token nor slack_conn_id supplied.')

def call(self, method: str, api_params: dict) -> None:
raise AirflowException('Cannot get token: '
'No valid Slack token nor slack_conn_id supplied.')

def call(self, *args, **kwargs) -> None:
"""
Calls the Slack client.
Calls Slack WebClient `WebClient.api_call` with given arguments.
:param method: method
:param api_params: parameters of the API
:param api_method: The target Slack API method. e.g. 'chat.postMessage'. Required.
:type api_method: str
:param http_verb: HTTP Verb. Optional (defaults to 'POST')
:type http_verb: str
:param files: Files to multipart upload. e.g. {imageORfile: file_objectORfile_path}
:type files: dict
:param data: The body to attach to the request. If a dictionary is provided,
form-encoding will take place. Optional.
:type data: dict or aiohttp.FormData
:param params: The URL parameters to append to the URL. Optional.
:type params: dict
:param json: JSON for the body to attach to the request. Optional.
:type json: dict
"""
slack_client = SlackClient(self.token)
return_code = slack_client.api_call(method, **api_params)

if not return_code['ok']:
msg = "Slack API call failed ({})".format(return_code['error'])
return_code = self.client.api_call(*args, **kwargs)

try:
return_code.validate()
except SlackClientError as exc:
msg = f"Slack API call failed ({exc})"
raise AirflowException(msg)
35 changes: 23 additions & 12 deletions airflow/providers/slack/operators/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import json
from typing import Dict, List, Optional

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.utils.decorators import apply_defaults
Expand All @@ -30,14 +29,17 @@ class SlackAPIOperator(BaseOperator):
Base Slack Operator
The SlackAPIPostOperator is derived from this operator.
In the future additional Slack API Operators will be derived from this class as well
Only one of `slack_conn_id` and `token` is required.
:param slack_conn_id: Slack connection ID which its password is Slack API token
:param slack_conn_id: Slack connection ID which its password is Slack API token. Optional
:type slack_conn_id: str
:param token: Slack API token (https://api.slack.com/web)
:param token: Slack API token (https://api.slack.com/web). Optional
:type token: str
:param method: The Slack API Method to Call (https://api.slack.com/methods)
:param method: The Slack API Method to Call (https://api.slack.com/methods). Optional
:type method: str
:param api_params: API Method call parameters (https://api.slack.com/methods)
:param api_params: API Method call parameters (https://api.slack.com/methods). Optional
:type api_params: dict
:param client_args: Slack Hook parameters. Optional. Check airflow.providers.slack.hooks.SlackHook
:type api_params: dict
"""

Expand All @@ -50,12 +52,6 @@ def __init__(self,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)

if token is None and slack_conn_id is None:
raise AirflowException('No valid Slack token nor slack_conn_id supplied.')
if token is not None and slack_conn_id is not None:
raise AirflowException('Cannot determine Slack credential '
'when both token and slack_conn_id are supplied.')

self.token = token # type: Optional[str]
self.slack_conn_id = slack_conn_id # type: Optional[str]

Expand All @@ -73,6 +69,9 @@ def construct_api_call_params(self):
which sets self.api_call_params with a dict of
API call parameters (https://api.slack.com/methods)
"""
raise NotImplementedError(
"SlackAPIOperator should not be used directly. Chose one of the subclasses instead"
)

def execute(self, **kwargs):
"""
Expand All @@ -82,13 +81,25 @@ def execute(self, **kwargs):
if not self.api_params:
self.construct_api_call_params()
slack = SlackHook(token=self.token, slack_conn_id=self.slack_conn_id)
slack.call(self.method, self.api_params)
slack.call(self.method, json=self.api_params)


class SlackAPIPostOperator(SlackAPIOperator):
"""
Posts messages to a slack channel
Examples:
.. code-block:: python
slack = SlackAPIPostOperator(
task_id="post_hello",
dag=dag,
token="XXX",
text="hello there!",
channel="#random",
)
:param channel: channel in which to post message on slack name (#general) or
ID (C12318391). (templated)
:type channel: str
Expand Down
11 changes: 8 additions & 3 deletions requirements/requirements-python3.6.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Unidecode==1.1.1
WTForms==2.3.1
Werkzeug==0.16.1
adal==1.2.3
aiohttp==3.6.2
alabaster==0.7.12
alembic==1.4.2
amqp==2.5.2
Expand All @@ -45,6 +46,7 @@ argcomplete==1.11.1
asn1crypto==1.3.0
astroid==2.3.3
async-generator==1.10
async-timeout==3.0.1
atlasclient==1.0.0
attrs==19.3.0
aws-sam-translator==1.23.0
Expand Down Expand Up @@ -120,7 +122,7 @@ fastavro==0.23.3
filelock==3.0.12
fissix==19.2b1
flake8-colors==0.1.6
flake8==3.8.0
flake8==3.8.1
flaky==3.6.1
flask-swagger==0.2.13
flower==0.9.4
Expand Down Expand Up @@ -174,6 +176,7 @@ httplib2==0.17.3
humanize==0.5.1
hvac==0.10.1
identify==1.4.15
idna-ssl==1.1.0
idna==2.9
ijson==2.6.1
imagesize==1.2.0
Expand Down Expand Up @@ -216,6 +219,7 @@ msgpack==1.0.0
msrest==0.6.13
msrestazure==0.6.3
multi-key-dict==2.0.3
multidict==4.7.5
mypy-extensions==0.4.3
mypy==0.770
mysql-connector-python==8.0.18
Expand Down Expand Up @@ -314,10 +318,10 @@ setproctitle==1.1.10
sh==1.13.1
simple-salesforce==1.0.0
six==1.14.0
slackclient==1.3.2
slackclient==2.5.0
smmap==3.0.4
snowballstemmer==2.0.0
snowflake-connector-python==2.2.5
snowflake-connector-python==2.2.6
snowflake-sqlalchemy==1.2.3
sortedcontainers==2.1.0
soupsieve==2.0
Expand Down Expand Up @@ -369,6 +373,7 @@ wrapt==1.12.1
xmltodict==0.12.0
yamllint==1.23.0
yandexcloud==0.35.0
yarl==1.4.2
zdesk==2.7.1
zict==2.0.0
zipp==3.1.0
10 changes: 7 additions & 3 deletions requirements/requirements-python3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Unidecode==1.1.1
WTForms==2.3.1
Werkzeug==0.16.1
adal==1.2.3
aiohttp==3.6.2
alabaster==0.7.12
alembic==1.4.2
amqp==2.5.2
Expand All @@ -45,6 +46,7 @@ argcomplete==1.11.1
asn1crypto==1.3.0
astroid==2.3.3
async-generator==1.10
async-timeout==3.0.1
atlasclient==1.0.0
attrs==19.3.0
aws-sam-translator==1.23.0
Expand Down Expand Up @@ -119,7 +121,7 @@ fastavro==0.23.3
filelock==3.0.12
fissix==19.2b1
flake8-colors==0.1.6
flake8==3.8.0
flake8==3.8.1
flaky==3.6.1
flask-swagger==0.2.13
flower==0.9.4
Expand Down Expand Up @@ -213,6 +215,7 @@ msgpack==1.0.0
msrest==0.6.13
msrestazure==0.6.3
multi-key-dict==2.0.3
multidict==4.7.5
mypy-extensions==0.4.3
mypy==0.770
mysql-connector-python==8.0.18
Expand Down Expand Up @@ -310,10 +313,10 @@ setproctitle==1.1.10
sh==1.13.1
simple-salesforce==1.0.0
six==1.14.0
slackclient==1.3.2
slackclient==2.5.0
smmap==3.0.4
snowballstemmer==2.0.0
snowflake-connector-python==2.2.5
snowflake-connector-python==2.2.6
snowflake-sqlalchemy==1.2.3
sortedcontainers==2.1.0
soupsieve==2.0
Expand Down Expand Up @@ -364,6 +367,7 @@ wrapt==1.12.1
xmltodict==0.12.0
yamllint==1.23.0
yandexcloud==0.35.0
yarl==1.4.2
zdesk==2.7.1
zict==2.0.0
zipp==3.1.0
2 changes: 1 addition & 1 deletion requirements/setup-3.6.md5
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4311e187f8fc829ca83a33356a9d1947 /opt/airflow/setup.py
22f14063a514a325525c530b7c30f562 /opt/airflow/setup.py
2 changes: 1 addition & 1 deletion requirements/setup-3.7.md5
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4311e187f8fc829ca83a33356a9d1947 /opt/airflow/setup.py
22f14063a514a325525c530b7c30f562 /opt/airflow/setup.py
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
]
singularity = ['spython>=0.0.56']
slack = [
'slackclient>=1.0.0,<2.0.0',
'slackclient>=2.0.0,<3.0.0',
]
snowflake = [
'snowflake-connector-python>=1.5.2',
Expand Down
Loading

0 comments on commit 578fc51

Please sign in to comment.