Skip to content

Commit

Permalink
✨Source S3 (v4): Set decimal_as_float to True for parquet files (#29342)
Browse files Browse the repository at this point in the history
* [ISSUE #28893] infer csv schema

* [ISSUE #28893] align with pyarrow

* Automated Commit - Formatting Changes

* [ISSUE #28893] legacy inference and infer only when needed

* [ISSUE #28893] fix scenario tests

* [ISSUE #28893] using discovered schema as part of read

* [ISSUE #28893] self-review + cleanup

* [ISSUE #28893] fix test

* [ISSUE #28893] code review part #1

* [ISSUE #28893] code review part #2

* Fix test

* formatcdk

* [ISSUE #28893] code review

* FIX test log level

* Re-adding failing tests

* [ISSUE #28893] improve inferrence to consider multiple types per value

* set decimal_as_float to True

* update

* Automated Commit - Formatting Changes

* add file adapters for avro, csv, jsonl, and parquet

* fix try catch

* update

* format

* pr feedback with a few additional default options set

---------

Co-authored-by: maxi297 <maxime@airbyte.io>
Co-authored-by: maxi297 <maxi297@users.noreply.github.com>
Co-authored-by: brianjlai <brian.lai@airbyte.io>
  • Loading branch information
4 people committed Aug 15, 2023
1 parent cde2c1b commit 690479d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
if "autogenerate_column_names" in advanced_options:
csv_options["autogenerate_column_names"] = advanced_options["autogenerate_column_names"]
return csv_options

elif isinstance(format_options, JsonlFormat):
return {"filetype": "jsonl"}
elif isinstance(format_options, ParquetFormat):
return {"filetype": "parquet"}
return {"filetype": "parquet", "decimal_as_float": True}
else:
# This should never happen because it would fail schema validation
raise ValueError(f"Format filetype {format_options} is not a supported file type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
"aws_secret_access_key": "some_secret",
"endpoint": "https://external-s3.com",
"path_prefix": "a_folder/",
"start_date": "2022-01-01T01:02:03Z"
"start_date": "2022-01-01T01:02:03Z",
},
"format": {
"filetype": "avro",
},
"path_pattern": "**/*.avro",
"schema": '{"col1": "string", "col2": "integer"}'
"schema": '{"col1": "string", "col2": "integer"}',
},
{
"bucket": "test_bucket",
Expand All @@ -42,13 +41,11 @@
"globs": ["a_folder/**/*.avro"],
"validation_policy": "Emit Record",
"input_schema": '{"col1": "string", "col2": "integer"}',
"format": {
"filetype": "avro"
}
"format": {"filetype": "avro"},
}
]
}
, id="test_convert_legacy_config"
],
},
id="test_convert_legacy_config",
),
pytest.param(
{
Expand All @@ -70,15 +67,13 @@
"file_type": "avro",
"globs": ["**/*.avro"],
"validation_policy": "Emit Record",
"format": {
"filetype": "avro"
}
"format": {"filetype": "avro"},
}
]
}
, id="test_convert_no_optional_fields"
],
},
id="test_convert_no_optional_fields",
),
]
],
)
def test_convert_legacy_config(legacy_config, expected_config):
parsed_legacy_config = SourceS3Spec(**legacy_config)
Expand All @@ -101,8 +96,8 @@ def test_convert_legacy_config(legacy_config, expected_config):
"encoding": "ansi",
"double_quote": False,
"newlines_in_values": True,
"additional_reader_options": "{\"strings_can_be_null\": true}",
"advanced_options": "{\"skip_rows\": 3, \"skip_rows_after_names\": 5, \"autogenerate_column_names\": true}",
"additional_reader_options": '{"strings_can_be_null": true}',
"advanced_options": '{"skip_rows": 3, "skip_rows_after_names": 5, "autogenerate_column_names": true}',
"blocksize": 20000,
},
{
Expand All @@ -122,7 +117,8 @@ def test_convert_legacy_config(legacy_config, expected_config):
"autogenerate_column_names": True,
},
None,
id="test_csv_all_legacy_options_set"),
id="test_csv_all_legacy_options_set",
),
pytest.param(
"csv",
{
Expand All @@ -145,14 +141,15 @@ def test_convert_legacy_config(legacy_config, expected_config):
"strings_can_be_null": False,
},
None,
id="test_csv_only_required_options"),
id="test_csv_only_required_options",
),
pytest.param(
"csv",
{},
{
"filetype": "csv",
"delimiter": ",",
"quote_char": "\"",
"quote_char": '"',
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
Expand All @@ -162,23 +159,26 @@ def test_convert_legacy_config(legacy_config, expected_config):
"strings_can_be_null": False,
},
None,
id="test_csv_empty_format"),
id="test_csv_empty_format",
),
pytest.param(
"csv",
{
"additional_reader_options": "{\"not_valid\": \"at all}",
"additional_reader_options": '{"not_valid": "at all}',
},
None,
ValueError,
id="test_malformed_additional_reader_options"),
id="test_malformed_additional_reader_options",
),
pytest.param(
"csv",
{
"advanced_options": "{\"not_valid\": \"at all}",
"advanced_options": '{"not_valid": "at all}',
},
None,
ValueError,
id="test_malformed_advanced_options"),
id="test_malformed_advanced_options",
),
pytest.param(
"jsonl",
{
Expand All @@ -187,11 +187,10 @@ def test_convert_legacy_config(legacy_config, expected_config):
"unexpected_field_behavior": "ignore",
"block_size": 0,
},
{
"filetype": "jsonl"
},
{"filetype": "jsonl"},
None,
id="test_jsonl_format"),
id="test_jsonl_format",
),
pytest.param(
"parquet",
{
Expand All @@ -200,22 +199,20 @@ def test_convert_legacy_config(legacy_config, expected_config):
"batch_size": 65536,
"buffer_size": 100,
},
{
"filetype": "parquet"
},
{"filetype": "parquet", "decimal_as_float": True},
None,
id="test_parquet_format"),
id="test_parquet_format",
),
pytest.param(
"avro",
{
"filetype": "avro",
},
{
"filetype": "avro"
},
{"filetype": "avro"},
None,
id="test_avro_format"),
]
id="test_avro_format",
),
],
)
def test_convert_file_format(file_type, legacy_format_config, expected_format_config, expected_error):
legacy_config = {
Expand All @@ -225,7 +222,6 @@ def test_convert_file_format(file_type, legacy_format_config, expected_format_co
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",

},
"format": legacy_format_config,
"path_pattern": f"**/*.{file_type}",
Expand All @@ -241,9 +237,9 @@ def test_convert_file_format(file_type, legacy_format_config, expected_format_co
"file_type": file_type,
"globs": [f"**/*.{file_type}"],
"validation_policy": "Emit Record",
"format": expected_format_config
"format": expected_format_config,
}
]
],
}

parsed_legacy_config = SourceS3Spec(**legacy_config)
Expand Down

0 comments on commit 690479d

Please sign in to comment.