Skip to content

Commit

Permalink
added Topic params for schema_settings and message_retention_duration. (
Browse files Browse the repository at this point in the history
  • Loading branch information
dmedora committed Nov 22, 2023
1 parent f4e5571 commit 72ba63e
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 1 deletion.
10 changes: 10 additions & 0 deletions airflow/providers/google/cloud/hooks/pubsub.py
Expand Up @@ -57,6 +57,7 @@
PushConfig,
ReceivedMessage,
RetryPolicy,
SchemaSettings,
)


Expand Down Expand Up @@ -182,6 +183,8 @@ def create_topic(
labels: dict[str, str] | None = None,
message_storage_policy: dict | MessageStoragePolicy = None,
kms_key_name: str | None = None,
schema_settings: dict | SchemaSettings = None,
message_retention_duration: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
Expand All @@ -206,6 +209,11 @@ def create_topic(
to be used to protect access to messages published on this topic.
The expected format is
``projects/*/locations/*/keyRings/*/cryptoKeys/*``.
:param schema_settings: (Optional) Settings for validating messages published against an
existing schema. The expected format is ``projects/*/schemas/*``.
:param message_retention_duration: (Optional) Indicates the minimum duration to retain a
message after it is published to the topic. The expected format is a duration in
seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:param timeout: (Optional) The amount of time, in seconds, to wait for the request
Expand All @@ -228,6 +236,8 @@ def create_topic(
"labels": labels,
"message_storage_policy": message_storage_policy,
"kms_key_name": kms_key_name,
"schema_settings": schema_settings,
"message_retention_duration": message_retention_duration,
},
retry=retry,
timeout=timeout,
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/google/cloud/operators/pubsub.py
Expand Up @@ -35,6 +35,7 @@
PushConfig,
ReceivedMessage,
RetryPolicy,
SchemaSettings,
)

from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
Expand Down Expand Up @@ -130,6 +131,8 @@ def __init__(
labels: dict[str, str] | None = None,
message_storage_policy: dict | MessageStoragePolicy = None,
kms_key_name: str | None = None,
schema_settings: dict | SchemaSettings = None,
message_retention_duration: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
Expand All @@ -144,6 +147,8 @@ def __init__(
self.labels = labels
self.message_storage_policy = message_storage_policy
self.kms_key_name = kms_key_name
self.schema_settings = schema_settings
self.message_retention_duration = message_retention_duration
self.retry = retry
self.timeout = timeout
self.metadata = metadata
Expand All @@ -163,6 +168,8 @@ def execute(self, context: Context) -> None:
labels=self.labels,
message_storage_policy=self.message_storage_policy,
kms_key_name=self.kms_key_name,
schema_settings=self.schema_settings,
message_retention_duration=self.message_retention_duration,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
Expand Down
9 changes: 8 additions & 1 deletion tests/providers/google/cloud/hooks/test_pubsub.py
Expand Up @@ -103,7 +103,14 @@ def test_create_nonexistent_topic(self, mock_service):
create_method = mock_service.return_value.create_topic
self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC)
create_method.assert_called_once_with(
request=dict(name=EXPANDED_TOPIC, labels=LABELS, message_storage_policy=None, kms_key_name=None),
request=dict(
name=EXPANDED_TOPIC,
labels=LABELS,
message_storage_policy=None,
kms_key_name=None,
schema_settings=None,
message_retention_duration=None,
),
retry=DEFAULT,
timeout=None,
metadata=(),
Expand Down
4 changes: 4 additions & 0 deletions tests/providers/google/cloud/operators/test_pubsub.py
Expand Up @@ -59,6 +59,8 @@ def test_failifexists(self, mock_hook):
labels=None,
message_storage_policy=None,
kms_key_name=None,
schema_settings=None,
message_retention_duration=None,
retry=DEFAULT,
timeout=None,
metadata=(),
Expand All @@ -79,6 +81,8 @@ def test_succeedifexists(self, mock_hook):
labels=None,
message_storage_policy=None,
kms_key_name=None,
schema_settings=None,
message_retention_duration=None,
retry=DEFAULT,
timeout=None,
metadata=(),
Expand Down

0 comments on commit 72ba63e

Please sign in to comment.