Skip to content
Permalink
Browse files
docs: add scheduled query samples (#83)
* docs: add scheduled query samples

* test: opt-out of type annotations for now

* test: use environment variable for project ID

* set quota project

* consolidate config creation to conserve quota
  • Loading branch information
tswast committed Dec 15, 2020
1 parent 3e2bbef commit cd519709228cda3bbcf2fd978d37ccd04ef27c82
@@ -12,34 +12,93 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import os
import uuid

from google.api_core import client_options
import google.api_core.exceptions
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_datatransfer
import pytest


def temp_suffix():
now = datetime.datetime.now()
return f"{now.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}"


@pytest.fixture(scope="session")
def bigquery_client(default_credentials):
credentials, project_id = default_credentials
return bigquery.Client(credentials=credentials, project=project_id)


@pytest.fixture(scope="session")
def dataset_id(bigquery_client, project_id):
dataset_id = f"bqdts_{temp_suffix()}"
bigquery_client.create_dataset(f"{project_id}.{dataset_id}")
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True)


@pytest.fixture(scope="session")
def default_credentials():
return google.auth.default(["https://www.googleapis.com/auth/cloud-platform"])


@pytest.fixture(scope="session")
def project_id(default_credentials):
_, project_id = default_credentials
return project_id
def project_id():
return os.environ["GOOGLE_CLOUD_PROJECT"]


@pytest.fixture(scope="session")
def bigquery_client(default_credentials):
credentials, project_id = default_credentials
return bigquery.Client(credentials=credentials, project=project_id)
def service_account_name(default_credentials):
credentials, _ = default_credentials
# Note: this property is not available when running with user account
# credentials, but only service account credentials are used in our test
# infrastructure.
return credentials.service_account_email


@pytest.fixture(scope="session")
def transfer_client(default_credentials):
def transfer_client(default_credentials, project_id):
credentials, _ = default_credentials
return bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)
options = client_options.ClientOptions(quota_project_id=project_id)

transfer_client = bigquery_datatransfer.DataTransferServiceClient(
credentials=credentials, client_options=options
)

# Ensure quota is always attributed to the correct project.
bigquery_datatransfer.DataTransferServiceClient = lambda: transfer_client

return transfer_client


@pytest.fixture(scope="session")
def transfer_config_name(transfer_client, project_id, dataset_id, service_account_name):
from . import manage_transfer_configs, scheduled_query

# Use the transfer_client fixture so we know quota is attributed to the
# correct project.
assert transfer_client is not None

# To conserve limited BQ-DTS quota, this fixture creates only one transfer
# config for a whole session and is used to test the scheduled_query.py and
# the delete operation in manage_transfer_configs.py.
transfer_config = scheduled_query.create_scheduled_query(
{
"project_id": project_id,
"dataset_id": dataset_id,
"service_account_name": service_account_name,
}
)
yield transfer_config.name
manage_transfer_configs.delete_config(
{"transfer_config_name": transfer_config.name}
)


@pytest.fixture
@@ -42,8 +42,17 @@ def source_dataset_id(bigquery_client, project_id):


def test_copy_dataset(
capsys, project_id, destination_dataset_id, source_dataset_id, to_delete_configs
capsys,
transfer_client,
project_id,
destination_dataset_id,
source_dataset_id,
to_delete_configs,
):
# Use the transfer_client fixture so we know quota is attributed to the
# correct project.
assert transfer_client is not None

transfer_config = copy_dataset.copy_dataset(
{
"destination_project_id": project_id,
@@ -0,0 +1,171 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def list_configs(override_values={}):
# [START bigquerydatatransfer_list_configs]
from google.cloud import bigquery_datatransfer

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

project_id = "my-project"
# [END bigquerydatatransfer_list_configs]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
project_id = override_values.get("project_id", project_id)
# [START bigquerydatatransfer_list_configs]
parent = transfer_client.common_project_path(project_id)

configs = transfer_client.list_transfer_configs(parent=parent)
print("Got the following configs:")
for config in configs:
print(f"\tID: {config.name}, Schedule: {config.schedule}")
# [END bigquerydatatransfer_list_configs]


def update_config(override_values={}):
# [START bigquerydatatransfer_update_config]
from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
new_display_name = "My Transfer Config"
# [END bigquerydatatransfer_update_config]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
new_display_name = override_values.get("new_display_name", new_display_name)
transfer_config_name = override_values.get(
"transfer_config_name", transfer_config_name
)
# [START bigquerydatatransfer_update_config]

transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)
transfer_config.display_name = new_display_name

transfer_config = transfer_client.update_transfer_config(
{
"transfer_config": transfer_config,
"update_mask": field_mask_pb2.FieldMask(paths=["display_name"]),
}
)

print(f"Updated config: '{transfer_config.name}'")
print(f"New display name: '{transfer_config.display_name}'")
# [END bigquerydatatransfer_update_config]
# Return the config name for testing purposes, so that it can be deleted.
return transfer_config


def update_credentials_with_service_account(override_values={}):
# [START bigquerydatatransfer_update_credentials]
from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

service_account_name = "abcdef-test-sa@abcdef-test.iam.gserviceaccount.com"
transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
# [END bigquerydatatransfer_update_credentials]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
service_account_name = override_values.get(
"service_account_name", service_account_name
)
transfer_config_name = override_values.get(
"transfer_config_name", transfer_config_name
)
# [START bigquerydatatransfer_update_credentials]

transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)

transfer_config = transfer_client.update_transfer_config(
{
"transfer_config": transfer_config,
"update_mask": field_mask_pb2.FieldMask(paths=["service_account_name"]),
"service_account_name": service_account_name,
}
)

print("Updated config: '{}'".format(transfer_config.name))
# [END bigquerydatatransfer_update_credentials]
# Return the config name for testing purposes, so that it can be deleted.
return transfer_config


def schedule_backfill(override_values={}):
# [START bigquerydatatransfer_schedule_backfill]
import datetime

from google.cloud import bigquery_datatransfer

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
# [END bigquerydatatransfer_schedule_backfill]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
transfer_config_name = override_values.get(
"transfer_config_name", transfer_config_name
)
# [START bigquerydatatransfer_schedule_backfill]
now = datetime.datetime.now(datetime.timezone.utc)
start_time = now - datetime.timedelta(days=5)
end_time = now - datetime.timedelta(days=2)

# Some data sources, such as scheduled_query only support daily run.
# Truncate start_time and end_time to midnight time (00:00AM UTC).
start_time = datetime.datetime(
start_time.year, start_time.month, start_time.day, tzinfo=datetime.timezone.utc
)
end_time = datetime.datetime(
end_time.year, end_time.month, end_time.day, tzinfo=datetime.timezone.utc
)

response = transfer_client.schedule_transfer_runs(
parent=transfer_config_name,
start_time=start_time,
end_time=end_time,
)

print("Started transfer runs:")
for run in response.runs:
print(f"backfill: {run.run_time} run: {run.name}")
# [END bigquerydatatransfer_schedule_backfill]
return response.runs


def delete_config(override_values={}):
# [START bigquerydatatransfer_delete_transfer]
import google.api_core.exceptions
from google.cloud import bigquery_datatransfer

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
# [END bigquerydatatransfer_delete_transfer]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
transfer_config_name = override_values.get(
"transfer_config_name", transfer_config_name
)
# [START bigquerydatatransfer_delete_transfer]
try:
transfer_client.delete_transfer_config(name=transfer_config_name)
except google.api_core.exceptions.NotFound:
print("Transfer config not found.")
else:
print(f"Deleted transfer config: {transfer_config_name}")
# [END bigquerydatatransfer_delete_transfer]
@@ -0,0 +1,70 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import manage_transfer_configs


def test_list_configs(capsys, project_id, transfer_config_name):
manage_transfer_configs.list_configs({"project_id": project_id})
out, _ = capsys.readouterr()
assert "Got the following configs:" in out
assert transfer_config_name in out


def test_update_config(capsys, transfer_config_name):
manage_transfer_configs.update_config(
{
"new_display_name": "name from test_update_config",
"transfer_config_name": transfer_config_name,
}
)
out, _ = capsys.readouterr()
assert "Updated config:" in out
assert transfer_config_name in out
assert "name from test_update_config" in out


def test_update_credentials_with_service_account(
capsys, project_id, service_account_name, transfer_config_name
):
manage_transfer_configs.update_credentials_with_service_account(
{
"project_id": project_id,
"service_account_name": service_account_name,
"transfer_config_name": transfer_config_name,
}
)
out, _ = capsys.readouterr()
assert "Updated config:" in out
assert transfer_config_name in out


def test_schedule_backfill(capsys, transfer_config_name):
runs = manage_transfer_configs.schedule_backfill(
{
"transfer_config_name": transfer_config_name,
}
)
out, _ = capsys.readouterr()
assert "Started transfer runs:" in out
# Run IDs should include the transfer name in their path.
assert transfer_config_name in out
# Check that there are runs for 5, 4, 3, and 2 days ago.
assert len(runs) == 4


def test_delete_config(capsys, transfer_config_name):
# transfer_config_name fixture in conftest.py calls the delete config
# sample. To conserve limited BQ-DTS quota we only make basic checks.
assert len(transfer_config_name) != 0
Loading

0 comments on commit cd51970

Please sign in to comment.