Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: change sink shuffle to use RowData as data type and statistics key type #7494

Merged
merged 2 commits into from
May 15, 2023

Conversation

stevenzwu
Copy link
Contributor

, because FlinkSink normalize the data type to RowData before coming to writer. Also added custom type serializers for MapDataStatistics and /DataStatisticsOrRecord.

@github-actions github-actions bot added the flink label May 1, 2023
… key type, because FlinkSink normalize the data type to RowData before coming to writer. Also added custom type serializers for MapDataStatistics and /DataStatisticsOrRecord.
@stevenzwu
Copy link
Contributor Author

address some of the comments from issue #7393.

@huyuanfeng2018 @hililiwei @yegangy0718 can you help review?

@yegangy0718 will follow up with a separate PR on jmh benchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Factory is replaced by TypeSerializer

@@ -28,10 +29,10 @@
* (sketching) can be used.
*/
@Internal
interface DataStatistics<K> {
interface DataStatistics<D extends DataStatistics, S> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used generic trick for the strong type check. it shouldn't matter to users since all these are internal classes.


@Internal
class MapDataStatisticsSerializer
extends TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the base interface DataStatistics so that it can be used by DataStatisticsOrRecordSerializer.

DataStatisticsOrRecordSerializer(
      TypeSerializer<DataStatistics<D, S>> statisticsSerializer,
      TypeSerializer<RowData> recordSerializer)

assertTrue(mapDataStatistics.dataStatistics().containsKey("b"));
assertEquals(2L, (long) mapDataStatistics.dataStatistics().get("a"));
assertEquals(1L, (long) mapDataStatistics.dataStatistics().get("b"));
try (OneInputStreamOperatorTestHarness<
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to wrap the block in try with test harness. otherwise, Flink don't know how to serialize the output type ofDataStatisticsOrRecord <>. test harness has the proper setup(...) on output type serializer

@@ -42,12 +43,19 @@
*
* @param key generate from data by applying key selector
*/
void add(K key);
void add(RowData key);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will use RowDataProjection to extract the key. PR #7493 is related.

@@ -42,12 +43,19 @@
*
* @param key generate from data by applying key selector
*/
void add(K key);
void add(RowData key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add a method here to count statistical information with values? For example, data under different keys may have different sizes. Is it also a consideration when controlling subsequent balance,like: add(Rowdata key, V v) Among them, v may represent the record bytes of the row corresponding to the current key. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are only counting records per key. To get the bytes, it would require serialization or some other trick of estimation. Agree bytes would be the best. but record count is probably also good enough.

@@ -40,50 +40,49 @@
* shuffle record to improve data clustering while maintaining relative balanced traffic
* distribution to downstream subtasks.
*/
class DataStatisticsOperator<T, K> extends AbstractStreamOperator<DataStatisticsOrRecord<T, K>>
implements OneInputStreamOperator<T, DataStatisticsOrRecord<T, K>>, OperatorEventHandler {
class DataStatisticsOperator<D extends DataStatistics<D, S>, S>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think this DataStatisticsOperator is good for collecting some statistical information. It seems that we also need an Operator to determine the partitionID, such as PartitionIdAssignerOperator and then pass org.apache.flink.api.java.functions.IdPartitioner From custom data distribution, I haven’t seen the design of this piece in the design document, can you briefly introduce the follow-up implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is only for collecting statistics to guide the partitioner decision. we will implement a custom range partitioner (Flink ) that splits the value into ranges (one for each assigner writer subtask) based on the statistics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my understanding, each writer will be responsible for one or more partitions, and we will distribute the data arriving from upstream to the corresponding writers, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hililiwei that is correct. custom range partitioner for Flink DataStream will distribute the data to writer subtasks with good clustering based on the data statistics.

@pvary
Copy link
Contributor

pvary commented May 8, 2023

@stevenzwu: I like that with a correct reader we can make sure that the IcebergSource could read whatever data structure we want. It would be good to have the possibility for the same thing for the FlinkSink (or IcebergSink later). Wouldn't this PR close this possibility by forcing the Sink to use RowData?

@stevenzwu
Copy link
Contributor Author

@stevenzwu: I like that with a correct reader we can make sure that the IcebergSource could read whatever data structure we want. It would be good to have the possibility for the same thing for the FlinkSink (or IcebergSink later). Wouldn't this PR close this possibility by forcing the Sink to use RowData?

@pvary FlinkSink can still support other input types (like Avro GenericRecord). The behavior (internally) is that all input type is converted to Flink RowData, which is the only supported record format for the underline file writer in Iceberg.

That is the reason why @huyuanfeng2018 was suggesting if RowData is the only key type we need to support. If in the future, Flink sink supports other record format for the underline file writer, maybe we can revisit this decision. for now, RowData is the only type Flink file writer supports.


@Override
public int getLength() {
return -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only care about the amount of data, not the size of data, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hililiwei not sure if I understand the question here. can you elaborate?

here is the javadoc for this method from the TypeSerializer interface.

Returns:
The length of the data type, or -1 for variable length data types.

@pvary
Copy link
Contributor

pvary commented May 9, 2023

@pvary FlinkSink can still support other input types (like Avro GenericRecord). The behavior (internally) is that all input type is converted to Flink RowData, which is the only supported record format for the underline file writer in Iceberg.

That is the reason why @huyuanfeng2018 was suggesting if RowData is the only key type we need to support. If in the future, Flink sink supports other record format for the underline file writer, maybe we can revisit this decision. for now, RowData is the only type Flink file writer supports.

Thanks @stevenzwu for the answer!

@stevenzwu stevenzwu requested review from hililiwei and yegangy0718 and removed request for hililiwei and yegangy0718 May 12, 2023 15:53
Copy link
Contributor

@yegangy0718 yegangy0718 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more question, how do we plan to register the DataStatisticsOrRecordSerializer? Do we plan to define TypeInfoFactory or use env config to register it?

@stevenzwu
Copy link
Contributor Author

stevenzwu commented May 12, 2023

One naive question, how do we plan to register the DataStatisticsOrRecordSerializer? Do we plan to define TypeInfoFactory or use env config to register it?

@yegangy0718 good question. we probably also need to implement a TypeInfo class when adding the DataStatisticsOperator. that way, Flink would know which type serializer to use. TypeInformation impl can be followed up as a separate PR.

Here is the DataStream API.

    /**
     * Method for passing user defined operators along with the type information that will transform
     * the DataStream.
     *
     * @param operatorName name of the operator, for logging purposes
     * @param outTypeInfo the output type of the operator
     * @param operator the object containing the transformation logic
     * @param <R> type of the return stream
     * @return the data stream constructed
     * @see #transform(String, TypeInformation, OneInputStreamOperatorFactory)
     */
    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            OneInputStreamOperator<T, R> operator) {

        return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
    }

@stevenzwu stevenzwu merged commit 203b8db into apache:master May 15, 2023
13 checks passed
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request May 17, 2023
stevenzwu added a commit that referenced this pull request May 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

None yet

5 participants