Skip to content

Commit

Permalink
Implement parquet parser (#28064)
Browse files Browse the repository at this point in the history
* Implement parquet parser

* move comment

* comments

* Automated Commit - Formatting Changes

* cleanup

* Update

* remove superfluous method

* update

* format

---------

Co-authored-by: girarda <girarda@users.noreply.github.com>
  • Loading branch information
girarda and girarda committed Jul 12, 2023
1 parent 9f12062 commit 40e62fb
Show file tree
Hide file tree
Showing 7 changed files with 705 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,130 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Dict, Iterable
import json
from typing import Any, Dict, Iterable, Mapping

import pyarrow as pa
import pyarrow.parquet as pq
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from pyarrow import Scalar


class ParquetParser(FileTypeParser):
async def infer_schema(
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
) -> Dict[str, Any]:
...
# Pyarrow can detect the schema of a parquet file by reading only its metadata.
# https://github.com/apache/arrow/blob/main/python/pyarrow/_parquet.pyx#L1168-L1243
parquet_file = pq.ParquetFile(stream_reader.open_file(file))
parquet_schema = parquet_file.schema_arrow
schema = {field.name: ParquetParser.parquet_type_to_schema_type(field.type) for field in parquet_schema}
return schema

def parse_records(
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
) -> Iterable[Dict[str, Any]]:
...
table = pq.read_table(stream_reader.open_file(file))
for batch in table.to_batches():
for i in range(batch.num_rows):
row_dict = {column: ParquetParser._to_output_value(batch.column(column)[i]) for column in table.column_names}
yield row_dict

@staticmethod
def _to_output_value(parquet_value: Scalar) -> Any:
"""
Convert a pyarrow scalar to a value that can be output by the source.
"""
# Convert date and datetime objects to isoformat strings
if pa.types.is_time(parquet_value.type) or pa.types.is_timestamp(parquet_value.type) or pa.types.is_date(parquet_value.type):
return parquet_value.as_py().isoformat()

# Convert month_day_nano_interval to array
if parquet_value.type == pa.month_day_nano_interval():
return json.loads(json.dumps(parquet_value.as_py()))

# Decode binary strings to utf-8
if ParquetParser._is_binary(parquet_value.type):
return parquet_value.as_py().decode("utf-8")
if pa.types.is_decimal(parquet_value.type):
return str(parquet_value.as_py())

# Dictionaries are stored as two columns: indices and values
# The indices column is an array of integers that maps to the values column
if pa.types.is_dictionary(parquet_value.type):
return {
"indices": parquet_value.indices.tolist(),
"values": parquet_value.dictionary.tolist(),
}

# Convert duration to seconds, then convert to the appropriate unit
if pa.types.is_duration(parquet_value.type):
duration = parquet_value.as_py()
duration_seconds = duration.total_seconds()
if parquet_value.type.unit == "s":
return duration_seconds
elif parquet_value.type.unit == "ms":
return duration_seconds * 1000
elif parquet_value.type.unit == "us":
return duration_seconds * 1_000_000
elif parquet_value.type.unit == "ns":
return duration_seconds * 1_000_000_000 + duration.nanoseconds
else:
raise ValueError(f"Unknown duration unit: {parquet_value.type.unit}")
else:
return parquet_value.as_py()

@staticmethod
def parquet_type_to_schema_type(parquet_type: pa.DataType) -> Mapping[str, str]:
"""
Convert a pyarrow data type to an Airbyte schema type.
Parquet data types are defined at https://arrow.apache.org/docs/python/api/datatypes.html
"""

if pa.types.is_timestamp(parquet_type):
return {"type": "string", "format": "date-time"}
elif pa.types.is_date(parquet_type):
return {"type": "string", "format": "date"}
elif ParquetParser._is_string(parquet_type):
return {"type": "string"}
elif pa.types.is_boolean(parquet_type):
return {"type": "boolean"}
elif ParquetParser._is_integer(parquet_type):
return {"type": "integer"}
elif pa.types.is_floating(parquet_type):
return {"type": "number"}
elif ParquetParser._is_object(parquet_type):
return {"type": "object"}
elif ParquetParser._is_list(parquet_type):
return {"type": "array"}
else:
raise ValueError(f"Unsupported parquet type: {parquet_type}")

@staticmethod
def _is_binary(parquet_type: pa.DataType) -> bool:
return bool(pa.types.is_binary(parquet_type) or pa.types.is_large_binary(parquet_type))

@staticmethod
def _is_integer(parquet_type: pa.DataType) -> bool:
return bool(pa.types.is_integer(parquet_type) or pa.types.is_duration(parquet_type))

@staticmethod
def _is_string(parquet_type: pa.DataType) -> bool:
return bool(
pa.types.is_time(parquet_type)
or pa.types.is_string(parquet_type)
or pa.types.is_large_string(parquet_type)
or pa.types.is_decimal(parquet_type) # Return as a string to ensure no precision is lost
or ParquetParser._is_binary(parquet_type) # Best we can do is return as a string since we do not support binary
)

@staticmethod
def _is_object(parquet_type: pa.DataType) -> bool:
return bool(pa.types.is_dictionary(parquet_type) or pa.types.is_struct(parquet_type))

@staticmethod
def _is_list(parquet_type: pa.DataType) -> bool:
return bool(pa.types.is_list(parquet_type) or pa.types.is_large_list(parquet_type) or parquet_type == pa.month_day_nano_interval())
2 changes: 2 additions & 0 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"Jinja2~=3.1.2",
"cachetools",
"wcmatch==8.4",
"pyarrow==12.0.1",
],
python_requires=">=3.8",
extras_require={
Expand All @@ -74,6 +75,7 @@
"pytest-mock",
"requests-mock",
"pytest-httpserver",
"pandas==2.0.3",
],
"sphinx-docs": [
"Sphinx~=4.2",
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
import math
from typing import Any, Mapping

import pyarrow as pa
import pytest
from airbyte_cdk.sources.file_based.file_types import ParquetParser
from pyarrow import Scalar


@pytest.mark.parametrize(
"parquet_type, expected_type",
[
pytest.param(pa.bool_(), {"type": "boolean"}, id="test_parquet_bool"),
pytest.param(pa.int8(), {"type": "integer"}, id="test_parquet_int8"),
pytest.param(pa.int16(), {"type": "integer"}, id="test_parquet_int16"),
pytest.param(pa.int32(), {"type": "integer"}, id="test_parquet_int32"),
pytest.param(pa.uint8(), {"type": "integer"}, id="test_parquet_uint8"),
pytest.param(pa.uint16(), {"type": "integer"}, id="test_parquet_uint16"),
pytest.param(pa.uint32(), {"type": "integer"}, id="test_parquet_uint32"),
pytest.param(pa.float16(), {"type": "number"}, id="test_parquet_float16"),
pytest.param(pa.float32(), {"type": "number"}, id="test_parquet_float32"),
pytest.param(pa.float64(), {"type": "number"}, id="test_parquet_float64"),
pytest.param(pa.time32("s"), {"type": "string"}, id="test_parquet_time32s"),
pytest.param(pa.time32("ms"), {"type": "string"}, id="test_parquet_time32ms"),
pytest.param(pa.time64("us"), {"type": "string"}, id="test_parquet_time64us"),
pytest.param(pa.time64("ns"), {"type": "string"}, id="test_parquet_time64us"),
pytest.param(pa.timestamp("s"), {"type": "string", "format": "date-time"}, id="test_parquet_timestamps_s"),
pytest.param(pa.timestamp("ms"), {"type": "string", "format": "date-time"}, id="test_parquet_timestamp_ms"),
pytest.param(pa.timestamp("s", "utc"), {"type": "string", "format": "date-time"}, id="test_parquet_timestamps_s_with_tz"),
pytest.param(pa.timestamp("ms", "est"), {"type": "string", "format": "date-time"}, id="test_parquet_timestamps_ms_with_tz"),
pytest.param(pa.date32(), {"type": "string", "format": "date"}, id="test_parquet_date32"),
pytest.param(pa.date64(), {"type": "string", "format": "date"}, id="test_parquet_date64"),
pytest.param(pa.duration("s"), {"type": "integer"}, id="test_duration_s"),
pytest.param(pa.duration("ms"), {"type": "integer"}, id="test_duration_ms"),
pytest.param(pa.duration("us"), {"type": "integer"}, id="test_duration_us"),
pytest.param(pa.duration("ns"), {"type": "integer"}, id="test_duration_ns"),
pytest.param(pa.month_day_nano_interval(), {"type": "array"}, id="test_parquet_month_day_nano_interval"),
pytest.param(pa.binary(), {"type": "string"}, id="test_binary"),
pytest.param(pa.string(), {"type": "string"}, id="test_parquet_string"),
pytest.param(pa.utf8(), {"type": "string"}, id="test_utf8"),
pytest.param(pa.large_binary(), {"type": "string"}, id="test_large_binary"),
pytest.param(pa.large_string(), {"type": "string"}, id="test_large_string"),
pytest.param(pa.large_utf8(), {"type": "string"}, id="test_large_utf8"),
pytest.param(pa.dictionary(pa.int32(), pa.string()), {"type": "object"}, id="test_dictionary"),
pytest.param(pa.struct([pa.field("field", pa.int32())]), {"type": "object"}, id="test_struct"),
pytest.param(pa.list_(pa.int32()), {"type": "array"}, id="test_list"),
pytest.param(pa.large_list(pa.int32()), {"type": "array"}, id="test_large_list"),
pytest.param(pa.decimal128(2), {"type": "string"}, id="test_decimal128"),
pytest.param(pa.decimal256(2), {"type": "string"}, id="test_decimal256"),
]
)
def test_type_mapping(parquet_type: pa.DataType, expected_type: Mapping[str, str]) -> None:
if expected_type is None:
with pytest.raises(ValueError):
ParquetParser.parquet_type_to_schema_type(parquet_type)
else:
assert ParquetParser.parquet_type_to_schema_type(parquet_type) == expected_type


@pytest.mark.parametrize(
"pyarrow_type, parquet_object, expected_value",
[
pytest.param(pa.bool_(), True, True, id="test_bool"),
pytest.param(pa.int8(), -1, -1, id="test_int8"),
pytest.param(pa.int16(), 2, 2, id="test_int16"),
pytest.param(pa.int32(), 3, 3, id="test_int32"),
pytest.param(pa.uint8(), 4, 4, id="test_parquet_uint8"),
pytest.param(pa.uint16(), 5, 5, id="test_parquet_uint16"),
pytest.param(pa.uint32(), 6, 6, id="test_parquet_uint32"),
pytest.param(pa.float32(), 2.7, 2.7, id="test_parquet_float32"),
pytest.param(pa.float64(), 3.14, 3.14, id="test_parquet_float64"),
pytest.param(pa.time32("s"), datetime.time(1, 2, 3), "01:02:03", id="test_parquet_time32s"),
pytest.param(pa.time32("ms"), datetime.time(3, 4, 5), "03:04:05", id="test_parquet_time32ms"),
pytest.param(pa.time64("us"), datetime.time(6, 7, 8), "06:07:08", id="test_parquet_time64us"),
pytest.param(pa.time64("ns"), datetime.time(9, 10, 11), "09:10:11", id="test_parquet_time64us"),
pytest.param(pa.timestamp("s"), datetime.datetime(2023, 7, 7, 10, 11, 12), "2023-07-07T10:11:12", id="test_parquet_timestamps_s"),
pytest.param(pa.timestamp("ms"), datetime.datetime(2024, 8, 8, 11, 12, 13), "2024-08-08T11:12:13", id="test_parquet_timestamp_ms"),
pytest.param(pa.timestamp("s", "utc"), datetime.datetime(2020, 1, 1, 1, 1, 1, tzinfo=datetime.timezone.utc),
"2020-01-01T01:01:01+00:00", id="test_parquet_timestamps_s_with_tz"),
pytest.param(pa.timestamp("ms", "utc"), datetime.datetime(2021, 2, 3, 4, 5, tzinfo=datetime.timezone.utc),
"2021-02-03T04:05:00+00:00", id="test_parquet_timestamps_ms_with_tz"),
pytest.param(pa.date32(), datetime.date(2023, 7, 7), "2023-07-07", id="test_parquet_date32"),
pytest.param(pa.date64(), datetime.date(2023, 7, 8), "2023-07-08", id="test_parquet_date64"),
pytest.param(pa.duration("s"), 12345, 12345, id="test_duration_s"),
pytest.param(pa.duration("ms"), 12345, 12345, id="test_duration_ms"),
pytest.param(pa.duration("us"), 12345, 12345, id="test_duration_us"),
pytest.param(pa.duration("ns"), 12345, 12345, id="test_duration_ns"),
pytest.param(pa.month_day_nano_interval(), datetime.timedelta(days=3, microseconds=4), [0, 3, 4000],
id="test_parquet_month_day_nano_interval"),
pytest.param(pa.binary(), b"this is a binary string", "this is a binary string", id="test_binary"),
pytest.param(pa.string(), "this is a string", "this is a string", id="test_parquet_string"),
pytest.param(pa.utf8(), "utf8".encode("utf8"), "utf8", id="test_utf8"),
pytest.param(pa.large_binary(), b"large binary string", "large binary string", id="test_large_binary"),
pytest.param(pa.large_string(), "large string", "large string", id="test_large_string"),
pytest.param(pa.large_utf8(), "large utf8", "large utf8", id="test_large_utf8"),
pytest.param(pa.struct([pa.field("field", pa.int32())]), {"field": 1}, {"field": 1}, id="test_struct"),
pytest.param(pa.list_(pa.int32()), [1, 2, 3], [1, 2, 3], id="test_list"),
pytest.param(pa.large_list(pa.int32()), [4, 5, 6], [4, 5, 6], id="test_large_list"),
pytest.param(pa.decimal128(5, 3), 12, "12.000", id="test_decimal128"),
pytest.param(pa.decimal256(8, 2), 13, "13.00", id="test_decimal256"),
]
)
def test_value_transformation(pyarrow_type: pa.DataType, parquet_object: Scalar, expected_value: Any) -> None:
pyarrow_value = pa.array([parquet_object], type=pyarrow_type)[0]
py_value = ParquetParser._to_output_value(pyarrow_value)
if isinstance(py_value, float):
assert math.isclose(py_value, expected_value, abs_tol=0.01)
else:
assert py_value == expected_value


def test_value_dictionary() -> None:
# Setting the dictionary is more involved than other data types so we test it in a separate test
dictionary_values = ["apple", "banana", "cherry"]
indices = [0, 1, 2, 0, 1]
indices_array = pa.array(indices, type=pa.int8())
dictionary = pa.DictionaryArray.from_arrays(indices_array, dictionary_values)
py_value = ParquetParser._to_output_value(dictionary)
assert py_value == {"indices": [0, 1, 2, 0, 1], "values": ["apple", "banana", "cherry"]}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

import csv
import io
import tempfile
from datetime import datetime
from io import IOBase
from typing import Any, Dict, Iterable, List, Optional

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.file_based.default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
Expand Down Expand Up @@ -54,8 +58,8 @@ class InMemoryFilesStreamReader(AbstractFileBasedStreamReader):
file_write_options: Optional[Dict[str, Any]]

def get_matching_files(
self,
globs: List[str],
self,
globs: List[str],
) -> Iterable[RemoteFile]:
yield from AbstractFileBasedStreamReader.filter_files_by_globs([
RemoteFile(uri=f, last_modified=datetime.strptime(data["last_modified"], "%Y-%m-%dT%H:%M:%S.%fZ"), file_type=self.file_type)
Expand All @@ -82,3 +86,24 @@ def _make_csv_file_contents(self, file_name: str) -> str:
writer = csv.writer(fh)
writer.writerows(self.files[file_name]["contents"])
return fh.getvalue()


class TemporaryParquetFilesStreamReader(InMemoryFilesStreamReader):
"""
A file reader that writes RemoteFiles to a temporary file and then reads them back.
"""

def open_file(self, file: RemoteFile) -> IOBase:
return io.BytesIO(self._make_file_contents(file.uri))

def _make_file_contents(self, file_name: str) -> bytes:
contents = self.files[file_name]["contents"]
schema = self.files[file_name].get("schema")

df = pd.DataFrame(contents[1:], columns=contents[0])
with tempfile.TemporaryFile() as fp:
table = pa.Table.from_pandas(df, schema)
pq.write_table(table, fp)

fp.seek(0)
return fp.read()
Loading

0 comments on commit 40e62fb

Please sign in to comment.