From 15ddd14f9414701b32125012662893bac2ded0d0 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Mon, 14 Aug 2017 16:40:45 +0200 Subject: [PATCH] [hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction --- .../sink/TwoPhaseCommitSinkFunction.java | 31 +++++++------------ .../sink/TwoPhaseCommitSinkFunctionTest.java | 3 +- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 85ddc5c40b0b3..58532f5672b7b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -58,7 +57,7 @@ public abstract class TwoPhaseCommitSinkFunction private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); - protected final ListStateDescriptor>> pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor> pendingCommitTransactionsDescriptor; protected final ListStateDescriptor pendingTransactionsDescriptor; protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); @@ -66,7 +65,7 @@ public abstract class TwoPhaseCommitSinkFunction @Nullable protected TXN currentTransaction; protected ListState pendingTransactionsState; - protected ListState>> pendingCommitTransactionsState; + protected ListState> pendingCommitTransactionsState; /** * Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this @@ -76,17 +75,17 @@ public abstract class TwoPhaseCommitSinkFunction * {@code * TwoPhaseCommitSinkFunction( * TypeInformation.of(TXN.class), - * TypeInformation.of(new TypeHint>() {})); + * TypeInformation.of(new TypeHint>() {})); * } * * @param txnTypeInformation {@link TypeInformation} for transaction POJO. - * @param checkpointToTxnTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction. + * @param txnListTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction. */ public TwoPhaseCommitSinkFunction( TypeInformation txnTypeInformation, - TypeInformation>> checkpointToTxnTypeInformation) { + TypeInformation> txnListTypeInformation) { this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation), - new ListStateDescriptor<>("pendingCommitTransactions", checkpointToTxnTypeInformation)); + new ListStateDescriptor<>("pendingCommitTransactions", txnListTypeInformation)); } /** @@ -97,7 +96,7 @@ public TwoPhaseCommitSinkFunction( */ public TwoPhaseCommitSinkFunction( ListStateDescriptor pendingTransactionsDescriptor, - ListStateDescriptor>> pendingCommitTransactionsDescriptor) { + ListStateDescriptor> pendingCommitTransactionsDescriptor) { this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null"); this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null"); } @@ -236,7 +235,7 @@ public final void snapshotState(FunctionSnapshotContext context) throws Exceptio LOG.debug("{} - started new transaction '{}'", name(), currentTransaction); pendingCommitTransactionsState.clear(); - pendingCommitTransactionsState.add(toTuple2List(pendingCommitTransactions)); + pendingCommitTransactionsState.add(new ArrayList<>(pendingCommitTransactions.values())); pendingTransactionsState.clear(); // in case of failure we might not be able to abort currentTransaction. Let's store it into the state @@ -266,10 +265,10 @@ public final void initializeState(FunctionInitializationContext context) throws if (context.isRestored()) { LOG.info("{} - restoring state", name()); - for (List> recoveredTransactions : pendingCommitTransactionsState.get()) { - for (Tuple2 recoveredTransaction : recoveredTransactions) { + for (List recoveredTransactions : pendingCommitTransactionsState.get()) { + for (TXN recoveredTransaction : recoveredTransactions) { // If this fails, there is actually a data loss - recoverAndCommit(recoveredTransaction.f1); + recoverAndCommit(recoveredTransaction); LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction); } } @@ -305,12 +304,4 @@ private String name() { getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } - - private List> toTuple2List(LinkedHashMap transactions) { - List> result = new ArrayList<>(); - for (Map.Entry entry : transactions.entrySet()) { - result.add(Tuple2.of(entry.getKey(), entry.getValue())); - } - return result; - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 7d3abc2c3ff91..e5bb630b27c4b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -131,7 +130,7 @@ private static class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction>>() {})); + TypeInformation.of(new TypeHint>() {})); if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) { throw new IllegalArgumentException();