Skip to content

Commit

Permalink
[ISSUE #28893] code review part #2
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Aug 9, 2023
1 parent ef8f5f5 commit f651d03
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class QuotingBehavior(Enum):
QUOTE_NONE = "Quote None"


class InferenceType(Enum):
NONE = "None"
PRIMITIVE_TYPES_ONLY = "Primitive Types Only"
PRIMITIVE_AND_COMPLEX_TYPES = "Primitive and Complex Types"


DEFAULT_TRUE_VALUES = ["y", "yes", "t", "true", "on", "1"]
DEFAULT_FALSE_VALUES = ["n", "no", "f", "false", "off", "0"]

Expand Down Expand Up @@ -81,16 +87,11 @@ class Config:
default=DEFAULT_FALSE_VALUES,
description="A set of case-sensitive strings that should be interpreted as false values.",
)
infer_datatypes: bool = Field(
inference_type: InferenceType = Field(
title="Infer Datatypes",
default=False,
default=InferenceType.NONE,
description="Whether to autogenerate the schema based the file content.",
)
infer_datatypes_legacy: bool = Field(
title="Infer Datatypes (legacy)",
default=False,
description="Whether to autogenerate the schema based the file content. This inference does not support list and objects.",
)

@validator("delimiter")
def validate_delimiter(cls, v: str) -> str:
Expand Down Expand Up @@ -127,11 +128,3 @@ def validate_option_combinations(cls, values: Mapping[str, Any]) -> Mapping[str,
if skip_rows_before_header > 0 and auto_generate_column_names:
raise ValueError("Cannot skip rows before header and autogenerate column names at the same time.")
return values

@root_validator
def validate_option_inference(cls, values: Mapping[str, Any]) -> Mapping[str, Any]:
infer_datatypes = values.get("infer_datatypes", False)
infer_datatypes_legacy = values.get("infer_datatypes_legacy", False)
if infer_datatypes and infer_datatypes_legacy:
raise ValueError("Only one way to infer can be configured but both infer_datatypes and infer_datatypes_legacy are enabled.")
return values
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from io import IOBase
from typing import Any, Callable, Dict, Generator, Iterable, List, Mapping, Optional, Set

from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, QuotingBehavior
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, InferenceType, QuotingBehavior
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
Expand All @@ -38,7 +38,7 @@ def read_data(
logger: logging.Logger,
file_read_mode: FileReadMode,
) -> Generator[Dict[str, Any], None, None]:
config_format = config.format or CsvFormat()
config_format = _extract_format(config)

# Formats are configured individually per-stream so a unique dialect should be registered for each stream.
# We don't unregister the dialect because we are lazily parsing each csv file to generate records
Expand Down Expand Up @@ -123,21 +123,21 @@ async def infer_schema(
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> SchemaType:
if config.input_schema:
# FIXME change type of method to Mapping
return config.input_schema # type: ignore # conversion to mapping is handled by pydantic and we shouldn't have a str here
input_schema = config.get_input_schema()
if input_schema:
return input_schema

# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
config_format = config.format or CsvFormat()
config_format = _extract_format(config)
type_inferrer_by_field: Dict[str, _TypeInferrer] = defaultdict(
lambda: _JsonTypeInferrer(
config_format.true_values,
config_format.false_values,
config_format.null_values,
config_format.infer_datatypes and not config_format.infer_datatypes_legacy,
config_format.inference_type == InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
)
if config_format.infer_datatypes or config_format.infer_datatypes_legacy
if config_format.inference_type != InferenceType.NONE
else _DisabledTypeInferrer()
)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
Expand All @@ -164,7 +164,7 @@ def parse_records(
logger: logging.Logger,
discovered_schema: Optional[SchemaType],
) -> Iterable[Dict[str, Any]]:
config_format = config.format or CsvFormat()
config_format = _extract_format(config)
cast_fn = CsvParser._get_cast_function(discovered_schema, config_format, logger)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for row in data_generator:
Expand Down Expand Up @@ -411,3 +411,10 @@ def _format_warning(key: str, value: str, expected_type: Optional[Any]) -> str:

def _no_cast(row: Mapping[str, str]) -> Mapping[str, str]:
return row


def _extract_format(config: FileBasedStreamConfig) -> CsvFormat:
config_format = config.format or CsvFormat()
if not isinstance(config_format, CsvFormat):
raise ValueError(f"Invalid format config: {config_format}")
return config_format
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,3 @@ def test_csv_format_skip_rows_and_autogenerate_column_names(skip_rows_before_hea
CsvFormat(skip_rows_before_header=skip_rows_before_header, autogenerate_column_names=autogenerate_column_names)
else:
CsvFormat(skip_rows_before_header=skip_rows_before_header, autogenerate_column_names=autogenerate_column_names)


@pytest.mark.parametrize(
"infer_datatypes, infer_datatypes_legacy, expected_error",
[
pytest.param(True, True, ValueError, id="test_many_inferences_configured"),
pytest.param(True, False, None, id="test_infer_datatypes"),
pytest.param(False, True, None, id="test_infer_datatypes_legacy"),
pytest.param(False, False, None, id="test_no_inference"),
]
)
def test_csv_format_inference(infer_datatypes: bool, infer_datatypes_legacy: bool, expected_error: Type[BaseException]) -> None:
if expected_error:
with pytest.raises(expected_error):
CsvFormat(infer_datatypes=infer_datatypes, infer_datatypes_legacy=infer_datatypes_legacy)
else:
CsvFormat(infer_datatypes=infer_datatypes, infer_datatypes_legacy=infer_datatypes_legacy)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from unittest.mock import Mock

import pytest
from airbyte_cdk.sources.file_based.config.csv_format import DEFAULT_FALSE_VALUES, DEFAULT_TRUE_VALUES, CsvFormat
from airbyte_cdk.sources.file_based.config.csv_format import DEFAULT_FALSE_VALUES, DEFAULT_TRUE_VALUES, CsvFormat, InferenceType
from airbyte_cdk.sources.file_based.exceptions import RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.csv_parser import CsvParser, _CsvReader
Expand Down Expand Up @@ -99,10 +99,9 @@ def setUp(self) -> None:
self._config_format.true_values = _DEFAULT_TRUE_VALUES
self._config_format.false_values = _DEFAULT_FALSE_VALUES
self._config_format.null_values = {self._A_NULL_VALUE}
self._config_format.infer_datatypes = False
self._config_format.infer_datatypes_legacy = False
self._config_format.inference_type = InferenceType.NONE
self._config = Mock()
self._config.input_schema = None
self._config.get_input_schema.return_value = None
self._config.format = self._config_format

self._file = Mock(spec=RemoteFile)
Expand All @@ -112,71 +111,71 @@ def setUp(self) -> None:
self._parser = CsvParser(self._csv_reader)

def test_given_user_schema_defined_when_infer_schema_then_return_user_schema(self) -> None:
self._config.input_schema = {self._HEADER_NAME: {"type": "potato"}}
self._config.get_input_schema.return_value = {self._HEADER_NAME: {"type": "potato"}}
self._test_infer_schema(list(_DEFAULT_TRUE_VALUES.union(_DEFAULT_FALSE_VALUES)), "potato")

def test_given_booleans_only_when_infer_schema_then_type_is_boolean(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(list(_DEFAULT_TRUE_VALUES.union(_DEFAULT_FALSE_VALUES)), "boolean")

def test_given_legacy_and_booleans_only_when_infer_schema_then_type_is_boolean(self) -> None:
self._config_format.infer_datatypes_legacy = True
def test_given_primitive_only_and_booleans_only_when_infer_schema_then_type_is_boolean(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(list(_DEFAULT_TRUE_VALUES.union(_DEFAULT_FALSE_VALUES)), "boolean")

def test_given_integers_only_when_infer_schema_then_type_is_integer(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(["2", "90329", "5645"], "integer")

def test_given_legacy_and_integers_only_when_infer_schema_then_type_is_integer(self) -> None:
self._config_format.infer_datatypes_legacy = True
def test_given_primitive_only_and_integers_only_when_infer_schema_then_type_is_integer(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(["2", "90329", "5645"], "integer")

def test_given_numbers_and_integers_when_infer_schema_then_type_is_number(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(["2", "90329", "2.312"], "number")

def test_given_legacy_and_numbers_and_integers_when_infer_schema_then_type_is_number(self) -> None:
self._config_format.infer_datatypes_legacy = True
def test_given_primitive_only_and_numbers_and_integers_when_infer_schema_then_type_is_number(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(["2", "90329", "2.312"], "number")

def test_given_arrays_only_when_infer_schema_then_type_is_array(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(['["first_item", "second_item"]', '["first_item_again", "second_item_again"]'], "array")

def test_given_arrays_only_when_infer_schema_then_type_is_string(self) -> None:
self._config_format.infer_datatypes_legacy = True
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(['["first_item", "second_item"]', '["first_item_again", "second_item_again"]'], "string")

def test_given_objects_only_when_infer_schema_then_type_is_object(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(['{"object1_key": 1}', '{"object2_key": 2}'], "object")

def test_given_legacy_and_objects_only_when_infer_schema_then_type_is_string(self) -> None:
self._config_format.infer_datatypes_legacy = True
def test_given_primitive_only_and_objects_only_when_infer_schema_then_type_is_string(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(['{"object1_key": 1}', '{"object2_key": 2}'], "string")

def test_given_arrays_and_objects_only_when_infer_schema_then_type_is_object(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(['["first_item", "second_item"]', '{"an_object_key": "an_object_value"}'], "object")

def test_given_strings_and_objects_only_when_infer_schema_then_type_is_object(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(['["first_item", "second_item"]', "this is a string"], "object")

def test_given_strings_only_when_infer_schema_then_type_is_string(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(["a string", "another string"], "string")

def test_given_a_null_value_when_infer_then_ignore_null(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema(["2", "90329", "5645", self._A_NULL_VALUE], "integer")

def test_given_only_null_values_when_infer_then_type_is_string(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._test_infer_schema([self._A_NULL_VALUE, self._A_NULL_VALUE, self._A_NULL_VALUE], "string")

def test_given_big_file_when_infer_schema_then_stop_early(self) -> None:
self._config_format.infer_datatypes = True
self._config_format.inference_type = InferenceType.PRIMITIVE_AND_COMPLEX_TYPES
self._csv_reader.read_data.return_value = ({self._HEADER_NAME: row} for row in ["2." + "2" * 1_000_000] + ["this is a string"])
inferred_schema = self._infer_schema()
# since the type is number, we know the string at the end was not considered
Expand Down

0 comments on commit f651d03

Please sign in to comment.