diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 0e03ad049bbca..2a9757d5a0d5b 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.30 +LABEL io.airbyte.version=0.1.31 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 939c1b6dd7dd1..98d79edcc6d3b 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -3,7 +3,9 @@ # +import concurrent.futures import json +import threading from abc import ABC, abstractmethod from copy import deepcopy from datetime import datetime, timedelta @@ -29,6 +31,7 @@ JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"] LOGGER = AirbyteLogger() +LOCK = threading.Lock() class ConfigurationError(Exception): @@ -52,6 +55,7 @@ def fileformatparser_map(self) -> Mapping[str, type]: ab_file_name_col = "_ab_source_file_url" airbyte_columns = [ab_additional_col, ab_last_mod_col, ab_file_name_col] datetime_format_string = "%Y-%m-%dT%H:%M:%S%z" + parallel_tasks_size = 256 def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str, schema: str = None): """ @@ -202,6 +206,17 @@ def _broadest_type(type_1: str, type_2: str) -> Optional[str]: if types == {"number", "string"}: return "string" + @staticmethod + def guess_file_schema(storage_file, file_reader, file_info, processed_files, schemas): + try: + with storage_file.open(file_reader.is_binary) as f: + this_schema = file_reader.get_inferred_schema(f, file_info) + with LOCK: + schemas[file_info] = this_schema + processed_files.append(file_info) + except OSError: + pass + def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]: """ In order to auto-infer a schema across many files and/or allow for additional properties (columns), @@ -224,19 +239,25 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]: file_reader = self.fileformatparser_class(self._format) processed_files = [] - for file_info in self.get_time_ordered_file_infos(): - # skip this file if it's earlier than min_datetime - if (min_datetime is not None) and (file_info.last_modified < min_datetime): - continue - - storagefile = self.storagefile_class(file_info, self._provider) - try: - with storagefile.open(file_reader.is_binary) as f: - this_schema = file_reader.get_inferred_schema(f, file_info) - processed_files.append(file_info) - except OSError: - continue - + schemas = {} + + file_infos = list(self.get_time_ordered_file_infos()) + if min_datetime is not None: + file_infos = [info for info in file_infos if info.last_modified >= min_datetime] + + for i in range(0, len(file_infos), self.parallel_tasks_size): + chunk_infos = file_infos[i : i + self.parallel_tasks_size] + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map( + lambda args: self.guess_file_schema(*args), + [ + (self.storagefile_class(file_info, self._provider), file_reader, file_info, processed_files, schemas) + for file_info in chunk_infos + ], + ) + + for file_info in file_infos: + this_schema = schemas[file_info] if this_schema == master_schema: continue # exact schema match so go to next file diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index cdd3333b720c4..09e8c853ce6c1 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -207,35 +207,36 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo ## Changelog 21210 -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| -| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------| +| 0.1.31 | 2023-02-07 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Make sure spec works as expected in UI | +| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI | | 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer | -| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format | -| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | -| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | -| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file | -| 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays | -| 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` | -| 0.1.22 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state | -| 0.1.21 | 2022-09-20 | [16921](https://github.com/airbytehq/airbyte/pull/16921) | Upgrade pyarrow | -| 0.1.20 | 2022-09-12 | [16607](https://github.com/airbytehq/airbyte/pull/16607) | Fix for reading jsonl files containing nested structures | -| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas | -| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. | -| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet | -| 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema | -| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs | -| 0.1.14 | 2022-05-23 | [11967](https://github.com/airbytehq/airbyte/pull/11967) | Increase unit test coverage up to 90% | -| 0.1.13 | 2022-05-11 | [12730](https://github.com/airbytehq/airbyte/pull/12730) | Fixed empty options issue | -| 0.1.12 | 2022-05-11 | [12602](https://github.com/airbytehq/airbyte/pull/12602) | Added support for Avro file format | -| 0.1.11 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy | -| 0.1.10 | 2022-01-28 | [8252](https://github.com/airbytehq/airbyte/pull/8252) | Refactoring of files' metadata | -| 0.1.9 | 2022-01-06 | [9163](https://github.com/airbytehq/airbyte/pull/9163) | Work-around for web-UI, `backslash - t` converts to `tab` for `format.delimiter` field. | -| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | -| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) & [7058](https://github.com/airbytehq/airbyte/pull/7058) | Memory and performance optimisation. Advanced options for CSV parsing. | -| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | -| 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | -| 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference | -| 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly | -| 0.1.1 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990/commits/ff5f70662c5f84eabc03526cddfcc9d73c58c0f4) | Fixed documentation url in source definition | -| 0.1.0 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990) | Created S3 source connector | +| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format | +| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | +| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | +| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file | +| 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays | +| 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` | +| 0.1.22 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state | +| 0.1.21 | 2022-09-20 | [16921](https://github.com/airbytehq/airbyte/pull/16921) | Upgrade pyarrow | +| 0.1.20 | 2022-09-12 | [16607](https://github.com/airbytehq/airbyte/pull/16607) | Fix for reading jsonl files containing nested structures | +| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas | +| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. | +| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet | +| 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema | +| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs | +| 0.1.14 | 2022-05-23 | [11967](https://github.com/airbytehq/airbyte/pull/11967) | Increase unit test coverage up to 90% | +| 0.1.13 | 2022-05-11 | [12730](https://github.com/airbytehq/airbyte/pull/12730) | Fixed empty options issue | +| 0.1.12 | 2022-05-11 | [12602](https://github.com/airbytehq/airbyte/pull/12602) | Added support for Avro file format | +| 0.1.11 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy | +| 0.1.10 | 2022-01-28 | [8252](https://github.com/airbytehq/airbyte/pull/8252) | Refactoring of files' metadata | +| 0.1.9 | 2022-01-06 | [9163](https://github.com/airbytehq/airbyte/pull/9163) | Work-around for web-UI, `backslash - t` converts to `tab` for `format.delimiter` field. | +| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | +| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) & [7058](https://github.com/airbytehq/airbyte/pull/7058) | Memory and performance optimisation. Advanced options for CSV parsing. | +| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | +| 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | +| 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference | +| 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly | +| 0.1.1 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990/commits/ff5f70662c5f84eabc03526cddfcc9d73c58c0f4) | Fixed documentation url in source definition | +| 0.1.0 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990) | Created S3 source connector |