Skip to content

Commit

Permalink
Source s3: fix avro discovery (#23198)
Browse files Browse the repository at this point in the history
* #23197 source s3: fix avro discovery

* #23197 source s3: upd changelog

* #23197 source s3: add allowed hosts

* #23197 source s3: fix tests

* #23197 - fix build: formatting

* auto-bump connector version

---------

Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people committed Feb 24, 2023
1 parent af4c5cc commit e174647
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 39 deletions.
14 changes: 10 additions & 4 deletions .github/ISSUE_TEMPLATE/low-code-component.yaml
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
name: Low Code Component
description: Use this when requesting low-code CDK components
title: "Low-code CDK: "
labels: ["team/extensibility", "area/connector-builder", "area/low-code/components", "area/low-code"]
labels:
[
"team/extensibility",
"area/connector-builder",
"area/low-code/components",
"area/low-code",
]
body:
- type: textarea
id: description
attributes:
label: Component Description
description: Please describe the component you would like to see added to the Low-code CDK and why it's valuable
validations:
validations:
required: true
- type: textarea
id: proposed-schema
attributes:
label: Proposed YAML schema
description: If you can wave a magic wand, what would the YAML schema of the component you are suggesting look like?
description: If you can wave a magic wand, what would the YAML schema of the component you are suggesting look like?
value: |
```yaml
...
```
validations:
validations:
required: true
- type: input
id: url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1590,11 +1590,14 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.32
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
releaseStage: generally_available
allowedHosts:
hosts:
- "*.s3.amazonaws.com"
- name: SalesLoft
sourceDefinitionId: 41991d12-d4b5-439e-afd0-260a31d4c53f
dockerRepository: airbyte/source-salesloft
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12810,7 +12810,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.32"
- dockerImage: "airbyte/source-s3:1.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.32
LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ test_strictness_level: high
acceptance_tests:
spec:
tests:
- backward_compatibility_tests_config:
# skip because allOf gets converted into enum
disable_for_version: 0.1.29
spec_path: integration_tests/spec.json
- spec_path: integration_tests/spec.json

connection:
tests:
Expand All @@ -20,8 +17,6 @@ acceptance_tests:
status: succeed
- config_path: secrets/jsonl_newlines_config.json
status: succeed
- config_path: integration_tests/config_minio.json
status: succeed
- config_path: integration_tests/invalid_config.json
status: failed

Expand All @@ -30,9 +25,10 @@ acceptance_tests:
- config_path: secrets/config.json
- config_path: secrets/parquet_config.json
- config_path: secrets/avro_config.json
backward_compatibility_tests_config:
disable_for_version: 0.1.32
- config_path: secrets/jsonl_config.json
- config_path: secrets/jsonl_newlines_config.json
- config_path: integration_tests/config_minio.json

basic_read:
tests:
Expand All @@ -56,8 +52,6 @@ acceptance_tests:
expect_records:
path: integration_tests/expected_records/jsonl_newlines.jsonl
timeout_seconds: 1800
# - config_path: integration_tests/config_minio.json
# timeout_seconds: 1800

full_refresh:
tests:
Expand All @@ -76,9 +70,6 @@ acceptance_tests:
- config_path: secrets/jsonl_newlines_config.json
configured_catalog_path: integration_tests/configured_catalogs/jsonl.json
timeout_seconds: 1800
- config_path: integration_tests/config_minio.json
configured_catalog_path: integration_tests/configured_catalogs/csv.json
timeout_seconds: 1800

incremental:
tests:
Expand Down Expand Up @@ -122,11 +113,3 @@ acceptance_tests:
future_state:
future_state_path: integration_tests/abnormal_state.json
timeout_seconds: 1800
- config_path: integration_tests/config_minio.json
configured_catalog_path: integration_tests/configured_catalogs/csv.json
cursor_paths:
test:
- _ab_source_file_last_modified
future_state:
future_state_path: integration_tests/abnormal_state.json
timeout_seconds: 1800
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.40.43:9000"
"endpoint": "http://10.0.155.67:9000"
},
"format": {
"filetype": "csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ def _parse_data_type(self, data_type_mapping: dict, avro_schema: dict) -> dict:
schema_dict = {}
for i in avro_schema["fields"]:
data_type = i["type"]
# If field is nullable there will be a list of types and we need to make sure to map the whole list according to data_type_mapping
# If field is nullable there will be a list of types and we need to make sure
# to map the whole list according to data_type_mapping
if isinstance(data_type, list):
schema_dict[i["name"]] = [data_type_mapping[dtype] for dtype in data_type]
# TODO: Figure out a better way to handle nested records. Currently a nested record is returned as a string
# TODO: Figure out a better way to handle complex types.
# See https://github.com/airbytehq/airbyte/issues/23327
elif isinstance(data_type, dict):
schema_dict[i["name"]] = "string"
schema_dict[i["name"]] = data_type_mapping[data_type["type"]]
elif data_type in data_type_mapping:
schema_dict[i["name"]] = data_type_mapping[data_type]
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
]
}"""

nested_schema_output = {"lastname": "string", "address": "string"}
nested_schema_output = {"lastname": "string", "address": "object"}

master_schema = {
"name": "string",
Expand All @@ -60,15 +60,15 @@ class TestAvroParser(AbstractTestParser):
filetype = "avro"

@classmethod
def generate_avro_file(cls, schema_str: str, out_file, num_rows: int) -> str:
def generate_avro_file(cls, out_file, num_rows: int) -> str:
"""Creates an avro file and saves to tmp folder to be used by test cases
:param schema_str: valid avro schema as a string
:param out_file: name of file to be created
:param num_rows: number of rows to be generated
:return: string with path to the file created
"""
filename = os.path.join(TMP_FOLDER, out_file + "." + cls.filetype)
parsed_schema = schema.parse(schema_str)
parsed_schema = schema.parse(simple_schema_str)
rec_writer = io.DatumWriter(parsed_schema)
file_writer = datafile.DataFileWriter(open(filename, "wb"), rec_writer, parsed_schema)
for _ in range(num_rows):
Expand All @@ -82,6 +82,29 @@ def generate_avro_file(cls, schema_str: str, out_file, num_rows: int) -> str:
file_writer.close()
return filename

@classmethod
def generate_nested_avro_file(cls, out_file, num_rows: int) -> str:
"""Creates an avro file and saves to tmp folder to be used by test cases
:param schema_str: valid avro schema as a string
:param out_file: name of file to be created
:param num_rows: number of rows to be generated
:return: string with path to the file created
"""
filename = os.path.join(TMP_FOLDER, out_file + "." + cls.filetype)
parsed_schema = schema.parse(nested_records_schema_str)
rec_writer = io.DatumWriter(parsed_schema)
file_writer = datafile.DataFileWriter(open(filename, "wb"), rec_writer, parsed_schema)
for _ in range(num_rows):
data = {}
data["lastname"] = "".join(random.choice(string.ascii_letters) for i in range(10))
data["address"] = {
"streetaddress": "".join(random.choice(string.ascii_letters) for i in range(10)),
"city": "".join(random.choice(string.ascii_letters) for i in range(10))
}
file_writer.append(data)
file_writer.close()
return filename

@classmethod
def cases(cls) -> Mapping[str, Any]:
"""
Expand All @@ -91,7 +114,7 @@ def cases(cls) -> Mapping[str, Any]:
# test basic file with data type conversions
cases["simple_test"] = {
"AbstractFileParser": AvroParser(format=cls.filetype),
"filepath": cls.generate_avro_file(simple_schema_str, "test_file", 1000),
"filepath": cls.generate_avro_file("test_file", 1000),
"num_records": 1000,
"inferred_schema": master_schema,
"line_checks": {},
Expand All @@ -100,7 +123,7 @@ def cases(cls) -> Mapping[str, Any]:
# test file with 0 records. Will pass but not ingest anything
cases["test_zero_rows"] = {
"AbstractFileParser": AvroParser(format=cls.filetype),
"filepath": cls.generate_avro_file(simple_schema_str, "test_file_zero_rows", 0),
"filepath": cls.generate_avro_file("test_file_zero_rows", 0),
"num_records": 0,
"inferred_schema": master_schema,
"line_checks": {},
Expand All @@ -110,8 +133,8 @@ def cases(cls) -> Mapping[str, Any]:
# test for avro schema with nested records. This will pass as all nested records are returned as one string
cases["test_nested_records"] = {
"AbstractFileParser": AvroParser(format=cls.filetype),
"filepath": cls.generate_avro_file(nested_records_schema_str, "test_nested_records", 0),
"num_records": 0,
"filepath": cls.generate_nested_avro_file("test_nested_records", 10),
"num_records": 10,
"inferred_schema": nested_schema_output,
"line_checks": {},
"fails": [],
Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
| **Reply.io** | <img alt="Reply.io icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/reply-io.svg" height="30" height="30"/> | Source | airbyte/source-reply-io:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/reply-io) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-reply-io) | <small>`8cc6537e-f8a6-423c-b960-e927af76116e`</small> |
| **Retently** | <img alt="Retently icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/retently.svg" height="30" height="30"/> | Source | airbyte/source-retently:0.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/retently) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-retently) | <small>`db04ecd1-42e7-4115-9cec-95812905c626`</small> |
| **Rocket.chat** | <img alt="Rocket.chat icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/rocket-chat.svg" height="30" height="30"/> | Source | airbyte/source-rocket-chat:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/rocket-chat) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-rocket-chat) | <small>`921d9608-3915-450b-8078-0af18801ea1b`</small> |
| **S3** | <img alt="S3 icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/s3.svg" height="30" height="30"/> | Source | airbyte/source-s3:0.1.32 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | <small>`69589781-7828-43c5-9f63-8925b1c1ccc2`</small> |
| **S3** | <img alt="S3 icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/s3.svg" height="30" height="30"/> | Source | airbyte/source-s3:1.0.0 | generally_available | [link](https://docs.airbyte.com/integrations/sources/s3) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-s3) | <small>`69589781-7828-43c5-9f63-8925b1c1ccc2`</small> |
| **SAP Fieldglass** | <img alt="SAP Fieldglass icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sapfieldglass.svg" height="30" height="30"/> | Source | airbyte/source-sap-fieldglass:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sap-fieldglass) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sap-fieldglass) | <small>`ec5f3102-fb31-4916-99ae-864faf8e7e25`</small> |
| **SFTP** | <img alt="SFTP icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | <small>`a827c52e-791c-4135-a245-e233c5255199`</small> |
| **SFTP Bulk** | <img alt="SFTP Bulk icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp-bulk:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | <small>`31e3242f-dee7-4cdc-a4b8-8e06c5458517`</small> |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------|
| 1.0.0 | 2023-02-17 | [23198](https://github.com/airbytehq/airbyte/pull/23198) | Fix Avro schema discovery |
| 0.1.32 | 2023-02-07 | [22500](https://github.com/airbytehq/airbyte/pull/22500) | Speed up discovery |
| 0.1.31 | 2023-02-08 | [22550](https://github.com/airbytehq/airbyte/pull/22550) | Validate CSV read options and convert options |
| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI |
Expand Down

0 comments on commit e174647

Please sign in to comment.