Skip to content

Comments

don't convert date to iso string format if export format is parquet in PostgresToGCSOperator#25691

Closed
sanxore wants to merge 1 commit intoapache:mainfrom
sanxore:fix/PostgresToGCSOperator-date-type-for-parquet-export-format
Closed

don't convert date to iso string format if export format is parquet in PostgresToGCSOperator#25691
sanxore wants to merge 1 commit intoapache:mainfrom
sanxore:fix/PostgresToGCSOperator-date-type-for-parquet-export-format

Conversation

@sanxore
Copy link

@sanxore sanxore commented Aug 12, 2022

  • Issue description:
    In PostgresToGCSOperator operator we can't write DATE column when export_format="parquet" because at convert_type method the date data result from postgres are converted to string so pyarrow raise this exception : pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int

  • Solution:
    Don't convert date to string when export format is parquet.

  • Here my solution and screen of DagRun:

import datetime
import json
import os
import time
from decimal import Decimal

import pendulum
from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from common_v2.settings import GCS_BUCKET

DEFAULT_ARGS = {
    "start_date": datetime.datetime(2022, 8, 12),
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=1),
}


class PostgresToGCSFixedOperator(PostgresToGCSOperator):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def convert_type(self, value, schema_type, stringify_dict=True):
        """
        Takes a value from Postgres, and converts it to a value that's safe for
        JSON/Google Cloud Storage/BigQuery.
        Timezone aware Datetime are converted to UTC seconds.
        Unaware Datetime, Date and Time are converted to ISO formatted strings.
        Decimals are converted to floats.

        :param value: Postgres column value.
        :param schema_type: BigQuery data type.
        :param stringify_dict: Specify whether to convert dict to string.
        """
        if isinstance(value, datetime.datetime):
            iso_format_value = value.isoformat()
            if value.tzinfo is None:
                return iso_format_value
            return pendulum.parse(iso_format_value).float_timestamp

        if self.export_format != "parquet":
            if isinstance(value, datetime.date):
                return value.isoformat()
            if isinstance(value, datetime.time):
                formatted_time = time.strptime(str(value), "%H:%M:%S")
                time_delta = datetime.timedelta(
                    hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
                )
                return str(time_delta)

        if stringify_dict and isinstance(value, dict):
            return json.dumps(value)
        if isinstance(value, Decimal):
            return float(value)
        return value


with DAG(
    "date_parquet_issue",
    default_args=DEFAULT_ARGS,
    schedule_interval=None,
) as dag:
    date_pg_to_gcs_issue = PostgresToGCSOperator(
        task_id=f"date_pg_to_gcs_issue",
        postgres_conn_id="postgres",
        sql="SELECT CURRENT_DATE;",
        bucket=GCS_BUCKET,
        filename=os.path.join("data/date_pg_to_gcs_issue", "part_{}.parquet"),
        export_format="parquet",
    )

    date_pg_to_gcs_fixed = PostgresToGCSFixedOperator(
        task_id=f"date_pg_to_gcs_fixed",
        postgres_conn_id="postgres",
        sql="SELECT CURRENT_DATE;",
        bucket=GCS_BUCKET,
        filename=os.path.join("data/date_pg_to_gcs_fixed", "part_{}.parquet"),
        export_format="parquet",
    )

image

  • Fixed Operator write data to Parquet with the right format (physical_type: INT32, and logical_type: Date):
    image

@sanxore sanxore requested a review from turbaszek as a code owner August 12, 2022 14:30
@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Aug 12, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 12, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Contributor

@pingzh pingzh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you write a corresponding test for it? thanks

@pierrejeambrun
Copy link
Member

pierrejeambrun commented Aug 18, 2022

If I remember correctly we had to modify things to be able to export them to BigQuery with the correct field type.

  • You are updating the date case, do we also want not to isoformat datetime in this case ? here
  • Is is possible to confirm that these fields are getting exported with the correct type to bigquery with this modification ? (DATETIME, DATE and TIME)

@sanxore
Copy link
Author

sanxore commented Aug 18, 2022

If I remember correctly we had to modify things to be able to export them to BigQuery with the correct field type.

  • You are updating the date case, do we also want not to isoformat datetime in this case ? here
  • Is is possible to confirm that these fields are getting exported with the correct type to bigquery with this modification ? (DATETIME, DATE and TIME)
  • we have the issue with datetime too. ( I will add the fix to the PR )

  • This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )

Converting time, detetime and date to a string format had broken the Operator for parquet as export format

@pierrejeambrun
Copy link
Member

pierrejeambrun commented Aug 19, 2022

  • This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )

The schema generated by this operator use type that are safe for BigQuery (_write_local_schema_file will for instance use field_to_bigquery, using the underlying type_map mapping db types to BigQuery types). This makes sure we can load into BigQuery all data exported with a BaseSQLToGCSOperator.

Parquet has on top of that an additional mapping, for mapping BigQuery types to pyarrow types. (See _convert_parquet_schema).

I would expect parquet export to be successful when columns are dates, but also be able to import this into bigquery with a correct schema definition. (This is how it works for csv and json export format)

Note: I took a quick look, and couldn’t find what changed. On my bucket I found working extract from 19 April 2022, with PostgresToGCSOperator to parquet format with date and datetime in the schema. 🤔

@sanxore
Copy link
Author

sanxore commented Aug 19, 2022

If I remember correctly we had to modify things to be able to export them to BigQuery with the correct field type.

  • You are updating the date case, do we also want not to isoformat datetime in this case ? here
  • Is is possible to confirm that these fields are getting exported with the correct type to bigquery with this modification ? (DATETIME, DATE and TIME)
  • we have the issue with datetime too. ( I will add the fix to the PR )
  • This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )

Converting time, detetime and date to a string format had broken the Operator for parquet as export format

Yes i confirm this field will have the right type in BigQuery. If you see my terminal screenshot I parsed the schema of parquet file I exported.
The physical type is INT32 an logical type is DATE is exactly what's date for BigQuery in we use a bq load for managed and external table on parquet files

@sanxore
Copy link
Author

sanxore commented Aug 19, 2022

  • This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )

The schema generated by this operator use type that are safe for BigQuery (_write_local_schema_file will for instance use field_to_bigquery, using the underlying type_map mapping db types to BigQuery types). This makes sure we can load into BigQuery all data exported with a BaseSQLToGCSOperator.

Parquet has on top of that an additional mapping, for mapping BigQuery types to pyarrow types. (See _convert_parquet_schema).

I would expect parquet export to be successful when columns are dates, but also be able to import this into bigquery with a correct schema definition. (This is how it works for csv and json export format)

Note: I took a quick look, and couldn’t find what changed. On my bucket I found working extract from 19 April 2022, with PostgresToGCSOperator to parquet format with date and datetime in the schema. 🤔

But unfortunately we don't have unit tests on parquet export format which can confirm that parquet format was working :(

@sanxore
Copy link
Author

sanxore commented Aug 19, 2022

  • This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )

The schema generated by this operator use type that are safe for BigQuery (_write_local_schema_file will for instance use field_to_bigquery, using the underlying type_map mapping db types to BigQuery types). This makes sure we can load into BigQuery all data exported with a BaseSQLToGCSOperator.

Parquet has on top of that an additional mapping, for mapping BigQuery types to pyarrow types. (See _convert_parquet_schema).

I would expect parquet export to be successful when columns are dates, but also be able to import this into bigquery with a correct schema definition. (This is how it works for csv and json export format)

Note: I took a quick look, and couldn’t find what changed. On my bucket I found working extract from 19 April 2022, with PostgresToGCSOperator to parquet format with date and datetime in the schema. 🤔

We should not compare how json and csv format work with parquet format. Because it's two different different files types, json and csv file are string serialized format and parquet is a binary format which make parquet a type aware format.

For example:

  • in csv :
    • a timestamp is a STRING type.
    • a date is a STRING
  • in json :
    • a timestamp is a STRING type
    • a date is a STRING
  • in parquet:
    • a timestamp is an INT64
    • a date is an INT32

@potiuk
Copy link
Member

potiuk commented Aug 26, 2022

needs rebase and test fixes.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 11, 2022
@github-actions github-actions bot closed this Oct 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants