From e1c21769748c39027e948ef188af9edea6934752 Mon Sep 17 00:00:00 2001 From: Mukund Ananthu Date: Wed, 13 Mar 2024 20:27:42 +0000 Subject: [PATCH 1/3] feat: Add Create Topic with Kinesis IngestionDataSourceSettings Sample --- samples/snippets/publisher.py | 55 ++++++++++++++++++++++++++++++ samples/snippets/publisher_test.py | 29 ++++++++++++++++ samples/snippets/requirements.txt | 2 +- 3 files changed, 85 insertions(+), 1 deletion(-) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index e2c63556c..9df1af788 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -60,6 +60,45 @@ def create_topic(project_id: str, topic_id: str) -> None: # [END pubsub_create_topic] +def create_topic_kinesis_ingestion(project_id: str, topic_id: str, stream_arn: str, + consumer_arn: str, aws_role_arn: str, gcp_service_account: str) -> None: + """Create a new Pub/Sub topic with AWS Kinesis Ingestion Settings.""" + # [START pubsub_quickstart_create_topic] + # [START pubsub_create_topic] + from google.cloud import pubsub_v1 + from google.pubsub_v1.types import Topic + from google.pubsub_v1.types import IngestionDataSourceSettings + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # stream_arn = "your-stream-arn" + # consumer_arn = "your-consumer-arn" + # aws_role_arn = "your-aws-role-arn" + # gcp_service_account = "your-gcp-service-account" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + request = Topic( + name=topic_path, + ingestion_data_source_settings=IngestionDataSourceSettings( + aws_kinesis=IngestionDataSourceSettings.AwsKinesis( + stream_arn=stream_arn, + consumer_arn=consumer_arn, + aws_role_arn=aws_role_arn, + gcp_service_account=gcp_service_account, + ) + ) + ) + + topic = publisher.create_topic(request=request) + + print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings") + # [END pubsub_quickstart_create_topic] + # [END pubsub_create_topic] + + def delete_topic(project_id: str, topic_id: str) -> None: """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_topic] @@ -430,6 +469,13 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: create_parser = subparsers.add_parser("create", help=create_topic.__doc__) create_parser.add_argument("topic_id") + create_topic_kinesis_ingestion_parser = subparsers.add_parser("create_kinesis_ingestion", help=create_topic_kinesis_ingestion.__doc__) + create_topic_kinesis_ingestion_parser.add_argument("topic_id") + create_topic_kinesis_ingestion_parser.add_argument("stream_arn") + create_topic_kinesis_ingestion_parser.add_argument("consumer_arn") + create_topic_kinesis_ingestion_parser.add_argument("aws_role_arn") + create_topic_kinesis_ingestion_parser.add_argument("gcp_service_account") + delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) delete_parser.add_argument("topic_id") @@ -490,6 +536,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: list_topics(args.project_id) elif args.command == "create": create_topic(args.project_id, args.topic_id) + elif args.command == "create_kinesis_ingestion": + create_topic_kinesis_ingestion( + args.project_id, + args.topic_id, + args.stream_arn, + args.consumer_arn, + args.aws_role_arn, + args.gcp_service_account + ) elif args.command == "delete": delete_topic(args.project_id, args.topic_id) elif args.command == "publish": diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index 0a6311308..a18033fc1 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -124,6 +124,35 @@ def test_create( assert f"Created topic: {topic_path}" in out +def test_create_kinesis_ingestion( + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] +) -> None: + # The scope of `topic_path` is limited to this function. + topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID) + + # Outside of automated CI tests, these values must be of actual AWS resources for the test to pass. + stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name" + consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111" + aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name" + gcp_service_account = "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" + + try: + publisher_client.delete_topic(request={"topic": topic_path}) + except NotFound: + pass + + publisher.create_topic_kinesis_ingestion( + PROJECT_ID, + TOPIC_ID, + stream_arn, + consumer_arn, + aws_role_arn, + gcp_service_account + ) + + out, _ = capsys.readouterr() + assert f"Created topic: {topic_path} with AWS Kinesis Ingestion Settings" in out + def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None: publisher.list_topics(PROJECT_ID) out, _ = capsys.readouterr() diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index 3fb4e0a69..aba41c7d7 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,4 +1,4 @@ -google-cloud-pubsub==2.19.0 +google-cloud-pubsub==2.20.1 avro==1.11.3 protobuf===4.24.4; python_version == '3.7' protobuf==4.25.1; python_version >= '3.8' From 11b317e0f118b56396af14859fab6e13401e442f Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 13 Mar 2024 20:48:25 +0000 Subject: [PATCH 2/3] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- samples/snippets/publisher.py | 18 +++++++++++++----- samples/snippets/publisher_test.py | 9 ++++++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 9df1af788..8b632ee85 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -60,8 +60,14 @@ def create_topic(project_id: str, topic_id: str) -> None: # [END pubsub_create_topic] -def create_topic_kinesis_ingestion(project_id: str, topic_id: str, stream_arn: str, - consumer_arn: str, aws_role_arn: str, gcp_service_account: str) -> None: +def create_topic_kinesis_ingestion( + project_id: str, + topic_id: str, + stream_arn: str, + consumer_arn: str, + aws_role_arn: str, + gcp_service_account: str, +) -> None: """Create a new Pub/Sub topic with AWS Kinesis Ingestion Settings.""" # [START pubsub_quickstart_create_topic] # [START pubsub_create_topic] @@ -89,7 +95,7 @@ def create_topic_kinesis_ingestion(project_id: str, topic_id: str, stream_arn: s aws_role_arn=aws_role_arn, gcp_service_account=gcp_service_account, ) - ) + ), ) topic = publisher.create_topic(request=request) @@ -469,7 +475,9 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: create_parser = subparsers.add_parser("create", help=create_topic.__doc__) create_parser.add_argument("topic_id") - create_topic_kinesis_ingestion_parser = subparsers.add_parser("create_kinesis_ingestion", help=create_topic_kinesis_ingestion.__doc__) + create_topic_kinesis_ingestion_parser = subparsers.add_parser( + "create_kinesis_ingestion", help=create_topic_kinesis_ingestion.__doc__ + ) create_topic_kinesis_ingestion_parser.add_argument("topic_id") create_topic_kinesis_ingestion_parser.add_argument("stream_arn") create_topic_kinesis_ingestion_parser.add_argument("consumer_arn") @@ -543,7 +551,7 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: args.stream_arn, args.consumer_arn, args.aws_role_arn, - args.gcp_service_account + args.gcp_service_account, ) elif args.command == "delete": delete_topic(args.project_id, args.topic_id) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index a18033fc1..fa31a74cf 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -134,7 +134,9 @@ def test_create_kinesis_ingestion( stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name" consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111" aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name" - gcp_service_account = "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" + gcp_service_account = ( + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" + ) try: publisher_client.delete_topic(request={"topic": topic_path}) @@ -147,12 +149,13 @@ def test_create_kinesis_ingestion( stream_arn, consumer_arn, aws_role_arn, - gcp_service_account - ) + gcp_service_account, + ) out, _ = capsys.readouterr() assert f"Created topic: {topic_path} with AWS Kinesis Ingestion Settings" in out + def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None: publisher.list_topics(PROJECT_ID) out, _ = capsys.readouterr() From 37a8d1516e6a13671af3bfe72bd26ed1968f0298 Mon Sep 17 00:00:00 2001 From: mukund-ananthu <83691193+mukund-ananthu@users.noreply.github.com> Date: Wed, 13 Mar 2024 17:39:04 -0400 Subject: [PATCH 3/3] Update publisher.py --- samples/snippets/publisher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 8b632ee85..282bbf4db 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -69,8 +69,8 @@ def create_topic_kinesis_ingestion( gcp_service_account: str, ) -> None: """Create a new Pub/Sub topic with AWS Kinesis Ingestion Settings.""" - # [START pubsub_quickstart_create_topic] - # [START pubsub_create_topic] + # [START pubsub_quickstart_create_topic_kinesis_ingestion] + # [START pubsub_create_topic_kinesis_ingestion] from google.cloud import pubsub_v1 from google.pubsub_v1.types import Topic from google.pubsub_v1.types import IngestionDataSourceSettings @@ -101,8 +101,8 @@ def create_topic_kinesis_ingestion( topic = publisher.create_topic(request=request) print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings") - # [END pubsub_quickstart_create_topic] - # [END pubsub_create_topic] + # [END pubsub_quickstart_create_topic_kinesis_ingestion] + # [END pubsub_create_topic_kinesis_ingestion] def delete_topic(project_id: str, topic_id: str) -> None: