/
bigquery.py
3337 lines (2864 loc) · 137 KB
/
bigquery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""BigQuery Hook and a very basic PEP 249 implementation for BigQuery."""
from __future__ import annotations
import json
import logging
import re
import time
import uuid
import warnings
from copy import deepcopy
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NoReturn, Sequence, Union, cast
from aiohttp import ClientSession as ClientSession
from gcloud.aio.bigquery import Job, Table as Table_async
from google.cloud.bigquery import (
DEFAULT_RETRY,
Client,
CopyJob,
ExternalConfig,
ExtractJob,
LoadJob,
QueryJob,
SchemaField,
UnknownJob,
)
from google.cloud.bigquery.dataset import AccessEntry, Dataset, DatasetListItem, DatasetReference
from google.cloud.bigquery.table import EncryptionConfiguration, Row, RowIterator, Table, TableReference
from google.cloud.exceptions import NotFound
from googleapiclient.discovery import Resource, build
from pandas_gbq import read_gbq
from pandas_gbq.gbq import GbqConnector # noqa
from requests import Session
from sqlalchemy import create_engine
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.google.cloud.utils.bigquery import bq_cast
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook, get_field
try:
from airflow.utils.hashlib_wrapper import md5
except ModuleNotFoundError:
# Remove when Airflow providers min Airflow version is "2.7.0"
from hashlib import md5
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
import pandas as pd
from google.api_core.page_iterator import HTTPIterator
from google.api_core.retry import Retry
log = logging.getLogger(__name__)
BigQueryJob = Union[CopyJob, QueryJob, LoadJob, ExtractJob]
class BigQueryHook(GoogleBaseHook, DbApiHook):
"""Interact with BigQuery.
This hook uses the Google Cloud connection.
:param gcp_conn_id: The Airflow connection used for GCP credentials.
:param use_legacy_sql: This specifies whether to use legacy SQL dialect.
:param location: The location of the BigQuery resource.
:param priority: Specifies a priority for the query.
Possible values include INTERACTIVE and BATCH.
The default value is INTERACTIVE.
:param api_resource_configs: This contains params configuration applied for
Google BigQuery jobs.
:param impersonation_chain: This is the optional service account to
impersonate using short term credentials.
:param labels: The BigQuery resource label.
"""
conn_name_attr = "gcp_conn_id"
default_conn_name = "google_cloud_bigquery_default"
conn_type = "gcpbigquery"
hook_name = "Google Bigquery"
def __init__(
self,
gcp_conn_id: str = GoogleBaseHook.default_conn_name,
use_legacy_sql: bool = True,
location: str | None = None,
priority: str = "INTERACTIVE",
api_resource_configs: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
**kwargs,
) -> None:
if kwargs.get("delegate_to") is not None:
raise RuntimeError(
"The `delegate_to` parameter has been deprecated before and finally removed in this version"
" of Google Provider. You MUST convert it to `impersonate_chain`"
)
super().__init__(
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
)
self.use_legacy_sql = use_legacy_sql
self.location = location
self.priority = priority
self.running_job_id: str | None = None
self.api_resource_configs: dict = api_resource_configs if api_resource_configs else {}
self.labels = labels
self.credentials_path = "bigquery_hook_credentials.json"
def get_conn(self) -> BigQueryConnection:
"""Get a BigQuery PEP 249 connection object."""
service = self.get_service()
return BigQueryConnection(
service=service,
project_id=self.project_id,
use_legacy_sql=self.use_legacy_sql,
location=self.location,
num_retries=self.num_retries,
hook=self,
)
def get_service(self) -> Resource:
"""Get a BigQuery service object. Deprecated."""
warnings.warn(
"This method will be deprecated. Please use `BigQueryHook.get_client` method",
AirflowProviderDeprecationWarning,
)
http_authorized = self._authorize()
return build("bigquery", "v2", http=http_authorized, cache_discovery=False)
def get_client(self, project_id: str | None = None, location: str | None = None) -> Client:
"""Get an authenticated BigQuery Client.
:param project_id: Project ID for the project which the client acts on behalf of.
:param location: Default location for jobs / datasets / tables.
"""
return Client(
client_info=CLIENT_INFO,
project=project_id,
location=location,
credentials=self.get_credentials(),
)
def get_uri(self) -> str:
"""Override from ``DbApiHook`` for ``get_sqlalchemy_engine()``."""
return f"bigquery://{self.project_id}"
def get_sqlalchemy_engine(self, engine_kwargs=None):
"""Create an SQLAlchemy engine object.
:param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`.
"""
if engine_kwargs is None:
engine_kwargs = {}
extras = self.get_connection(self.gcp_conn_id).extra_dejson
credentials_path = get_field(extras, "key_path")
if credentials_path:
return create_engine(self.get_uri(), credentials_path=credentials_path, **engine_kwargs)
keyfile_dict = get_field(extras, "keyfile_dict")
if keyfile_dict:
keyfile_content = keyfile_dict if isinstance(keyfile_dict, dict) else json.loads(keyfile_dict)
return create_engine(self.get_uri(), credentials_info=keyfile_content, **engine_kwargs)
try:
# 1. If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set
# ADC uses the service account key or configuration file that the variable points to.
# 2. If the environment variable GOOGLE_APPLICATION_CREDENTIALS isn't set
# ADC uses the service account that is attached to the resource that is running your code.
return create_engine(self.get_uri(), **engine_kwargs)
except Exception as e:
self.log.error(e)
raise AirflowException(
"For now, we only support instantiating SQLAlchemy engine by"
" using ADC or extra fields `key_path` and `keyfile_dict`."
)
def get_records(self, sql, parameters=None):
if self.location is None:
raise AirflowException("Need to specify 'location' to use BigQueryHook.get_records()")
return super().get_records(sql, parameters=parameters)
@staticmethod
def _resolve_table_reference(
table_resource: dict[str, Any],
project_id: str | None = None,
dataset_id: str | None = None,
table_id: str | None = None,
) -> dict[str, Any]:
try:
# Check if tableReference is present and is valid
TableReference.from_api_repr(table_resource["tableReference"])
except KeyError:
# Something is wrong so we try to build the reference
table_resource["tableReference"] = table_resource.get("tableReference", {})
values = [("projectId", project_id), ("tableId", table_id), ("datasetId", dataset_id)]
for key, value in values:
# Check if value is already present if no use the provided one
resolved_value = table_resource["tableReference"].get(key, value)
if not resolved_value:
# If there's no value in tableReference and provided one is None raise error
raise AirflowException(
f"Table resource is missing proper `tableReference` and `{key}` is None"
)
table_resource["tableReference"][key] = resolved_value
return table_resource
def insert_rows(
self,
table: Any,
rows: Any,
target_fields: Any = None,
commit_every: Any = 1000,
replace: Any = False,
**kwargs,
) -> None:
"""Insert rows.
Insertion is currently unsupported. Theoretically, you could use
BigQuery's streaming API to insert rows into a table, but this hasn't
been implemented.
"""
raise NotImplementedError()
def get_pandas_df(
self,
sql: str,
parameters: Iterable | Mapping[str, Any] | None = None,
dialect: str | None = None,
**kwargs,
) -> pd.DataFrame:
"""Get a Pandas DataFrame for the BigQuery results.
The DbApiHook method must be overridden because Pandas doesn't support
PEP 249 connections, except for SQLite.
.. seealso::
https://github.com/pandas-dev/pandas/blob/055d008615272a1ceca9720dc365a2abd316f353/pandas/io/sql.py#L415
https://github.com/pandas-dev/pandas/issues/6900
:param sql: The BigQuery SQL to execute.
:param parameters: The parameters to render the SQL query with (not
used, leave to override superclass method)
:param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL
defaults to use `self.use_legacy_sql` if not specified
:param kwargs: (optional) passed into pandas_gbq.read_gbq method
"""
if dialect is None:
dialect = "legacy" if self.use_legacy_sql else "standard"
credentials, project_id = self.get_credentials_and_project_id()
return read_gbq(sql, project_id=project_id, dialect=dialect, credentials=credentials, **kwargs)
@GoogleBaseHook.fallback_to_default_project_id
def table_exists(self, dataset_id: str, table_id: str, project_id: str) -> bool:
"""Check if a table exists in Google BigQuery.
:param project_id: The Google cloud project in which to look for the
table. The connection supplied to the hook must provide access to
the specified project.
:param dataset_id: The name of the dataset in which to look for the
table.
:param table_id: The name of the table to check the existence of.
"""
table_reference = TableReference(DatasetReference(project_id, dataset_id), table_id)
try:
self.get_client(project_id=project_id).get_table(table_reference)
return True
except NotFound:
return False
@GoogleBaseHook.fallback_to_default_project_id
def table_partition_exists(
self, dataset_id: str, table_id: str, partition_id: str, project_id: str
) -> bool:
"""Check if a partition exists in Google BigQuery.
:param project_id: The Google cloud project in which to look for the
table. The connection supplied to the hook must provide access to
the specified project.
:param dataset_id: The name of the dataset in which to look for the
table.
:param table_id: The name of the table to check the existence of.
:param partition_id: The name of the partition to check the existence of.
"""
table_reference = TableReference(DatasetReference(project_id, dataset_id), table_id)
try:
return partition_id in self.get_client(project_id=project_id).list_partitions(table_reference)
except NotFound:
return False
@GoogleBaseHook.fallback_to_default_project_id
def create_empty_table(
self,
project_id: str | None = None,
dataset_id: str | None = None,
table_id: str | None = None,
table_resource: dict[str, Any] | None = None,
schema_fields: list | None = None,
time_partitioning: dict | None = None,
cluster_fields: list[str] | None = None,
labels: dict | None = None,
view: dict | None = None,
materialized_view: dict | None = None,
encryption_configuration: dict | None = None,
retry: Retry = DEFAULT_RETRY,
location: str | None = None,
exists_ok: bool = True,
) -> Table:
"""Create a new, empty table in the dataset.
To create a view, which is defined by a SQL query, parse a dictionary to
the *view* argument.
:param project_id: The project to create the table into.
:param dataset_id: The dataset to create the table into.
:param table_id: The Name of the table to be created.
:param table_resource: Table resource as described in documentation:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table
If provided all other parameters are ignored.
:param schema_fields: If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
.. code-block:: python
schema_fields = [
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
]
:param labels: a dictionary containing labels for the table, passed to BigQuery
:param retry: Optional. How to retry the RPC.
:param time_partitioning: configure optional time partitioning fields i.e.
partition by field, type and expiration as per API specifications.
.. seealso::
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning
:param cluster_fields: [Optional] The fields used for clustering.
BigQuery supports clustering for both partitioned and
non-partitioned tables.
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#clustering.fields
:param view: [Optional] A dictionary containing definition for the view.
If set, it will create a view instead of a table:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
.. code-block:: python
view = {
"query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 1000",
"useLegacySql": False,
}
:param materialized_view: [Optional] The materialized view definition.
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
.. code-block:: python
encryption_configuration = {
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key",
}
:param num_retries: Maximum number of retries in case of connection problems.
:param location: (Optional) The geographic location where the table should reside.
:param exists_ok: If ``True``, ignore "already exists" errors when creating the table.
:return: Created table
"""
_table_resource: dict[str, Any] = {}
if self.location:
_table_resource["location"] = self.location
if schema_fields:
_table_resource["schema"] = {"fields": schema_fields}
if time_partitioning:
_table_resource["timePartitioning"] = time_partitioning
if cluster_fields:
_table_resource["clustering"] = {"fields": cluster_fields}
if labels:
_table_resource["labels"] = labels
if view:
_table_resource["view"] = view
if materialized_view:
_table_resource["materializedView"] = materialized_view
if encryption_configuration:
_table_resource["encryptionConfiguration"] = encryption_configuration
table_resource = table_resource or _table_resource
table_resource = self._resolve_table_reference(
table_resource=table_resource,
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
)
table = Table.from_api_repr(table_resource)
return self.get_client(project_id=project_id, location=location).create_table(
table=table, exists_ok=exists_ok, retry=retry
)
@GoogleBaseHook.fallback_to_default_project_id
def create_empty_dataset(
self,
dataset_id: str | None = None,
project_id: str | None = None,
location: str | None = None,
dataset_reference: dict[str, Any] | None = None,
exists_ok: bool = True,
) -> dict[str, Any]:
"""Create a new empty dataset.
.. seealso:: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
:param project_id: The name of the project where we want to create
an empty a dataset. Don't need to provide, if projectId in dataset_reference.
:param dataset_id: The id of dataset. Don't need to provide, if datasetId in dataset_reference.
:param location: (Optional) The geographic location where the dataset should reside.
There is no default value but the dataset will be created in US if nothing is provided.
:param dataset_reference: Dataset reference that could be provided with request body. More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:param exists_ok: If ``True``, ignore "already exists" errors when creating the dataset.
"""
dataset_reference = dataset_reference or {}
if "datasetReference" not in dataset_reference:
dataset_reference["datasetReference"] = {}
for param, value in zip(["datasetId", "projectId"], [dataset_id, project_id]):
specified_param = dataset_reference["datasetReference"].get(param)
if specified_param:
if value:
self.log.info(
"`%s` was provided in both `dataset_reference` and as `%s`. "
"Using value from `dataset_reference`",
param,
convert_camel_to_snake(param),
)
continue # use specified value
if not value:
raise ValueError(
f"Please specify `{param}` either in `dataset_reference` "
f"or by providing `{convert_camel_to_snake(param)}`",
)
# dataset_reference has no param but we can fallback to default value
self.log.info(
"%s was not specified in `dataset_reference`. Will use default value %s.", param, value
)
dataset_reference["datasetReference"][param] = value
location = location or self.location
project_id = project_id or self.project_id
if location:
dataset_reference["location"] = dataset_reference.get("location", location)
dataset: Dataset = Dataset.from_api_repr(dataset_reference)
self.log.info("Creating dataset: %s in project: %s ", dataset.dataset_id, dataset.project)
dataset_object = self.get_client(project_id=project_id, location=location).create_dataset(
dataset=dataset, exists_ok=exists_ok
)
self.log.info("Dataset created successfully.")
return dataset_object.to_api_repr()
@GoogleBaseHook.fallback_to_default_project_id
def get_dataset_tables(
self,
dataset_id: str,
project_id: str | None = None,
max_results: int | None = None,
retry: Retry = DEFAULT_RETRY,
) -> list[dict[str, Any]]:
"""Get the list of tables for a given dataset.
For more information, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
:param dataset_id: the dataset ID of the requested dataset.
:param project_id: (Optional) the project of the requested dataset. If None,
self.project_id will be used.
:param max_results: (Optional) the maximum number of tables to return.
:param retry: How to retry the RPC.
:return: List of tables associated with the dataset.
"""
self.log.info("Start getting tables list from dataset: %s.%s", project_id, dataset_id)
tables = self.get_client().list_tables(
dataset=DatasetReference(project=project_id, dataset_id=dataset_id),
max_results=max_results,
retry=retry,
)
# Convert to a list (consumes all values)
return [t.reference.to_api_repr() for t in tables]
@GoogleBaseHook.fallback_to_default_project_id
def delete_dataset(
self,
dataset_id: str,
project_id: str | None = None,
delete_contents: bool = False,
retry: Retry = DEFAULT_RETRY,
) -> None:
"""Delete a dataset of Big query in your project.
:param project_id: The name of the project where we have the dataset.
:param dataset_id: The dataset to be delete.
:param delete_contents: If True, delete all the tables in the dataset.
If False and the dataset contains tables, the request will fail.
:param retry: How to retry the RPC.
"""
self.log.info("Deleting from project: %s Dataset:%s", project_id, dataset_id)
self.get_client(project_id=project_id).delete_dataset(
dataset=DatasetReference(project=project_id, dataset_id=dataset_id),
delete_contents=delete_contents,
retry=retry,
not_found_ok=True,
)
@GoogleBaseHook.fallback_to_default_project_id
def create_external_table(
self,
external_project_dataset_table: str,
schema_fields: list,
source_uris: list,
source_format: str = "CSV",
autodetect: bool = False,
compression: str = "NONE",
ignore_unknown_values: bool = False,
max_bad_records: int = 0,
skip_leading_rows: int = 0,
field_delimiter: str = ",",
quote_character: str | None = None,
allow_quoted_newlines: bool = False,
allow_jagged_rows: bool = False,
encoding: str = "UTF-8",
src_fmt_configs: dict | None = None,
labels: dict | None = None,
description: str | None = None,
encryption_configuration: dict | None = None,
location: str | None = None,
project_id: str | None = None,
) -> Table:
"""Create an external table in the dataset with data from Google Cloud Storage.
.. seealso:: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
This method is deprecated. Please use :func:`.create_empty_table` with
the ``table_resource`` object. See function documentation for more
details about these parameters.
:param external_project_dataset_table:
The dotted ``(<project>.|<project>:)<dataset>.<table>($<partition>)`` BigQuery
table name to create external table.
If ``<project>`` is not included, project will be the
project defined in the connection json.
:param schema_fields: The schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
:param source_uris: The source Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild
per-object name can be used.
:param source_format: File format to export.
:param autodetect: Try to detect schema and format options automatically.
Any option specified explicitly will be honored.
:param compression: [Optional] The compression type of the data source.
Possible values include GZIP and NONE.
The default value is NONE.
This setting is ignored for Google Cloud Bigtable,
Google Cloud Datastore backups and Avro formats.
:param ignore_unknown_values: [Optional] Indicates if BigQuery should allow
extra values that are not represented in the table schema.
If true, the extra values are ignored. If false, records with extra columns
are treated as bad records, and if there are too many bad records, an
invalid error is returned in the job result.
:param max_bad_records: The maximum number of bad records that BigQuery can
ignore when running the job.
:param skip_leading_rows: Number of rows to skip when loading from a CSV.
:param field_delimiter: The delimiter to use when loading from a CSV.
:param quote_character: The value that is used to quote data sections in a CSV
file.
:param allow_quoted_newlines: Whether to allow quoted newlines (true) or not
(false).
:param allow_jagged_rows: Accept rows that are missing trailing optional columns.
The missing values are treated as nulls. If false, records with missing
trailing columns are treated as bad records, and if there are too many bad
records, an invalid error is returned in the job result. Only applicable when
source_format is CSV.
:param encoding: The character encoding of the data. See:
.. seealso::
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding
:param src_fmt_configs: configure optional fields specific to the source format
:param labels: A dictionary containing labels for the BiqQuery table.
:param description: A string containing the description for the BigQuery table.
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
.. code-block:: python
encryption_configuration = {
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key",
}
"""
warnings.warn(
"This method is deprecated. Please use `BigQueryHook.create_empty_table` method with "
"passing the `table_resource` object. This gives more flexibility than this method.",
AirflowProviderDeprecationWarning,
)
location = location or self.location
src_fmt_configs = src_fmt_configs or {}
source_format = source_format.upper()
compression = compression.upper()
external_config_api_repr = {
"autodetect": autodetect,
"sourceFormat": source_format,
"sourceUris": source_uris,
"compression": compression,
"ignoreUnknownValues": ignore_unknown_values,
}
# if following fields are not specified in src_fmt_configs,
# honor the top-level params for backward-compatibility
backward_compatibility_configs = {
"skipLeadingRows": skip_leading_rows,
"fieldDelimiter": field_delimiter,
"quote": quote_character,
"allowQuotedNewlines": allow_quoted_newlines,
"allowJaggedRows": allow_jagged_rows,
"encoding": encoding,
}
src_fmt_to_param_mapping = {"CSV": "csvOptions", "GOOGLE_SHEETS": "googleSheetsOptions"}
src_fmt_to_configs_mapping = {
"csvOptions": [
"allowJaggedRows",
"allowQuotedNewlines",
"fieldDelimiter",
"skipLeadingRows",
"quote",
"encoding",
],
"googleSheetsOptions": ["skipLeadingRows"],
}
if source_format in src_fmt_to_param_mapping.keys():
valid_configs = src_fmt_to_configs_mapping[src_fmt_to_param_mapping[source_format]]
src_fmt_configs = _validate_src_fmt_configs(
source_format, src_fmt_configs, valid_configs, backward_compatibility_configs
)
external_config_api_repr[src_fmt_to_param_mapping[source_format]] = src_fmt_configs
# build external config
external_config = ExternalConfig.from_api_repr(external_config_api_repr)
if schema_fields:
external_config.schema = [SchemaField.from_api_repr(f) for f in schema_fields]
if max_bad_records:
external_config.max_bad_records = max_bad_records
# build table definition
table = Table(table_ref=TableReference.from_string(external_project_dataset_table, project_id))
table.external_data_configuration = external_config
if labels:
table.labels = labels
if description:
table.description = description
if encryption_configuration:
table.encryption_configuration = EncryptionConfiguration.from_api_repr(encryption_configuration)
self.log.info("Creating external table: %s", external_project_dataset_table)
table_object = self.create_empty_table(
table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=True
)
self.log.info("External table created successfully: %s", external_project_dataset_table)
return table_object
@GoogleBaseHook.fallback_to_default_project_id
def update_table(
self,
table_resource: dict[str, Any],
fields: list[str] | None = None,
dataset_id: str | None = None,
table_id: str | None = None,
project_id: str | None = None,
) -> dict[str, Any]:
"""Change some fields of a table.
Use ``fields`` to specify which fields to update. At least one field
must be provided. If a field is listed in ``fields`` and is ``None``
in ``table``, the field value will be deleted.
If ``table.etag`` is not ``None``, the update will only succeed if
the table on the server has the same ETag. Thus reading a table with
``get_table``, changing its fields, and then passing it to
``update_table`` will ensure that the changes will only be saved if
no modifications to the table occurred since the read.
:param project_id: The project to create the table into.
:param dataset_id: The dataset to create the table into.
:param table_id: The Name of the table to be created.
:param table_resource: Table resource as described in documentation:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table
The table has to contain ``tableReference`` or ``project_id``, ``dataset_id`` and ``table_id``
have to be provided.
:param fields: The fields of ``table`` to change, spelled as the Table
properties (e.g. "friendly_name").
"""
fields = fields or list(table_resource.keys())
table_resource = self._resolve_table_reference(
table_resource=table_resource, project_id=project_id, dataset_id=dataset_id, table_id=table_id
)
table = Table.from_api_repr(table_resource)
self.log.info("Updating table: %s", table_resource["tableReference"])
table_object = self.get_client(project_id=project_id).update_table(table=table, fields=fields)
self.log.info("Table %s.%s.%s updated successfully", project_id, dataset_id, table_id)
return table_object.to_api_repr()
@GoogleBaseHook.fallback_to_default_project_id
def patch_table(
self,
dataset_id: str,
table_id: str,
project_id: str | None = None,
description: str | None = None,
expiration_time: int | None = None,
external_data_configuration: dict | None = None,
friendly_name: str | None = None,
labels: dict | None = None,
schema: list | None = None,
time_partitioning: dict | None = None,
view: dict | None = None,
require_partition_filter: bool | None = None,
encryption_configuration: dict | None = None,
) -> None:
"""Patch information in an existing table.
It only updates fields that are provided in the request object. This
method is deprecated. Please use :func:`.update_table` instead.
Reference: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/patch
:param dataset_id: The dataset containing the table to be patched.
:param table_id: The Name of the table to be patched.
:param project_id: The project containing the table to be patched.
:param description: [Optional] A user-friendly description of this table.
:param expiration_time: [Optional] The time when this table expires,
in milliseconds since the epoch.
:param external_data_configuration: [Optional] A dictionary containing
properties of a table stored outside of BigQuery.
:param friendly_name: [Optional] A descriptive name for this table.
:param labels: [Optional] A dictionary containing labels associated with this table.
:param schema: [Optional] If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
The supported schema modifications and unsupported schema modification are listed here:
https://cloud.google.com/bigquery/docs/managing-table-schemas
.. code-block:: python
schema = [
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
]
:param time_partitioning: [Optional] A dictionary containing time-based partitioning
definition for the table.
:param view: [Optional] A dictionary containing definition for the view.
If set, it will patch a view instead of a table:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
.. code-block:: python
view = {
"query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 500",
"useLegacySql": False,
}
:param require_partition_filter: [Optional] If true, queries over the this table require a
partition filter. If false, queries over the table
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
.. code-block:: python
encryption_configuration = {
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key",
}
"""
warnings.warn(
"This method is deprecated, please use ``BigQueryHook.update_table`` method.",
AirflowProviderDeprecationWarning,
)
table_resource: dict[str, Any] = {}
if description is not None:
table_resource["description"] = description
if expiration_time is not None:
table_resource["expirationTime"] = expiration_time
if external_data_configuration:
table_resource["externalDataConfiguration"] = external_data_configuration
if friendly_name is not None:
table_resource["friendlyName"] = friendly_name
if labels:
table_resource["labels"] = labels
if schema:
table_resource["schema"] = {"fields": schema}
if time_partitioning:
table_resource["timePartitioning"] = time_partitioning
if view:
table_resource["view"] = view
if require_partition_filter is not None:
table_resource["requirePartitionFilter"] = require_partition_filter
if encryption_configuration:
table_resource["encryptionConfiguration"] = encryption_configuration
self.update_table(
table_resource=table_resource,
fields=list(table_resource.keys()),
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
)
@GoogleBaseHook.fallback_to_default_project_id
def insert_all(
self,
project_id: str,
dataset_id: str,
table_id: str,
rows: list,
ignore_unknown_values: bool = False,
skip_invalid_rows: bool = False,
fail_on_error: bool = False,
) -> None:
"""Stream data into BigQuery one record at a time without a load job.
.. seealso::
For more information, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
:param project_id: The name of the project where we have the table
:param dataset_id: The name of the dataset where we have the table
:param table_id: The name of the table
:param rows: the rows to insert
.. code-block:: python
rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
:param ignore_unknown_values: [Optional] Accept rows that contain values
that do not match the schema. The unknown values are ignored.
The default value is false, which treats unknown values as errors.
:param skip_invalid_rows: [Optional] Insert all valid rows of a request,
even if invalid rows exist. The default value is false, which causes
the entire request to fail if any invalid rows exist.
:param fail_on_error: [Optional] Force the task to fail if any errors occur.
The default value is false, which indicates the task should not fail
even if any insertion errors occur.
"""
self.log.info("Inserting %s row(s) into table %s:%s.%s", len(rows), project_id, dataset_id, table_id)
table_ref = TableReference(dataset_ref=DatasetReference(project_id, dataset_id), table_id=table_id)
bq_client = self.get_client(project_id=project_id)
table = bq_client.get_table(table_ref)
errors = bq_client.insert_rows(
table=table,
rows=rows,
ignore_unknown_values=ignore_unknown_values,
skip_invalid_rows=skip_invalid_rows,
)
if errors:
error_msg = f"{len(errors)} insert error(s) occurred. Details: {errors}"
self.log.error(error_msg)
if fail_on_error:
raise AirflowException(f"BigQuery job failed. Error was: {error_msg}")
else:
self.log.info("All row(s) inserted successfully: %s:%s.%s", project_id, dataset_id, table_id)
@GoogleBaseHook.fallback_to_default_project_id
def update_dataset(
self,
fields: Sequence[str],
dataset_resource: dict[str, Any],
dataset_id: str | None = None,
project_id: str | None = None,
retry: Retry = DEFAULT_RETRY,
) -> Dataset:
"""Change some fields of a dataset.
Use ``fields`` to specify which fields to update. At least one field
must be provided. If a field is listed in ``fields`` and is ``None`` in
``dataset``, it will be deleted.
If ``dataset.etag`` is not ``None``, the update will only
succeed if the dataset on the server has the same ETag. Thus
reading a dataset with ``get_dataset``, changing its fields,
and then passing it to ``update_dataset`` will ensure that the changes
will only be saved if no modifications to the dataset occurred
since the read.
:param dataset_resource: Dataset resource that will be provided
in request body.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:param dataset_id: The id of the dataset.
:param fields: The properties of ``dataset`` to change (e.g. "friendly_name").
:param project_id: The Google Cloud Project ID
:param retry: How to retry the RPC.
"""
dataset_resource["datasetReference"] = dataset_resource.get("datasetReference", {})
for key, value in zip(["datasetId", "projectId"], [dataset_id, project_id]):
spec_value = dataset_resource["datasetReference"].get(key)
if value and not spec_value:
dataset_resource["datasetReference"][key] = value
self.log.info("Start updating dataset")
dataset = self.get_client(project_id=project_id).update_dataset(
dataset=Dataset.from_api_repr(dataset_resource),
fields=fields,
retry=retry,
)
self.log.info("Dataset successfully updated: %s", dataset)
return dataset
def patch_dataset(self, dataset_id: str, dataset_resource: dict, project_id: str | None = None) -> dict:
"""Patches information in an existing dataset.
It only replaces fields that are provided in the submitted dataset resource.
This method is deprecated. Please use :func:`.update_dataset` instead.
More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch
:param dataset_id: The BigQuery Dataset ID
:param dataset_resource: Dataset resource that will be provided
in request body.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:param project_id: The Google Cloud Project ID
"""
warnings.warn(
"This method is deprecated. Please use ``update_dataset``.", AirflowProviderDeprecationWarning
)
project_id = project_id or self.project_id
if not dataset_id or not isinstance(dataset_id, str):
raise ValueError(
f"dataset_id argument must be provided and has a type 'str'. You provided: {dataset_id}"
)
service = self.get_service()
dataset_project_id = project_id or self.project_id
self.log.info("Start patching dataset: %s:%s", dataset_project_id, dataset_id)
dataset = (
service.datasets()
.patch(
datasetId=dataset_id,
projectId=dataset_project_id,
body=dataset_resource,
)
.execute(num_retries=self.num_retries)
)
self.log.info("Dataset successfully patched: %s", dataset)
return dataset
def get_dataset_tables_list(
self,
dataset_id: str,
project_id: str | None = None,
table_prefix: str | None = None,
max_results: int | None = None,
) -> list[dict[str, Any]]:
"""List tables of a BigQuery dataset.
If a table prefix is specified, only tables beginning by it are
returned. This method is deprecated. Please use
:func:`.get_dataset_tables` instead.
For more information, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list