Skip to content

Commit

Permalink
[BEAM-12778] Prevent unnecessary dry run requests to BQ (#15356)
Browse files Browse the repository at this point in the history
* Prevent unnecessary dry run requests to BQ when temp dataset is provided by the user

* Document that the temp dataset name prefix is reserved for Beam

* User provided temporary datasets should not be deleted.

* Raise an error if reserver dataset prefix is used

* updates
  • Loading branch information
chamikaramj committed Aug 25, 2021
1 parent b236713 commit efbbf25
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 34 deletions.
14 changes: 11 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Expand Up @@ -822,6 +822,9 @@ def read(self, range_tracker):

@check_accessible(['query'])
def _setup_temporary_dataset(self, bq):
if self.temp_dataset:
# Temp dataset was provided by the user so we can just return.
return
location = bq.get_query_location(
self._get_project(), self.query.get(), self.use_legacy_sql)
bq.create_temporary_dataset(self._get_project(), location)
Expand Down Expand Up @@ -2200,9 +2203,14 @@ class ReadFromBigQuery(PTransform):
#avro_conversions
temp_dataset (``apache_beam.io.gcp.internal.clients.bigquery.\
DatasetReference``):
The dataset in which to create temporary tables when performing file
loads. By default, a new dataset is created in the execution project for
temporary tables.
Temporary dataset reference to use when reading from BigQuery using a
query. When reading using a query, BigQuery source will create a
temporary dataset and a temporary table to store the results of the
query. With this option, you can set an existing dataset to create the
temporary table in. BigQuery source will create a temporary table in
that dataset, and will remove it once it is not needed. Job needs access
to create and delete tables within the given dataset. Dataset name
should *not* start with the reserved prefix `beam_temp_dataset_`.
"""
class Method(object):
EXPORT = 'EXPORT' # This is currently the default.
Expand Down
32 changes: 9 additions & 23 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Expand Up @@ -451,15 +451,16 @@ def test_get_destination_uri_fallback_temp_location(self):
'empty, using temp_location instead'
])

@mock.patch.object(BigQueryWrapper, '_delete_table')
@mock.patch.object(BigQueryWrapper, '_delete_dataset')
@mock.patch('apache_beam.io.gcp.internal.clients.bigquery.BigqueryV2')
def test_temp_dataset_location_is_configurable(self, api, delete_dataset):
def test_temp_dataset_is_configurable(
self, api, delete_dataset, delete_table):
temp_dataset = bigquery.DatasetReference(
projectId='temp-project', datasetId='bq_dataset')
bq = BigQueryWrapper(client=api, temp_dataset_id=temp_dataset.datasetId)
gcs_location = 'gs://gcs_location'

# bq.get_or_create_dataset.return_value = temp_dataset
c = beam.io.gcp.bigquery._CustomBigQuerySource(
query='select * from test_table',
gcs_location=gcs_location,
Expand All @@ -470,30 +471,15 @@ def test_temp_dataset_location_is_configurable(self, api, delete_dataset):
project='execution_project',
**{'temp_dataset': temp_dataset})

api.datasets.Get.side_effect = HttpError({
'status_code': 404, 'status': 404
},
'',
'')

c._setup_temporary_dataset(bq)
api.datasets.Insert.assert_called_with(
bigquery.BigqueryDatasetsInsertRequest(
dataset=bigquery.Dataset(datasetReference=temp_dataset),
projectId=temp_dataset.projectId))
api.datasets.assert_not_called()

api.datasets.Get.return_value = temp_dataset
api.datasets.Get.side_effect = None
# User provided temporary dataset should not be deleted but the temporary
# table created by Beam should be deleted.
bq.clean_up_temporary_dataset(temp_dataset.projectId)
delete_dataset.assert_called_with(
temp_dataset.projectId, temp_dataset.datasetId, True)

self.assertEqual(
bq._get_temp_table(temp_dataset.projectId),
bigquery.TableReference(
projectId=temp_dataset.projectId,
datasetId=temp_dataset.datasetId,
tableId=BigQueryWrapper.TEMP_TABLE + bq._temporary_table_suffix))
delete_dataset.assert_not_called()
delete_table.assert_called_with(
temp_dataset.projectId, temp_dataset.datasetId, mock.ANY)


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
Expand Down
34 changes: 26 additions & 8 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Expand Up @@ -292,8 +292,10 @@ class BigQueryWrapper(object):
(e.g., find and create tables, query a table, etc.).
"""

TEMP_TABLE = 'temp_table_'
TEMP_DATASET = 'temp_dataset_'
# If updating following names, also update the corresponding pydocs in
# bigquery.py.
TEMP_TABLE = 'beam_temp_table_'
TEMP_DATASET = 'beam_temp_dataset_'

HISTOGRAM_METRIC_LOGGER = MetricLogger()

Expand All @@ -313,6 +315,10 @@ def __init__(self, client=None, temp_dataset_id=None):
'latency_histogram_ms',
LinearBucket(0, 20, 3000),
BigQueryWrapper.HISTOGRAM_METRIC_LOGGER)
if temp_dataset_id and temp_dataset_id.startswith(self.TEMP_DATASET):
raise ValueError(
'User provided temp dataset ID cannot start with %r' %
self.TEMP_DATASET)
self.temp_dataset_id = temp_dataset_id or self._get_temp_dataset()
self.created_temp_dataset = False

Expand Down Expand Up @@ -799,18 +805,22 @@ def get_table_location(self, project_id, dataset_id, table_id):
table = self.get_table(project_id, dataset_id, table_id)
return table.location

# Returns true if the temporary dataset was provided by the user.
def is_user_configured_dataset(self):
return (
self.temp_dataset_id and
not self.temp_dataset_id.startswith(self.TEMP_DATASET))

@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def create_temporary_dataset(self, project_id, location):
is_user_configured_dataset = \
not self.temp_dataset_id.startswith(self.TEMP_DATASET)
# Check if dataset exists to make sure that the temporary id is unique
try:
self.client.datasets.Get(
bigquery.BigqueryDatasetsGetRequest(
projectId=project_id, datasetId=self.temp_dataset_id))
if project_id is not None and not is_user_configured_dataset:
if project_id is not None and not self.is_user_configured_dataset():
# Unittests don't pass projectIds so they can be run without error
# User configured datasets are allowed to pre-exist.
raise RuntimeError(
Expand Down Expand Up @@ -846,7 +856,13 @@ def clean_up_temporary_dataset(self, project_id):
else:
raise
try:
self._delete_dataset(temp_table.projectId, temp_table.datasetId, True)
# We do not want to delete temporary datasets configured by the user hence
# we just delete the temporary table in that case.
if not self.is_user_configured_dataset():
self._delete_dataset(temp_table.projectId, temp_table.datasetId, True)
else:
self._delete_table(
temp_table.projectId, temp_table.datasetId, temp_table.tableId)
self.created_temp_dataset = False
except HttpError as exn:
if exn.status_code == 403:
Expand Down Expand Up @@ -1305,8 +1321,10 @@ def _get_source_location(self):

def __enter__(self):
self.client = BigQueryWrapper(client=self.test_bigquery_client)
self.client.create_temporary_dataset(
self.executing_project, location=self._get_source_location())
if not self.client.is_user_configured_dataset():
# Temp dataset was provided by the user so we do not have to create one.
self.client.create_temporary_dataset(
self.executing_project, location=self._get_source_location())
return self

def __exit__(self, exception_type, exception_value, traceback):
Expand Down

0 comments on commit efbbf25

Please sign in to comment.