Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[AIRFLOW-1932] Add GCP Pub/Sub Pull and Ack
Adds the necessary hooks to support pulling and acknowleding Pub/Sub messages. This is implemented by adding a PubSubPullSensor operator that will attempt to retrieve messages from a specified subscription and will meet its criteria when a message or messages is available. The configuration allows those messages to be acknowledged immediately. In addition, the messages are passed to downstream workers via the return value of operator's execute method. An end-to-end example is included showing topic and subscription creation, parallel tasks to publish and pull messages, and a downstream chain to echo the contents of each message before cleaning up. Closes #2885 from prodonjs/airflow-1932-pr
- Loading branch information
1 parent
33b3f6d
commit 6645218
Showing
8 changed files
with
449 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
""" | ||
This example DAG demonstrates how the PubSub*Operators and PubSubPullSensor | ||
can be used to trigger dependant tasks upon receipt of a Pub/Sub message. | ||
NOTE: project_id must be updated to a GCP project ID accessible with the | ||
Google Default Credentials on the machine running the workflow | ||
""" | ||
from __future__ import unicode_literals | ||
from base64 import b64encode | ||
|
||
import datetime | ||
|
||
from airflow import DAG | ||
from airflow.operators.bash_operator import BashOperator | ||
from airflow.contrib.operators.pubsub_operator import ( | ||
PubSubTopicCreateOperator, PubSubSubscriptionCreateOperator, | ||
PubSubPublishOperator, PubSubTopicDeleteOperator, | ||
PubSubSubscriptionDeleteOperator | ||
) | ||
from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor | ||
from airflow.utils import dates | ||
|
||
project = 'your-project-id' # Change this to your own GCP project_id | ||
topic = 'example-topic' # Cloud Pub/Sub topic | ||
subscription = 'subscription-to-example-topic' # Cloud Pub/Sub subscription | ||
# Sample messages to push/pull | ||
messages = [ | ||
{'data': b64encode(b'Hello World')}, | ||
{'data': b64encode(b'Another message')}, | ||
{'data': b64encode(b'A final message')} | ||
] | ||
|
||
default_args = { | ||
'owner': 'airflow', | ||
'depends_on_past': False, | ||
'start_date': dates.days_ago(2), | ||
'email': ['airflow@example.com'], | ||
'email_on_failure': False, | ||
'email_on_retry': False, | ||
'project': project, | ||
'topic': topic, | ||
'subscription': subscription, | ||
} | ||
|
||
|
||
echo_template = ''' | ||
{% for m in task_instance.xcom_pull(task_ids='pull-messages') %} | ||
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}" | ||
{% endfor %} | ||
''' | ||
|
||
with DAG('pubsub-end-to-end', default_args=default_args, | ||
schedule_interval=datetime.timedelta(days=1)) as dag: | ||
t1 = PubSubTopicCreateOperator(task_id='create-topic') | ||
t2 = PubSubSubscriptionCreateOperator( | ||
task_id='create-subscription', topic_project=project, | ||
subscription=subscription) | ||
t3 = PubSubPublishOperator( | ||
task_id='publish-messages', messages=messages) | ||
t4 = PubSubPullSensor(task_id='pull-messages', ack_messages=True) | ||
t5 = BashOperator(task_id='echo-pulled-messages', | ||
bash_command=echo_template) | ||
t6 = PubSubSubscriptionDeleteOperator(task_id='delete-subscription') | ||
t7 = PubSubTopicDeleteOperator(task_id='delete-topic') | ||
|
||
t1 >> t2 >> t3 | ||
t2 >> t4 >> t5 >> t6 >> t7 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook | ||
from airflow.operators.sensors import BaseSensorOperator | ||
from airflow.utils.decorators import apply_defaults | ||
|
||
|
||
class PubSubPullSensor(BaseSensorOperator): | ||
"""Pulls messages from a PubSub subscription and passes them through XCom. | ||
This sensor operator will pull up to ``max_messages`` messages from the | ||
specified PubSub subscription. When the subscription returns messages, | ||
the poke method's criteria will be fulfilled and the messages will be | ||
returned from the operator and passed through XCom for downstream tasks. | ||
If ``ack_messages`` is set to True, messages will be immediately | ||
acknowledged before being returned, otherwise, downstream tasks will be | ||
responsible for acknowledging them. | ||
``project`` and ``subscription`` are templated so you can use | ||
variables in them. | ||
""" | ||
template_fields = ['project', 'subscription'] | ||
ui_color = '#ff7f50' | ||
|
||
@apply_defaults | ||
def __init__( | ||
self, | ||
project, | ||
subscription, | ||
max_messages=5, | ||
return_immediately=False, | ||
ack_messages=False, | ||
gcp_conn_id='google_cloud_default', | ||
delegate_to=None, | ||
*args, | ||
**kwargs): | ||
""" | ||
:param project: the GCP project ID for the subscription (templated) | ||
:type project: string | ||
:param subscription: the Pub/Sub subscription name. Do not include the | ||
full subscription path. | ||
:type subscription: string | ||
:param max_messages: The maximum number of messages to retrieve per | ||
PubSub pull request | ||
:type max_messages: int | ||
:param return_immediately: If True, instruct the PubSub API to return | ||
immediately if no messages are available for delivery. | ||
:type return_immediately: bool | ||
:param ack_messages: If True, each message will be acknowledged | ||
immediately rather than by any downstream tasks | ||
:type ack_messages: bool | ||
:param gcp_conn_id: The connection ID to use connecting to | ||
Google Cloud Platform. | ||
:type gcp_conn_id: string | ||
:param delegate_to: The account to impersonate, if any. | ||
For this to work, the service account making the request | ||
must have domain-wide delegation enabled. | ||
:type delegate_to: string | ||
""" | ||
super(PubSubPullSensor, self).__init__(*args, **kwargs) | ||
|
||
self.gcp_conn_id = gcp_conn_id | ||
self.delegate_to = delegate_to | ||
self.project = project | ||
self.subscription = subscription | ||
self.max_messages = max_messages | ||
self.return_immediately = return_immediately | ||
self.ack_messages = ack_messages | ||
|
||
self._messages = None | ||
|
||
def execute(self, context): | ||
"""Overridden to allow messages to be passed""" | ||
super(PubSubPullSensor, self).execute(context) | ||
return self._messages | ||
|
||
def poke(self, context): | ||
hook = PubSubHook(gcp_conn_id=self.gcp_conn_id, | ||
delegate_to=self.delegate_to) | ||
self._messages = hook.pull( | ||
self.project, self.subscription, self.max_messages, | ||
self.return_immediately) | ||
if self._messages and self.ack_messages: | ||
if self.ack_messages: | ||
ack_ids = [m['ackId'] for m in self._messages if m.get('ackId')] | ||
hook.acknowledge(self.project, self.subscription, ack_ids) | ||
return self._messages |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.