Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCSToBigQueryOperator - allow upload to existing table without specifying schema_fields/schema_object #12329

Closed
aKumpan opened this issue Nov 12, 2020 · 5 comments · Fixed by #28564
Labels
area:providers good first issue kind:feature Feature Requests provider:google Google (including GCP) related issues

Comments

@aKumpan
Copy link
Contributor

aKumpan commented Nov 12, 2020

Description

We would like to be able to load data to existing BigQuery tables without having to specify schema_fields/schema_object in GCSToBigQueryOperator since table already exists.

Use case / motivation

BigQuery load job usage details and problem explanation

We create BigQuery tables/datasets through CI process (terraform managed), with the help of Airflow we updating those tables with data.
To update tables with data we can use:
Airflow 2.0 operator: GCSToBigQueryOperator
Airflow 1.* operator (deprecated) GoogleCloudStorageToBigQueryOperator
However those operator require to specify one of 3 things:

  1. schema_fields - fields that define table schema
  2. schema_object - a GCS object path pointing to a .json file that contains the schema for the table
  3. or autodetect=True
    In other cases it will:
 raise ValueError('At least one of `schema_fields`, `schema_object`, '
                                 'or `autodetect**=True**` must be passed.')  

Note: it does not actually says that autodetect must be True in exception - but according to code it must be specified as True, or schema should be used otherwise.

But we already have created table, and we can update it using
bq load command. (which Airflow operators mentioned above are using internally)

When using bq load - you also have an option to specify schema. The schema can be a local JSON file, or it can be typed inline as part of the command. You can also use the --autodetect flag instead of supplying a schema definition.
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#bq

When you specify --autodetect as True - BigQuery will try to give random names to your columns, e.g.: 'string_field_0', 'int_field_1' - and if you are trying to load into existing table - bq load will fail with error:
'Cannot add fields (field: string_field_0)'}.'
Same way Airflow operators like 'GCSToBigQueryOperator' will fail.

However there is also an option NOT to specify --autodetect or specify --autodetect=False and in this case bq load will load from CloudStorage to existing BQ table without problems.

Proposal/TL;DR:
Add an option not to specify --autodetect or specify --autodetect=False when write_disposition='WRITE_APPEND' is used in GCSToBigQueryOperator. This will allow an operator to update existing BigQuery table without having to specify schema within the operator itself (it will just be updating existing table with data).

@aKumpan aKumpan added the kind:feature Feature Requests label Nov 12, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 12, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@turbaszek
Copy link
Member

turbaszek commented Nov 12, 2020

While this sounds like a good idea, I would recommend using BigQueryInsertJobOperator with load job type (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). This operator is closest to the BigQuery API and allows whole possibility of customisations (via job config).

@aKumpan
Copy link
Contributor Author

aKumpan commented Nov 12, 2020

BigQueryInsertJobOperator

Thats a good suggestion, with BashOperator and bq load command it works well too (on older versions of airflow).
I'd still like to fix BQ dedicated operator for this purpose.

@turbaszek
Copy link
Member

I'd still like to fix BQ dedicated operator for this purpose.

That would be good, but it will probably require to refactor this operator to use new methods, not the deprecated ones, see #10288

@vchiapaikeo
Copy link
Contributor

vchiapaikeo commented Dec 23, 2022

Hi @eladkal , I took a look at this issue and it seems like with this commit from @VladaZakharova a couple days ago, this is mostly working as expected. There is a small nit that is causing it not to work perfectly due to a check for self.autodetect being falsey as opposed to it being explicitly set to None. In fact, the Job docs from Google allude to it working this way but you kinda need to read between the lines a bit:

schema: object (TableSchema) Optional. The schema for the destination table. The schema can be omitted if the destination table already exists, or if you're loading data from Google Cloud Datastore.
autodetect: boolean Optional. Indicates if we should automatically infer the options and schema for CSV and JSON sources.

Once I patch this with PR 28564, it works fine. To verify, I tried this on my local setup with a simple dag:

from airflow import DAG

from etsy.operators.gcs_to_bigquery import GCSToBigQueryOperator

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "retries": 1,
    "retry_delay": 10,
    "start_date": "2022-08-01",
}

with DAG(
    max_active_runs=1,
    concurrency=2,
    catchup=False,
    schedule_interval="@daily",
    dag_id="test_os_patch_gcs_to_bigquery",
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    test_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="test_gcs_to_bigquery",
        create_disposition="CREATE_IF_NEEDED",
        # Need to explicitly set autodetect to None
        autodetect=None,
        write_disposition="WRITE_TRUNCATE",
        destination_project_dataset_table="my-project.vchiapaikeo.test1",
        bucket="my-bucket",
        source_format="CSV",
        source_objects=["vchiapaikeo/file.csv"],
    )

I then created a simple table in BigQuery:

image

And ran the dag:

image

image

Task logs:

[2022-12-23, 20:30:32 UTC] {taskinstance.py:1087} INFO - Dependencies all met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [queued]>
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1087} INFO - Dependencies all met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [queued]>
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1283} INFO - 
--------------------------------------------------------------------------------
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1284} INFO - Starting attempt 15 of 16
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1285} INFO - 
--------------------------------------------------------------------------------
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1304} INFO - Executing <Task(GCSToBigQueryOperator): test_gcs_to_bigquery> on 2022-12-22 00:00:00+00:00
[2022-12-23, 20:30:32 UTC] {standard_task_runner.py:55} INFO - Started process 5611 to run task
[2022-12-23, 20:30:32 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_os_patch_gcs_to_bigquery', 'test_gcs_to_bigquery', 'scheduled__2022-12-22T00:00:00+00:00', '--job-id', '17', '--raw', '--subdir', 'DAGS_FOLDER/dataeng/batch/test_os_patch_gcs_to_bigquery.py', '--cfg-path', '/tmp/tmpoxitwl1m']
[2022-12-23, 20:30:32 UTC] {standard_task_runner.py:83} INFO - Job 17: Subtask test_gcs_to_bigquery
[2022-12-23, 20:30:32 UTC] {task_command.py:389} INFO - Running <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [running]> on host f3b7042f4dc5
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1511} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_patch_gcs_to_bigquery
AIRFLOW_CTX_TASK_ID=test_gcs_to_bigquery
AIRFLOW_CTX_EXECUTION_DATE=2022-12-22T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=15
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-22T00:00:00+00:00
[2022-12-23, 20:30:32 UTC] {metastore.py:45} INFO - Default connection request. Checking conn_id google_cloud_gcp_data_platform
[2022-12-23, 20:30:32 UTC] {connection.py:210} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2022-12-23, 20:30:32 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-12-23, 20:30:32 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-23, 20:30:32 UTC] {gcs_to_bigquery.py:370} INFO - Using existing BigQuery table for storing data...
[2022-12-23, 20:30:32 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-23, 20:30:34 UTC] {gcs_to_bigquery.py:374} INFO - Executing: {'load': {'autodetect': None, 'createDisposition': 'CREATE_IF_NEEDED', 'destinationTable': {'projectId': 'my-project', 'datasetId': 'vchiapaikeo', 'tableId': 'test1'}, 'sourceFormat': 'CSV', 'sourceUris': ['gs://my-bucket/vchiapaikeo/file.csv'], 'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False, 'skipLeadingRows': None, 'fieldDelimiter': ',', 'quote': None, 'allowQuotedNewlines': False, 'encoding': 'UTF-8'}}
[2022-12-23, 20:30:34 UTC] {bigquery.py:1539} INFO - Inserting job airflow_test_os_patch_gcs_to_bigquery_test_gcs_to_bigquery_2022_12_22T00_00_00_00_00_8c90b0141a25c185bab829d91cc9a474
[2022-12-23, 20:30:37 UTC] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=test_os_patch_gcs_to_bigquery, task_id=test_gcs_to_bigquery, execution_date=20221222T000000, start_date=20221223T203032, end_date=20221223T203037
[2022-12-23, 20:30:37 UTC] {connection.py:210} WARNING - Connection schemes (type: datahub_rest) shall not contain '_' according to RFC3986.
[2022-12-23, 20:30:40 UTC] {local_task_job.py:159} INFO - Task exited with return code 0
[2022-12-23, 20:30:40 UTC] {taskinstance.py:2582} INFO - 0 downstream tasks scheduled from follow-on schedule check

^ omitted some redundant log lines

PR: #28564

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:feature Feature Requests provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants