Skip to content

Commit

Permalink
[AIRFLOW-1404] Add 'flatten_results' & 'maximum_bytes_billed' to BQ O…
Browse files Browse the repository at this point in the history
…perator

- Updated BQ hook `run_query` method to add
'flatten_results' & 'maximum_bytes_billed'
parameters
- Added the same in BQ Operator

Closes #3030 from kaxil/AIRFLOW-1404
  • Loading branch information
kaxil authored and Fokko Driesprong committed Feb 11, 2018
1 parent 69334fc commit a289497
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
15 changes: 15 additions & 0 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,10 @@ def run_query(self,
destination_dataset_table=False,
write_disposition='WRITE_EMPTY',
allow_large_results=False,
flatten_results=False,
udf_config=False,
maximum_billing_tier=None,
maximum_bytes_billed=None,
create_disposition='CREATE_IF_NEEDED',
query_params=None,
schema_update_options=(),
Expand All @@ -488,12 +490,22 @@ def run_query(self,
:type write_disposition: string
:param allow_large_results: Whether to allow large results.
:type allow_large_results: boolean
:param flatten_results: If true and query uses legacy SQL dialect, flattens
all nested and repeated fields in the query results. ``allowLargeResults``
must be true if this is set to false. For standard SQL queries, this
flag is ignored and results are never flattened.
:type flatten_results: boolean
:param udf_config: The User Defined Function configuration for the query.
See https://cloud.google.com/bigquery/user-defined-functions for details.
:type udf_config: list
:param maximum_billing_tier: Positive integer that serves as a
multiplier of the basic price.
:type maximum_billing_tier: integer
:param maximum_bytes_billed: Limits the bytes billed for this job.
Queries that will have bytes billed beyond this limit will fail
(without incurring a charge). If unspecified, this will be
set to your project default.
:type maximum_bytes_billed: float
:param create_disposition: Specifies whether the job is allowed to
create new tables.
:type create_disposition: string
Expand Down Expand Up @@ -528,6 +540,7 @@ def run_query(self,
'query': bql,
'useLegacySql': self.use_legacy_sql,
'maximumBillingTier': maximum_billing_tier,
'maximumBytesBilled': maximum_bytes_billed,
'priority': priority
}
}
Expand All @@ -542,6 +555,8 @@ def run_query(self,
configuration['query'].update({
'allowLargeResults':
allow_large_results,
'flattenResults':
flatten_results,
'writeDisposition':
write_disposition,
'createDisposition':
Expand Down
20 changes: 19 additions & 1 deletion airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ class BigQueryOperator(BaseOperator):
:param create_disposition: Specifies whether the job is allowed to create new tables.
(default: 'CREATE_IF_NEEDED')
:type create_disposition: string
:param allow_large_results: Whether to allow large results.
:type allow_large_results: boolean
:param flatten_results: If true and query uses legacy SQL dialect, flattens
all nested and repeated fields in the query results. ``allow_large_results``
must be ``true`` if this is set to ``false``. For standard SQL queries, this
flag is ignored and results are never flattened.
:type flatten_results: boolean
:param bigquery_conn_id: reference to a specific BigQuery hook.
:type bigquery_conn_id: string
:param delegate_to: The account to impersonate, if any.
Expand All @@ -53,7 +60,12 @@ class BigQueryOperator(BaseOperator):
of the basic price.
Defaults to None, in which case it uses the value set in the project.
:type maximum_billing_tier: integer
:param schema_update_options: Allows the schema of the desitination
:param maximum_bytes_billed: Limits the bytes billed for this job.
Queries that will have bytes billed beyond this limit will fail
(without incurring a charge). If unspecified, this will be
set to your project default.
:type maximum_bytes_billed: float
:param schema_update_options: Allows the schema of the destination
table to be updated as a side effect of the load job.
:type schema_update_options: tuple
:param query_params: a dictionary containing query parameter types and
Expand All @@ -71,11 +83,13 @@ def __init__(self,
destination_dataset_table=False,
write_disposition='WRITE_EMPTY',
allow_large_results=False,
flatten_results=False,
bigquery_conn_id='bigquery_default',
delegate_to=None,
udf_config=False,
use_legacy_sql=True,
maximum_billing_tier=None,
maximum_bytes_billed=None,
create_disposition='CREATE_IF_NEEDED',
schema_update_options=(),
query_params=None,
Expand All @@ -88,11 +102,13 @@ def __init__(self,
self.write_disposition = write_disposition
self.create_disposition = create_disposition
self.allow_large_results = allow_large_results
self.flatten_results = flatten_results
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
self.udf_config = udf_config
self.use_legacy_sql = use_legacy_sql
self.maximum_billing_tier = maximum_billing_tier
self.maximum_bytes_billed = maximum_bytes_billed
self.schema_update_options = schema_update_options
self.query_params = query_params
self.bq_cursor = None
Expand All @@ -112,8 +128,10 @@ def execute(self, context):
destination_dataset_table=self.destination_dataset_table,
write_disposition=self.write_disposition,
allow_large_results=self.allow_large_results,
flatten_results=self.flatten_results,
udf_config=self.udf_config,
maximum_billing_tier=self.maximum_billing_tier,
maximum_bytes_billed=self.maximum_bytes_billed,
create_disposition=self.create_disposition,
query_params=self.query_params,
schema_update_options=self.schema_update_options,
Expand Down

0 comments on commit a289497

Please sign in to comment.