Skip to content

Commit

Permalink
Source s3 error on unsupported config (#29541)
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Aug 18, 2023
1 parent 25d7624 commit cfdc274
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 10 deletions.
Expand Up @@ -4,7 +4,7 @@

import json
from datetime import datetime
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Dict, List, Mapping, Optional, Union

from source_s3.source import SourceS3Spec
from source_s3.source_files_abstract.formats.avro_spec import AvroFormat
Expand Down Expand Up @@ -84,19 +84,36 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only" if format_options.infer_datatypes else "None",
"strings_can_be_null": additional_reader_options.get("strings_can_be_null", False),
"strings_can_be_null": additional_reader_options.pop("strings_can_be_null", False),
}

if format_options.escape_char:
csv_options["escape_char"] = format_options.escape_char
if format_options.encoding:
csv_options["encoding"] = format_options.encoding
if "skip_rows" in advanced_options:
csv_options["skip_rows_before_header"] = advanced_options["skip_rows"]
if "skip_rows_after_names" in advanced_options:
csv_options["skip_rows_after_header"] = advanced_options["skip_rows_after_names"]
if "autogenerate_column_names" in advanced_options:
csv_options["autogenerate_column_names"] = advanced_options["autogenerate_column_names"]
if skip_rows := advanced_options.pop("skip_rows", None):
csv_options["skip_rows_before_header"] = skip_rows
if skip_rows_after_names := advanced_options.pop(
"skip_rows_after_names", None
):
csv_options["skip_rows_after_header"] = skip_rows_after_names
if autogenerate_column_names := advanced_options.pop(
"autogenerate_column_names", None
):
csv_options["autogenerate_column_names"] = autogenerate_column_names

cls._filter_legacy_noops(advanced_options)

if advanced_options or additional_reader_options:
raise ValueError(
f"The config options you selected are no longer supported.\n"
+ f"advanced_options={advanced_options}"
if advanced_options
else "" + f"additional_reader_options={additional_reader_options}"
if additional_reader_options
else ""
)

return csv_options

elif isinstance(format_options, JsonlFormat):
Expand All @@ -105,12 +122,26 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
return {"filetype": "parquet", "decimal_as_float": True}
else:
# This should never happen because it would fail schema validation
raise ValueError(f"Format filetype {format_options} is not a supported file type")
raise ValueError(
f"Format filetype {format_options} is not a supported file type"
)

@classmethod
def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Mapping[str, Any]:
def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Dict[str, Any]:
options_str = options_value or "{}"
try:
return json.loads(options_str)
except json.JSONDecodeError as error:
raise ValueError(f"Malformed {options_field} config json: {error}. Please ensure that it is a valid JSON.")

@staticmethod
def _filter_legacy_noops(advanced_options: Dict[str, Any]):
ignore_all = ("auto_dict_encode", "timestamp_parsers")
ignore_by_value = (("check_utf8", False),)

for option in ignore_all:
advanced_options.pop(option, None)

for option, value_to_ignore in ignore_by_value:
if advanced_options.get(option) == value_to_ignore:
advanced_options.pop(option)
Expand Up @@ -200,6 +200,15 @@ def test_convert_legacy_config(legacy_config, expected_config):
ValueError,
id="test_malformed_additional_reader_options",
),
pytest.param(
"csv",
{
"additional_reader_options": '{"include_columns": ""}',
},
None,
ValueError,
id="test_unsupported_additional_reader_options",
),
pytest.param(
"csv",
{
Expand All @@ -209,6 +218,64 @@ def test_convert_legacy_config(legacy_config, expected_config):
ValueError,
id="test_malformed_advanced_options",
),
pytest.param(
"csv",
{
"advanced_options": '{"column_names": ""}',
},
None,
ValueError,
id="test_unsupported_advanced_options",
),
pytest.param(
"csv",
{
"advanced_options": '{"check_utf8": false}',
},
{
"filetype": "csv",
"delimiter": ",",
"quote_char": '"',
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": False,
},
None,
id="test_unsupported_advanced_options_by_value_succeeds_if_value_matches_ignored_values",
),
pytest.param(
"csv",
{
"advanced_options": '{"check_utf8": true}',
},
None,
ValueError,
id="test_unsupported_advanced_options_by_value_fails_if_value_doesnt_match_ignored_values",
),
pytest.param(
"csv",
{
"advanced_options": '{"auto_dict_encode": ""}',
},
{
"filetype": "csv",
"delimiter": ",",
"quote_char": '"',
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": False,
},
None,
id="test_ignored_advanced_options",
),
pytest.param(
"jsonl",
{
Expand Down

0 comments on commit cfdc274

Please sign in to comment.