Skip to content

Commit

Permalink
docs(samples): Fix Pub/Sub snippet for Dataflow runner (#11147)
Browse files Browse the repository at this point in the history
* docs(samples): Fix Pub/Sub snippet for Dataflow runner

* Add comment about global namespace
  • Loading branch information
VeronicaWasson committed Feb 2, 2024
1 parent f3e83a0 commit f3cbb8e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
5 changes: 2 additions & 3 deletions dataflow/snippets/tests/test_write_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ def read_messages() -> None:


def test_write_to_pubsub(setup_and_teardown: None) -> None:
with patch("sys.argv", [
"", '--streaming', f'--project={project_id}', f'--topic={topic_id}'
]):
topic_path = publisher.topic_path(project_id, topic_id)
with patch("sys.argv", ["", '--streaming', f'--topic={topic_path}']):
write_to_pubsub()

# Read from Pub/Sub to verify the pipeline successfully wrote messages.
Expand Down
16 changes: 10 additions & 6 deletions dataflow/snippets/write_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# [START dataflow_pubsub_write_with_attributes]i
# [START dataflow_pubsub_write_with_attributes]
import argparse
from typing import Any, Dict, List

Expand All @@ -26,6 +26,12 @@


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
# Re-import needed types. When using the Dataflow runner, this
# function executes on a worker, where the global namespace is not
# available. For more information, see:
# https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
from apache_beam.io import PubsubMessage

attributes = {
'buyer': item['name'],
'timestamp': str(item['ts'])
Expand All @@ -38,15 +44,13 @@ def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
def write_to_pubsub(argv: List[str] = None) -> None:

# Parse the pipeline options passed into the application. Example:
# --project=$PROJECT_ID --topic=$TOPIC_NAME --streaming
# --topic=$TOPIC_PATH --streaming
# For more information, see
# https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
class MyOptions(PipelineOptions):
@classmethod
# Define custom pipeline options that specify the project ID and Pub/Sub
# topic.
# Define a custom pipeline option to specify the Pub/Sub topic.
def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
parser.add_argument("--project", required=True)
parser.add_argument("--topic", required=True)

example_data = [
Expand All @@ -63,7 +67,7 @@ def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
| "Create elements" >> beam.Create(example_data)
| "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
| WriteToPubSub(
topic=f'projects/{options.project}/topics/{options.topic}',
topic=options.topic,
with_attributes=True)
)

Expand Down

0 comments on commit f3cbb8e

Please sign in to comment.