Skip to content

Commit

Permalink
#1470 source S3: speed up discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Feb 7, 2023
1 parent e441d5c commit c254d94
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 45 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.30
LABEL io.airbyte.version=0.1.31
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#


import concurrent.futures
import json
import threading
from abc import ABC, abstractmethod
from copy import deepcopy
from datetime import datetime, timedelta
Expand All @@ -29,6 +31,7 @@
JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"]

LOGGER = AirbyteLogger()
LOCK = threading.Lock()


class ConfigurationError(Exception):
Expand All @@ -52,6 +55,7 @@ def fileformatparser_map(self) -> Mapping[str, type]:
ab_file_name_col = "_ab_source_file_url"
airbyte_columns = [ab_additional_col, ab_last_mod_col, ab_file_name_col]
datetime_format_string = "%Y-%m-%dT%H:%M:%S%z"
parallel_tasks_size = 256

def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str, schema: str = None):
"""
Expand Down Expand Up @@ -202,6 +206,17 @@ def _broadest_type(type_1: str, type_2: str) -> Optional[str]:
if types == {"number", "string"}:
return "string"

@staticmethod
def guess_file_schema(storage_file, file_reader, file_info, processed_files, schemas):
try:
with storage_file.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f, file_info)
with LOCK:
schemas[file_info] = this_schema
processed_files.append(file_info)
except OSError:
pass

def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
"""
In order to auto-infer a schema across many files and/or allow for additional properties (columns),
Expand All @@ -224,19 +239,25 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
file_reader = self.fileformatparser_class(self._format)

processed_files = []
for file_info in self.get_time_ordered_file_infos():
# skip this file if it's earlier than min_datetime
if (min_datetime is not None) and (file_info.last_modified < min_datetime):
continue

storagefile = self.storagefile_class(file_info, self._provider)
try:
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f, file_info)
processed_files.append(file_info)
except OSError:
continue

schemas = {}

file_infos = list(self.get_time_ordered_file_infos())
if min_datetime is not None:
file_infos = [info for info in file_infos if info.last_modified >= min_datetime]

for i in range(0, len(file_infos), self.parallel_tasks_size):
chunk_infos = file_infos[i : i + self.parallel_tasks_size]
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(
lambda args: self.guess_file_schema(*args),
[
(self.storagefile_class(file_info, self._provider), file_reader, file_info, processed_files, schemas)
for file_info in chunk_infos
],
)

for file_info in file_infos:
this_schema = schemas[file_info]
if this_schema == master_schema:
continue # exact schema match so go to next file

Expand Down
63 changes: 32 additions & 31 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,35 +207,36 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo

## Changelog 21210

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------|
| 0.1.31 | 2023-02-07 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Make sure spec works as expected in UI |
| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI |
| 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer |
| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format |
| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format |
| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |
| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file |
| 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays |
| 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` |
| 0.1.22 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state |
| 0.1.21 | 2022-09-20 | [16921](https://github.com/airbytehq/airbyte/pull/16921) | Upgrade pyarrow |
| 0.1.20 | 2022-09-12 | [16607](https://github.com/airbytehq/airbyte/pull/16607) | Fix for reading jsonl files containing nested structures |
| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas |
| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. |
| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet |
| 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema |
| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs |
| 0.1.14 | 2022-05-23 | [11967](https://github.com/airbytehq/airbyte/pull/11967) | Increase unit test coverage up to 90% |
| 0.1.13 | 2022-05-11 | [12730](https://github.com/airbytehq/airbyte/pull/12730) | Fixed empty options issue |
| 0.1.12 | 2022-05-11 | [12602](https://github.com/airbytehq/airbyte/pull/12602) | Added support for Avro file format |
| 0.1.11 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy |
| 0.1.10 | 2022-01-28 | [8252](https://github.com/airbytehq/airbyte/pull/8252) | Refactoring of files' metadata |
| 0.1.9 | 2022-01-06 | [9163](https://github.com/airbytehq/airbyte/pull/9163) | Work-around for web-UI, `backslash - t` converts to `tab` for `format.delimiter` field. |
| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) & [7058](https://github.com/airbytehq/airbyte/pull/7058) | Memory and performance optimisation. Advanced options for CSV parsing. |
| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services |
| 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format |
| 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference |
| 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly |
| 0.1.1 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990/commits/ff5f70662c5f84eabc03526cddfcc9d73c58c0f4) | Fixed documentation url in source definition |
| 0.1.0 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990) | Created S3 source connector |
| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format |
| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format |
| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |
| 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file |
| 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays |
| 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` |
| 0.1.22 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream state |
| 0.1.21 | 2022-09-20 | [16921](https://github.com/airbytehq/airbyte/pull/16921) | Upgrade pyarrow |
| 0.1.20 | 2022-09-12 | [16607](https://github.com/airbytehq/airbyte/pull/16607) | Fix for reading jsonl files containing nested structures |
| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas |
| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. |
| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet |
| 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema |
| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs |
| 0.1.14 | 2022-05-23 | [11967](https://github.com/airbytehq/airbyte/pull/11967) | Increase unit test coverage up to 90% |
| 0.1.13 | 2022-05-11 | [12730](https://github.com/airbytehq/airbyte/pull/12730) | Fixed empty options issue |
| 0.1.12 | 2022-05-11 | [12602](https://github.com/airbytehq/airbyte/pull/12602) | Added support for Avro file format |
| 0.1.11 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy |
| 0.1.10 | 2022-01-28 | [8252](https://github.com/airbytehq/airbyte/pull/8252) | Refactoring of files' metadata |
| 0.1.9 | 2022-01-06 | [9163](https://github.com/airbytehq/airbyte/pull/9163) | Work-around for web-UI, `backslash - t` converts to `tab` for `format.delimiter` field. |
| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
| 0.1.6 | 2021-10-15 | [6615](https://github.com/airbytehq/airbyte/pull/6615) & [7058](https://github.com/airbytehq/airbyte/pull/7058) | Memory and performance optimisation. Advanced options for CSV parsing. |
| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services |
| 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format |
| 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference |
| 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly |
| 0.1.1 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990/commits/ff5f70662c5f84eabc03526cddfcc9d73c58c0f4) | Fixed documentation url in source definition |
| 0.1.0 | 2021-07-30 | [4990](https://github.com/airbytehq/airbyte/pull/4990) | Created S3 source connector |

0 comments on commit c254d94

Please sign in to comment.