From 09cfcbf59918fb220310b18587a4d2af9a7d0a86 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Wed, 14 Dec 2022 21:53:06 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20S3:=20Check=20config=20?= =?UTF-8?q?settings=20for=20CSV=20file=20format=20(#20262)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Source S3: get master schema on check connection * Source S3: bump version * Source S3: update docs * Source S3: fix test * Source S3: add fields validation for CSV source * Source S3: add test * Source S3: Refactor config validation * Source S3: update docs * Source S3: format * Source S3: format * Source S3: fix tests * Source S3: fix tests * Source S3: fix tests * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 4 +- .../connectors/source-s3/Dockerfile | 2 +- .../source-s3/acceptance-test-config.yml | 2 + .../integration_tests/config_minio.json | 7 ++- .../config_minio.template.json | 5 +- .../source-s3/integration_tests/spec.json | 2 + .../formats/abstract_file_parser.py | 3 ++ .../formats/csv_parser.py | 18 +++++++ .../source_files_abstract/formats/csv_spec.py | 2 + .../source_s3/source_files_abstract/source.py | 5 +- .../source-s3/unit_tests/test_source.py | 48 ++++++++++++++++++- docs/integrations/sources/s3.md | 3 +- 13 files changed, 92 insertions(+), 11 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index dc901dee363976..ca31fc70e4985a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1407,7 +1407,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.26 + dockerImageTag: 0.1.27 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 551e606585c3ef..ff0b63dc89e11d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -12449,7 +12449,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.26" +- dockerImage: "airbyte/source-s3:0.1.27" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3" @@ -12582,6 +12582,8 @@ \ during schema detection, increasing this should solve it. Beware\ \ of raising this too high as you could hit OOM errors." default: 10000 + minimum: 1 + maximum: 2147483647 order: 9 type: "integer" - title: "Parquet" diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index c38be4e387ef8d..e97e475cc4a2dd 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.26 +LABEL io.airbyte.version=0.1.27 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index c85b3712cc75c6..e652a72ea4b8ad 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -4,6 +4,8 @@ connector_image: airbyte/source-s3:dev tests: spec: - spec_path: "integration_tests/spec.json" + backward_compatibility_tests_config: + disable_for_version: "0.1.26" connection: # for CSV format - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json index 1a0863be9c8b60..6198d09c452e4e 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -6,10 +6,13 @@ "aws_access_key_id": "123456", "aws_secret_access_key": "123456key", "path_prefix": "", - "endpoint": "http://10.0.229.255:9000" + "endpoint": "http://10.0.210.197:9000" }, "format": { - "filetype": "csv" + "filetype": "csv", + "delimiter": ",", + "quote_char": "'", + "encoding": "utf8" }, "path_pattern": "*.csv", "schema": "{}" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json index c551bfeb6685be..574f5fb000e53e 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json @@ -9,7 +9,10 @@ "endpoint": "http://:9000" }, "format": { - "filetype": "csv" + "filetype": "csv", + "delimiter": ",", + "quote_char": "'", + "encoding": "utf8" }, "path_pattern": "*.csv", "schema": "{}" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index 417c4805850156..2fcd27e2bcf926 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -110,6 +110,8 @@ "title": "Block Size", "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", "default": 10000, + "minimum": 1, + "maximum": 2147483647, "order": 9, "type": "integer" } diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py index 44579b5a099972..84dc292e96b008 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py @@ -108,3 +108,6 @@ def json_schema_to_pyarrow_schema(cls, schema: Mapping[str, Any], reverse: bool :return: converted schema dict """ return {column: cls.json_type_to_pyarrow_type(json_type, reverse=reverse) for column, json_type in schema.items()} + + def _validate_config(self, config: Mapping[str, Any]): + pass diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py index 9b832653b932b1..adb87b4672201c 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import codecs import csv import json import tempfile @@ -50,6 +51,23 @@ def format(self) -> CsvFormat: self.format_model = CsvFormat.parse_obj(self._format) return self.format_model + def _validate_field_len(self, config: Mapping[str, Any], field_name: str): + if len(config.get("format", {}).get(field_name)) != 1: + raise ValueError(f"{field_name} should contain 1 character only") + + def _validate_config(self, config: Mapping[str, Any]): + if config.get("format", {}).get("filetype") == "csv": + self._validate_field_len(config, "delimiter") + if config.get("format", {}).get("delimiter") in ("\r", "\n"): + raise ValueError("Delimiter cannot be \r or \n") + + self._validate_field_len(config, "quote_char") + + if config.get("format", {}).get("escape_char"): + self._validate_field_len(config, "escape_char") + + codecs.lookup(config.get("format", {}).get("encoding")) + def _read_options(self) -> Mapping[str, str]: """ https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py index 34da5af3e6df83..f2c82d7a86c089 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py @@ -73,6 +73,8 @@ class Config: ) block_size: int = Field( default=10000, + ge=1, + le=2_147_483_647, # int32_t max description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", order=9, ) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py index 10aa6d094fdfae..6d4a453f23bdf6 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py @@ -2,7 +2,6 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - from abc import ABC, abstractmethod from traceback import format_exc from typing import Any, List, Mapping, Optional, Tuple @@ -59,7 +58,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> The error object will be cast to string to display the problem to the user. """ try: - for file_info in self.stream_class(**config).filepath_iterator(): + stream = self.stream_class(**config) + stream.fileformatparser_class(stream._format)._validate_config(config) + for file_info in stream.filepath_iterator(): # TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams # test that matching on the pattern doesn't error globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT) diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py index f556412f77b064..680d303e0a3602 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py @@ -38,13 +38,57 @@ def test_check_connection_exception(config): assert error_msg +@pytest.mark.parametrize( + "delimiter, quote_char, escape_char, encoding, error_type", + [ + ("string", "'", None, "utf8", ValueError), + ("\n", "'", None, "utf8", ValueError), + (",", ";,", None, "utf8", ValueError), + (",", "'", "escape", "utf8", ValueError), + (",", "'", None, "utf888", LookupError) + ], + ids=[ + "long_delimiter", + "forbidden_delimiter_symbol", + "long_quote_char", + "long_escape_char", + "unknown_encoding" + ], +) +def test_check_connection_csv_validation_exception(delimiter, quote_char, escape_char, encoding, error_type): + config = { + "dataset": "test", + "provider": { + "storage": "S3", + "bucket": "test-source-s3", + "aws_access_key_id": "key_id", + "aws_secret_access_key": "access_key", + "path_prefix": "" + }, + "path_pattern": "simple_test*.csv", + "schema": "{}", + "format": { + "filetype": "csv", + "delimiter": delimiter, + "quote_char": quote_char, + "escape_char": escape_char, + "encoding": encoding, + } + } + ok, error_msg = SourceS3().check_connection(logger, config=config) + + assert not ok + assert error_msg + assert isinstance(error_msg, error_type) + + def test_check_connection(config): instance = SourceS3() with patch.object(instance.stream_class, "filepath_iterator", MagicMock()): ok, error_msg = instance.check_connection(logger, config=config) - assert ok - assert not error_msg + assert not ok + assert error_msg def test_streams(config): diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 401288e206e87b..5b039fdd4b9e76 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -209,7 +209,8 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| -| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | +| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | +| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | | 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file | | 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays | | 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` |