From 83db1db1c93b45943a788a8bd61f023b389d4af2 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Wed, 25 Mar 2015 00:14:21 +0100 Subject: [PATCH] [FLINK-1775] BarrierBuffer fix to avoid end of stream deadlock Closes #534 --- .../jobgraph/tasks/BarrierTransceiver.java | 2 +- .../minicluster/LocalFlinkMiniCluster.scala | 11 +++- .../runtime/taskmanager/TaskManager.scala | 3 +- .../api/streamvertex/StreamVertex.java | 34 +++++++----- .../flink/streaming/io/BarrierBuffer.java | 52 ++++++++++++++++--- .../flink/streaming/io/SpillReader.java | 31 ++++------- .../streaming/io/SpillingBufferOrEvent.java | 43 +++------------ .../streaming/io/StreamRecordWriter.java | 4 +- .../io/StreamingAbstractRecordReader.java | 4 ++ .../TumblingGroupedPreReducerTest.java | 12 +++-- .../flink/streaming/io/BarrierBufferTest.java | 9 ++++ .../io/SpillingBufferOrEventTest.java | 12 +++-- .../StreamCheckpointingITCase.java | 4 +- 13 files changed, 126 insertions(+), 95 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java index a867b57cbba2b..1dd6a90e6bf5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java @@ -32,7 +32,7 @@ public interface BarrierTransceiver { * A callback for notifying an operator of a new checkpoint barrier. * @param barrierID */ - public void broadcastBarrier(long barrierID); + public void broadcastBarrierFromSource(long barrierID); /** * A callback for confirming that a barrier checkpoint is complete diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 37e41e2e8504b..fa98e2d7081b7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.EnvironmentInformation import org.slf4j.LoggerFactory +import akka.actor.ExtendedActorSystem /** * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same @@ -117,7 +118,15 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: def getJobClientActorSystem: ActorSystem = jobClientActorSystem def getJobManagerRPCPort: Int = { - configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1) + if (jobManagerActorSystem.isInstanceOf[ExtendedActorSystem]) { + val extActor = jobManagerActorSystem.asInstanceOf[ExtendedActorSystem] + extActor.provider.getDefaultAddress.port match { + case p: Some[Int] => p.get + case _ => -1 + } + } else { + -1 + } } override def shutdown(): Unit = { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 067ffe8dc27b9..1db4e3e62e475 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -361,7 +361,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, i.getEnvironment.getInvokable match { case barrierTransceiver: BarrierTransceiver => new Thread(new Runnable { - override def run(): Unit = barrierTransceiver.broadcastBarrier(checkpointID); + override def run(): Unit = + barrierTransceiver.broadcastBarrierFromSource(checkpointID); }).start() case _ => log.error("[FT-TaskManager] Received a barrier for the wrong vertex") } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index ae2ebdd619c06..2641bc15551b5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -60,6 +60,7 @@ public class StreamVertex extends AbstractInvokable implements StreamTa private InputHandler inputHandler; protected OutputHandler outputHandler; private StreamInvokable userInvokable; + protected volatile boolean isRunning = false; private StreamingRuntimeContext context; private Map> states; @@ -95,12 +96,14 @@ protected void initialize() { } @Override - public void broadcastBarrier(long id) { - // Only called at input vertices - if (LOG.isDebugEnabled()) { - LOG.debug("Received barrier from jobmanager: " + id); + public void broadcastBarrierFromSource(long id) { + if (this.isRunning) { + // Only called at input vertices + if (LOG.isDebugEnabled()) { + LOG.debug("Received barrier from jobmanager: " + id); + } + actOnBarrier(id); } - actOnBarrier(id); } /** @@ -151,6 +154,7 @@ public StreamingRuntimeContext createRuntimeContext(String taskName, @Override public void invoke() throws Exception { + this.isRunning = true; boolean operatorOpen = false; @@ -190,6 +194,7 @@ public void invoke() throws Exception { // Cleanup outputHandler.flushOutputs(); clearBuffers(); + this.isRunning = false; } } @@ -283,16 +288,17 @@ public EventListener getSuperstepListener() { * * @param id */ - private void actOnBarrier(long id) { - try { - outputHandler.broadcastBarrier(id); - // TODO checkpoint state here - confirmBarrier(id); - if (LOG.isDebugEnabled()) { - LOG.debug("Superstep " + id + " processed: " + StreamVertex.this); + private synchronized void actOnBarrier(long id) { + if (this.isRunning) { + try { + outputHandler.broadcastBarrier(id); + confirmBarrier(id); + if (LOG.isDebugEnabled()) { + LOG.debug("Superstep " + id + " processed: " + StreamVertex.this); + } + } catch (Exception e) { + throw new RuntimeException(e); } - } catch (Exception e) { - throw new RuntimeException("Error while confirming barrier", e); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java index f349ac501ecd9..6c198a7bdd861 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java @@ -24,6 +24,7 @@ import java.util.Queue; import java.util.Set; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; @@ -60,6 +61,10 @@ public class BarrierBuffer { private SpillReader spillReader; private BufferSpiller bufferSpiller; + private boolean inputFinished = false; + + private BufferOrEvent endOfStreamEvent = null; + public BarrierBuffer(InputGate inputGate, AbstractReader reader) { this.inputGate = inputGate; totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); @@ -136,17 +141,36 @@ public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedExceptio if (bufferOrEvent != null) { return bufferOrEvent; + } else if (blockedNonprocessed.isEmpty() && inputFinished) { + return endOfStreamEvent; } else { // If no non-processed, get new from input while (true) { - // We read the next buffer from the inputgate - bufferOrEvent = inputGate.getNextBufferOrEvent(); - if (isBlocked(bufferOrEvent.getChannelIndex())) { - // If channel blocked we just store it - blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent, bufferSpiller, - spillReader)); + if (!inputFinished) { + // We read the next buffer from the inputgate + bufferOrEvent = inputGate.getNextBufferOrEvent(); + + if (!bufferOrEvent.isBuffer() + && bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) { + if (inputGate.isFinished()) { + // store the event for later if the channel is + // closed + endOfStreamEvent = bufferOrEvent; + inputFinished = true; + } + + } else { + if (isBlocked(bufferOrEvent.getChannelIndex())) { + // If channel blocked we just store it + blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent, + bufferSpiller, spillReader)); + } else { + return bufferOrEvent; + } + } } else { - return bufferOrEvent; + actOnAllBlocked(); + return getNextNonBlocked(); } } } @@ -208,7 +232,11 @@ protected void actOnAllBlocked() { if (LOG.isDebugEnabled()) { LOG.debug("Publishing barrier to the vertex"); } - reader.publish(currentSuperstep); + + if (currentSuperstep != null) { + reader.publish(currentSuperstep); + } + releaseBlocks(); } @@ -240,4 +268,12 @@ public void cleanup() throws IOException { } } + public String toString() { + return nonprocessed.toString() + blockedNonprocessed.toString(); + } + + public boolean isEmpty() { + return nonprocessed.isEmpty() && blockedNonprocessed.isEmpty(); + } + } \ No newline at end of file diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java index 3cb83d4551016..ce16b8ca15325 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java @@ -24,7 +24,6 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; public class SpillReader { @@ -33,30 +32,18 @@ public class SpillReader { private File spillFile; /** - * Reads the next buffer from the spilled file. If a buffer pool was given, - * uses the buffer pool to request a new buffer to read into. - * + * Reads the next buffer from the spilled file. */ - public Buffer readNextBuffer(int bufferSize, BufferPool bufferPool) throws IOException { + public Buffer readNextBuffer(int bufferSize) throws IOException { try { - Buffer buffer = null; + Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), + new BufferRecycler() { - // If available tries to request a new buffer from the pool - if (bufferPool != null) { - buffer = bufferPool.requestBuffer(); - } - - // If no bufferpool provided or the pool was empty create a new - // buffer - if (buffer == null) { - buffer = new Buffer(new MemorySegment(new byte[bufferSize]), new BufferRecycler() { - - @Override - public void recycle(MemorySegment memorySegment) { - memorySegment.free(); - } - }); - } + @Override + public void recycle(MemorySegment memorySegment) { + memorySegment.free(); + } + }); spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize)); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java index 38cc0c0723f3d..ef3410ece2d9c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java @@ -19,9 +19,6 @@ import java.io.IOException; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; public class SpillingBufferOrEvent { @@ -31,8 +28,6 @@ public class SpillingBufferOrEvent { private SpillReader spillReader; - private BufferPool bufferPool; - private int channelIndex; private int bufferSize; @@ -43,10 +38,11 @@ public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillRead this.channelIndex = boe.getChannelIndex(); this.spillReader = reader; - if (shouldSpill()) { + if (boe.isBuffer()) { + this.bufferSize = boe.getBuffer().getSize(); spiller.spill(boe.getBuffer()); - isSpilled = true; - boe = null; + this.boe = null; + this.isSpilled = true; } } @@ -56,39 +52,14 @@ public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillRead */ public BufferOrEvent getBufferOrEvent() throws IOException { if (isSpilled) { - return new BufferOrEvent(spillReader.readNextBuffer(bufferSize, bufferPool), - channelIndex); + boe = new BufferOrEvent(spillReader.readNextBuffer(bufferSize), channelIndex); + this.isSpilled = false; + return boe; } else { return boe; } } - /** - * Checks whether a given buffer should be spilled to disk. Currently it - * checks whether the buffer pool from which the buffer was supplied is - * empty and only spills if it is. This avoids out of memory exceptions and - * also blocks at the input gate. - */ - private boolean shouldSpill() throws IOException { - if (boe.isBuffer()) { - Buffer buffer = boe.getBuffer(); - this.bufferSize = buffer.getSize(); - BufferRecycler recycler = buffer.getRecycler(); - - if (recycler instanceof BufferPool) { - bufferPool = (BufferPool) recycler; - Buffer nextBuffer = bufferPool.requestBuffer(); - if (nextBuffer == null) { - return true; - } else { - nextBuffer.recycle(); - } - } - } - - return false; - } - public boolean isSpilled() { return isSpilled; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java index be9adf4a2c66a..45595dfd31c91 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java @@ -64,10 +64,12 @@ public void close() { } private class OutputFlusher extends Thread { - private boolean running = true; + private volatile boolean running = true; + public void terminate() { running = false; } + @Override public void run() { while (running) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java index c5ffa62ebd314..8e939c633b5f5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java @@ -103,6 +103,10 @@ protected boolean getNextRecord(T target) throws IOException, InterruptedExcepti } else { if (handleEvent(event)) { if (inputGate.isFinished()) { + if (!barrierBuffer.isEmpty()) { + throw new RuntimeException( + "BarrierBuffer should be empty at this point"); + } isFinished = true; return false; } else if (hasReachedEndOfSuperstep()) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java index 55abc72762df6..ee2379f0efeaa 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java @@ -32,8 +32,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer; -import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.junit.Test; @@ -97,6 +95,7 @@ public void testEmitWindow() throws Exception { } + @SuppressWarnings("unchecked") @Test public void testEmitWindow2() throws Exception { @@ -116,11 +115,16 @@ public void testEmitWindow2() throws Exception { wb.store(serializer.copy(inputs.get(1))); wb.emitWindow(collector); - System.out.println(collected); - + assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0)); + wb.store(serializer.copy(inputs.get(0))); wb.store(serializer.copy(inputs.get(1))); wb.store(serializer.copy(inputs.get(2))); + wb.emitWindow(collector); + + assertSetEquals(StreamWindow.fromElements(new Tuple2(2, 0), inputs.get(1)), collected.get(1)); + + } private static void assertSetEquals(Collection first, Collection second) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java index d8749023f44f3..62feb35ccfb5e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java @@ -26,6 +26,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; @@ -105,6 +106,10 @@ public void testMultiChannelBarrier() throws IOException, InterruptedException { input.add(createBuffer(1)); input.add(createSuperstep(2, 1)); input.add(createSuperstep(3, 1)); + input.add(createSuperstep(4, 0)); + input.add(createBuffer(0)); + input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1)); + InputGate mockIG1 = new MockInputGate(2, input); AbstractReader mockAR1 = new MockReader(mockIG1); @@ -131,6 +136,10 @@ public void testMultiChannelBarrier() throws IOException, InterruptedException { bb.processSuperstep(nextBoe); check(input.get(6), nextBoe = bb.getNextNonBlocked()); check(input.get(9), nextBoe = bb.getNextNonBlocked()); + check(input.get(13), nextBoe = bb.getNextNonBlocked()); + bb.processSuperstep(nextBoe); + check(input.get(14), nextBoe = bb.getNextNonBlocked()); + check(input.get(15), nextBoe = bb.getNextNonBlocked()); bb.cleanup(); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java index 2f28f90639361..30fc820369612 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.io; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -41,17 +40,17 @@ public void testSpilling() throws IOException, InterruptedException { Buffer b1 = pool1.requestBuffer(); b1.getMemorySegment().putInt(0, 10000); - BufferOrEvent boe1 = new BufferOrEvent(b1, 0); + BufferOrEvent boe1 = new BufferOrEvent(b1, 2); SpillingBufferOrEvent sboe1 = new SpillingBufferOrEvent(boe1, bsp, spr); - assertFalse(sboe1.isSpilled()); + assertTrue(sboe1.isSpilled()); Buffer b2 = pool2.requestBuffer(); b2.getMemorySegment().putInt(0, 10000); - BufferOrEvent boe2 = new BufferOrEvent(b2, 0); + BufferOrEvent boe2 = new BufferOrEvent(b2, 4); SpillingBufferOrEvent sboe2 = new SpillingBufferOrEvent(boe2, bsp, spr); - assertFalse(sboe2.isSpilled()); + assertTrue(sboe2.isSpilled()); Buffer b3 = pool1.requestBuffer(); b3.getMemorySegment().putInt(0, 50000); @@ -73,14 +72,17 @@ public void testSpilling() throws IOException, InterruptedException { Buffer b1ret = sboe1.getBufferOrEvent().getBuffer(); assertEquals(10000, b1ret.getMemorySegment().getInt(0)); + assertEquals(2, sboe1.getBufferOrEvent().getChannelIndex()); b1ret.recycle(); Buffer b2ret = sboe2.getBufferOrEvent().getBuffer(); assertEquals(10000, b2ret.getMemorySegment().getInt(0)); + assertEquals(4, sboe2.getBufferOrEvent().getChannelIndex()); b2ret.recycle(); Buffer b3ret = sboe3.getBufferOrEvent().getBuffer(); assertEquals(50000, b3ret.getMemorySegment().getInt(0)); + assertEquals(0, sboe3.getBufferOrEvent().getChannelIndex()); b3ret.recycle(); Buffer b4ret = sboe4.getBufferOrEvent().getBuffer(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index 12c6b41eb55fa..797555a2d91c6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -97,8 +97,8 @@ public void runCheckpointedProgram() { try { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "localhost", cluster.getJobManagerRPCPort()); - env.setDegreeOfParallelism(PARALLELISM); -// env.enableMonitoring(500); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(200); DataStream stream = env.addSource(new RichParallelSourceFunction() {