Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partition_columns in BaseSQLToGCSOperator #28677

Merged
merged 4 commits into from
Jan 10, 2023

Conversation

vchiapaikeo
Copy link
Contributor

@vchiapaikeo vchiapaikeo commented Jan 2, 2023

closes: #21537

Context

This PR adds support for partition_columns in BaseSQLToGCSOperator. We partition files using Hive style partitioning structure for all export types (csv, parquet, json). We assume the data is presorted (user has provided an ORDER BY clause in their SQL statement) and then write files based on changes in values to those partition columns.

Testing

As a simple test case, I created a test dag that writes data from the Airflow metadata DB (postgres) to GCS:

from airflow import DAG

from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "retries": 1,
    "retry_delay": 10,
    "start_date": "2022-08-01",
}

with DAG(
    max_active_runs=1,
    concurrency=2,
    catchup=False,
    schedule_interval="@daily",
    dag_id="test_postgres",
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    test_postgres_to_gcs = PostgresToGCSOperator(
        task_id="test_postgres_to_gcs",
        postgres_conn_id="postgres_default",
        sql="""
        SELECT
            conn_id,
            conn_type
        FROM connection
        ORDER BY conn_id
        LIMIT 10
        """,
        export_format="csv",
        gcp_conn_id="google_cloud_default",
        bucket="my-bucket",
        filename="vchiapaikeo/sql-to-gcs/connections/csv/file_{}.csv",
        stringify_dict=True,
        partition_columns=["conn_id"],
    )

Ran the dag:

image

Task Logs:

[2023-01-02, 14:45:04 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_postgres.test_postgres_to_gcs scheduled__2023-01-01T00:00:00+00:00 [queued]>
[2023-01-02, 14:45:04 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_postgres.test_postgres_to_gcs scheduled__2023-01-01T00:00:00+00:00 [queued]>
[2023-01-02, 14:45:04 UTC] {taskinstance.py:1282} INFO - 
--------------------------------------------------------------------------------
[2023-01-02, 14:45:04 UTC] {taskinstance.py:1283} INFO - Starting attempt 9 of 10
[2023-01-02, 14:45:04 UTC] {taskinstance.py:1284} INFO - 
--------------------------------------------------------------------------------
[2023-01-02, 14:45:04 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2023-01-01 00:00:00+00:00
[2023-01-02, 14:45:04 UTC] {standard_task_runner.py:55} INFO - Started process 339 to run task
[2023-01-02, 14:45:04 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_postgres', 'test_postgres_to_gcs', 'scheduled__2023-01-01T00:00:00+00:00', '--job-id', '13', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmp2yqzsfli']
[2023-01-02, 14:45:04 UTC] {standard_task_runner.py:83} INFO - Job 13: Subtask test_postgres_to_gcs
[2023-01-02, 14:45:04 UTC] {task_command.py:389} INFO - Running <TaskInstance: test_postgres.test_postgres_to_gcs scheduled__2023-01-01T00:00:00+00:00 [running]> on host 29f49d0114bf
[2023-01-02, 14:45:04 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_postgres
AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2023-01-01T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=9
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2023-01-01T00:00:00+00:00
[2023-01-02, 14:45:04 UTC] {sql_to_gcs.py:150} INFO - Found partition columns: conn_id. Assuming the SQL statement is properly sorted by these columns in ascending or descending order.
[2023-01-02, 14:45:04 UTC] {sql_to_gcs.py:155} INFO - Executing query
[2023-01-02, 14:45:04 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
[2023-01-02, 14:45:04 UTC] {sql_to_gcs.py:174} INFO - Writing local data files
[2023-01-02, 14:45:04 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #0 to GCS.
[2023-01-02, 14:45:04 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:04 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:04 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:05 UTC] {gcs.py:520} INFO - File /tmp/tmp8peeg79v uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=***_db/file_0.csv in my-bucket bucket
[2023-01-02, 14:45:05 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:05 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #1 to GCS.
[2023-01-02, 14:45:05 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:05 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:05 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:05 UTC] {gcs.py:520} INFO - File /tmp/tmpofe2t8yt uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=aws_default/file_1.csv in my-bucket bucket
[2023-01-02, 14:45:05 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:05 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #2 to GCS.
[2023-01-02, 14:45:05 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:05 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:05 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:06 UTC] {gcs.py:520} INFO - File /tmp/tmpcouq2z3s uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_batch_default/file_2.csv in my-bucket bucket
[2023-01-02, 14:45:06 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:06 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #3 to GCS.
[2023-01-02, 14:45:06 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:06 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:06 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:06 UTC] {gcs.py:520} INFO - File /tmp/tmp1dfyd7zb uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_cosmos_default/file_3.csv in my-bucket bucket
[2023-01-02, 14:45:06 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:06 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #4 to GCS.
[2023-01-02, 14:45:06 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:06 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:06 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:07 UTC] {gcs.py:520} INFO - File /tmp/tmpdeseikz8 uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_data_explorer_default/file_4.csv in my-bucket bucket
[2023-01-02, 14:45:07 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:07 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #5 to GCS.
[2023-01-02, 14:45:07 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:07 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:07 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:07 UTC] {gcs.py:520} INFO - File /tmp/tmpsrcp1til uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_data_lake_default/file_5.csv in my-bucket bucket
[2023-01-02, 14:45:07 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:07 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #6 to GCS.
[2023-01-02, 14:45:07 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:07 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:07 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:08 UTC] {gcs.py:520} INFO - File /tmp/tmptaiv3xrk uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_default/file_6.csv in my-bucket bucket
[2023-01-02, 14:45:08 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:08 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #7 to GCS.
[2023-01-02, 14:45:08 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:08 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:08 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:08 UTC] {gcs.py:520} INFO - File /tmp/tmpzc7qd0q6 uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=cassandra_default/file_7.csv in my-bucket bucket
[2023-01-02, 14:45:08 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:08 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #8 to GCS.
[2023-01-02, 14:45:08 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:08 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:08 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:08 UTC] {gcs.py:520} INFO - File /tmp/tmpdwo4672n uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=databricks_default/file_8.csv in my-bucket bucket
[2023-01-02, 14:45:08 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:08 UTC] {sql_to_gcs.py:180} INFO - Uploading chunk file #9 to GCS.
[2023-01-02, 14:45:08 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2023-01-02, 14:45:08 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-02, 14:45:08 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2023-01-02, 14:45:09 UTC] {gcs.py:520} INFO - File /tmp/tmp4gj83eg5 uploaded to vchiapaikeo/sql-to-gcs/connections/csv/conn_id=dingding_default/file_9.csv in my-bucket bucket
[2023-01-02, 14:45:09 UTC] {sql_to_gcs.py:183} INFO - Removing local file
[2023-01-02, 14:45:09 UTC] {taskinstance.py:1326} INFO - Marking task as SUCCESS. dag_id=test_postgres, task_id=test_postgres_to_gcs, execution_date=20230101T000000, start_date=20230102T144504, end_date=20230102T144509
[2023-01-02, 14:45:09 UTC] {local_task_job.py:208} INFO - Task exited with return code 0
[2023-01-02, 14:45:09 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check

Check GCS:

vchiapaikeo@7676:airflow-src (vchiapaikeo/sql-paritions-v2 =)$ gsutil ls -R gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/:

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=airflow_db/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=airflow_db/file_0.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=aws_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=aws_default/file_1.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_batch_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_batch_default/file_2.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_cosmos_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_cosmos_default/file_3.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_data_explorer_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_data_explorer_default/file_4.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_data_lake_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_data_lake_default/file_5.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=azure_default/file_6.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=cassandra_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=cassandra_default/file_7.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=databricks_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=databricks_default/file_8.csv

gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=dingding_default/:
gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=dingding_default/file_9.csv

Inspect a file:

vchiapaikeo@7676:airflow-src (vchiapaikeo/sql-paritions-v2 =)$ gsutil cat gs://my-bucket/vchiapaikeo/sql-to-gcs/connections/csv/conn_id=dingding_default/file_9.csv
conn_id,conn_type
dingding_default,http

cc @eladkal


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Jan 2, 2023
@vchiapaikeo vchiapaikeo force-pushed the vchiapaikeo/sql-paritions-v2 branch 2 times, most recently from 6d52ade to fb3fd4a Compare January 2, 2023 14:53
@vchiapaikeo vchiapaikeo marked this pull request as ready for review January 2, 2023 14:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

add partition option for parquet files by columns in BaseSQLToGCSOperator
2 participants