From 93429e0bdd15d95fbe89491ac8de46485b3c6f43 Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Sun, 14 Jan 2024 12:36:16 -0800 Subject: [PATCH] docs(samples): Add Dataflow to Pub/Sub snippet (#11104) * docs(samples): Add Dataflow to Pub/Sub snippet Adds a snippet that shows how to write messages to Pub/Sub from Dataflow. * Fix region tag * Skip Python 3.12 test see https://github.com/apache/beam/issues/29149 * Fix copyright date * Improve IT by reading messages from Pub/Sub * Incorporate review feedback * Fix testfor 3.8 * Fix linter error --- dataflow/snippets/batch_write_storage.py | 9 +- dataflow/snippets/noxfile_config.py | 42 ++++++++ .../tests/test_batch_write_storage.py | 4 +- dataflow/snippets/tests/test_write_pubsub.py | 96 +++++++++++++++++++ dataflow/snippets/write_pubsub.py | 75 +++++++++++++++ 5 files changed, 222 insertions(+), 4 deletions(-) create mode 100644 dataflow/snippets/noxfile_config.py create mode 100644 dataflow/snippets/tests/test_write_pubsub.py create mode 100644 dataflow/snippets/write_pubsub.py diff --git a/dataflow/snippets/batch_write_storage.py b/dataflow/snippets/batch_write_storage.py index 85cee50cf6d5..b86a1923df8c 100644 --- a/dataflow/snippets/batch_write_storage.py +++ b/dataflow/snippets/batch_write_storage.py @@ -14,17 +14,22 @@ # limitations under the License. # [START dataflow_batch_write_to_storage] +import argparse +from typing import List + import apache_beam as beam from apache_beam.io.textio import WriteToText from apache_beam.options.pipeline_options import PipelineOptions +from typing_extensions import Self + -def write_to_cloud_storage(argv=None): +def write_to_cloud_storage(argv : List[str] = None) -> None: # Parse the pipeline options passed into the application. class MyOptions(PipelineOptions): @classmethod # Define a custom pipeline option that specfies the Cloud Storage bucket. - def _add_argparse_args(cls, parser): + def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None: parser.add_argument("--output", required=True) wordsList = ["1", "2", "3", "4"] diff --git a/dataflow/snippets/noxfile_config.py b/dataflow/snippets/noxfile_config.py new file mode 100644 index 000000000000..3c61ecbdff31 --- /dev/null +++ b/dataflow/snippets/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be imported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.12"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} diff --git a/dataflow/snippets/tests/test_batch_write_storage.py b/dataflow/snippets/tests/test_batch_write_storage.py index a8c2d8d1fc37..9959a7545d5a 100644 --- a/dataflow/snippets/tests/test_batch_write_storage.py +++ b/dataflow/snippets/tests/test_batch_write_storage.py @@ -26,7 +26,7 @@ @pytest.fixture(scope="function") -def setup_and_teardown(): +def setup_and_teardown() -> None: try: bucket = storage_client.create_bucket(bucket_name) yield @@ -34,7 +34,7 @@ def setup_and_teardown(): bucket.delete(force=True) -def test_write_to_cloud_storage(setup_and_teardown): +def test_write_to_cloud_storage(setup_and_teardown: None) -> None: sys.argv = ['', f'--output=gs://{bucket_name}/output/out-'] write_to_cloud_storage() diff --git a/dataflow/snippets/tests/test_write_pubsub.py b/dataflow/snippets/tests/test_write_pubsub.py new file mode 100644 index 000000000000..7c2a819ee129 --- /dev/null +++ b/dataflow/snippets/tests/test_write_pubsub.py @@ -0,0 +1,96 @@ +# !/usr/bin/env python +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +from unittest.mock import patch +import uuid + +from google.cloud import pubsub_v1 + +import pytest + +from ..write_pubsub import write_to_pubsub + + +topic_id = f'test-topic-{uuid.uuid4()}' +subscription_id = f'{topic_id}-sub' +project_id = os.environ["GOOGLE_CLOUD_PROJECT"] + +publisher = pubsub_v1.PublisherClient() +subscriber = pubsub_v1.SubscriberClient() + +NUM_MESSAGES = 4 +TIMEOUT = 60 * 5 + + +@pytest.fixture(scope="function") +def setup_and_teardown() -> None: + topic_path = publisher.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + try: + publisher.create_topic(request={"name": topic_path}) + subscriber.create_subscription( + request={"name": subscription_path, "topic": topic_path} + ) + yield + finally: + subscriber.delete_subscription( + request={"subscription": subscription_path}) + publisher.delete_topic(request={"topic": topic_path}) + + +def read_messages() -> None: + received_messages = [] + ack_ids = [] + + # Read messages from Pub/Sub. It might be necessary to read multiple + # batches, Use a timeout value to avoid potentially looping forever. + start_time = time.time() + while time.time() - start_time <= TIMEOUT: + # Pull messages from Pub/Sub. + subscription_path = subscriber.subscription_path(project_id, subscription_id) + response = subscriber.pull( + request={"subscription": subscription_path, "max_messages": NUM_MESSAGES} + ) + received_messages.append(response.received_messages) + + for received_message in response.received_messages: + ack_ids.append(received_message.ack_id) + + # Acknowledge the received messages so they will not be sent again. + subscriber.acknowledge( + request={"subscription": subscription_path, "ack_ids": ack_ids} + ) + + if (len(received_messages) >= NUM_MESSAGES): + break + + time.sleep(5) + + return received_messages + + +def test_write_to_pubsub(setup_and_teardown: None) -> None: + with patch("sys.argv", [ + "", '--streaming', f'--project={project_id}', f'--topic={topic_id}' + ]): + write_to_pubsub() + + # Read from Pub/Sub to verify the pipeline successfully wrote messages. + # Duplicate reads are possible. + messages = read_messages() + assert (len(messages) >= NUM_MESSAGES) diff --git a/dataflow/snippets/write_pubsub.py b/dataflow/snippets/write_pubsub.py new file mode 100644 index 000000000000..cfb19396c064 --- /dev/null +++ b/dataflow/snippets/write_pubsub.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START dataflow_pubsub_write_with_attributes]i +import argparse +from typing import Any, Dict, List + +import apache_beam as beam +from apache_beam.io import PubsubMessage +from apache_beam.io import WriteToPubSub +from apache_beam.options.pipeline_options import PipelineOptions + +from typing_extensions import Self + + +def item_to_message(item: Dict[str, Any]) -> PubsubMessage: + attributes = { + 'buyer': item['name'], + 'timestamp': str(item['ts']) + } + data = bytes(item['product'], 'utf-8') + + return PubsubMessage(data=data, attributes=attributes) + + +def write_to_pubsub(argv: List[str] = None) -> None: + + # Parse the pipeline options passed into the application. Example: + # --project=$PROJECT_ID --topic=$TOPIC_NAME --streaming + # For more information, see + # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + class MyOptions(PipelineOptions): + @classmethod + # Define custom pipeline options that specify the project ID and Pub/Sub + # topic. + def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None: + parser.add_argument("--project", required=True) + parser.add_argument("--topic", required=True) + + example_data = [ + {'name': 'Robert', 'product': 'TV', 'ts': 1613141590000}, + {'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000}, + {'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000}, + {'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000} + ] + options = MyOptions() + + with beam.Pipeline(options=options) as pipeline: + ( + pipeline + | "Create elements" >> beam.Create(example_data) + | "Convert to Pub/Sub messages" >> beam.Map(item_to_message) + | WriteToPubSub( + topic=f'projects/{options.project}/topics/{options.topic}', + with_attributes=True) + ) + + print('Pipeline ran successfully.') +# [END dataflow_pubsub_write_with_attributes] + + +if __name__ == "__main__": + write_to_pubsub()