diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index d6cc5ad57abd..bce0e75c21b4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1711,7 +1711,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 1.0.2 + dockerImageTag: 2.0.0 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 3387c9a2ae0a..e976ea1361f0 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -12929,7 +12929,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:1.0.2" +- dockerImage: "airbyte/source-s3:2.0.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3" diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index d8e2a25bc51c..efe74dd39b4e 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.0.2 +LABEL io.airbyte.version=2.0.0 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index 469aec344e6d..49958a915e3d 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -1,33 +1,4 @@ -connector_image: airbyte/source-s3:dev -test_strictness_level: high acceptance_tests: - spec: - tests: - - spec_path: integration_tests/spec.json - - connection: - tests: - - config_path: secrets/config.json - status: succeed - - config_path: secrets/parquet_config.json - status: succeed - - config_path: secrets/avro_config.json - status: succeed - - config_path: secrets/jsonl_config.json - status: succeed - - config_path: secrets/jsonl_newlines_config.json - status: succeed - - config_path: integration_tests/invalid_config.json - status: failed - - discovery: - tests: - - config_path: secrets/config.json - - config_path: secrets/parquet_config.json - - config_path: secrets/avro_config.json - - config_path: secrets/jsonl_config.json - - config_path: secrets/jsonl_newlines_config.json - basic_read: tests: - config_path: secrets/config.json @@ -51,6 +22,27 @@ acceptance_tests: path: integration_tests/expected_records/jsonl_newlines.jsonl timeout_seconds: 1800 + connection: + tests: + - config_path: secrets/config.json + status: succeed + - config_path: secrets/parquet_config.json + status: succeed + - config_path: secrets/avro_config.json + status: succeed + - config_path: secrets/jsonl_config.json + status: succeed + - config_path: secrets/jsonl_newlines_config.json + status: succeed + - config_path: integration_tests/invalid_config.json + status: failed + discovery: + tests: + - config_path: secrets/config.json + - config_path: secrets/parquet_config.json + - config_path: secrets/avro_config.json + - config_path: secrets/jsonl_config.json + - config_path: secrets/jsonl_newlines_config.json full_refresh: tests: - config_path: secrets/config.json @@ -111,3 +103,8 @@ acceptance_tests: future_state: future_state_path: integration_tests/abnormal_state.json timeout_seconds: 1800 + spec: + tests: + - spec_path: integration_tests/spec.json +connector_image: airbyte/source-s3:dev +test_strictness_level: high diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json index 26a2e199cba6..fa13fb3b9ca0 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -6,7 +6,7 @@ "aws_access_key_id": "123456", "aws_secret_access_key": "123456key", "path_prefix": "", - "endpoint": "http://10.0.45.4:9000" + "endpoint": "http://10.0.186.80:9000" }, "format": { "filetype": "csv", diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py index b969dc82715a..1ae6079e1b5d 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py @@ -118,7 +118,7 @@ def _stream_records_test_logic( LOGGER.info(f"Testing stream_records() in SyncMode:{sync_mode.value}") # check we return correct schema from get_json_schema() - assert fs._get_schema_map() == full_expected_schema + assert fs._schema == full_expected_schema records = [] for stream_slice in fs.stream_slices(sync_mode=sync_mode, stream_state=current_state): @@ -305,22 +305,6 @@ def _stream_records_test_logic( False, True, ), - # multiple file tests (different but merge-able schemas) - ( # auto-infer - [ - SAMPLE_DIR.joinpath("simple_test.csv"), - SAMPLE_DIR.joinpath("multi_file_diffschema_1.csv"), - SAMPLE_DIR.joinpath("multi_file_diffschema_2.csv"), - ], - "**", - True, - 6, - 17, - {"id": "integer", "name": "string", "valid": "boolean", "location": "string", "percentage": "number", "nullable": "string"}, - None, - False, - False, - ), ( # provided schema, not containing all columns (extra columns should go into FileStream.ab_additional_col) [ SAMPLE_DIR.joinpath("simple_test.csv"), diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py index ae0a1840e82d..85408ee35b03 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py @@ -7,7 +7,7 @@ from typing import Any, List, Mapping, Optional, Tuple from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import ConnectorSpecification +from airbyte_cdk.models import ConnectorSpecification, SyncMode from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream @@ -65,13 +65,14 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> # test that matching on the pattern doesn't error globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT) # just need first file here to test connection and valid patterns - return True, None - + break + for slice_ in stream.stream_slices(sync_mode=SyncMode.full_refresh): + list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_)) + break except Exception as e: logger.error(format_exc()) return False, e - logger.warn("Found 0 files (but connection is valid).") return True, None def streams(self, config: Mapping[str, Any]) -> List[Stream]: diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 1dac6f9e8c6e..9283d2a652ac 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -3,23 +3,19 @@ # -import concurrent.futures import json -import threading from abc import ABC, abstractmethod from copy import deepcopy from datetime import datetime, timedelta -from functools import lru_cache +from functools import cached_property, lru_cache from traceback import format_exc from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import FailureType from airbyte_cdk.models.airbyte_protocol import SyncMode from airbyte_cdk.sources.streams import Stream from wcmatch.glob import GLOBSTAR, SPLIT, globmatch -from ..exceptions import S3Exception from .file_info import FileInfo from .formats.abstract_file_parser import AbstractFileParser from .formats.avro_parser import AvroParser @@ -31,7 +27,6 @@ JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"] LOGGER = AirbyteLogger() -LOCK = threading.Lock() class ConfigurationError(Exception): @@ -55,7 +50,6 @@ def fileformatparser_map(self) -> Mapping[str, type]: ab_file_name_col = "_ab_source_file_url" airbyte_columns = [ab_additional_col, ab_last_mod_col, ab_file_name_col] datetime_format_string = "%Y-%m-%dT%H:%M:%S%z" - parallel_tasks_size = 256 def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str, schema: str = None): """ @@ -69,10 +63,9 @@ def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str self._path_pattern = path_pattern self._provider = provider self._format = format - self._schema: Dict[str, Any] = {} + self._user_input_schema: Dict[str, Any] = {} if schema: - self._schema = self._parse_user_input_schema(schema) - self.master_schema: Dict[str, Any] = None + self._user_input_schema = self._parse_user_input_schema(schema) LOGGER.info(f"initialised stream with format: {format}") @staticmethod @@ -165,145 +158,34 @@ def get_time_ordered_file_infos(self) -> List[FileInfo]: """ return sorted(self.pattern_matched_filepath_iterator(self.filepath_iterator()), key=lambda file_info: file_info.last_modified) - def _get_schema_map(self) -> Mapping[str, Any]: - return_schema: Dict[str, Any] = None - if self._schema != {}: - return_schema = deepcopy(self._schema) - else: # we have no provided schema or schema state from a previous incremental run - return_schema = self._get_master_schema() + @property + def _raw_schema(self) -> Mapping[str, Any]: + if self._user_input_schema and isinstance(self._user_input_schema, dict): + return self._user_input_schema + return self._auto_inferred_schema - return_schema[self.ab_additional_col] = "object" - return_schema[self.ab_last_mod_col] = "string" - return_schema[self.ab_file_name_col] = "string" - return return_schema + @property + def _schema(self) -> Mapping[str, Any]: + extra_fields = {self.ab_additional_col: "object", self.ab_last_mod_col: "string", self.ab_file_name_col: "string"} + schema = self._raw_schema + return {**schema, **extra_fields} def get_json_schema(self) -> Mapping[str, Any]: - """ - :return: the JSON schema representing this stream. - """ # note: making every non-airbyte column nullable for compatibility - # TODO: ensure this behaviour still makes sense as we add new file formats properties: Mapping[str, Any] = { - column: {"type": ["null", typ]} if column not in self.airbyte_columns else {"type": typ} - for column, typ in self._get_schema_map().items() + column: {"type": ["null", typ]} if column not in self.airbyte_columns else {"type": typ} for column, typ in self._schema.items() } - properties[self.ab_last_mod_col]["format"] = "date-time" return {"type": "object", "properties": properties} - @staticmethod - def _broadest_type(type_1: str, type_2: str) -> Optional[str]: - non_comparable_types = ["object", "array", "null"] - if type_1 in non_comparable_types or type_2 in non_comparable_types: - return None - types = {type_1, type_2} - if types == {"boolean", "string"}: - return "string" - if types == {"integer", "number"}: - return "number" - if types == {"integer", "string"}: - return "string" - if types == {"number", "string"}: - return "string" - - @staticmethod - def guess_file_schema(storage_file, file_reader, file_info, processed_files, schemas): - try: - with storage_file.open(file_reader.is_binary) as f: - this_schema = file_reader.get_inferred_schema(f, file_info) - with LOCK: - schemas[file_info] = this_schema - processed_files.append(file_info) - except OSError: - pass - - def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]: - """ - In order to auto-infer a schema across many files and/or allow for additional properties (columns), - we need to determine the superset of schemas across all relevant files. - This method iterates through get_time_ordered_file_infos() obtaining the inferred schema (process implemented per file format), - to build up this superset schema (master_schema). - This runs datatype checks to Warn or Error if we find incompatible schemas (e.g. same column is 'date' in one file but 'float' in another). - This caches the master_schema after first run in order to avoid repeated compute and network calls to infer schema on all files. - - :param min_datetime: if passed, will only use files with last_modified >= this to determine master schema - - :raises RuntimeError: if we find datatype mismatches between files or between a file and schema state (provided or from previous inc. batch) - :return: A dict of the JSON schema representing this stream. - """ - # TODO: could implement a (user-beware) 'lazy' mode that skips schema checking to improve performance - # TODO: could utilise min_datetime to add a start_date parameter in spec for user - if self.master_schema is None: - master_schema = deepcopy(self._schema) - - file_reader = self.fileformatparser_class(self._format) - - processed_files = [] - schemas = {} - - file_infos = list(self.get_time_ordered_file_infos()) - if min_datetime is not None: - file_infos = [info for info in file_infos if info.last_modified >= min_datetime] - - for i in range(0, len(file_infos), self.parallel_tasks_size): - chunk_infos = file_infos[i : i + self.parallel_tasks_size] - with concurrent.futures.ThreadPoolExecutor() as executor: - list( - executor.map( - lambda args: self.guess_file_schema(*args), - [ - (self.storagefile_class(file_info, self._provider), file_reader, file_info, processed_files, schemas) - for file_info in chunk_infos - ], - ) - ) - - for file_info in file_infos: - this_schema = schemas[file_info] - if this_schema == master_schema: - continue # exact schema match so go to next file - - # creates a superset of columns retaining order of master_schema with any additional columns added to end - column_superset = list(master_schema.keys()) + [c for c in this_schema.keys() if c not in master_schema.keys()] - # this compares datatype of every column that the two schemas have in common - for col in column_superset: - if (col in master_schema.keys()) and (col in this_schema.keys()) and (master_schema[col] != this_schema[col]): - # If this column exists in a provided schema or schema state, we'll WARN here rather than throw an error. - # This is to allow more leniency as we may be able to coerce this datatype mismatch on read according to - # provided schema state. Else we're inferring the schema (or at least this column) from scratch, and therefore - # we try to choose the broadest type among two if possible - broadest_of_types = self._broadest_type(master_schema[col], this_schema[col]) - type_explicitly_defined = col in self._schema.keys() - override_type = broadest_of_types and not type_explicitly_defined - if override_type: - master_schema[col] = broadest_of_types - if override_type or type_explicitly_defined: - LOGGER.warn( - f"Detected mismatched datatype on column '{col}', in file '{file_info}'. " - + f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. " - + f"Airbyte will attempt to coerce this to {master_schema[col]} on read." - ) - continue - # otherwise throw an error on mismatching datatypes - raise S3Exception( - processed_files, - "Column type mismatch", - f"Detected mismatched datatype on column '{col}', in file '{file_info}'. " - + f"Should be '{master_schema[col]}', but found '{this_schema[col]}'.", - failure_type=FailureType.config_error, - ) - - # missing columns in this_schema doesn't affect our master_schema, so we don't check for it here - - # add to master_schema any columns from this_schema that aren't already present - for col, datatype in this_schema.items(): - if col not in master_schema.keys(): - master_schema[col] = datatype - - LOGGER.info(f"determined master schema: {master_schema}") - self.master_schema = master_schema - - return self.master_schema + @cached_property + def _auto_inferred_schema(self) -> Dict[str, Any]: + file_reader = self.fileformatparser_class(self._format) + file_info_iterator = iter(list(self.get_time_ordered_file_infos())) + file_info = next(file_info_iterator) + storage_file = self.storagefile_class(file_info, self._provider) + with storage_file.open(file_reader.is_binary) as f: + return file_reader.get_inferred_schema(f, file_info) def stream_slices( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None @@ -330,7 +212,7 @@ def _match_target_schema(self, record: Dict[str, Any], target_columns: List) -> We start off with a check to see if we're already lined up to target in order to avoid unnecessary iterations (useful if many columns) :param record: json-like representation of a data row {column:value} - :param target_columns: list of column names to mutate this record into (obtained via self._get_schema_map().keys() as of now) + :param target_columns: list of column names to mutate this record into (obtained via self._schema.keys() as of now) :return: mutated record with columns lining up to target_columns """ compare_columns = [c for c in target_columns if c not in [self.ab_last_mod_col, self.ab_file_name_col]] @@ -378,7 +260,7 @@ def _read_from_slice( with storage_file.open(file_reader.is_binary) as f: # TODO: make this more efficient than mutating every record one-by-one as they stream for record in file_reader.stream_records(f, storage_file.file_info): - schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys())) + schema_matched_record = self._match_target_schema(record, list(self._schema.keys())) complete_record = self._add_extra_fields_from_map( schema_matched_record, { @@ -402,7 +284,7 @@ def read_records( The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic """ if stream_slice: - file_reader = self.fileformatparser_class(self._format, self._get_master_schema()) + file_reader = self.fileformatparser_class(self._format, self._raw_schema) yield from self._read_from_slice(file_reader, stream_slice) @@ -490,8 +372,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late ) state_dict[self.cursor_field] = datetime.strftime(max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string) - state_dict["schema"] = self._get_schema_map() - state_date = self._get_datetime_from_stream_state(state_dict).date() if not self.sync_all_files_always: @@ -538,10 +418,6 @@ def stream_slices( yield from super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state) else: - # if necessary and present, let's update this object's schema attribute to the schema stored in state - # TODO: ideally we could do this on __init__ but I'm not sure that's possible without breaking from cdk style implementation - if self._schema == {} and stream_state is not None and "schema" in stream_state.keys(): - self._schema = stream_state["schema"] # logic here is to bundle all files with exact same last modified timestamp together in each slice prev_file_last_mod: datetime = None # init variable to hold previous iterations last modified @@ -566,26 +442,3 @@ def stream_slices( else: # in case we have no files yield None - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - """ - The heavy lifting sits in _read_from_slice() which is full refresh / incremental agnostic. - We override this for incremental so we can pass our minimum datetime from state into _get_master_schema(). - This means we only parse the schema of new files on incremental runs rather than all files in the bucket. - """ - if stream_slice: - if sync_mode == SyncMode.full_refresh: - yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state) - - else: - - file_reader = self.fileformatparser_class( - self._format, self._get_master_schema(self._get_datetime_from_stream_state(stream_state)) - ) - yield from self._read_from_slice(file_reader, stream_slice) diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py index 60f7ce5271d7..0c025365f144 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py @@ -9,7 +9,6 @@ import pytest from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import SyncMode -from source_s3.exceptions import S3Exception from source_s3.source_files_abstract.file_info import FileInfo from source_s3.source_files_abstract.storagefile import StorageFile from source_s3.source_files_abstract.stream import IncrementalFileStream @@ -516,151 +515,15 @@ def test_filepath_iterator(self, bucket, path_prefix, list_v2_objects, expected_ for file_info in stream_instance.filepath_iterator(): assert file_info == next(expected_info) - @pytest.mark.parametrize( - ("user_schema", "min_datetime", "ordered_file_infos", "file_schemas", "expected_schema", "log_expected", "error_expected"), - ( - ( # first file skipped due to min datetime constraint - """{"id": "string", "first_name": "string", "last_name": "string", "single": "boolean"}""", - datetime(2022, 5, 5, 13, 5, 5), - [ - FileInfo(last_modified=datetime(2022, 1, 1, 13, 5, 5), key="first", size=128), - FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128), - ], - [{"pets": "array", "hobbies": "array"}], # this is the second file schema as call for the first one will be skipped - {"id": "string", "first_name": "string", "last_name": "string", "single": "boolean", "pets": "array", "hobbies": "array"}, - False, - False, - ), - ( # first file schema == user schema - """{"id": "string", "first_name": "string", "last_name": "string", "single": "boolean"}""", - datetime(2020, 5, 5, 13, 5, 5), - [ - FileInfo(last_modified=datetime(2022, 1, 1, 13, 5, 5), key="first", size=128), - FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128), - ], - [ - {"id": "string", "first_name": "string", "last_name": "string", "single": "boolean"}, - {"pets": "array", "hobbies": "array"}, - ], - {"id": "string", "first_name": "string", "last_name": "string", "single": "boolean", "pets": "array", "hobbies": "array"}, - False, - False, - ), - ( # warning log expected - """{"id": "string", "first_name": "string", "last_name": "string", "single": "boolean"}""", - datetime(2020, 5, 5, 13, 5, 5), - [ - FileInfo(last_modified=datetime(2022, 1, 1, 13, 5, 5), key="first", size=128), - FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128), - ], - [ - {"id": "integer", "first_name": "string", "last_name": "string", "single": "boolean"}, - {"pets": "array", "hobbies": "array"}, - ], - {"id": "string", "first_name": "string", "last_name": "string", "single": "boolean", "pets": "array", "hobbies": "array"}, - True, - False, - ), - ( # error expected - """{"id": "string", "first_name": "string", "last_name": "string", "single": "boolean"}""", - datetime(2020, 5, 5, 13, 5, 5), - [ - FileInfo(last_modified=datetime(2022, 1, 1, 13, 5, 5), key="first", size=128), - FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128), - ], - [{"pets": "boolean", "hobbies": "boolean"}, {"pets": "array", "hobbies": "array"}], - {}, - False, - True, - ), - ( # successful merge of 3 schemas - """{"id": "string", "first_name": "string", "last_name": "string", "single": "boolean"}""", - datetime(2020, 5, 5, 13, 5, 5), - [ - FileInfo(last_modified=datetime(2022, 1, 1, 13, 5, 5), key="first", size=128), - FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128), - ], - [{"company": "string", "gender": "string"}, {"pets": "array", "hobbies": "array"}], - { - "id": "string", - "first_name": "string", - "last_name": "string", - "single": "boolean", - "pets": "array", - "hobbies": "array", - "company": "string", - "gender": "string", - }, - False, - False, - ), - ( # int becomes str in case of type mismatch in different files - "{}", - datetime(2020, 5, 5, 13, 5, 5), - [ - FileInfo(last_modified=datetime(2022, 1, 1, 13, 5, 5), key="first", size=128), - FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128), - ], - [ - { - "pk": "string", - "full_name": "string", - "street_address": "string", - "customer_code": "integer", - "email": "string", - "dob": "string", - }, - { - "pk": "integer", - "full_name": "string", - "street_address": "string", - "customer_code": "integer", - "email": "string", - "dob": "string", - }, - ], - { - "pk": "string", - "full_name": "string", - "street_address": "string", - "customer_code": "integer", - "email": "string", - "dob": "string", - }, - True, - False, - ), - ), - ) - @patch("source_s3.stream.IncrementalFileStreamS3.storagefile_class", MagicMock()) - def test_master_schema( - self, capsys, user_schema, min_datetime, ordered_file_infos, file_schemas, expected_schema, log_expected, error_expected - ): - file_format_parser_mock = MagicMock(return_value=MagicMock(get_inferred_schema=MagicMock(side_effect=file_schemas))) - with patch.object(IncrementalFileStreamS3, "fileformatparser_class", file_format_parser_mock): - with patch.object(IncrementalFileStreamS3, "get_time_ordered_file_infos", MagicMock(return_value=ordered_file_infos)): - stream_instance = IncrementalFileStreamS3( - dataset="dummy", provider={}, format={"filetype": "csv"}, schema=user_schema, path_pattern="**/prefix*.csv" - ) - if error_expected: - with pytest.raises(S3Exception): - stream_instance._get_master_schema(min_datetime=min_datetime) - else: - assert stream_instance._get_master_schema(min_datetime=min_datetime) == expected_schema - if log_expected: - captured = capsys.readouterr() - assert "Detected mismatched datatype" in captured.out - - @patch.object( - IncrementalFileStreamS3, - "_get_master_schema", - MagicMock(return_value={"column_A": "string", "column_B": "integer", "column_C": "boolean"}), - ) - def test_get_schema_map(self): + def test_get_schema(self): stream_instance = IncrementalFileStreamS3( - dataset="dummy", provider={}, format={"filetype": "csv"}, schema={}, path_pattern="**/prefix*.csv" + dataset="dummy", + provider={}, + format={"filetype": "csv"}, + schema="{\"column_A\": \"string\", \"column_B\": \"integer\", \"column_C\": \"boolean\"}", + path_pattern="**/prefix*.csv" ) - assert stream_instance._get_schema_map() == { + assert stream_instance._schema == { "_ab_additional_properties": "object", "_ab_source_file_last_modified": "string", "_ab_source_file_url": "string", @@ -691,23 +554,13 @@ def test_fileformatparser_class(self, file_type, error_expected): else: assert stream_instance.fileformatparser_class - @patch.object( - IncrementalFileStreamS3, - "_get_schema_map", - MagicMock( - return_value={ - "_ab_additional_properties": "object", - "_ab_source_file_last_modified": "string", - "_ab_source_file_url": "string", - "column_A": "string", - "column_B": "integer", - "column_C": "boolean", - } - ), - ) def test_get_json_schema(self): stream_instance = IncrementalFileStreamS3( - dataset="dummy", provider={}, format={"filetype": "csv"}, schema={}, path_pattern="**/prefix*.csv" + dataset="dummy", + provider={}, + format={"filetype": "csv"}, + schema="{\"column_A\": \"string\", \"column_B\": \"integer\", \"column_C\": \"boolean\"}", + path_pattern="**/prefix*.csv" ) assert stream_instance.get_json_schema() == { "properties": { diff --git a/connectors.md b/connectors.md index f4ba0a3cdbd1..0bbef3638fcc 100644 --- a/connectors.md +++ b/connectors.md @@ -196,7 +196,7 @@ | **Reply.io** | Reply.io icon | Source | airbyte/source-reply-io:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/reply-io) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-reply-io) | `8cc6537e-f8a6-423c-b960-e927af76116e` | | **Retently** | Retently icon | Source | airbyte/source-retently:0.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/retently) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-retently) | `db04ecd1-42e7-4115-9cec-95812905c626` | | **Rocket.chat** | Rocket.chat icon | Source | airbyte/source-rocket-chat:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/rocket-chat) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-rocket-chat) | `921d9608-3915-450b-8078-0af18801ea1b` | -| **S3** | S3 icon | Source | airbyte/source-s3:1.0.2 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | `69589781-7828-43c5-9f63-8925b1c1ccc2` | +| **S3** | S3 icon | Source | airbyte/source-s3:2.0.0 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | `69589781-7828-43c5-9f63-8925b1c1ccc2` | | **SAP Fieldglass** | SAP Fieldglass icon | Source | airbyte/source-sap-fieldglass:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sap-fieldglass) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sap-fieldglass) | `ec5f3102-fb31-4916-99ae-864faf8e7e25` | | **SFTP** | SFTP icon | Source | airbyte/source-sftp:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | `a827c52e-791c-4135-a245-e233c5255199` | | **SFTP Bulk** | SFTP Bulk icon | Source | airbyte/source-sftp-bulk:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | `31e3242f-dee7-4cdc-a4b8-8e06c5458517` | diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 486cf9a44717..837ca66fa7f5 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -120,7 +120,7 @@ As you can probably tell, there are many ways to achieve the same goal with path ## User Schema -Providing a schema allows for more control over the output of this stream. Without a provided schema, columns and datatypes will be inferred from each file and a superset schema created. This will probably be fine in most cases but there may be situations you want to enforce a schema instead, e.g.: +Providing a schema allows for more control over the output of this stream. Without a provided schema, columns and datatypes will be inferred from the first created file in the bucket matching your path pattern and suffix. This will probably be fine in most cases but there may be situations you want to enforce a schema instead, e.g.: * You only care about a specific known subset of the columns. The other columns would all still be included, but packed into the `_ab_additional_properties` map. * Your initial dataset is quite small \(in terms of number of records\), and you think the automatic type inference from this sample might not be representative of the data in the future. @@ -142,6 +142,12 @@ For example: * {"id": "integer", "location": "string", "longitude": "number", "latitude": "number"} * {"username": "string", "friends": "array", "information": "object"} +:::note + +Please note, the S3 Source connector used to infer schemas from all the available files and then merge them to create a superset schema. Starting from version 2.0.0 the schema inference works based on the first file found only. The first file we consider is the oldest one written to the prefix. + +::: + ## S3 Provider Settings @@ -209,7 +215,8 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------| -| 1.0.2 | 2023-03-02 | [23669](https://github.com/airbytehq/airbyte/pull/23669)| Made `Advanced Reader Options` and `Advanced Options` truly `optional` for `CSV` format | +| 2.0.0 | 2023-03-14 | [23189](https://github.com/airbytehq/airbyte/pull/23189) | Infer schema based on one file instead of all the files | +| 1.0.2 | 2023-03-02 | [23669](https://github.com/airbytehq/airbyte/pull/23669) | Made `Advanced Reader Options` and `Advanced Options` truly `optional` for `CSV` format | | 1.0.1 | 2023-02-27 | [23502](https://github.com/airbytehq/airbyte/pull/23502) | Fix error handling | | 1.0.0 | 2023-02-17 | [23198](https://github.com/airbytehq/airbyte/pull/23198) | Fix Avro schema discovery | | 0.1.32 | 2023-02-07 | [22500](https://github.com/airbytehq/airbyte/pull/22500) | Speed up discovery |