Skip to content

Commit

Permalink
Add exists_ok flag to BigQueryCreateEmptyTable(Dataset)Operator (#1…
Browse files Browse the repository at this point in the history
…4026)

Co-authored-by: uma6 <>
Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
  • Loading branch information
ysktir and ephraimbuddy committed Feb 13, 2021
1 parent 9536953 commit 59c94c6
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Expand Up @@ -411,7 +411,7 @@ def create_empty_dataset(
:param dataset_reference: Dataset reference that could be provided with request body. More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_reference: dict
:param exists_ok: If ``True``, ignore "already exists" errors when creating the DATASET.
:param exists_ok: If ``True``, ignore "already exists" errors when creating the dataset.
:type exists_ok: bool
"""
dataset_reference = dataset_reference or {"datasetReference": {}}
Expand Down
12 changes: 10 additions & 2 deletions airflow/providers/google/cloud/operators/bigquery.py
Expand Up @@ -868,6 +868,8 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
:param exists_ok: If ``True``, ignore "already exists" errors when creating the table.
:type exists_ok: bool
"""

template_fields = (
Expand Down Expand Up @@ -905,6 +907,7 @@ def __init__(
location: Optional[str] = None,
cluster_fields: Optional[List[str]] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
exists_ok: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -926,6 +929,7 @@ def __init__(
self.cluster_fields = cluster_fields
self.table_resource = table_resource
self.impersonation_chain = impersonation_chain
self.exists_ok = exists_ok

def execute(self, context) -> None:
bq_hook = BigQueryHook(
Expand Down Expand Up @@ -960,7 +964,7 @@ def execute(self, context) -> None:
materialized_view=self.materialized_view,
encryption_configuration=self.encryption_configuration,
table_resource=self.table_resource,
exists_ok=False,
exists_ok=self.exists_ok,
)
self.log.info(
'Table %s.%s.%s created successfully', table.project, table.dataset_id, table.table_id
Expand Down Expand Up @@ -1357,6 +1361,8 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
:param exists_ok: If ``True``, ignore "already exists" errors when creating the dataset.
:type exists_ok: bool
**Example**: ::
create_new_dataset = BigQueryCreateEmptyDatasetOperator(
Expand Down Expand Up @@ -1389,6 +1395,7 @@ def __init__(
bigquery_conn_id: Optional[str] = None,
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
exists_ok: bool = False,
**kwargs,
) -> None:

Expand All @@ -1408,6 +1415,7 @@ def __init__(
self.dataset_reference = dataset_reference if dataset_reference else {}
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
self.exists_ok = exists_ok

super().__init__(**kwargs)

Expand All @@ -1425,7 +1433,7 @@ def execute(self, context) -> None:
dataset_id=self.dataset_id,
dataset_reference=self.dataset_reference,
location=self.location,
exists_ok=False,
exists_ok=self.exists_ok,
)
except Conflict:
dataset_id = self.dataset_reference.get("datasetReference", {}).get("datasetId", self.dataset_id)
Expand Down

0 comments on commit 59c94c6

Please sign in to comment.