Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
049f5f7
Flink: Implement data statistics coordinator to aggregate data statis…
Apr 13, 2023
f0f84d0
fix CICD error
Apr 17, 2023
e550396
handle comments to add javadoc for all classes and rename AggregateDa…
Apr 19, 2023
c0800be
rename lastCompleteAggregator to lastCompletedAggregator
Apr 19, 2023
8f9ec54
log msg minor change
Apr 19, 2023
61b21e2
log message minor change
Apr 19, 2023
9a3f598
update EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE from 0.8 to 80 an…
Apr 21, 2023
c6fe1b4
move executor close function to context
Apr 23, 2023
3b441e7
remove coordinator context
Apr 30, 2023
ed0cb09
remove unused comments
May 3, 2023
9e6a672
use RowData as the type to collect data statistics
May 17, 2023
dfc0a2f
fix style in coordinator
May 17, 2023
7a068b9
fix unit test to only send bytes in DataStatisticsEvent
May 21, 2023
fc36623
fix style
May 23, 2023
333bd8f
fix CoordinatorProvider unit test
May 24, 2023
ad0203e
Remove final and update function receiveDataStatisticEventAndCheckCom…
Jun 16, 2023
86ca09c
Create DataStatisticsUtil to serialize and deserialize statistics. Ad…
Jul 24, 2023
de9f629
handle comments to update log message and unit tests
Aug 15, 2023
9ce20c5
remove subtask set in GlobalStatistics and use GlobalStatisticsTracke…
Aug 17, 2023
e5b6dc6
fix style to use Set.newHashSet to create set
Aug 23, 2023
baf838e
update comments log messages
Aug 28, 2023
6107ac3
rename GlobalStatistics GlobalStatisticsTracker to AggregatedStatisti…
Aug 28, 2023
7bdb2b0
fix style
Aug 28, 2023
02f3df1
update log message and comments
Sep 22, 2023
aab164f
delete GlobalStatisticsTracker
Sep 22, 2023
8f6cc7a
use assertj in unit test
Sep 23, 2023
bfc9e05
handle comments to update log message
Sep 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.shuffle;

import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* AggregatedStatistics is used by {@link DataStatisticsCoordinator} to collect {@link
* DataStatistics} from {@link DataStatisticsOperator} subtasks for specific checkpoint. It stores
* the merged {@link DataStatistics} result from all reported subtasks.
*/
class AggregatedStatistics<D extends DataStatistics<D, S>, S> implements Serializable {

private final long checkpointId;
private final DataStatistics<D, S> dataStatistics;

AggregatedStatistics(long checkpoint, TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
this.checkpointId = checkpoint;
this.dataStatistics = statisticsSerializer.createInstance();
}

AggregatedStatistics(long checkpoint, DataStatistics<D, S> dataStatistics) {
this.checkpointId = checkpoint;
this.dataStatistics = dataStatistics;
}

long checkpointId() {
return checkpointId;
}

DataStatistics<D, S> dataStatistics() {
return dataStatistics;
}

void mergeDataStatistic(String operatorName, long eventCheckpointId, D eventDataStatistics) {
Preconditions.checkArgument(
checkpointId == eventCheckpointId,
"Received unexpected event from operator %s checkpoint %s. Expected checkpoint %s",
operatorName,
eventCheckpointId,
checkpointId);
dataStatistics.merge(eventDataStatistics);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("checkpointId", checkpointId)
.add("dataStatistics", dataStatistics)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Set;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* AggregatedStatisticsTracker is used by {@link DataStatisticsCoordinator} to track the in progress
* {@link AggregatedStatistics} received from {@link DataStatisticsOperator} subtasks for specific
* checkpoint.
*/
class AggregatedStatisticsTracker<D extends DataStatistics<D, S>, S> {
private static final Logger LOG = LoggerFactory.getLogger(AggregatedStatisticsTracker.class);
private static final double ACCEPT_PARTIAL_AGGR_THRESHOLD = 90;
private final String operatorName;
private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
private final int parallelism;
private final Set<Integer> inProgressSubtaskSet;
private volatile AggregatedStatistics<D, S> inProgressStatistics;

AggregatedStatisticsTracker(
String operatorName,
TypeSerializer<DataStatistics<D, S>> statisticsSerializer,
int parallelism) {
this.operatorName = operatorName;
this.statisticsSerializer = statisticsSerializer;
this.parallelism = parallelism;
this.inProgressSubtaskSet = Sets.newHashSet();
}

AggregatedStatistics<D, S> updateAndCheckCompletion(
int subtask, DataStatisticsEvent<D, S> event) {
long checkpointId = event.checkpointId();

if (inProgressStatistics != null && inProgressStatistics.checkpointId() > checkpointId) {
LOG.info(
"Expect data statistics for operator {} checkpoint {}, but receive event from older checkpoint {}. Ignore it.",
operatorName,
inProgressStatistics.checkpointId(),
checkpointId);
return null;
}

AggregatedStatistics<D, S> completedStatistics = null;
if (inProgressStatistics != null && inProgressStatistics.checkpointId() < checkpointId) {
if ((double) inProgressSubtaskSet.size() / parallelism * 100
>= ACCEPT_PARTIAL_AGGR_THRESHOLD) {
completedStatistics = inProgressStatistics;
LOG.info(
"Received data statistics from {} subtasks out of total {} for operator {} at checkpoint {}. "
+ "Complete data statistics aggregation at checkpoint {} as it is more than the threshold of {} percentage",
inProgressSubtaskSet.size(),
parallelism,
operatorName,
checkpointId,
inProgressStatistics.checkpointId(),
ACCEPT_PARTIAL_AGGR_THRESHOLD);
} else {
LOG.info(
"Received data statistics from {} subtasks out of total {} for operator {} at checkpoint {}. "
+ "Aborting the incomplete aggregation for checkpoint {}",
inProgressSubtaskSet.size(),
parallelism,
operatorName,
checkpointId,
inProgressStatistics.checkpointId());
}

inProgressStatistics = null;
inProgressSubtaskSet.clear();
}

if (inProgressStatistics == null) {
LOG.info("Starting a new data statistics for checkpoint {}", checkpointId);
inProgressStatistics = new AggregatedStatistics<>(checkpointId, statisticsSerializer);
inProgressSubtaskSet.clear();
}

if (!inProgressSubtaskSet.add(subtask)) {
LOG.debug(
"Ignore duplicated data statistics from operator {} subtask {} for checkpoint {}.",
operatorName,
subtask,
checkpointId);
} else {
inProgressStatistics.mergeDataStatistic(
operatorName,
event.checkpointId(),
DataStatisticsUtil.deserializeDataStatistics(
event.statisticsBytes(), statisticsSerializer));
}

if (inProgressSubtaskSet.size() == parallelism) {
completedStatistics = inProgressStatistics;
LOG.info(
"Received data statistics from all {} operators {} for checkpoint {}. Return last completed aggregator {}.",
parallelism,
operatorName,
inProgressStatistics.checkpointId(),
completedStatistics.dataStatistics());
inProgressStatistics = new AggregatedStatistics<>(checkpointId + 1, statisticsSerializer);
inProgressSubtaskSet.clear();
}

return completedStatistics;
}

@VisibleForTesting
AggregatedStatistics<D, S> inProgressStatistics() {
return inProgressStatistics;
}
}
Loading