Skip to content

Commit

Permalink
✨ Source S3: v4 rollout/feature parity (#29753)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Aug 23, 2023
1 parent 34e7144 commit 40b76a7
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def parse_records(
deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
else:
deduped_property_types = {}
cast_fn = CsvParser._get_cast_function(deduped_property_types, config_format, logger)
cast_fn = CsvParser._get_cast_function(deduped_property_types, config_format, logger, config.schemaless)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for row in data_generator:
yield CsvParser._to_nullable(cast_fn(row), deduped_property_types, config_format.null_values, config_format.strings_can_be_null)
Expand All @@ -170,10 +170,10 @@ def file_read_mode(self) -> FileReadMode:

@staticmethod
def _get_cast_function(
deduped_property_types: Mapping[str, str], config_format: CsvFormat, logger: logging.Logger
deduped_property_types: Mapping[str, str], config_format: CsvFormat, logger: logging.Logger, schemaless: bool
) -> Callable[[Mapping[str, str]], Mapping[str, str]]:
# Only cast values if the schema is provided
if deduped_property_types:
if deduped_property_types and not schemaless:
return partial(CsvParser._cast_types, deduped_property_types=deduped_property_types, config_format=config_format, logger=logger)
else:
# If no schema is provided, yield the rows as they are
Expand Down Expand Up @@ -275,11 +275,10 @@ def _cast_types(
except ValueError:
warnings.append(_format_warning(key, value, prop_type))

result[key] = cast_value
else:
warnings.append(_format_warning(key, value, prop_type))

result[key] = cast_value

if warnings:
logger.warning(
f"{FileBasedSourceError.ERROR_CASTING_VALUE.value}: {','.join([w for w in warnings])}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
pytest.param(
{"col9": "['a', 'b']"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col9": "['a', 'b']"}, id="cannot-cast-to-list-of-objects"
),
pytest.param({"col11": "x"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col11": "x"}, id="item-not-in-props-doesn't-error"),
pytest.param({"col11": "x"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {}, id="item-not-in-props-doesn't-error"),
],
)
def test_cast_to_python_type(row: Dict[str, str], true_values: Set[str], false_values: Set[str], expected_output: Dict[str, Any]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@
"data": {
"col1": "val11b",
"col2": "val12b",
"col3": "val13b",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "b.csv",
},
Expand All @@ -491,7 +490,6 @@
"data": {
"col1": "val21b",
"col2": "val22b",
"col3": "val23b",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "b.csv",
},
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=3.1.8
LABEL io.airbyte.version=3.1.9
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,42 @@
{
"stream": {
"name": "test",
"json_schema": {},
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": [
"integer",
"null"
]
},
"fullname_and_valid": {
"type": [
"object",
"null"
],
"fullname": {
"type": [
"string",
"null"
]
},
"valid": {
"type": [
"boolean",
"null"
]
}
},
"_ab_source_file_last_modified": {
"type": "string",
"format": "date-time"
},
"_ab_source_file_url": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,36 @@
{
"stream": {
"name": "test",
"json_schema": {},
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": [
"null",
"integer"
]
},
"name": {
"type": [
"null",
"string"
]
},
"valid": {
"type": [
"null",
"boolean"
]
},
"_ab_source_file_last_modified": {
"type": "string",
"format": "date-time"
},
"_ab_source_file_url": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,56 @@
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": [
"null",
"integer"
]
},
"name": {
"type": [
"null",
"string"
]
},
"valid": {
"type": [
"null",
"boolean"
]
},
"value": {
"type": [
"null",
"number"
]
},
"event_date": {
"type": [
"null",
"string"
]
},
"_ab_source_file_last_modified": {
"type": "string",
"format": "date-time"
},
"_ab_source_file_url": {
"type": "string"
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
"default_cursor_field": [
"_ab_source_file_last_modified"
]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,122 @@
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"json_schema": {
"type" : "object",
"properties" : {
"Payroll_Number" : {
"type" : [
"null",
"number"
]
},
"Last_Name" : {
"type" : [
"null",
"string"
]
},
"First_Name" : {
"type" : [
"null",
"string"
]
},
"Mid_Init" : {
"type" : [
"null",
"string"
]
},
"Agency_Start_Date" : {
"type" : [
"null",
"string"
]
},
"Work_Location_Borough" : {
"type" : [
"null",
"number"
]
},
"Title_Description" : {
"type" : [
"null",
"string"
]
},
"Base_Salary" : {
"type" : [
"null",
"number"
]
},
"Regular_Hours" : {
"type" : [
"null",
"number"
]
},
"Regular_Gross_Paid" : {
"type" : [
"null",
"number"
]
},
"OT_Hours" : {
"type" : [
"null",
"number"
]
},
"Total_OT_Paid" : {
"type" : [
"null",
"number"
]
},
"Total_Other_Pay" : {
"type" : [
"null",
"number"
]
},
"Fiscal_Year" : {
"type" : [
"null",
"string"
]
},
"Leave_Status_as_of_June_30" : {
"type" : [
"null",
"string"
]
},
"Pay_Basis" : {
"type" : [
"null",
"string"
]
},
"_ab_source_file_last_modified" : {
"type" : "string",
"format" : "date-time"
},
"_ab_source_file_url" : {
"type" : "string"
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
"default_cursor_field": [
"_ab_source_file_last_modified"
]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 3.1.8
dockerImageTag: 3.1.9
dockerRepository: airbyte/source-s3
githubIssueLabel: source-s3
icon: s3.svg
Expand Down
14 changes: 4 additions & 10 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,8 @@ def documentation_url(cls) -> AnyUrl:
def validate_optional_args(cls, values):
aws_access_key_id = values.get("aws_access_key_id")
aws_secret_access_key = values.get("aws_secret_access_key")
endpoint = values.get("endpoint")
if aws_access_key_id or aws_secret_access_key:
if not (aws_access_key_id and aws_secret_access_key):
raise ValidationError(
"`aws_access_key_id` and `aws_secret_access_key` are both required to authenticate with AWS.", model=Config
)
if endpoint:
raise ValidationError(
"Either `aws_access_key_id` and `aws_secret_access_key` or `endpoint` must be set, but not both.", model=Config
)
if (aws_access_key_id or aws_secret_access_key) and not (aws_access_key_id and aws_secret_access_key):
raise ValidationError(
"`aws_access_key_id` and `aws_secret_access_key` are both required to authenticate with AWS.", model=Config
)
return values
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,31 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
"delimiter": format_options.delimiter,
"quote_char": format_options.quote_char,
"double_quote": format_options.double_quote,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
# values taken from https://github.com/apache/arrow/blob/43c05c56b37daa93e76b94bc3e6952d56d1ea3f2/cpp/src/arrow/csv/options.cc#L41-L45
"null_values": additional_reader_options.pop(
"null_values",
[
"",
"#N/A",
"#N/A N/A",
"#NA",
"-1.#IND",
"-1.#QNAN",
"-NaN",
"-nan",
"1.#IND",
"1.#QNAN",
"N/A",
"NA",
"NULL",
"NaN",
"n/a",
"nan",
"null",
],
),
"true_values": additional_reader_options.pop("true_values", ["1", "True", "TRUE", "true"]),
"false_values": additional_reader_options.pop("false_values", ["0", "False", "FALSE", "false"]),
"inference_type": "Primitive Types Only" if format_options.infer_datatypes else "None",
"strings_can_be_null": additional_reader_options.pop("strings_can_be_null", False),
}
Expand Down

0 comments on commit 40b76a7

Please sign in to comment.