Skip to content

GCSToBigQueryOperator no longer loads DATASTORE_BACKUP formats #28513

@watertree

Description

@watertree

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

airflow@airflow-worker-XXXXXX-XXXXXX:~$ pip freeze | grep google-cloud 
google-cloud-aiplatform @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_aiplatform-1.16.1-py2.py3-none-any.whl
google-cloud-appengine-logging @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_appengine_logging-1.1.3-py2.py3-none-any.whl
google-cloud-audit-log @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_audit_log-0.2.4-py2.py3-none-any.whl
google-cloud-automl @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_automl-2.8.0-py2.py3-none-any.whl
google-cloud-bigquery @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery-2.34.4-py2.py3-none-any.whl
google-cloud-bigquery-datatransfer @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery_datatransfer-3.7.0-py2.py3-none-any.whl
google-cloud-bigquery-storage @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery_storage-2.14.1-py2.py3-none-any.whl
google-cloud-bigtable @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigtable-1.7.3-py2.py3-none-any.whl
google-cloud-build @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_build-3.9.0-py2.py3-none-any.whl
google-cloud-common @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_common-1.0.3-py2.py3-none-any.whl
google-cloud-compute @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_compute-0.7.0-py2.py3-none-any.whl
google-cloud-container @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_container-2.11.1-py2.py3-none-any.whl
google-cloud-core @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_core-2.3.2-py2.py3-none-any.whl
google-cloud-datacatalog @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog-3.9.0-py2.py3-none-any.whl
google-cloud-datacatalog-lineage @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog_lineage-0.1.6-py3-none-any.whl
google-cloud-datacatalog-lineage-producer-client @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog_lineage_producer_client-0.0.9-py3-none-any.whl
google-cloud-dataform @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataform-0.2.0-py2.py3-none-any.whl
google-cloud-dataplex @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataplex-1.1.0-py2.py3-none-any.whl
google-cloud-dataproc @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataproc-5.0.0-py2.py3-none-any.whl
google-cloud-dataproc-metastore @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataproc_metastore-1.6.0-py2.py3-none-any.whl
google-cloud-datastore @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datastore-2.8.0-py2.py3-none-any.whl
google-cloud-dlp @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dlp-1.0.2-py2.py3-none-any.whl
google-cloud-filestore @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_filestore-1.2.0-py2.py3-none-any.whl
google-cloud-firestore @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_firestore-2.5.0-py2.py3-none-any.whl
google-cloud-kms @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_kms-2.12.0-py2.py3-none-any.whl
google-cloud-language @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_language-1.3.2-py2.py3-none-any.whl
google-cloud-logging @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_logging-3.2.1-py2.py3-none-any.whl
google-cloud-memcache @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_memcache-1.4.1-py2.py3-none-any.whl
google-cloud-monitoring @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_monitoring-2.11.0-py2.py3-none-any.whl
google-cloud-orchestration-airflow @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_orchestration_airflow-1.4.1-py2.py3-none-any.whl
google-cloud-os-login @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_os_login-2.7.1-py2.py3-none-any.whl
google-cloud-pubsub @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_pubsub-2.13.4-py2.py3-none-any.whl
google-cloud-pubsublite @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_pubsublite-0.6.1-py2.py3-none-any.whl
google-cloud-redis @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_redis-2.9.0-py2.py3-none-any.whl
google-cloud-resource-manager @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_resource_manager-1.6.0-py2.py3-none-any.whl
google-cloud-secret-manager @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_secret_manager-1.0.2-py2.py3-none-any.whl
google-cloud-spanner @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_spanner-1.19.3-py2.py3-none-any.whl
google-cloud-speech @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_speech-1.3.4-py2.py3-none-any.whl
google-cloud-storage @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_storage-2.6.0-py2.py3-none-any.whl
google-cloud-tasks @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_tasks-2.10.1-py2.py3-none-any.whl
google-cloud-texttospeech @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_texttospeech-1.0.3-py2.py3-none-any.whl
google-cloud-translate @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_translate-1.7.2-py2.py3-none-any.whl
google-cloud-videointelligence @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_videointelligence-1.16.3-py2.py3-none-any.whl
google-cloud-vision @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_vision-1.0.2-py2.py3-none-any.whl
google-cloud-workflows @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_workflows-1.7.1-py2.py3-none-any.whl

Apache Airflow version

2.3.4

Operating System

Linux

Deployment

Composer

Deployment details

Cloud Composer: 1.20.2
Airflow: 2.3.4

What happened

GCSToBigQueryOperator was working properly on previous versions of Airflow/Composer, but started failing with encoding errors from erroneous attempts to get schemas. The error from the test case attached below:

*** Reading remote log from gs://us-central1-composer-bdcca446-bucket/logs/dag_id=load_datastore_backup_from_gcs_to_bq_bug/run_id=scheduled__2022-12-20T00:00:00+00:00/task_id=load_ds_backup_from_bq/attempt=1.log.
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1172} INFO - Dependencies all met for <TaskInstance: load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq scheduled__2022-12-20T00:00:00+00:00 [queued]>
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1172} INFO - Dependencies all met for <TaskInstance: load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq scheduled__2022-12-20T00:00:00+00:00 [queued]>
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1369} INFO - 
--------------------------------------------------------------------------------
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1370} INFO - Starting attempt 1 of 3
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1371} INFO - 
--------------------------------------------------------------------------------
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1390} INFO - Executing <Task(GCSToBigQueryOperator): load_ds_backup_from_bq> on 2022-12-20 00:00:00+00:00
[2022-12-21, 00:10:03 UTC] {standard_task_runner.py:52} INFO - Started process 4324 to run task
[2022-12-21, 00:10:03 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'load_datastore_backup_from_gcs_to_bq_bug', 'load_ds_backup_from_bq', 'scheduled__2022-12-20T00:00:00+00:00', '--job-id', '55473', '--raw', '--subdir', 'DAGS_FOLDER/gcs_datastore_bq_bug_dag.py', '--cfg-path', '/tmp/tmpum3kky0y', '--error-file', '/tmp/tmpl4da9d_3']
[2022-12-21, 00:10:03 UTC] {standard_task_runner.py:80} INFO - Job 55473: Subtask load_ds_backup_from_bq
[2022-12-21, 00:10:04 UTC] {task_command.py:375} INFO - Running <TaskInstance: load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq scheduled__2022-12-20T00:00:00+00:00 [running]> on host airflow-worker-594959469f-9dwhs
[2022-12-21, 00:10:04 UTC] {taskinstance.py:1583} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=load_datastore_backup_from_gcs_to_bq_bug
AIRFLOW_CTX_TASK_ID=load_ds_backup_from_bq
AIRFLOW_CTX_EXECUTION_DATE=2022-12-20T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-20T00:00:00+00:00
[2022-12-21, 00:10:04 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-21, 00:10:04 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-21, 00:10:04 UTC] {gcs_to_bigquery.py:367} INFO - Using existing BigQuery table for storing data...
[2022-12-21, 00:10:04 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-21, 00:10:04 UTC] {bigquery.py:2252} INFO - Project is not included in destination_project_dataset_table: ds.boog; using project "*REDACTED_GCP_PROJECT*"
[2022-12-21, 00:10:05 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-21, 00:10:05 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-21, 00:10:05 UTC] {taskinstance.py:1904} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 397, in execute
    self.configuration = self._check_schema_fields(self.configuration)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 542, in _check_schema_fields
    fields, values = [item.split(",") for item in blob.decode("utf-8").splitlines()][:2]
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xb7 in position 20: invalid start byte
[2022-12-21, 00:10:05 UTC] {taskinstance.py:1408} INFO - Marking task as UP_FOR_RETRY. dag_id=load_datastore_backup_from_gcs_to_bq_bug, task_id=load_ds_backup_from_bq, execution_date=20221220T000000, start_date=20221221T001003, end_date=20221221T001005
[2022-12-21, 00:10:05 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 55473 for task load_ds_backup_from_bq ('utf-8' codec can't decode byte 0xb7 in position 20: invalid start byte; 4324)
[2022-12-21, 00:10:05 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-12-21, 00:10:05 UTC] {local_task_job.py:279} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead

Should load table properly (was doing so previously and also manually when using BQ table creation)

How to reproduce

Sample below that can be unziped and copied into a cloud storage bucket, assuming everything in an airflow directory:

airflow.zip

Set up a desired bucket to send backup to, for example:

gsutil rsync -r airflow gs://bucket-name-here/airflow

Drop following DAG and replace variables marked for replacement:

from datetime import timedelta, datetime
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import DAG

yesterday = datetime.combine(
    datetime.today() - timedelta(1),
    datetime.min.time())

# Replace these values
BUCKET = 'bucket-name'
GCP_PROJECT='gcp-project'
DATASET_TABLE='dataset.table'

default_args = {
    'start_date': yesterday,
    'project_id': GCP_PROJECT
}

schedule_interval = '@once'

dag = DAG('load_datastore_backup_from_gcs_to_bq_bug',
          default_args=default_args,
          schedule_interval=schedule_interval)

start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

bq_load = GCSToBigQueryOperator(
    task_id='load_ds_backup_from_bq',
    source_format='DATASTORE_BACKUP',
    bucket=BUCKET,
    source_objects=[
        'airflow/namespace_airflow/kind_BugEntity/namespace_airflow_kind_BugEntity.export_metadata'],
    destination_project_dataset_table=DATASET_TABLE,
    write_disposition='WRITE_TRUNCATE',
    create_disposition='CREATE_IF_NEEDED',
    dag=dag
)

start >> bq_load
bq_load >> end

Anything else

Every time following upgrade. I believe I was running 2.25 before running a Composer managed upgrade but not sure but several DAGs stopped working as a result of this upgrade.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions