Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-XXX] Add How-To-Guide to GCP PubSub #401

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ It is required now to pass key-word only arguments to `PubSub` hook.
These changes are not backward compatible.

Affected components:
* airflow.gcp.hooks.pubsub.PubSubHook
* airflow.gcp.operators.pubsub.PubSubTopicCreateOperator
* airflow.gcp.operators.pubsub.PubSubSubscriptionCreateOperator
* airflow.gcp.operators.pubsub.PubSubTopicDeleteOperator
* airflow.gcp.operators.pubsub.PubSubSubscriptionDeleteOperator
* airflow.gcp.operators.pubsub.PubSubPublishOperator
* airflow.gcp.sensors.pubsub.PubSubPullSensor
* airflow.providers.google.cloud.hooks.pubsub.PubSubHook
* airflow.providers.google.cloud.operators.pubsub.PubSubTopicCreateOperator
* airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionCreateOperator
* airflow.providers.google.cloud.operators.pubsub.PubSubTopicDeleteOperator
* airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionDeleteOperator
* airflow.providers.google.cloud.operators.pubsub.PubSubPublishOperator
* airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor

### Changes to `aws_default` Connection's default region

Expand Down Expand Up @@ -225,7 +225,7 @@ The following table shows changes in import paths.
|airflow.contrib.hooks.gcp_kms_hook.GoogleCloudKMSHook |airflow.gcp.hooks.kms.GoogleCloudKMSHook |
|airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook |airflow.gcp.hooks.mlengine.MLEngineHook |
|airflow.contrib.hooks.gcp_natural_language_hook.CloudNaturalLanguageHook |airflow.providers.google.cloud.hooks.natural_language.CloudNaturalLanguageHook |
|airflow.contrib.hooks.gcp_pubsub_hook.PubSubHook |airflow.gcp.hooks.pubsub.PubSubHook |
|airflow.contrib.hooks.gcp_pubsub_hook.PubSubHook |airflow.providers.google.cloud.hooks.pubsub.PubSubHook |
|airflow.contrib.hooks.gcp_speech_to_text_hook.GCPSpeechToTextHook |airflow.gcp.hooks.speech_to_text.CloudSpeechToTextHook |
|airflow.contrib.hooks.gcp_spanner_hook.CloudSpannerHook |airflow.gcp.hooks.spanner.SpannerHook |
|airflow.contrib.hooks.gcp_speech_to_text_hook.GCPSpeechToTextHook |airflow.gcp.hooks.speech_to_text.GCPSpeechToTextHook |
Expand Down Expand Up @@ -377,19 +377,19 @@ The following table shows changes in import paths.
|airflow.contrib.operators.mssql_to_gcs.MsSqlToGoogleCloudStorageOperator |airflow.operators.mssql_to_gcs.MsSqlToGoogleCloudStorageOperator |
|airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator |airflow.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator |
|airflow.contrib.operators.postgres_to_gcs_operator.PostgresToGoogleCloudStorageOperator |airflow.operators.postgres_to_gcs.PostgresToGoogleCloudStorageOperator |
|airflow.contrib.operators.pubsub_operator.PubSubPublishOperator |airflow.gcp.operators.pubsub.PubSubPublishOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator |airflow.gcp.operators.pubsub.PubSubSubscriptionCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator |airflow.gcp.operators.pubsub.PubSubSubscriptionDeleteOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator |airflow.gcp.operators.pubsub.PubSubTopicCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator |airflow.gcp.operators.pubsub.PubSubTopicDeleteOperator |
|airflow.contrib.operators.pubsub_operator.PubSubPublishOperator |airflow.providers.google.cloud.operators.pubsub.PubSubPublishOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator |airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator |airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionDeleteOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator |airflow.providers.google.cloud.operators.pubsub.PubSubTopicCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator |airflow.providers.google.cloud.operators.pubsub.PubSubTopicDeleteOperator |
|airflow.contrib.operators.sql_to_gcs.BaseSQLToGoogleCloudStorageOperator |airflow.operators.sql_to_gcs.BaseSQLToGoogleCloudStorageOperator |
|airflow.contrib.sensors.bigquery_sensor.BigQueryTableSensor |airflow.gcp.sensors.bigquery.BigQueryTableSensor |
|airflow.contrib.sensors.gcp_transfer_sensor.GCPTransferServiceWaitForJobStatusSensor |airflow.gcp.sensors.cloud_storage_transfer_service.GCPTransferServiceWaitForJobStatusSensor |
|airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageObjectSensor |airflow.gcp.sensors.gcs.GoogleCloudStorageObjectSensor |
|airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageObjectUpdatedSensor |airflow.gcp.sensors.gcs.GoogleCloudStorageObjectUpdatedSensor |
|airflow.contrib.sensors.gcs_sensor.GoogleCloudStoragePrefixSensor |airflow.gcp.sensors.gcs.GoogleCloudStoragePrefixSensor |
|airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageUploadSessionCompleteSensor |airflow.gcp.sensors.gcs.GoogleCloudStorageUploadSessionCompleteSensor |
|airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor |airflow.gcp.sensors.pubsub.PubSubPullSensor |
|airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor |airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor |


### Remove provide_context
Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/hooks/gcp_pubsub_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.gcp.hooks.pubsub`."""
"""This module is deprecated. Please use `airflow.providers.google.cloud.hooks.pubsub`."""

import warnings

# pylint: disable=unused-import
from airflow.gcp.hooks.pubsub import PubSubException, PubSubHook # noqa
from airflow.providers.google.cloud.hooks.pubsub import PubSubException, PubSubHook # noqa

warnings.warn(
"This module is deprecated. Please use `airflow.gcp.hooks.pubsub`.",
"This module is deprecated. Please use `airflow.providers.google.cloud.hooks.pubsub`.",
DeprecationWarning, stacklevel=2
)
6 changes: 3 additions & 3 deletions airflow/contrib/operators/pubsub_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.gcp.operators.pubsub`."""
"""This module is deprecated. Please use `airflow.providers.google.cloud.operators.pubsub`."""

import warnings

# pylint: disable=unused-import
from airflow.gcp.operators.pubsub import ( # noqa
from airflow.providers.google.cloud.operators.pubsub import ( # noqa
PubSubPublishOperator, PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
)

warnings.warn(
"This module is deprecated. Please use `airflow.gcp.operators.pubsub`.",
"This module is deprecated. Please use `airflow.providers.google.cloud.operators.pubsub`.",
DeprecationWarning, stacklevel=2
)
6 changes: 3 additions & 3 deletions airflow/contrib/sensors/pubsub_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.gcp.sensors.pubsub`."""
"""This module is deprecated. Please use `airflow.providers.google.cloud.sensors.pubsub`."""

import warnings

# pylint: disable=unused-import
from airflow.gcp.sensors.pubsub import PubSubPullSensor # noqa
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor # noqa

warnings.warn(
"This module is deprecated. Please use `airflow.gcp.sensors.pubsub`.",
"This module is deprecated. Please use `airflow.providers.google.cloud.sensors.pubsub`.",
DeprecationWarning, stacklevel=2
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,45 @@

import airflow
from airflow import models
from airflow.gcp.operators.pubsub import (
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.operators.pubsub import (
PubSubPublishOperator, PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
)
from airflow.gcp.sensors.pubsub import PubSubPullSensor
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
TOPIC = "PubSubTestTopic"
MESSAGE = {"data": b"Tool", "attributes": {"name": "wrench", "mass": "1.3kg", "count": "3"}}

default_args = {"start_date": airflow.utils.dates.days_ago(1)}

# [START howto_operator_gcp_pubsub_pull_messages_result_cmd]
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
# [END howto_operator_gcp_pubsub_pull_messages_result_cmd]

with models.DAG(
"example_gcp_pubsub",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as example_dag:
# [START howto_operator_gcp_pubsub_create_topic]
create_topic = PubSubTopicCreateOperator(
task_id="create_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
)
# [END howto_operator_gcp_pubsub_create_topic]

# [START howto_operator_gcp_pubsub_create_subscription]
subscribe_task = PubSubSubscriptionCreateOperator(
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC
)
# [END howto_operator_gcp_pubsub_create_subscription]

# [START howto_operator_gcp_pubsub_pull_message]
subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"

pull_messages = PubSubPullSensor(
Expand All @@ -63,26 +71,36 @@
project_id=GCP_PROJECT_ID,
subscription=subscription,
)
# [END howto_operator_gcp_pubsub_pull_message]

# [START howto_operator_gcp_pubsub_pull_messages_result]
pull_messages_result = BashOperator(
task_id="pull_messages_result", bash_command=echo_cmd
)
# [END howto_operator_gcp_pubsub_pull_messages_result]

# [START howto_operator_gcp_pubsub_publish]
publish_task = PubSubPublishOperator(
task_id="publish_task",
project_id=GCP_PROJECT_ID,
topic=TOPIC,
messages=[MESSAGE, MESSAGE, MESSAGE],
)
# [END howto_operator_gcp_pubsub_publish]

# [START howto_operator_gcp_pubsub_unsubscribe]
unsubscribe_task = PubSubSubscriptionDeleteOperator(
task_id="unsubscribe_task",
project_id=GCP_PROJECT_ID,
subscription="{{ task_instance.xcom_pull('subscribe_task') }}",
)
# [END howto_operator_gcp_pubsub_unsubscribe]

# [START howto_operator_gcp_pubsub_delete_topic]
delete_topic = PubSubTopicDeleteOperator(
task_id="delete_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
)
# [END howto_operator_gcp_pubsub_delete_topic]

create_topic >> subscribe_task >> publish_task
subscribe_task >> pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@
from google.api_core.retry import Retry
from google.cloud.pubsub_v1.types import Duration, MessageStoragePolicy, PushConfig

from airflow.gcp.hooks.pubsub import PubSubHook
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
from airflow.utils.decorators import apply_defaults


class PubSubTopicCreateOperator(BaseOperator):
"""Create a PubSub topic.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubTopicCreateOperator`

By default, if the topic already exists, this operator will
not cause the DAG to fail. ::

Expand Down Expand Up @@ -166,6 +170,10 @@ def execute(self, context):
class PubSubSubscriptionCreateOperator(BaseOperator):
"""Create a PubSub subscription.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubSubscriptionCreateOperator`

By default, the subscription will be created in ``topic_project``. If
``subscription_project`` is specified and the GCP credentials allow, the
Subscription can be created in a different project from its topic.
Expand Down Expand Up @@ -354,6 +362,10 @@ def execute(self, context):
class PubSubTopicDeleteOperator(BaseOperator):
"""Delete a PubSub topic.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubTopicDeleteOperator`

By default, if the topic does not exist, this operator will
not cause the DAG to fail. ::

Expand Down Expand Up @@ -462,6 +474,10 @@ def execute(self, context):
class PubSubSubscriptionDeleteOperator(BaseOperator):
"""Delete a PubSub subscription.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubSubscriptionDeleteOperator`

By default, if the subscription does not exist, this operator will
not cause the DAG to fail. ::

Expand Down Expand Up @@ -572,6 +588,10 @@ def execute(self, context):
class PubSubPublishOperator(BaseOperator):
"""Publish messages to a PubSub topic.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubPublishOperator`

Each Task publishes all provided messages to the same topic
in a single GCP project. If the topic does not exist, this
task will fail. ::
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/google/cloud/sensors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@

from google.protobuf.json_format import MessageToDict

from airflow.gcp.hooks.pubsub import PubSubHook
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class PubSubPullSensor(BaseSensorOperator):
"""Pulls messages from a PubSub subscription and passes them through XCom.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubPullSensor`

This sensor operator will pull up to ``max_messages`` messages from the
specified PubSub subscription. When the subscription returns messages,
the poke method's criteria will be fulfilled and the messages will be
Expand Down
2 changes: 1 addition & 1 deletion docs/autoapi_templates/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ All operators are in the following packages:

airflow/providers/google/marketing_platform/operators/index

airflow/providers/google/cloud/operators/index
airflow/providers/google/cloud/sensors/index

airflow/providers/google/marketing_platform/sensors/index

Expand Down
Loading