Skip to content

Commit

Permalink
Fix PostgresToGCSOperator does not allow nested JSON (#23063)
Browse files Browse the repository at this point in the history
* Avoid double json.dumps for json data export in PostgresToGCSOperator.

* Fix CI
  • Loading branch information
pierrejeambrun committed May 8, 2022
1 parent ca3fbbb commit 766726f
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 23 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/mssql_to_gcs.py
Expand Up @@ -80,7 +80,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
}

@classmethod
def convert_type(cls, value, schema_type):
def convert_type(cls, value, schema_type, **kwargs):
"""
Takes a value from MSSQL, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/mysql_to_gcs.py
Expand Up @@ -92,7 +92,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
'mode': field_mode,
}

def convert_type(self, value, schema_type: str):
def convert_type(self, value, schema_type: str, **kwargs):
"""
Takes a value from MySQLdb, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/oracle_to_gcs.py
Expand Up @@ -85,7 +85,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
'mode': field_mode,
}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, **kwargs):
"""
Takes a value from Oracle db, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Expand Down
8 changes: 6 additions & 2 deletions airflow/providers/google/cloud/transfers/postgres_to_gcs.py
Expand Up @@ -128,13 +128,17 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
'mode': 'REPEATED' if field[1] in (1009, 1005, 1007, 1016) else 'NULLABLE',
}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, stringify_dict=True):
"""
Takes a value from Postgres, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Timezone aware Datetime are converted to UTC seconds.
Unaware Datetime, Date and Time are converted to ISO formatted strings.
Decimals are converted to floats.
:param value: Postgres column value.
:param schema_type: BigQuery data type.
:param stringify_dict: Specify whether to convert dict to string.
"""
if isinstance(value, datetime.datetime):
iso_format_value = value.isoformat()
Expand All @@ -149,7 +153,7 @@ def convert_type(self, value, schema_type):
hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
)
return str(time_delta)
if isinstance(value, dict):
if stringify_dict and isinstance(value, dict):
return json.dumps(value)
if isinstance(value, Decimal):
return float(value)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/presto_to_gcs.py
Expand Up @@ -195,7 +195,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:

return {"name": field[0], "type": new_field_type}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, **kwargs):
"""
Do nothing. Presto uses JSON on the transport layer, so types are simple.
Expand Down
16 changes: 9 additions & 7 deletions airflow/providers/google/cloud/transfers/sql_to_gcs.py
Expand Up @@ -150,9 +150,12 @@ def execute(self, context: 'Context'):
file_to_upload['file_handle'].close()
counter += 1

def convert_types(self, schema, col_type_dict, row) -> list:
def convert_types(self, schema, col_type_dict, row, stringify_dict=False) -> list:
"""Convert values from DBAPI to output-friendly formats."""
return [self.convert_type(value, col_type_dict.get(name)) for name, value in zip(schema, row)]
return [
self.convert_type(value, col_type_dict.get(name), stringify_dict=stringify_dict)
for name, value in zip(schema, row)
]

def _write_local_data_files(self, cursor):
"""
Expand Down Expand Up @@ -186,21 +189,20 @@ def _write_local_data_files(self, cursor):
parquet_writer = self._configure_parquet_file(tmp_file_handle, parquet_schema)

for row in cursor:
# Convert datetime objects to utc seconds, and decimals to floats.
# Convert binary type object to string encoded with base64.
row = self.convert_types(schema, col_type_dict, row)

if self.export_format == 'csv':
row = self.convert_types(schema, col_type_dict, row)
if self.null_marker is not None:
row = [value if value is not None else self.null_marker for value in row]
csv_writer.writerow(row)
elif self.export_format == 'parquet':
row = self.convert_types(schema, col_type_dict, row)
if self.null_marker is not None:
row = [value if value is not None else self.null_marker for value in row]
row_pydic = {col: [value] for col, value in zip(schema, row)}
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
parquet_writer.write_table(tbl)
else:
row = self.convert_types(schema, col_type_dict, row, stringify_dict=False)
row_dict = dict(zip(schema, row))

tmp_file_handle.write(
Expand Down Expand Up @@ -273,7 +275,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
"""Convert a DBAPI field to BigQuery schema format."""

@abc.abstractmethod
def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, **kwargs):
"""Convert a value from DBAPI to output-friendly formats."""

def _get_col_type_dict(self):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/trino_to_gcs.py
Expand Up @@ -195,7 +195,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:

return {"name": field[0], "type": new_field_type}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, **kwargs):
"""
Do nothing. Trino uses JSON on the transport layer, so types are simple.
Expand Down
25 changes: 17 additions & 8 deletions tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
Expand Up @@ -35,14 +35,15 @@
FILENAME = 'test_{}.ndjson'

NDJSON_LINES = [
b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
b'{"some_json": {"firtname": "John", "lastname": "Smith", "nested_dict": {"a": null, "b": "something"}}, "some_num": 42, "some_str": "mock_row_content_1"}\n', # noqa
b'{"some_json": {}, "some_num": 43, "some_str": "mock_row_content_2"}\n',
b'{"some_json": {}, "some_num": 44, "some_str": "mock_row_content_3"}\n',
]
SCHEMA_FILENAME = 'schema_test.json'
SCHEMA_JSON = (
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, '
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]'
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, '
b'{"mode": "NULLABLE", "name": "some_json", "type": "STRING"}]'
)


Expand All @@ -55,16 +56,24 @@ def setUpClass(cls):
with conn.cursor() as cur:
for table in TABLES:
cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
cur.execute(f"CREATE TABLE {table}(some_str varchar, some_num integer);")
cur.execute(f"CREATE TABLE {table}(some_str varchar, some_num integer, some_json json);")

cur.execute(
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_1', 42)
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
(
'mock_row_content_1',
42,
'{"lastname": "Smith", "firtname": "John", \
"nested_dict": {"a": null, "b": "something"}}',
),
)
cur.execute(
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_2', 43)
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
('mock_row_content_2', 43, '{}'),
)
cur.execute(
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_3', 44)
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
('mock_row_content_3', 44, '{}'),
)

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/google/cloud/transfers/test_sql_to_gcs.py
Expand Up @@ -70,7 +70,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
'mode': 'NULLABLE',
}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, stringify_dict):
return 'convert_type_return_value'

def query(self):
Expand Down

0 comments on commit 766726f

Please sign in to comment.