Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source S3: v4 rollout/feature parity #29753

Merged
merged 10 commits into from
Aug 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,10 @@ def _cast_types(
except ValueError:
warnings.append(_format_warning(key, value, prop_type))

result[key] = cast_value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

else:
warnings.append(_format_warning(key, value, prop_type))

result[key] = cast_value

if warnings:
logger.warning(
f"{FileBasedSourceError.ERROR_CASTING_VALUE.value}: {','.join([w for w in warnings])}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
pytest.param(
{"col9": "['a', 'b']"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col9": "['a', 'b']"}, id="cannot-cast-to-list-of-objects"
),
pytest.param({"col11": "x"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col11": "x"}, id="item-not-in-props-doesn't-error"),
pytest.param({"col11": "x"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {}, id="item-not-in-props-doesn't-error"),
],
)
def test_cast_to_python_type(row: Dict[str, str], true_values: Set[str], false_values: Set[str], expected_output: Dict[str, Any]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=3.1.8
LABEL io.airbyte.version=3.1.9
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,42 @@
{
"stream": {
"name": "test",
"json_schema": {},
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": [
"integer",
"null"
]
},
"fullname_and_valid": {
"type": [
"object",
"null"
],
"fullname": {
"type": [
"string",
"null"
]
},
"valid": {
"type": [
"boolean",
"null"
]
}
},
"_ab_source_file_last_modified": {
"type": "string",
"format": "date-time"
},
"_ab_source_file_url": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,36 @@
{
"stream": {
"name": "test",
"json_schema": {},
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": [
"null",
"integer"
]
},
"name": {
"type": [
"null",
"string"
]
},
"valid": {
"type": [
"null",
"boolean"
]
},
"_ab_source_file_last_modified": {
"type": "string",
"format": "date-time"
},
"_ab_source_file_url": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,56 @@
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": [
"null",
"integer"
]
},
"name": {
"type": [
"null",
"string"
]
},
"valid": {
"type": [
"null",
"boolean"
]
},
"value": {
"type": [
"null",
"number"
]
},
"event_date": {
"type": [
"null",
"string"
]
},
"_ab_source_file_last_modified": {
"type": "string",
"format": "date-time"
},
"_ab_source_file_url": {
"type": "string"
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
"default_cursor_field": [
"_ab_source_file_last_modified"
]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,122 @@
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"json_schema": {
"type" : "object",
"properties" : {
"Payroll_Number" : {
"type" : [
"null",
"number"
]
},
"Last_Name" : {
"type" : [
"null",
"string"
]
},
"First_Name" : {
"type" : [
"null",
"string"
]
},
"Mid_Init" : {
"type" : [
"null",
"string"
]
},
"Agency_Start_Date" : {
"type" : [
"null",
"string"
]
},
"Work_Location_Borough" : {
"type" : [
"null",
"number"
]
},
"Title_Description" : {
"type" : [
"null",
"string"
]
},
"Base_Salary" : {
"type" : [
"null",
"number"
]
},
"Regular_Hours" : {
"type" : [
"null",
"number"
]
},
"Regular_Gross_Paid" : {
"type" : [
"null",
"number"
]
},
"OT_Hours" : {
"type" : [
"null",
"number"
]
},
"Total_OT_Paid" : {
"type" : [
"null",
"number"
]
},
"Total_Other_Pay" : {
"type" : [
"null",
"number"
]
},
"Fiscal_Year" : {
"type" : [
"null",
"string"
]
},
"Leave_Status_as_of_June_30" : {
"type" : [
"null",
"string"
]
},
"Pay_Basis" : {
"type" : [
"null",
"string"
]
},
"_ab_source_file_last_modified" : {
"type" : "string",
"format" : "date-time"
},
"_ab_source_file_url" : {
"type" : "string"
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
"default_cursor_field": [
"_ab_source_file_last_modified"
]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 3.1.8
dockerImageTag: 3.1.9
dockerRepository: airbyte/source-s3
githubIssueLabel: source-s3
icon: s3.svg
Expand Down
14 changes: 4 additions & 10 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,8 @@ def documentation_url(cls) -> AnyUrl:
def validate_optional_args(cls, values):
aws_access_key_id = values.get("aws_access_key_id")
aws_secret_access_key = values.get("aws_secret_access_key")
endpoint = values.get("endpoint")
if aws_access_key_id or aws_secret_access_key:
if not (aws_access_key_id and aws_secret_access_key):
raise ValidationError(
"`aws_access_key_id` and `aws_secret_access_key` are both required to authenticate with AWS.", model=Config
)
if endpoint:
raise ValidationError(
"Either `aws_access_key_id` and `aws_secret_access_key` or `endpoint` must be set, but not both.", model=Config
)
if (aws_access_key_id or aws_secret_access_key) and not (aws_access_key_id and aws_secret_access_key):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

raise ValidationError(
"`aws_access_key_id` and `aws_secret_access_key` are both required to authenticate with AWS.", model=Config
)
return values
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,28 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat,
"delimiter": format_options.delimiter,
"quote_char": format_options.quote_char,
"double_quote": format_options.double_quote,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
# values taken from https://github.com/apache/arrow/blob/43c05c56b37daa93e76b94bc3e6952d56d1ea3f2/cpp/src/arrow/csv/options.cc#L41-L45
"null_values": additional_reader_options.pop("null_values", [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"",
"#N/A",
"#N/A N/A",
"#NA",
"-1.#IND",
"-1.#QNAN",
"-NaN",
"-nan",
"1.#IND",
"1.#QNAN",
"N/A",
"NA",
"NULL",
"NaN",
"n/a",
"nan",
"null",
]),
"true_values": additional_reader_options.pop("true_values", ["1", "True", "TRUE", "true"]),
"false_values": additional_reader_options.pop("false_values", ["0", "False", "FALSE", "false"]),
"inference_type": "Primitive Types Only" if format_options.infer_datatypes else "None",
"strings_can_be_null": additional_reader_options.pop("strings_can_be_null", False),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _page(
total_n_keys_for_prefix = 0
kwargs = {"Bucket": bucket}
while True:
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) if prefix else s3.list_objects_v2(Bucket=bucket)
response = s3.list_objects_v2(Prefix=prefix, **kwargs) if prefix else s3.list_objects_v2(**kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

key_count = response.get("KeyCount")
total_n_keys_for_prefix += key_count
logger.info(f"Received {key_count} objects from S3 for prefix '{prefix}'.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
pytest.param({"bucket": "test", "streams": []}, None, id="required-fields"),
pytest.param({"bucket": "test", "streams": [], "aws_access_key_id": "access_key", "aws_secret_access_key": "secret_access_key"}, None, id="config-created-with-aws-info"),
pytest.param({"bucket": "test", "streams": [], "endpoint": "http://test.com"}, None, id="config-created-with-endpoint"),
pytest.param({"bucket": "test", "streams": [], "aws_access_key_id": "access_key", "aws_secret_access_key": "secret_access_key", "endpoint": "http://test.com"}, ValidationError, id="cannot-have-endpoint-and-aws-info"),
pytest.param({"bucket": "test", "streams": [], "aws_access_key_id": "access_key", "aws_secret_access_key": "secret_access_key", "endpoint": "http://test.com"}, None, id="config-created-with-endpoint-and-aws-info"),
pytest.param({"streams": []}, ValidationError, id="missing-bucket"),
]
)
Expand Down