From a422ac4fb283d11bc8932fc030a0e44720e12580 Mon Sep 17 00:00:00 2001 From: Juntao Hu Date: Mon, 8 Aug 2022 23:23:44 +0800 Subject: [PATCH 1/2] [FLINK-28876][python][format/orc] Support writing RowData into Orc files --- .../docs/connectors/datastream/filesystem.md | 30 ++++ .../docs/connectors/datastream/filesystem.md | 30 ++++ flink-python/pom.xml | 8 + flink-python/pyflink/datastream/__init__.py | 3 + .../pyflink/datastream/formats/orc.py | 104 ++++++++++++ .../datastream/formats/tests/test_orc.py | 156 ++++++++++++++++++ 6 files changed, 331 insertions(+) create mode 100644 flink-python/pyflink/datastream/formats/orc.py create mode 100644 flink-python/pyflink/datastream/formats/tests/test_orc.py diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md index f5468862a5347..5691383469cb1 100644 --- a/docs/content.zh/docs/connectors/datastream/filesystem.md +++ b/docs/content.zh/docs/connectors/datastream/filesystem.md @@ -811,6 +811,36 @@ class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { {{< /tab >}} {{< /tabs >}} +PyFlink 用户可以使用 `OrcBulkWriters.for_row_data_vectorization` 来创建将 `Row` 数据写入 Orc 文件的 `BulkWriterFactory` 。 +注意如果 sink 的前置算子的输出类型为 `RowData` ,例如 CSV source ,则需要先转换为 `Row` 类型。 + +{{< py_download_link "orc" >}} + +```python +row_type = DataTypes.ROW([ + DataTypes.FIELD('name', DataTypes.STRING()), + DataTypes.FIELD('age', DataTypes.INT()), +]) +row_type_info = Types.ROW_NAMED( + ['name', 'age'], + [Types.STRING(), Types.INT()] +) + +sink = FileSink.for_bulk_format( + OUTPUT_DIR, + OrcBulkWriters.for_row_data_vectorization( + row_type=row_type, + writer_properties=Configuration(), + hadoop_config=Configuration(), + ) +).build() + +# 如果 ds 是产生 RowData 的数据源,可以使用一个 map 函数来指定其对应的 Row 类型。 +ds.map(lambda e: e, output_type=row_type_info).sink_to(sink) +# 否则 +ds.sink_to(sink) +``` + ##### Hadoop SequenceFile Format diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md index f5a4517aa95df..f68f270f3f705 100644 --- a/docs/content/docs/connectors/datastream/filesystem.md +++ b/docs/content/docs/connectors/datastream/filesystem.md @@ -816,6 +816,36 @@ class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { {{< /tab >}} {{< /tabs >}} +For PyFlink users, `OrcBulkWriters.for_row_data_vectorization` could be used to create `BulkWriterFactory` to write `Row` records to files in Orc format. +It should be noted that if the preceding operator of sink is an operator producing `RowData` records, e.g. CSV source, it needs to be converted to `Row` records before writing to sink. + +{{< py_download_link "orc" >}} + +```python +row_type = DataTypes.ROW([ + DataTypes.FIELD('name', DataTypes.STRING()), + DataTypes.FIELD('age', DataTypes.INT()), +]) +row_type_info = Types.ROW_NAMED( + ['name', 'age'], + [Types.STRING(), Types.INT()] +) + +sink = FileSink.for_bulk_format( + OUTPUT_DIR, + OrcBulkWriters.for_row_data_vectorization( + row_type=row_type, + writer_properties=Configuration(), + hadoop_config=Configuration(), + ) +).build() + +# If ds is a source stream producing RowData records, a map could be added to help converting RowData records into Row records. +ds.map(lambda e: e, output_type=row_type_info).sink_to(sink) +# Else +ds.sink_to(sink) +``` + ##### Hadoop SequenceFile format To use the `SequenceFile` bulk encoder in your application you need to add the following dependency: diff --git a/flink-python/pom.xml b/flink-python/pom.xml index f1a61a9eff6a5..5922e98a7e4a3 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -247,6 +247,14 @@ under the License. test + + + org.apache.flink + flink-sql-orc + ${project.version} + test + + org.apache.flink diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py index 4bcf0050446cd..f4d3df35f59b6 100644 --- a/flink-python/pyflink/datastream/__init__.py +++ b/flink-python/pyflink/datastream/__init__.py @@ -234,6 +234,9 @@ - :class:`formats.parquet.AvroParquetWriters`: Convenience builder to create ParquetWriterFactory instances for Avro types. Only GenericRecord is supported in PyFlink. + - :class:`formats.orc.OrcBulkWriters`: + Convenient builder to create a :class:`BulkWriterFactory` that writes Row records with a + defined :class:`RowType` into Orc files. Other important classes: diff --git a/flink-python/pyflink/datastream/formats/orc.py b/flink-python/pyflink/datastream/formats/orc.py new file mode 100644 index 0000000000000..b4794b14c5ce4 --- /dev/null +++ b/flink-python/pyflink/datastream/formats/orc.py @@ -0,0 +1,104 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from typing import Optional + +from pyflink.common import Configuration +from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory +from pyflink.datastream.utils import create_hadoop_configuration +from pyflink.java_gateway import get_gateway +from pyflink.table.types import _to_java_data_type, RowType +from pyflink.util.java_utils import to_jarray + + +class OrcBulkWriters(object): + """ + Convenient builder to create a :class:`BulkWriterFactory` that writes Row records with a defined + :class:`RowType` into Orc files in a batch fashion. + + .. versionadded:: 1.16.0 + """ + + @staticmethod + def for_row_data_vectorization(row_type: RowType, + writer_properties: Optional[Configuration] = None, + hadoop_config: Optional[Configuration] = None) \ + -> BulkWriterFactory: + """ + Create a :class:`RowDataBulkWriterFactory` that writes Row records with a defined + :class:`RowType` into Orc files in a batch fashion. + + Example: + :: + + >>> row_type = DataTypes.ROW([ + ... DataTypes.FIELD('string', DataTypes.STRING()), + ... DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())) + ... ]) + >>> row_type_info = Types.ROW_NAMED( + ... ['string', 'int_array'], + ... [Types.STRING(), Types.LIST(Types.INT())] + ... ) + >>> sink = FileSink.for_bulk_format( + ... OUTPUT_DIR, OrcBulkWriters.for_row_data_vectorization( + ... row_type=row_type, + ... writer_properties=Configuration(), + ... hadoop_config=Configuration(), + ... ) + ... ).build() + >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink) + + Note that in the above example, an identity map to indicate its :class:`RowTypeInfo` is + necessary before ``sink_to`` when ``ds`` is a source stream producing **RowData** records, + because :class:`RowDataBulkWriterFactory` assumes the input record type is :class:`Row`. + """ + if not isinstance(row_type, RowType): + raise TypeError('row_type must be an instance of RowType') + + j_data_type = _to_java_data_type(row_type) + jvm = get_gateway().jvm + j_row_type = j_data_type.getLogicalType() + orc_types = to_jarray( + jvm.org.apache.flink.table.types.logical.LogicalType, + [i for i in j_row_type.getChildren()] + ) + type_description = jvm.org.apache.flink.orc \ + .OrcSplitReaderUtil.logicalTypeToOrcType(j_row_type) + if writer_properties is None: + writer_properties = Configuration() + if hadoop_config is None: + hadoop_config = Configuration() + + return RowDataBulkWriterFactory( + jvm.org.apache.flink.orc.writer.OrcBulkWriterFactory( + jvm.org.apache.flink.orc.vector.RowDataVectorizer( + type_description.toString(), + orc_types + ), + OrcBulkWriters._create_properties(writer_properties), + create_hadoop_configuration(hadoop_config) + ), + row_type + ) + + @staticmethod + def _create_properties(conf: Configuration): + jvm = get_gateway().jvm + properties = jvm.java.util.Properties() + for k, v in conf.to_dict().items(): + properties.put(k, v) + return properties diff --git a/flink-python/pyflink/datastream/formats/tests/test_orc.py b/flink-python/pyflink/datastream/formats/tests/test_orc.py new file mode 100644 index 0000000000000..24ced7e8fa4ce --- /dev/null +++ b/flink-python/pyflink/datastream/formats/tests/test_orc.py @@ -0,0 +1,156 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import glob +import os +import tempfile +import unittest +from datetime import date, datetime +from decimal import Decimal +from typing import List, Optional, Tuple + +import pandas as pd + +from pyflink.common import Row +from pyflink.common.typeinfo import RowTypeInfo, Types +from pyflink.datastream import DataStream +from pyflink.datastream.connectors.file_system import FileSink +from pyflink.datastream.formats.orc import OrcBulkWriters +from pyflink.datastream.formats.tests.test_parquet import _create_parquet_array_row_and_data, \ + _check_parquet_array_results, _create_parquet_map_row_and_data, _check_parquet_map_results +from pyflink.java_gateway import get_gateway +from pyflink.table.types import RowType, DataTypes +from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, to_java_data_structure + + +@unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None, + 'Some Hadoop lib is needed for Orc format tests') +class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase): + + def setUp(self): + super().setUp() + self.env.set_parallelism(1) + self.orc_dir_name = tempfile.mkdtemp(dir=self.tempdir) + + def test_orc_basic_write(self): + row_type, row_type_info, data = _create_orc_basic_row_and_data() + self._build_orc_job(row_type, row_type_info, data) + self.env.execute('test_orc_basic_write') + results = self._read_orc_file() + _check_orc_basic_results(self, results) + + def test_orc_array_write(self): + ( + row_type, + row_type_info, + conversion_row_type_info, + data, + ) = _create_parquet_array_row_and_data() + self._build_orc_job(row_type, row_type_info, data, conversion_row_type_info) + self.env.execute() + results = self._read_orc_file() + _check_parquet_array_results(self, results) + + def test_orc_map_write(self): + row_type, row_type_info, data = _create_parquet_map_row_and_data() + self._build_orc_job(row_type, row_type_info, data) + self.env.execute() + results = self._read_orc_file() + _check_parquet_map_results(self, results) + + def _build_orc_job( + self, + row_type: RowType, + row_type_info: RowTypeInfo, + data: List[Row], + conversion_type_info: Optional[RowTypeInfo] = None, + ): + jvm = get_gateway().jvm + sink = FileSink.for_bulk_format( + self.orc_dir_name, OrcBulkWriters.for_row_data_vectorization(row_type) + ).build() + j_list = jvm.java.util.ArrayList() + for d in data: + j_list.add(to_java_data_structure(d)) + ds = DataStream(self.env._j_stream_execution_environment.fromCollection( + j_list, + row_type_info.get_java_type_info() + )) + if conversion_type_info: + ds = ds.map(lambda e: e, output_type=conversion_type_info) + ds.sink_to(sink) + + def _read_orc_file(self): + records = [] + for file in glob.glob(os.path.join(os.path.join(self.orc_dir_name, '**/*'))): + df = pd.read_orc(file) + for i in range(df.shape[0]): + records.append(df.loc[i]) + return records + + +def _create_orc_basic_row_and_data() -> Tuple[RowType, RowTypeInfo, List[Row]]: + jvm = get_gateway().jvm + row_type = DataTypes.ROW([ + DataTypes.FIELD('char', DataTypes.CHAR(10)), + DataTypes.FIELD('varchar', DataTypes.VARCHAR(10)), + DataTypes.FIELD('bytes', DataTypes.BYTES()), + DataTypes.FIELD('boolean', DataTypes.BOOLEAN()), + DataTypes.FIELD('decimal', DataTypes.DECIMAL(2, 0)), + DataTypes.FIELD('int', DataTypes.INT()), + DataTypes.FIELD('bigint', DataTypes.BIGINT()), + DataTypes.FIELD('double', DataTypes.DOUBLE()), + DataTypes.FIELD('date', DataTypes.DATE()), + DataTypes.FIELD('timestamp', DataTypes.TIMESTAMP(3)), + ]) + row_type_info = Types.ROW_NAMED( + ['char', 'varchar', 'bytes', 'boolean', 'decimal', 'int', 'bigint', 'double', + 'date', 'timestamp'], + [Types.STRING(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()), Types.BOOLEAN(), + Types.BIG_DEC(), Types.INT(), Types.LONG(), Types.DOUBLE(), + Types.JAVA(jvm.java.time.LocalTime), Types.JAVA(jvm.java.time.LocalDateTime)] + ) + data = [Row( + char='char', + varchar='varchar', + bytes=b'varbinary', + boolean=True, + decimal=Decimal(1.5), + int=2147483647, + bigint=-9223372036854775808, + double=2e-308, + date=date(1970, 1, 1), + timestamp=datetime(1970, 1, 2, 3, 4, 5, 600000), + )] + return row_type, row_type_info, data + + +def _check_orc_basic_results(test, results): + row = results[0] + test.assertEqual(row['char'], b'char ') + test.assertEqual(row['varchar'], 'varchar') + test.assertEqual(row['bytes'], b'varbinary') + test.assertEqual(row['boolean'], True) + test.assertAlmostEqual(row['decimal'], 2) + test.assertEqual(row['int'], 2147483647) + test.assertEqual(row['bigint'], -9223372036854775808) + test.assertAlmostEqual(row['double'], 2e-308, delta=1e-311) + test.assertEqual(row['date'], date(1970, 1, 1)) + test.assertEqual( + row['timestamp'].to_pydatetime(), + datetime(1970, 1, 2, 3, 4, 5, 600000), + ) From 42a3f3481af225dc023485d22c546af2779f5cb9 Mon Sep 17 00:00:00 2001 From: Juntao Hu Date: Tue, 9 Aug 2022 19:31:14 +0800 Subject: [PATCH 2/2] move function to utils --- .../pyflink/datastream/formats/orc.py | 26 +++++++------------ flink-python/pyflink/datastream/utils.py | 8 ++++++ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/flink-python/pyflink/datastream/formats/orc.py b/flink-python/pyflink/datastream/formats/orc.py index b4794b14c5ce4..1203f82bd3822 100644 --- a/flink-python/pyflink/datastream/formats/orc.py +++ b/flink-python/pyflink/datastream/formats/orc.py @@ -19,7 +19,7 @@ from pyflink.common import Configuration from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory -from pyflink.datastream.utils import create_hadoop_configuration +from pyflink.datastream.utils import create_hadoop_configuration, create_java_properties from pyflink.java_gateway import get_gateway from pyflink.table.types import _to_java_data_type, RowType from pyflink.util.java_utils import to_jarray @@ -27,8 +27,8 @@ class OrcBulkWriters(object): """ - Convenient builder to create a :class:`BulkWriterFactory` that writes Row records with a defined - :class:`RowType` into Orc files in a batch fashion. + Convenient builder to create a :class:`~connectors.file_system.BulkWriterFactory` that writes + Row records with a defined RowType into Orc files in a batch fashion. .. versionadded:: 1.16.0 """ @@ -39,8 +39,8 @@ def for_row_data_vectorization(row_type: RowType, hadoop_config: Optional[Configuration] = None) \ -> BulkWriterFactory: """ - Create a :class:`RowDataBulkWriterFactory` that writes Row records with a defined - :class:`RowType` into Orc files in a batch fashion. + Create a RowDataBulkWriterFactory that writes Row records with a defined RowType into Orc + files in a batch fashion. Example: :: @@ -62,9 +62,9 @@ def for_row_data_vectorization(row_type: RowType, ... ).build() >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink) - Note that in the above example, an identity map to indicate its :class:`RowTypeInfo` is - necessary before ``sink_to`` when ``ds`` is a source stream producing **RowData** records, - because :class:`RowDataBulkWriterFactory` assumes the input record type is :class:`Row`. + Note that in the above example, an identity map to indicate its RowTypeInfo is necessary + before ``sink_to`` when ``ds`` is a source stream producing **RowData** records, + because RowDataBulkWriterFactory assumes the input record type is Row. """ if not isinstance(row_type, RowType): raise TypeError('row_type must be an instance of RowType') @@ -89,16 +89,8 @@ def for_row_data_vectorization(row_type: RowType, type_description.toString(), orc_types ), - OrcBulkWriters._create_properties(writer_properties), + create_java_properties(writer_properties), create_hadoop_configuration(hadoop_config) ), row_type ) - - @staticmethod - def _create_properties(conf: Configuration): - jvm = get_gateway().jvm - properties = jvm.java.util.Properties() - for k, v in conf.to_dict().items(): - properties.put(k, v) - return properties diff --git a/flink-python/pyflink/datastream/utils.py b/flink-python/pyflink/datastream/utils.py index f1bdb18617718..efbdd1b5da1ec 100644 --- a/flink-python/pyflink/datastream/utils.py +++ b/flink-python/pyflink/datastream/utils.py @@ -51,6 +51,14 @@ def create_hadoop_configuration(config: Configuration): return hadoop_config +def create_java_properties(config: Configuration): + jvm = get_gateway().jvm + properties = jvm.java.util.Properties() + for k, v in config.to_dict().items(): + properties.put(k, v) + return properties + + def convert_to_python_obj(data, type_info): if type_info == Types.PICKLED_BYTE_ARRAY(): return pickle.loads(data)