Skip to content

Commit

Permalink
Use python client in BQ hook create_empty_table/dataset and table_exi…
Browse files Browse the repository at this point in the history
…sts (#8377)

* Use python client in BQ hook create_empty_table method

* Refactor table_exists and create_empty_dataset

* Add note in UPDATING
  • Loading branch information
turbaszek committed Apr 22, 2020
1 parent 93ea058 commit 57c8c05
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 258 deletions.
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ https://developers.google.com/style/inclusive-documentation
-->

### Changes in BigQueryHook
- `create_empty_table` method accepts now `table_resource` parameter. If provided all
other parameters are ignored.
- `create_empty_dataset` will now use values from `dataset_reference` instead of raising error
if parameters were passed in `dataset_reference` and as arguments to method. Additionally validation
of `dataset_reference` is done using `Dataset.from_api_repr`. Exception and log messages has been
changed.

### Added mypy plugin to preserve types of decorated functions

Mypy currently doesn't support precise type information for decorated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,10 @@
# [START howto_operator_bigquery_create_view]
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=LOCATION_DATASET_NAME,
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": "SELECT * FROM `{}.test_table`".format(DATASET_NAME),
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False
}
)
Expand Down
171 changes: 81 additions & 90 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Tuple, Type, Union

from google.api_core.retry import Retry
from google.cloud.bigquery import DEFAULT_RETRY, Client, Dataset, Table
from google.cloud.exceptions import NotFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pandas import DataFrame
Expand All @@ -38,6 +41,7 @@
from airflow.exceptions import AirflowException
from airflow.hooks.dbapi_hook import DbApiHook
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -136,7 +140,8 @@ def get_pandas_df(
verbose=False,
credentials=credentials)

def table_exists(self, project_id: str, dataset_id: str, table_id: str) -> bool:
@GoogleBaseHook.fallback_to_default_project_id
def table_exists(self, dataset_id: str, table_id: str, project_id: str) -> bool:
"""
Checks for the existence of a table in Google BigQuery.
Expand All @@ -150,28 +155,30 @@ def table_exists(self, project_id: str, dataset_id: str, table_id: str) -> bool:
:param table_id: The name of the table to check the existence of.
:type table_id: str
"""
service = self.get_service()
table_reference = f"{project_id}.{dataset_id}.{table_id}"

try:
service.tables().get( # pylint: disable=no-member
projectId=project_id, datasetId=dataset_id,
tableId=table_id).execute(num_retries=self.num_retries)
Client(client_info=self.client_info).get_table(table_reference)
return True
except HttpError as e:
if e.resp['status'] == '404':
return False
raise

def create_empty_table(self, # pylint: disable=too-many-arguments
project_id: str,
dataset_id: str,
table_id: str,
schema_fields: Optional[List] = None,
time_partitioning: Optional[Dict] = None,
cluster_fields: Optional[List[str]] = None,
labels: Optional[Dict] = None,
view: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
num_retries: int = 5) -> None:
except NotFound:
return False

@GoogleBaseHook.fallback_to_default_project_id
def create_empty_table( # pylint: disable=too-many-arguments
self,
project_id: str,
dataset_id: str,
table_id: str,
table_resource: Optional[Dict[str, Any]] = None,
schema_fields: Optional[List] = None,
time_partitioning: Optional[Dict] = None,
cluster_fields: Optional[List[str]] = None,
labels: Optional[Dict] = None,
view: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
retry: Optional[Retry] = DEFAULT_RETRY,
num_retries: Optional[int] = None
) -> None:
"""
Creates a new, empty table in the dataset.
To create a view, which is defined by a SQL query, parse a dictionary to 'view' kwarg
Expand All @@ -182,11 +189,17 @@ def create_empty_table(self, # pylint: disable=too-many-arguments
:type dataset_id: str
:param table_id: The Name of the table to be created.
:type table_id: str
:param table_resource: Table resource as described in documentation:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table
If provided all other parameters are ignored.
:type table_resource: Dict[str, Any]
:param schema_fields: If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
:type schema_fields: list
:param labels: a dictionary containing labels for the table, passed to BigQuery
:type labels: dict
:param retry: Optional. How to retry the RPC.
:type retry: google.api_core.retry.Retry
**Example**: ::
Expand Down Expand Up @@ -227,119 +240,97 @@ def create_empty_table(self, # pylint: disable=too-many-arguments
:type num_retries: int
:return: None
"""
service = self.get_service()

project_id = project_id if project_id is not None else self.project_id
if num_retries:
warnings.warn("Parameter `num_retries` is deprecated", DeprecationWarning)

table_resource = {
_table_resource: Dict[str, Any] = {
'tableReference': {
'tableId': table_id
'tableId': table_id,
'projectId': project_id,
'datasetId': dataset_id,
}
} # type: Dict[str, Any]
}

if self.location:
table_resource['location'] = self.location
_table_resource['location'] = self.location

if schema_fields:
table_resource['schema'] = {'fields': schema_fields}
_table_resource['schema'] = {'fields': schema_fields}

if time_partitioning:
table_resource['timePartitioning'] = time_partitioning
_table_resource['timePartitioning'] = time_partitioning

if cluster_fields:
table_resource['clustering'] = {
_table_resource['clustering'] = {
'fields': cluster_fields
}

if labels:
table_resource['labels'] = labels
_table_resource['labels'] = labels

if view:
table_resource['view'] = view
_table_resource['view'] = view

if encryption_configuration:
table_resource["encryptionConfiguration"] = encryption_configuration
_table_resource["encryptionConfiguration"] = encryption_configuration

num_retries = num_retries if num_retries else self.num_retries

service.tables().insert( # pylint: disable=no-member
projectId=project_id,
datasetId=dataset_id,
body=table_resource).execute(num_retries=num_retries)
table_resource = table_resource or _table_resource
table = Table.from_api_repr(table_resource)
Client(client_info=self.client_info).create_table(table=table, exists_ok=True, retry=retry)

@GoogleBaseHook.fallback_to_default_project_id
def create_empty_dataset(self,
dataset_id: str = "",
project_id: str = "",
dataset_id: Optional[str] = None,
project_id: Optional[str] = None,
location: Optional[str] = None,
dataset_reference: Optional[Dict] = None) -> None:
dataset_reference: Optional[Dict[str, Any]] = None) -> None:
"""
Create a new empty dataset:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
:param project_id: The name of the project where we want to create
an empty a dataset. Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:param dataset_id: The id of dataset. Don't need to provide, if datasetId in dataset_reference.
:type dataset_id: str
:param location: (Optional) The geographic location where the dataset should reside.
There is no default value but the dataset will be created in US if nothing is provided.
:type location: str
:param dataset_reference: Dataset reference that could be provided
with request body. More info:
:param dataset_reference: Dataset reference that could be provided with request body. More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_reference: dict
"""
service = self.get_service()

if dataset_reference:
_validate_value('dataset_reference', dataset_reference, dict)
else:
dataset_reference = {}

if "datasetReference" not in dataset_reference:
dataset_reference["datasetReference"] = {}
dataset_reference = dataset_reference or {"datasetReference": {}}

if self.location:
dataset_reference['location'] = dataset_reference.get('location') or self.location

if not dataset_reference["datasetReference"].get("datasetId") and not dataset_id:
raise ValueError(
"dataset_id not provided and datasetId not exist in the datasetReference. "
"Impossible to create dataset")

dataset_required_params = [(dataset_id, "datasetId", ""),
(project_id, "projectId", self.project_id)]
for param_tuple in dataset_required_params:
param, param_name, param_default = param_tuple
if param_name not in dataset_reference['datasetReference']:
if param_default and not param:
for param, value in zip(["datasetId", "projectId"], [dataset_id, project_id]):
specified_param = dataset_reference["datasetReference"].get(param)
if specified_param:
if value:
self.log.info(
"%s was not specified. Will be used default value %s.",
param_name, param_default
"`%s` was provided in both `dataset_reference` and as `%s`. "
"Using value from `dataset_reference`",
param, convert_camel_to_snake(param)
)
param = param_default
dataset_reference['datasetReference'].update(
{param_name: param})
elif param:
_api_resource_configs_duplication_check(
param_name, param,
dataset_reference['datasetReference'], 'dataset_reference')
continue # use specified value
if not value:
raise ValueError(
f"Please specify `{param}` either in `dataset_reference` "
f"or by providing `{convert_camel_to_snake(param)}`",
)
# dataset_reference has no param but we can fallback to default value
self.log.info(
"%s was not specified in `dataset_reference`. Will use default value %s.",
param, value
)
dataset_reference["datasetReference"][param] = value

location = location or self.location
if location:
if 'location' not in dataset_reference:
dataset_reference.update({'location': location})
else:
_api_resource_configs_duplication_check(
'location', location,
dataset_reference, 'dataset_reference')
dataset_reference["location"] = dataset_reference.get("location", location)

dataset_id = dataset_reference.get("datasetReference").get("datasetId") # type: ignore
dataset_project_id = dataset_reference.get("datasetReference").get("projectId") # type: ignore

service.datasets().insert( # pylint: disable=no-member
projectId=dataset_project_id,
body=dataset_reference).execute(num_retries=self.num_retries)
dataset = Dataset.from_api_repr(dataset_reference)
Client(client_info=self.client_info).create_dataset(dataset=dataset, exists_ok=True)

def get_dataset_tables(self, dataset_id: str, project_id: Optional[str] = None,
max_results: Optional[int] = None,
Expand Down
Loading

0 comments on commit 57c8c05

Please sign in to comment.