Skip to content

Commit

Permalink
馃悰 Source S3: Check config settings for CSV file format (#20262)
Browse files Browse the repository at this point in the history
* Source S3: get master schema on check connection

* Source S3: bump version

* Source S3: update docs

* Source S3: fix test

* Source S3: add fields validation for CSV source

* Source S3: add test

* Source S3: Refactor config validation

* Source S3: update docs

* Source S3: format

* Source S3: format

* Source S3: fix tests

* Source S3: fix tests

* Source S3: fix tests

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
artem1205 and octavia-squidington-iii committed Dec 14, 2022
1 parent 3dae1f1 commit 09cfcbf
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.26
dockerImageTag: 0.1.27
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12449,7 +12449,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.26"
- dockerImage: "airbyte/source-s3:0.1.27"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down Expand Up @@ -12582,6 +12582,8 @@
\ during schema detection, increasing this should solve it. Beware\
\ of raising this too high as you could hit OOM errors."
default: 10000
minimum: 1
maximum: 2147483647
order: 9
type: "integer"
- title: "Parquet"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.26
LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ connector_image: airbyte/source-s3:dev
tests:
spec:
- spec_path: "integration_tests/spec.json"
backward_compatibility_tests_config:
disable_for_version: "0.1.26"
connection:
# for CSV format
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.229.255:9000"
"endpoint": "http://10.0.210.197:9000"
},
"format": {
"filetype": "csv"
"filetype": "csv",
"delimiter": ",",
"quote_char": "'",
"encoding": "utf8"
},
"path_pattern": "*.csv",
"schema": "{}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
"endpoint": "http://<local_ip>:9000"
},
"format": {
"filetype": "csv"
"filetype": "csv",
"delimiter": ",",
"quote_char": "'",
"encoding": "utf8"
},
"path_pattern": "*.csv",
"schema": "{}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@
"title": "Block Size",
"description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
"default": 10000,
"minimum": 1,
"maximum": 2147483647,
"order": 9,
"type": "integer"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,6 @@ def json_schema_to_pyarrow_schema(cls, schema: Mapping[str, Any], reverse: bool
:return: converted schema dict
"""
return {column: cls.json_type_to_pyarrow_type(json_type, reverse=reverse) for column, json_type in schema.items()}

def _validate_config(self, config: Mapping[str, Any]):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import codecs
import csv
import json
import tempfile
Expand Down Expand Up @@ -50,6 +51,23 @@ def format(self) -> CsvFormat:
self.format_model = CsvFormat.parse_obj(self._format)
return self.format_model

def _validate_field_len(self, config: Mapping[str, Any], field_name: str):
if len(config.get("format", {}).get(field_name)) != 1:
raise ValueError(f"{field_name} should contain 1 character only")

def _validate_config(self, config: Mapping[str, Any]):
if config.get("format", {}).get("filetype") == "csv":
self._validate_field_len(config, "delimiter")
if config.get("format", {}).get("delimiter") in ("\r", "\n"):
raise ValueError("Delimiter cannot be \r or \n")

self._validate_field_len(config, "quote_char")

if config.get("format", {}).get("escape_char"):
self._validate_field_len(config, "escape_char")

codecs.lookup(config.get("format", {}).get("encoding"))

def _read_options(self) -> Mapping[str, str]:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Config:
)
block_size: int = Field(
default=10000,
ge=1,
le=2_147_483_647, # int32_t max
description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.",
order=9,
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from abc import ABC, abstractmethod
from traceback import format_exc
from typing import Any, List, Mapping, Optional, Tuple
Expand Down Expand Up @@ -59,7 +58,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
The error object will be cast to string to display the problem to the user.
"""
try:
for file_info in self.stream_class(**config).filepath_iterator():
stream = self.stream_class(**config)
stream.fileformatparser_class(stream._format)._validate_config(config)
for file_info in stream.filepath_iterator():
# TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams
# test that matching on the pattern doesn't error
globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,57 @@ def test_check_connection_exception(config):
assert error_msg


@pytest.mark.parametrize(
"delimiter, quote_char, escape_char, encoding, error_type",
[
("string", "'", None, "utf8", ValueError),
("\n", "'", None, "utf8", ValueError),
(",", ";,", None, "utf8", ValueError),
(",", "'", "escape", "utf8", ValueError),
(",", "'", None, "utf888", LookupError)
],
ids=[
"long_delimiter",
"forbidden_delimiter_symbol",
"long_quote_char",
"long_escape_char",
"unknown_encoding"
],
)
def test_check_connection_csv_validation_exception(delimiter, quote_char, escape_char, encoding, error_type):
config = {
"dataset": "test",
"provider": {
"storage": "S3",
"bucket": "test-source-s3",
"aws_access_key_id": "key_id",
"aws_secret_access_key": "access_key",
"path_prefix": ""
},
"path_pattern": "simple_test*.csv",
"schema": "{}",
"format": {
"filetype": "csv",
"delimiter": delimiter,
"quote_char": quote_char,
"escape_char": escape_char,
"encoding": encoding,
}
}
ok, error_msg = SourceS3().check_connection(logger, config=config)

assert not ok
assert error_msg
assert isinstance(error_msg, error_type)


def test_check_connection(config):
instance = SourceS3()
with patch.object(instance.stream_class, "filepath_iterator", MagicMock()):
ok, error_msg = instance.check_connection(logger, config=config)

assert ok
assert not error_msg
assert not ok
assert error_msg


def test_streams(config):
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |
| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format |
| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |
| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file |
| 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays |
| 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` |
Expand Down

0 comments on commit 09cfcbf

Please sign in to comment.