diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f418907860a4d3..3b817cbd13e677 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1549,7 +1549,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.31 + dockerImageTag: 0.1.32 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index b46b8e97cf1ca6..77cc371e0f17db 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -12775,7 +12775,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.31" +- dockerImage: "airbyte/source-s3:0.1.32" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3" diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 2a9757d5a0d5b3..a46487bc2fea13 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.31 +LABEL io.airbyte.version=0.1.32 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 5c1de77e535137..64444a2fe638c9 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -3,7 +3,9 @@ # +import concurrent.futures import json +import threading from abc import ABC, abstractmethod from copy import deepcopy from datetime import datetime, timedelta @@ -29,6 +31,7 @@ JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"] LOGGER = AirbyteLogger() +LOCK = threading.Lock() class ConfigurationError(Exception): @@ -52,6 +55,7 @@ def fileformatparser_map(self) -> Mapping[str, type]: ab_file_name_col = "_ab_source_file_url" airbyte_columns = [ab_additional_col, ab_last_mod_col, ab_file_name_col] datetime_format_string = "%Y-%m-%dT%H:%M:%S%z" + parallel_tasks_size = 256 def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str, schema: str = None): """ @@ -202,6 +206,17 @@ def _broadest_type(type_1: str, type_2: str) -> Optional[str]: if types == {"number", "string"}: return "string" + @staticmethod + def guess_file_schema(storage_file, file_reader, file_info, processed_files, schemas): + try: + with storage_file.open(file_reader.is_binary) as f: + this_schema = file_reader.get_inferred_schema(f, file_info) + with LOCK: + schemas[file_info] = this_schema + processed_files.append(file_info) + except OSError: + pass + def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]: """ In order to auto-infer a schema across many files and/or allow for additional properties (columns), @@ -224,19 +239,25 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]: file_reader = self.fileformatparser_class(self._format) processed_files = [] - for file_info in self.get_time_ordered_file_infos(): - # skip this file if it's earlier than min_datetime - if (min_datetime is not None) and (file_info.last_modified < min_datetime): - continue - - storagefile = self.storagefile_class(file_info, self._provider) - try: - with storagefile.open(file_reader.is_binary) as f: - this_schema = file_reader.get_inferred_schema(f, file_info) - processed_files.append(file_info) - except OSError: - continue - + schemas = {} + + file_infos = list(self.get_time_ordered_file_infos()) + if min_datetime is not None: + file_infos = [info for info in file_infos if info.last_modified >= min_datetime] + + for i in range(0, len(file_infos), self.parallel_tasks_size): + chunk_infos = file_infos[i : i + self.parallel_tasks_size] + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map( + lambda args: self.guess_file_schema(*args), + [ + (self.storagefile_class(file_info, self._provider), file_reader, file_info, processed_files, schemas) + for file_info in chunk_infos + ], + ) + + for file_info in file_infos: + this_schema = schemas[file_info] if this_schema == master_schema: continue # exact schema match so go to next file diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 57747c1ae96929..38710549491fd1 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -209,6 +209,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------| +| 0.1.32 | 2023-02-07 | [22500](https://github.com/airbytehq/airbyte/pull/22500) | Speed up discovery | | 0.1.31 | 2023-02-08 | [22550](https://github.com/airbytehq/airbyte/pull/22550) | Validate CSV read options and convert options | | 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI | | 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer |