Skip to content

Commit

Permalink
file-based CDK: Configurable strings_can_be_null (#29298)
Browse files Browse the repository at this point in the history
* [ISSUE #28893] infer csv schema

* [ISSUE #28893] align with pyarrow

* Automated Commit - Formatting Changes

* [ISSUE #28893] legacy inference and infer only when needed

* [ISSUE #28893] fix scenario tests

* [ISSUE #28893] using discovered schema as part of read

* [ISSUE #28893] self-review + cleanup

* [ISSUE #28893] fix test

* [ISSUE #28893] code review part #1

* [ISSUE #28893] code review part #2

* Fix test

* formatcdk

* first pass

* [ISSUE #28893] code review

* fix mypy issues

* comment

* rename for clarity

* Add a scenario test case

* this isn't optional anymore

* FIX test log level

* Re-adding failing tests

* [ISSUE #28893] improve inferrence to consider multiple types per value

* Automated Commit - Formatting Changes

* [ISSUE #28893] remove InferenceType.PRIMITIVE_AND_COMPLEX_TYPES

* Code review

* Automated Commit - Formatting Changes

* fix unit tests

---------

Co-authored-by: maxi297 <maxime@airbyte.io>
Co-authored-by: maxi297 <maxi297@users.noreply.github.com>
  • Loading branch information
3 people committed Aug 14, 2023
1 parent 12f1304 commit b512fa4
Show file tree
Hide file tree
Showing 5 changed files with 652 additions and 513 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ class Config:
default=[],
description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
)
strings_can_be_null: bool = Field(
title="Strings Can Be Null",
default=True,
description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
)
skip_rows_before_header: int = Field(
title="Skip Rows Before Header",
default=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,15 @@ def parse_records(
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
config_format = _extract_format(config)
cast_fn = CsvParser._get_cast_function(discovered_schema, config_format, logger)
if discovered_schema:
property_types = {col: prop["type"] for col, prop in discovered_schema["properties"].items()} # type: ignore # discovered_schema["properties"] is known to be a mapping
deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
else:
deduped_property_types = {}
cast_fn = CsvParser._get_cast_function(deduped_property_types, config_format, logger)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for row in data_generator:
yield CsvParser._to_nullable(cast_fn(row), config_format.null_values)
yield CsvParser._to_nullable(cast_fn(row), deduped_property_types, config_format.null_values, config_format.strings_can_be_null)
data_generator.close()

@property
Expand All @@ -172,24 +177,62 @@ def file_read_mode(self) -> FileReadMode:

@staticmethod
def _get_cast_function(
schema: Optional[Mapping[str, SchemaType]], config_format: CsvFormat, logger: logging.Logger
deduped_property_types: Mapping[str, str], config_format: CsvFormat, logger: logging.Logger
) -> Callable[[Mapping[str, str]], Mapping[str, str]]:
# Only cast values if the schema is provided
if schema:
property_types = {col: prop["type"] for col, prop in schema["properties"].items()}
return partial(CsvParser._cast_types, property_types=property_types, config_format=config_format, logger=logger)
if deduped_property_types:
return partial(CsvParser._cast_types, deduped_property_types=deduped_property_types, config_format=config_format, logger=logger)
else:
# If no schema is provided, yield the rows as they are
return _no_cast

@staticmethod
def _to_nullable(row: Mapping[str, str], null_values: Set[str]) -> Dict[str, Optional[str]]:
nullable = row | {k: None if v in null_values else v for k, v in row.items()}
def _to_nullable(
row: Mapping[str, str], deduped_property_types: Mapping[str, str], null_values: Set[str], strings_can_be_null: bool
) -> Dict[str, Optional[str]]:
nullable = row | {
k: None if CsvParser._value_is_none(v, deduped_property_types.get(k), null_values, strings_can_be_null) else v
for k, v in row.items()
}
return nullable

@staticmethod
def _value_is_none(value: Any, deduped_property_type: Optional[str], null_values: Set[str], strings_can_be_null: bool) -> bool:
return value in null_values and (strings_can_be_null or deduped_property_type != "string")

@staticmethod
def _pre_propcess_property_types(property_types: Dict[str, Any]) -> Mapping[str, str]:
"""
Transform the property types to be non-nullable and remove duplicate types if any.
Sample input:
{
"col1": ["string", "null"],
"col2": ["string", "string", "null"],
"col3": "integer"
}
Sample output:
{
"col1": "string",
"col2": "string",
"col3": "integer",
}
"""
output = {}
for prop, prop_type in property_types.items():
if isinstance(prop_type, list):
prop_type_distinct = set(prop_type)
prop_type_distinct.remove("null")
if len(prop_type_distinct) != 1:
raise ValueError(f"Could not get non nullable type from {prop_type}")
output[prop] = next(iter(prop_type_distinct))
else:
output[prop] = prop_type
return output

@staticmethod
def _cast_types(
row: Dict[str, str], property_types: Dict[str, Any], config_format: CsvFormat, logger: logging.Logger
row: Dict[str, str], deduped_property_types: Dict[str, str], config_format: CsvFormat, logger: logging.Logger
) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
Expand All @@ -202,17 +245,10 @@ def _cast_types(
result = {}

for key, value in row.items():
prop_type = property_types.get(key)
prop_type = deduped_property_types.get(key)
cast_value: Any = value

if isinstance(prop_type, list):
prop_type_distinct = set(prop_type)
prop_type_distinct.remove("null")
if len(prop_type_distinct) != 1:
raise ValueError(f"Could not get non nullable type from {prop_type}")
(prop_type,) = prop_type_distinct

if prop_type in TYPE_PYTHON_MAPPING:
if prop_type in TYPE_PYTHON_MAPPING and prop_type is not None:
_, python_type = TYPE_PYTHON_MAPPING[prop_type]

if python_type is None:
Expand Down
Loading

0 comments on commit b512fa4

Please sign in to comment.