Skip to content

Commit

Permalink
Source S3: infer schema of the first file only (#23189)
Browse files Browse the repository at this point in the history
* #1470 Source S3: infer schema of the first file

* #1470 source s3: upd changelog

* #1470 source s3: review fixes

* #1470 source s3: review fixes

* #1470 source s3: bump version

* #1470 source s3: review fixes

* auto-bump connector version

---------

Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 14, 2023
1 parent f6e6ae6 commit 3eecf54
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 1.0.2
dockerImageTag: 2.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12929,7 +12929,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:1.0.2"
- dockerImage: "airbyte/source-s3:2.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.2
LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
@@ -1,33 +1,4 @@
connector_image: airbyte/source-s3:dev
test_strictness_level: high
acceptance_tests:
spec:
tests:
- spec_path: integration_tests/spec.json

connection:
tests:
- config_path: secrets/config.json
status: succeed
- config_path: secrets/parquet_config.json
status: succeed
- config_path: secrets/avro_config.json
status: succeed
- config_path: secrets/jsonl_config.json
status: succeed
- config_path: secrets/jsonl_newlines_config.json
status: succeed
- config_path: integration_tests/invalid_config.json
status: failed

discovery:
tests:
- config_path: secrets/config.json
- config_path: secrets/parquet_config.json
- config_path: secrets/avro_config.json
- config_path: secrets/jsonl_config.json
- config_path: secrets/jsonl_newlines_config.json

basic_read:
tests:
- config_path: secrets/config.json
Expand All @@ -51,6 +22,27 @@ acceptance_tests:
path: integration_tests/expected_records/jsonl_newlines.jsonl
timeout_seconds: 1800

connection:
tests:
- config_path: secrets/config.json
status: succeed
- config_path: secrets/parquet_config.json
status: succeed
- config_path: secrets/avro_config.json
status: succeed
- config_path: secrets/jsonl_config.json
status: succeed
- config_path: secrets/jsonl_newlines_config.json
status: succeed
- config_path: integration_tests/invalid_config.json
status: failed
discovery:
tests:
- config_path: secrets/config.json
- config_path: secrets/parquet_config.json
- config_path: secrets/avro_config.json
- config_path: secrets/jsonl_config.json
- config_path: secrets/jsonl_newlines_config.json
full_refresh:
tests:
- config_path: secrets/config.json
Expand Down Expand Up @@ -111,3 +103,8 @@ acceptance_tests:
future_state:
future_state_path: integration_tests/abnormal_state.json
timeout_seconds: 1800
spec:
tests:
- spec_path: integration_tests/spec.json
connector_image: airbyte/source-s3:dev
test_strictness_level: high
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.45.4:9000"
"endpoint": "http://10.0.186.80:9000"
},
"format": {
"filetype": "csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _stream_records_test_logic(
LOGGER.info(f"Testing stream_records() in SyncMode:{sync_mode.value}")

# check we return correct schema from get_json_schema()
assert fs._get_schema_map() == full_expected_schema
assert fs._schema == full_expected_schema

records = []
for stream_slice in fs.stream_slices(sync_mode=sync_mode, stream_state=current_state):
Expand Down Expand Up @@ -305,22 +305,6 @@ def _stream_records_test_logic(
False,
True,
),
# multiple file tests (different but merge-able schemas)
( # auto-infer
[
SAMPLE_DIR.joinpath("simple_test.csv"),
SAMPLE_DIR.joinpath("multi_file_diffschema_1.csv"),
SAMPLE_DIR.joinpath("multi_file_diffschema_2.csv"),
],
"**",
True,
6,
17,
{"id": "integer", "name": "string", "valid": "boolean", "location": "string", "percentage": "number", "nullable": "string"},
None,
False,
False,
),
( # provided schema, not containing all columns (extra columns should go into FileStream.ab_additional_col)
[
SAMPLE_DIR.joinpath("simple_test.csv"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Any, List, Mapping, Optional, Tuple

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.models import ConnectorSpecification, SyncMode
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
Expand Down Expand Up @@ -65,13 +65,14 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
# test that matching on the pattern doesn't error
globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT)
# just need first file here to test connection and valid patterns
return True, None

break
for slice_ in stream.stream_slices(sync_mode=SyncMode.full_refresh):
list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_))
break
except Exception as e:
logger.error(format_exc())
return False, e

logger.warn("Found 0 files (but connection is valid).")
return True, None

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Expand Down

0 comments on commit 3eecf54

Please sign in to comment.