Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PostgresToGCSOperator does not allow nested JSON #23063

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/mssql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
}

@classmethod
def convert_type(cls, value, schema_type):
def convert_type(cls, value, schema_type, **kwargs):
"""
Takes a value from MSSQL, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
'mode': field_mode,
}

def convert_type(self, value, schema_type: str):
def convert_type(self, value, schema_type: str, **kwargs):
"""
Takes a value from MySQLdb, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/oracle_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
'mode': field_mode,
}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, **kwargs):
"""
Takes a value from Oracle db, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Expand Down
8 changes: 6 additions & 2 deletions airflow/providers/google/cloud/transfers/postgres_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,17 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
'mode': 'REPEATED' if field[1] in (1009, 1005, 1007, 1016) else 'NULLABLE',
}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, stringify_dict=True):
"""
Takes a value from Postgres, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery.
Timezone aware Datetime are converted to UTC seconds.
Unaware Datetime, Date and Time are converted to ISO formatted strings.
Decimals are converted to floats.

:param value: Postgres column value.
:param schema_type: BigQuery data type.
:param stringify_dict: Specify whether to convert dict to string.
"""
if isinstance(value, datetime.datetime):
iso_format_value = value.isoformat()
Expand All @@ -149,7 +153,7 @@ def convert_type(self, value, schema_type):
hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
)
return str(time_delta)
if isinstance(value, dict):
if stringify_dict and isinstance(value, dict):
return json.dumps(value)
if isinstance(value, Decimal):
return float(value)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/presto_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:

return {"name": field[0], "type": new_field_type}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, **kwargs):
"""
Do nothing. Presto uses JSON on the transport layer, so types are simple.

Expand Down
16 changes: 9 additions & 7 deletions airflow/providers/google/cloud/transfers/sql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,12 @@ def execute(self, context: 'Context'):
file_to_upload['file_handle'].close()
counter += 1

def convert_types(self, schema, col_type_dict, row) -> list:
def convert_types(self, schema, col_type_dict, row, stringify_dict=False) -> list:
"""Convert values from DBAPI to output-friendly formats."""
return [self.convert_type(value, col_type_dict.get(name)) for name, value in zip(schema, row)]
return [
self.convert_type(value, col_type_dict.get(name), stringify_dict=stringify_dict)
for name, value in zip(schema, row)
]

def _write_local_data_files(self, cursor):
"""
Expand Down Expand Up @@ -200,21 +203,20 @@ def _write_local_data_files(self, cursor):
parquet_writer = self._configure_parquet_file(tmp_file_handle, parquet_schema)

for row in cursor:
# Convert datetime objects to utc seconds, and decimals to floats.
# Convert binary type object to string encoded with base64.
row = self.convert_types(schema, col_type_dict, row)

if self.export_format == 'csv':
row = self.convert_types(schema, col_type_dict, row)
if self.null_marker is not None:
row = [value if value is not None else self.null_marker for value in row]
csv_writer.writerow(row)
elif self.export_format == 'parquet':
row = self.convert_types(schema, col_type_dict, row)
if self.null_marker is not None:
row = [value if value is not None else self.null_marker for value in row]
row_pydic = {col: [value] for col, value in zip(schema, row)}
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
parquet_writer.write_table(tbl)
else:
row = self.convert_types(schema, col_type_dict, row, stringify_dict=False)
row_dict = dict(zip(schema, row))

tmp_file_handle.write(
Expand Down Expand Up @@ -287,7 +289,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
"""Convert a DBAPI field to BigQuery schema format."""

@abc.abstractmethod
def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, **kwargs):
"""Convert a value from DBAPI to output-friendly formats."""

def _get_col_type_dict(self):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/trino_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:

return {"name": field[0], "type": new_field_type}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, **kwargs):
"""
Do nothing. Trino uses JSON on the transport layer, so types are simple.

Expand Down
25 changes: 17 additions & 8 deletions tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@
FILENAME = 'test_{}.ndjson'

NDJSON_LINES = [
b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
b'{"some_json": {"firtname": "John", "lastname": "Smith", "nested_dict": {"a": null, "b": "something"}}, "some_num": 42, "some_str": "mock_row_content_1"}\n', # noqa
b'{"some_json": {}, "some_num": 43, "some_str": "mock_row_content_2"}\n',
b'{"some_json": {}, "some_num": 44, "some_str": "mock_row_content_3"}\n',
]
SCHEMA_FILENAME = 'schema_test.json'
SCHEMA_JSON = (
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, '
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]'
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, '
b'{"mode": "NULLABLE", "name": "some_json", "type": "STRING"}]'
)


Expand All @@ -55,16 +56,24 @@ def setUpClass(cls):
with conn.cursor() as cur:
for table in TABLES:
cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
cur.execute(f"CREATE TABLE {table}(some_str varchar, some_num integer);")
cur.execute(f"CREATE TABLE {table}(some_str varchar, some_num integer, some_json json);")

cur.execute(
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_1', 42)
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
(
'mock_row_content_1',
42,
'{"lastname": "Smith", "firtname": "John", \
"nested_dict": {"a": null, "b": "something"}}',
),
)
cur.execute(
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_2', 43)
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
('mock_row_content_2', 43, '{}'),
)
cur.execute(
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_3', 44)
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
('mock_row_content_3', 44, '{}'),
)

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/google/cloud/transfers/test_sql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
'mode': 'NULLABLE',
}

def convert_type(self, value, schema_type):
def convert_type(self, value, schema_type, stringify_dict):
return 'convert_type_return_value'

def query(self):
Expand Down