Skip to content

Commit

Permalink
Source S3: support parquet dataset (#25937)
Browse files Browse the repository at this point in the history
* Source S3: support parquet dataset

* Source S3: update docs

* Source S3: Fix expected records

* Source S3: Fix expected records

* Source S3: update sem version

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
artem1205 and octavia-squidington-iii committed May 10, 2023
1 parent f5321d1 commit f74d96f
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22774,7 +22774,7 @@
"sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2",
"name": "S3",
"dockerRepository": "airbyte/source-s3",
"dockerImageTag": "2.1.4",
"dockerImageTag": "2.2.0",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/s3",
"icon": "s3.svg",
"sourceType": "file",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 2.1.4
dockerImageTag: 2.2.0
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13450,7 +13450,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:2.1.4"
- dockerImage: "airbyte/source-s3:2.2.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.1.4
LABEL io.airbyte.version=2.2.0
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ acceptance_tests:
expect_records:
path: integration_tests/expected_records/parquet.jsonl
timeout_seconds: 1800
- config_path: secrets/parquet_dataset_config.json
expect_records:
path: integration_tests/expected_records/parquet_dataset.jsonl
timeout_seconds: 1800
- config_path: secrets/avro_config.json
expect_records:
path: integration_tests/expected_records/avro.jsonl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.77.160:9000"
"endpoint": "http://10.0.103.193:9000"
},
"format": {
"filetype": "csv",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"SCHWARTZ","First_Name":"CHANA","Mid_Init":"H","Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"*ATTORNEY AT LAW","Base_Salary":77015,"Regular_Hours":1046.25,"Regular_Gross_Paid":47316.74,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":8230.31,"Fiscal_Year":"2021","Leave_Status_as_of_June_30":"ON LEAVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:28Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2021/Leave_Status_as_of_June_30=ON%20LEAVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668637642}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"WASHINGTON","First_Name":"DOROTHY","Mid_Init":null,"Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":53373,"Regular_Hours":1825,"Regular_Gross_Paid":47436.44,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":1723.17,"Fiscal_Year":"2021","Leave_Status_as_of_June_30":"ON LEAVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:28Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2021/Leave_Status_as_of_June_30=ON%20LEAVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668637643}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"SAMUEL","First_Name":"GRACE","Mid_Init":"Y","Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":55337,"Regular_Hours":1825,"Regular_Gross_Paid":55185.52,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":0,"Fiscal_Year":"2022","Leave_Status_as_of_June_30":"ON LEAVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:28Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2022/Leave_Status_as_of_June_30=ON%20LEAVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668639019}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"BIEBEL","First_Name":"ANN","Mid_Init":"M","Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"*ATTORNEY AT LAW","Base_Salary":77015,"Regular_Hours":1825,"Regular_Gross_Paid":76804,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":13750.36,"Fiscal_Year":"2021","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2021/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668640406}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"CARROLL","First_Name":"FRAN","Mid_Init":null,"Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"*ATTORNEY AT LAW","Base_Salary":77015,"Regular_Hours":1825,"Regular_Gross_Paid":76804,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":13750.36,"Fiscal_Year":"2021","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2021/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668640407}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"BROWNSTEIN","First_Name":"ELFREDA","Mid_Init":"G","Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"*ATTORNEY AT LAW","Base_Salary":83504,"Regular_Hours":1825,"Regular_Gross_Paid":83275.15,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":13750.36,"Fiscal_Year":"2021","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2021/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668640407}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"WARD","First_Name":"RENEE","Mid_Init":null,"Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":53373,"Regular_Hours":1825,"Regular_Gross_Paid":46588.76,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":3409.69,"Fiscal_Year":"2021","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2021/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668640408}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"SPIVEY","First_Name":"NATASHA","Mid_Init":"L","Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":53436,"Regular_Hours":1825,"Regular_Gross_Paid":53289.6,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":0,"Fiscal_Year":"2021","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2021/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668640408}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"DU","First_Name":"MARK","Mid_Init":null,"Agency_Start_Date":"03/24/2014","Work_Location_Borough":null,"Title_Description":"HEARING OFFICER","Base_Salary":36.6,"Regular_Hours":188.75,"Regular_Gross_Paid":5334.45,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":0,"Fiscal_Year":"2021","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Hour","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2021/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Hour/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668641811}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"THEIL","First_Name":"JOANNE","Mid_Init":"F","Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"*ATTORNEY AT LAW","Base_Salary":80438,"Regular_Hours":1825,"Regular_Gross_Paid":80217.55,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":13635.42,"Fiscal_Year":"2022","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2022/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668643311}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"DEMAIO","First_Name":"DEIRDRE","Mid_Init":null,"Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":53512,"Regular_Hours":1780,"Regular_Gross_Paid":48727.47,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":3318.35,"Fiscal_Year":"2022","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2022/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668643311}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"MCLAURIN TRAPP","First_Name":"CELESTINE","Mid_Init":"T","Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":58951,"Regular_Hours":1818,"Regular_Gross_Paid":58563.27,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":8.25,"Fiscal_Year":"2022","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2022/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668643312}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"BUNDRANT","First_Name":"TROY","Mid_Init":null,"Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":64769,"Regular_Hours":1825,"Regular_Gross_Paid":61817.94,"OT_Hours":62,"Total_OT_Paid":2576.58,"Total_Other_Pay":106.68,"Fiscal_Year":"2022","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2022/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668643312}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"CHASE JONES","First_Name":"DIANA","Mid_Init":null,"Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":66000,"Regular_Hours":1825,"Regular_Gross_Paid":65819.25,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":0,"Fiscal_Year":"2022","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2022/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668643312}
{"stream":"test","data":{"Payroll_Number":820,"Last_Name":"JORDAN","First_Name":"REGINALD","Mid_Init":null,"Agency_Start_Date":"07/05/2010","Work_Location_Borough":null,"Title_Description":"ADM MANAGER-NON-MGRL FROM M1/M2","Base_Salary":75000,"Regular_Hours":1825,"Regular_Gross_Paid":74794.46,"OT_Hours":0,"Total_OT_Paid":0,"Total_Other_Pay":0,"Fiscal_Year":"2022","Leave_Status_as_of_June_30":"ACTIVE","Pay_Basis":"per Annum","_ab_additional_properties":{},"_ab_source_file_last_modified":"2023-05-09T20:16:29Z","_ab_source_file_url":"test_payroll/Fiscal_Year=2022/Leave_Status_as_of_June_30=ACTIVE/Pay_Basis=per%20Annum/4e0ea65c5a074c0592e43f7b950f3ce8-0.parquet"},"emitted_at":1683668643312}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 2.1.4
dockerImageTag: 2.1.5
dockerRepository: airbyte/source-s3
githubIssueLabel: source-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import os
from typing import Any, BinaryIO, Iterator, List, Mapping, TextIO, Tuple, Union
from urllib.parse import unquote

import pyarrow.parquet as pq
from airbyte_cdk.models import FailureType
Expand Down Expand Up @@ -97,7 +99,7 @@ def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo
reader = self._init_reader(file)
schema_dict = {
field.name: self.parse_field_type(field.logical_type.type.lower(), field.physical_type)[0] for field in reader.schema
}
} | {x: "string" for x in self.get_partition_columns(file_info.key)}
if not schema_dict:
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
raise S3Exception(file_info, "empty Parquet file", "The .parquet file is empty!", FailureType.config_error)
Expand All @@ -123,7 +125,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) ->

args = self._select_options("columns", "batch_size") # type: ignore[arg-type]
self.logger.debug(f"Found the {reader.num_row_groups} Parquet groups")

partition_columns = self.get_partition_columns(file_info.key)
# load batches per page
for num_row_group in range(reader.num_row_groups):
args["row_groups"] = [num_row_group]
Expand All @@ -142,7 +144,16 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) ->
yield {
batch_columns[i]: self.convert_field_data(logical_types[batch_columns[i]], record_values[i])
for i in range(len(batch_columns))
}
} | partition_columns

@staticmethod
def get_partition_columns(file_path: str) -> Mapping[str, Any]:
"""
Parse file path and return dict of partitioned columns names with values, example:
/payroll/Year=2014/Agency_Name=ADMIN/file.parquet -> {"Year": "2014", Agency_Name: "ADMIN"}
"""
partitions_in_path = (unquote(x) for x in file_path.split(os.sep) if "=" in x)
return {x.split("=")[0]: x.split("=")[1] for x in partitions_in_path}

@classmethod
def set_minimal_block_size(cls, format: Mapping[str, Any]):
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
| 2.2.0 | 2023-05-10 | [25937](https://github.com/airbytehq/airbyte/pull/25937) | Add support for Parquet Dataset |
| 2.1.4 | 2023-05-01 | [25361](https://github.com/airbytehq/airbyte/pull/25361) | Parse nested avro schemas |
| 2.1.3 | 2023-05-01 | [25706](https://github.com/airbytehq/airbyte/pull/25706) | Remove minimum block size for CSV check |
| 2.1.2 | 2023-04-18 | [25067](https://github.com/airbytehq/airbyte/pull/25067) | Handle block size related errors; fix config validator |
Expand Down

0 comments on commit f74d96f

Please sign in to comment.