Skip to content

Commit

Permalink
docs: add sample to include run notification (#173)
Browse files Browse the repository at this point in the history
* docs: add sample to include run notification

* blacken samples

* fix sample lint
  • Loading branch information
tswast authored and leahecole committed Jul 11, 2023
1 parent fa3df71 commit b5b2934
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 10 deletions.
55 changes: 51 additions & 4 deletions bigquery-datatransfer/snippets/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,48 @@

import datetime
import os
import random
import uuid

from google.api_core import client_options
import google.api_core.exceptions
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_datatransfer
from google.cloud import pubsub_v1
import pytest


RESOURCE_PREFIX = "python_bigquery_datatransfer_samples_snippets"
RESOURCE_DATE_FORMAT = "%Y%m%d%H%M%S"
RESOURCE_DATE_LENGTH = 4 + 2 + 2 + 2 + 2 + 2


def resource_prefix() -> str:
timestamp = datetime.datetime.utcnow().strftime(RESOURCE_DATE_FORMAT)
random_string = hex(random.randrange(1000000))[2:]
return f"{RESOURCE_PREFIX}_{timestamp}_{random_string}"


def resource_name_to_date(resource_name: str):
start_date = len(RESOURCE_PREFIX) + 1
date_string = resource_name[start_date : start_date + RESOURCE_DATE_LENGTH]
parsed_date = datetime.datetime.strptime(date_string, RESOURCE_DATE_FORMAT)
return parsed_date


@pytest.fixture(scope="session", autouse=True)
def cleanup_pubsub_topics(pubsub_client: pubsub_v1.PublisherClient, project_id):
yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1)
for topic in pubsub_client.list_topics(project=f"projects/{project_id}"):
topic_id = topic.name.split("/")[-1]
if (
topic_id.startswith(RESOURCE_PREFIX)
and resource_name_to_date(topic_id) < yesterday
):
pubsub_client.delete_topic(topic=topic.name)


def temp_suffix():
now = datetime.datetime.now()
return f"{now.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}"
Expand All @@ -35,6 +67,21 @@ def bigquery_client(default_credentials):
return bigquery.Client(credentials=credentials, project=project_id)


@pytest.fixture(scope="session")
def pubsub_client(default_credentials):
credentials, _ = default_credentials
return pubsub_v1.PublisherClient(credentials=credentials)


@pytest.fixture(scope="session")
def pubsub_topic(pubsub_client: pubsub_v1.PublisherClient, project_id):
topic_id = resource_prefix()
topic_path = pubsub_v1.PublisherClient.topic_path(project_id, topic_id)
pubsub_client.create_topic(name=topic_path)
yield topic_path
pubsub_client.delete_topic(topic=topic_path)


@pytest.fixture(scope="session")
def dataset_id(bigquery_client, project_id):
dataset_id = f"bqdts_{temp_suffix()}"
Expand All @@ -56,10 +103,10 @@ def project_id():
@pytest.fixture(scope="session")
def service_account_name(default_credentials):
credentials, _ = default_credentials
# Note: this property is not available when running with user account
# credentials, but only service account credentials are used in our test
# infrastructure.
return credentials.service_account_email
# The service_account_email attribute is not available when running with
# user account credentials, but should be available when running from our
# continuous integration tests.
return getattr(credentials, "service_account_email", None)


@pytest.fixture(scope="session")
Expand Down
4 changes: 1 addition & 3 deletions bigquery-datatransfer/snippets/manage_transfer_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ def schedule_backfill(override_values={}):
)

response = transfer_client.schedule_transfer_runs(
parent=transfer_config_name,
start_time=start_time,
end_time=end_time,
parent=transfer_config_name, start_time=start_time, end_time=end_time,
)

print("Started transfer runs:")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ def test_update_credentials_with_service_account(

def test_schedule_backfill(capsys, transfer_config_name):
runs = manage_transfer_configs.schedule_backfill(
{
"transfer_config_name": transfer_config_name,
}
{"transfer_config_name": transfer_config_name}
)
out, _ = capsys.readouterr()
assert "Started transfer runs:" in out
Expand Down
1 change: 1 addition & 0 deletions bigquery-datatransfer/snippets/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
google-cloud-bigquery==2.20.0
google-cloud-pubsub==2.6.0
pytest==6.2.4
mock==4.0.3
44 changes: 44 additions & 0 deletions bigquery-datatransfer/snippets/run_notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def run_notification(transfer_config_name, pubsub_topic):
orig_transfer_config_name = transfer_config_name
orig_pubsub_topic = pubsub_topic
# [START bigquerydatatransfer_run_notification]
transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
pubsub_topic = "projects/PROJECT-ID/topics/TOPIC-ID"
# [END bigquerydatatransfer_run_notification]
transfer_config_name = orig_transfer_config_name
pubsub_topic = orig_pubsub_topic

# [START bigquerydatatransfer_run_notification]
from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)
transfer_config.notification_pubsub_topic = pubsub_topic
update_mask = field_mask_pb2.FieldMask(paths=["notification_pubsub_topic"])

transfer_config = transfer_client.update_transfer_config(
{"transfer_config": transfer_config, "update_mask": update_mask}
)

print(f"Updated config: '{transfer_config.name}'")
print(f"Notification Pub/Sub topic: '{transfer_config.notification_pubsub_topic}'")
# [END bigquerydatatransfer_run_notification]
# Return the config name for testing purposes, so that it can be deleted.
return transfer_config
26 changes: 26 additions & 0 deletions bigquery-datatransfer/snippets/run_notification_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import run_notification


def test_run_notification(capsys, transfer_config_name, pubsub_topic):
run_notification.run_notification(
transfer_config_name=transfer_config_name, pubsub_topic=pubsub_topic,
)
out, _ = capsys.readouterr()
assert "Updated config:" in out
assert transfer_config_name in out
assert "Notification Pub/Sub topic:" in out
assert pubsub_topic in out

0 comments on commit b5b2934

Please sign in to comment.