Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,17 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa

CheckpointCommittableManagerImpl(
int subtaskId, int numberOfSubtasks, @Nullable Long checkpointId) {
this.subtaskId = subtaskId;
this.numberOfSubtasks = numberOfSubtasks;
this.checkpointId = checkpointId;
this.subtasksCommittableManagers = new HashMap<>();
this(new HashMap<>(), subtaskId, numberOfSubtasks, checkpointId);
}

CheckpointCommittableManagerImpl(
Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers,
int subtaskId,
int numberOfSubtasks,
@Nullable Long checkpointId) {
this.subtasksCommittableManagers = checkNotNull(subtasksCommittableManagers);
this.subtaskId = 0;
this.numberOfSubtasks = 1;
this.subtaskId = subtaskId;
this.numberOfSubtasks = numberOfSubtasks;
this.checkpointId = checkpointId;
}

Expand Down Expand Up @@ -158,6 +157,8 @@ CheckpointCommittableManagerImpl<CommT> copy() {
return new CheckpointCommittableManagerImpl<>(
subtasksCommittableManagers.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (e) -> e.getValue().copy())),
subtaskId,
numberOfSubtasks,
checkpointId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public CheckpointCommittableManagerImpl<CommT> deserialize(int version, byte[] s
.collect(
Collectors.toMap(
SubtaskCommittableManager::getSubtaskId, e -> e)),
subtaskId,
numberOfSubtasks,
checkpointId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public CommittableSummaryAssert isEqualTo(CommittableSummary<?> summary) {
isNotNull();
assertThat(actual.getSubtaskId()).isEqualTo(summary.getSubtaskId());
assertThat(actual.getCheckpointId()).isEqualTo(summary.getCheckpointId());
assertThat(actual.getNumberOfSubtasks()).isEqualTo(summary.getNumberOfSubtasks());
assertThat(actual.getNumberOfCommittables()).isEqualTo(summary.getNumberOfCommittables());
assertThat(actual.getNumberOfPendingCommittables())
.isEqualTo(summary.getNumberOfPendingCommittables());
Expand All @@ -44,6 +45,18 @@ public CommittableSummaryAssert isEqualTo(CommittableSummary<?> summary) {
return this;
}

public CommittableSummaryAssert hasSubtaskId(int subtaskId) {
isNotNull();
assertThat(actual.getSubtaskId()).isEqualTo(subtaskId);
return this;
}

public CommittableSummaryAssert hasNumberOfSubtasks(int numberOfSubtasks) {
isNotNull();
assertThat(actual.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks);
return this;
}

public CommittableSummaryAssert hasOverallCommittables(int committableNumber) {
isNotNull();
assertThat(actual.getNumberOfCommittables()).isEqualTo(committableNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,27 @@ void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception {

@Test
void testStateRestore() throws Exception {

final int originalSubtaskId = 0;
final int subtaskIdAfterRecovery = 9;

final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
testHarness = createTestHarness(new TestSink.RetryOnceCommitter(), false, true);
testHarness =
createTestHarness(
new TestSink.RetryOnceCommitter(),
false,
true,
1,
1,
originalSubtaskId);
testHarness.open();

final CommittableSummary<String> committableSummary =
new CommittableSummary<>(1, 1, 0L, 1, 1, 0);
new CommittableSummary<>(originalSubtaskId, 1, 0L, 1, 1, 0);
testHarness.processElement(new StreamRecord<>(committableSummary));
final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", 0L, 1);
final CommittableWithLineage<String> first =
new CommittableWithLineage<>("1", 0L, originalSubtaskId);
testHarness.processElement(new StreamRecord<>(first));

final OperatorSubtaskState snapshot = testHarness.snapshot(0L, 2L);
Expand All @@ -239,9 +251,14 @@ void testStateRestore() throws Exception {
testHarness.close();

final ForwardingCommitter committer = new ForwardingCommitter();

// create new testHarness but with different parallelism level and subtaskId that original
// one.
// we will make sure that new subtaskId was used during committable recovery.
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
restored = createTestHarness(committer, false, true);
restored =
createTestHarness(committer, false, true, 10, 10, subtaskIdAfterRecovery);

restored.initializeState(snapshot);
restored.open();
Expand All @@ -256,7 +273,9 @@ void testStateRestore() throws Exception {
.hasPendingCommittables(0);

SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1)))
.isEqualTo(new CommittableWithLineage<>(first.getCommittable(), 1L, 0));
.isEqualTo(
new CommittableWithLineage<>(
first.getCommittable(), 1L, subtaskIdAfterRecovery));
restored.close();
}

Expand Down Expand Up @@ -344,6 +363,33 @@ CommittableWithLineage<?> copyCommittableWithDifferentOrigin(
isCheckpointingEnabled));
}

private OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
createTestHarness(
Committer<String> committer,
boolean isBatchMode,
boolean isCheckpointingEnabled,
int maxParallelism,
int parallelism,
int subtaskId)
throws Exception {
return new OneInputStreamOperatorTestHarness<>(
new CommitterOperatorFactory<>(
(TwoPhaseCommittingSink<?, String>)
TestSink.newBuilder()
.setCommitter(committer)
.setDefaultGlobalCommitter()
.setCommittableSerializer(
TestSink.StringCommittableSerializer.INSTANCE)
.build()
.asV2(),
isBatchMode,
isCheckpointingEnabled),
maxParallelism,
parallelism,
subtaskId);
}

private static class ForwardingCommitter extends TestSink.DefaultCommitter {
private int successfulCommits = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -93,6 +96,26 @@ void testUpdateCommittableSummary() {
.hasMessageContaining("FLINK-25920");
}

// check different values of subtaskId and numberOfSubtasks to make sure that no value is
// hardcoded.
@ParameterizedTest(name = "subtaskId = {0}, numberOfSubtasks = {1}, checkpointId = {2}")
@CsvSource({"1, 10, 100", "2, 20, 200", "3, 30, 300"})
public void testCopy(int subtaskId, int numberOfSubtasks, long checkpointId) {

final CheckpointCommittableManagerImpl<Integer> original =
new CheckpointCommittableManagerImpl<>(subtaskId, numberOfSubtasks, checkpointId);
original.upsertSummary(
new CommittableSummary<>(subtaskId, numberOfSubtasks, checkpointId, 1, 0, 0));

CheckpointCommittableManagerImpl<Integer> copy = original.copy();

assertThat(copy.getCheckpointId()).isEqualTo(checkpointId);
SinkV2Assertions.assertThat(copy.getSummary())
.hasNumberOfSubtasks(numberOfSubtasks)
.hasSubtaskId(subtaskId)
.hasCheckpointId(checkpointId);
}

private static class NoOpCommitter implements Committer<Integer> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;

import org.junit.jupiter.api.Test;

Expand All @@ -35,6 +36,7 @@
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertAll;

class CommittableCollectorSerializerTest {

Expand Down Expand Up @@ -72,38 +74,88 @@ void testCommittableCollectorV1SerDe() throws IOException {

@Test
void testCommittableCollectorV2SerDe() throws IOException {
final CommittableCollector<Integer> committableCollector = new CommittableCollector<>(2, 3);
committableCollector.addMessage(new CommittableSummary<>(2, 3, 1L, 1, 1, 0));
committableCollector.addMessage(new CommittableSummary<>(2, 3, 2L, 1, 1, 0));
committableCollector.addMessage(new CommittableWithLineage<>(1, 1L, 2));
committableCollector.addMessage(new CommittableWithLineage<>(2, 2L, 2));
int subtaskId = 2;
int numberOfSubtasks = 3;

final CommittableCollectorSerializer<Integer> ccSerializer =
new CommittableCollectorSerializer<>(
COMMITTABLE_SERIALIZER, subtaskId, numberOfSubtasks);

final CommittableCollector<Integer> committableCollector =
new CommittableCollector<>(subtaskId, numberOfSubtasks);
committableCollector.addMessage(
new CommittableSummary<>(subtaskId, numberOfSubtasks, 1L, 1, 1, 0));
committableCollector.addMessage(
new CommittableSummary<>(subtaskId, numberOfSubtasks, 2L, 1, 1, 0));
committableCollector.addMessage(new CommittableWithLineage<>(1, 1L, subtaskId));
committableCollector.addMessage(new CommittableWithLineage<>(2, 2L, subtaskId));

final CommittableCollector<Integer> copy =
SERIALIZER.deserialize(2, SERIALIZER.serialize(committableCollector));
ccSerializer.deserialize(2, SERIALIZER.serialize(committableCollector));

// Expect the subtask Id equal to the origin of the collector
assertThat(copy.getSubtaskId()).isEqualTo(1);
assertThat(copy.getSubtaskId()).isEqualTo(subtaskId);
assertThat(copy.isFinished()).isFalse();
assertThat(copy.getNumberOfSubtasks()).isEqualTo(1);
final Collection<CheckpointCommittableManagerImpl<Integer>> checkpointCommittables =
committableCollector.getCheckpointCommittables();
assertThat(checkpointCommittables).hasSize(2);
final Iterator<CheckpointCommittableManagerImpl<Integer>> committablesIterator =
checkpointCommittables.iterator();
final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint1 =
committablesIterator.next().getSubtaskCommittableManager(2);
assertThat(
subtaskCommittableManagerCheckpoint1
.getPendingRequests()
.map(CommitRequestImpl::getCommittable)
.collect(Collectors.toList()))
.containsExactly(1);
final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint2 =
committablesIterator.next().getSubtaskCommittableManager(2);
assertThat(
subtaskCommittableManagerCheckpoint2
.getPendingRequests()
.map(CommitRequestImpl::getCommittable)
.collect(Collectors.toList()))
.containsExactly(2);
assertThat(copy.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks);

// assert original CommittableCollector
assertCommittableCollector(
"Original CommittableCollector", subtaskId, numberOfSubtasks, committableCollector);

// assert deserialized CommittableCollector
assertCommittableCollector(
"Deserialized CommittableCollector", subtaskId, numberOfSubtasks, copy);
}

private void assertCommittableCollector(
String assertMessageHeading,
int subtaskId,
int numberOfSubtasks,
CommittableCollector<Integer> committableCollector) {

assertAll(
assertMessageHeading,
() -> {
final Collection<CheckpointCommittableManagerImpl<Integer>>
checkpointCommittables =
committableCollector.getCheckpointCommittables();
assertThat(checkpointCommittables).hasSize(2);

final Iterator<CheckpointCommittableManagerImpl<Integer>> committablesIterator =
checkpointCommittables.iterator();
final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager1 =
committablesIterator.next();
final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint1 =
checkpointCommittableManager1.getSubtaskCommittableManager(subtaskId);

SinkV2Assertions.assertThat(checkpointCommittableManager1.getSummary())
.hasSubtaskId(subtaskId)
.hasNumberOfSubtasks(numberOfSubtasks);
assertThat(
subtaskCommittableManagerCheckpoint1
.getPendingRequests()
.map(CommitRequestImpl::getCommittable)
.collect(Collectors.toList()))
.containsExactly(1);
assertThat(subtaskCommittableManagerCheckpoint1.getSubtaskId())
.isEqualTo(subtaskId);

final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager2 =
committablesIterator.next();
final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint2 =
checkpointCommittableManager2.getSubtaskCommittableManager(subtaskId);

SinkV2Assertions.assertThat(checkpointCommittableManager2.getSummary())
.hasSubtaskId(subtaskId)
.hasNumberOfSubtasks(numberOfSubtasks);
assertThat(
subtaskCommittableManagerCheckpoint2
.getPendingRequests()
.map(CommitRequestImpl::getCommittable)
.collect(Collectors.toList()))
.containsExactly(2);
assertThat(subtaskCommittableManagerCheckpoint2.getSubtaskId())
.isEqualTo(subtaskId);
});
}
}