From 9b1e00e2cb934bd8981037c82153d5fbd2b254c9 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Sun, 22 Mar 2015 13:44:27 +0100 Subject: [PATCH] [FLINK-1763] [streaming] Remove cancel from SinkFunction --- .../streaming/connectors/flume/FlumeSink.java | 7 --- .../connectors/kafka/api/KafkaSink.java | 6 +-- .../connectors/kafka/api/KafkaSource.java | 1 + .../api/simple/offset/BeginningOffset.java | 2 + .../api/simple/offset/CurrentOffset.java | 2 + .../kafka/api/simple/offset/GivenOffset.java | 1 + .../kafka/api/simple/offset/KafkaOffset.java | 2 + .../connectors/rabbitmq/RMQSink.java | 5 --- .../connectors/twitter/TwitterStreaming.java | 4 -- .../connectors/kafka/KafkaITCase.java | 7 ++- .../api/function/sink/FileSinkFunction.java | 9 ---- .../api/function/sink/PrintSinkFunction.java | 6 --- .../api/function/sink/SinkFunction.java | 7 --- .../sink/WriteSinkFunctionByMillis.java | 5 --- .../api/invokable/SinkInvokable.java | 7 --- .../api/streamvertex/StreamVertex.java | 43 ++++++++++++++----- .../flink/streaming/api/IterateTest.java | 5 --- .../api/collector/DirectedOutputTest.java | 6 +-- .../windowing/WindowIntegrationTest.java | 40 ----------------- .../api/streamvertex/StreamVertexTest.java | 10 +---- .../streaming/util/TestListResultSink.java | 6 --- .../streaming/api/scala/DataStream.scala | 1 - .../StreamCheckpointingITCase.java | 4 +- .../classloading/jar/StreamingProgram.java | 4 -- 24 files changed, 49 insertions(+), 141 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java index 86fd1b1b8ea16..8a2f2b8f45ea7 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java @@ -133,13 +133,6 @@ public void close() { client.client.close(); } - @Override - public void cancel() { - if (client != null) { - client.client.close(); - } - } - @Override public void open(Configuration config) { client = new FlinkRpcClientFacade(); diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java index 17535610a670c..2bd0fca4f9899 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java @@ -65,6 +65,7 @@ public class KafkaSink extends RichSinkFunction { * @param serializationSchema * User defined serialization schema. */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public KafkaSink(String zookeeperAddress, String topicId, SerializationSchema serializationSchema) { this(zookeeperAddress, topicId, serializationSchema, (Class) null); @@ -161,9 +162,4 @@ public void close() { } } - @Override - public void cancel() { - close(); - } - } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index 4eff8708fa468..d3a62e8526bdb 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -46,6 +46,7 @@ public class KafkaSource extends ConnectorSource { private static final long serialVersionUID = 1L; + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private final String zookeeperAddress; diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java index f7096ad80b24b..15e7b36bfac3c 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java @@ -22,6 +22,8 @@ public class BeginningOffset extends KafkaOffset { + private static final long serialVersionUID = 1L; + @Override public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { return getLastOffset(consumer, topic, partition, OffsetRequest.EarliestTime(), clientName); diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java index 3555ff9c83744..6119f3269c48f 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java @@ -22,6 +22,8 @@ public class CurrentOffset extends KafkaOffset { + private static final long serialVersionUID = 1L; + @Override public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { return getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName); diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java index 1282125aff6e2..fef6325924aa4 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java @@ -21,6 +21,7 @@ public class GivenOffset extends KafkaOffset { + private static final long serialVersionUID = 1L; private final long offset; public GivenOffset(long offset) { diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java index c048ba103b381..4dfd314dc485b 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java @@ -28,6 +28,8 @@ public abstract class KafkaOffset implements Serializable { + private static final long serialVersionUID = 1L; + public abstract long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName); diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index dae9c6d4de432..38c4f5fd423f7 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -108,9 +108,4 @@ public void close() { closeChannel(); } - @Override - public void cancel() { - close(); - } - } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java index 9be27eb29853c..a32fe1bed124b 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java @@ -47,10 +47,6 @@ public void invoke(Tuple5 tuple) { System.out.println(""); } - @Override - public void cancel() { - } - } public static class SelectDataFlatMap extends diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 134525d130d83..7b1abb45779b9 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -154,14 +154,11 @@ public void invoke(String value) throws Exception { throw new SuccessException(); } } - - @Override - public void cancel() { - } }); // add producing topology DataStream stream = env.addSource(new SourceFunction() { + private static final long serialVersionUID = 1L; boolean running = true; @Override @@ -247,6 +244,8 @@ public void sleep(long ms) { public static class SuccessException extends Exception { + private static final long serialVersionUID = 1L; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java index 5468494df5e11..24beba1a20024 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java @@ -115,13 +115,4 @@ protected void flush() { */ protected abstract void resetParameters(); - @Override - public void cancel() { - try { - close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java index 0fa37acefccec..9ff8a7f5d9977 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java @@ -93,10 +93,4 @@ public void close() { public String toString() { return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); } - - @Override - public void cancel() { - close(); - } - } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java index ffa5a67c38dc2..d4ce24e6bc680 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java @@ -35,11 +35,4 @@ public interface SinkFunction extends Function, Serializable { * @throws Exception */ public void invoke(IN value) throws Exception; - - /** - * In case another vertex in topology fails this method is called before terminating - * the sink. Make sure to free up any allocated resources here. - */ - public void cancel(); - } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java index 53030f4a0b223..ee6df9495099e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java @@ -47,9 +47,4 @@ protected void resetParameters() { lastTime = System.currentTimeMillis(); } - @Override - public void cancel() { - // No cleanup needed - } - } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java index 01a295a956a8a..29d2ed27eeab7 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java @@ -40,11 +40,4 @@ public void invoke() throws Exception { protected void callUserFunction() throws Exception { sinkFunction.invoke(nextObject); } - - @Override - public void cancel() { - super.cancel(); - sinkFunction.cancel(); - } - } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index dd8a463e9f626..ae2ebdd619c06 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -29,9 +29,9 @@ import org.apache.flink.runtime.jobmanager.BarrierAck; import org.apache.flink.runtime.jobmanager.StateBarrierAck; import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; @@ -152,32 +152,36 @@ public StreamingRuntimeContext createRuntimeContext(String taskName, @Override public void invoke() throws Exception { + boolean operatorOpen = false; + if (LOG.isDebugEnabled()) { LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID()); } try { userInvokable.setRuntimeContext(context); - userInvokable.open(getTaskConfiguration()); - for (ChainableInvokable invokable : outputHandler.chainedInvokables) { - invokable.setRuntimeContext(context); - invokable.open(getTaskConfiguration()); - } + operatorOpen = true; + openOperator(); userInvokable.invoke(); - userInvokable.close(); - - for (ChainableInvokable invokable : outputHandler.chainedInvokables) { - invokable.close(); - } + closeOperator(); + operatorOpen = false; if (LOG.isDebugEnabled()) { LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID()); } } catch (Exception e) { + + if (operatorOpen) { + try { + closeOperator(); + } catch (Throwable t) { + } + } + if (LOG.isErrorEnabled()) { LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e)); } @@ -190,6 +194,23 @@ public void invoke() throws Exception { } + protected void openOperator() throws Exception { + userInvokable.open(getTaskConfiguration()); + + for (ChainableInvokable invokable : outputHandler.chainedInvokables) { + invokable.setRuntimeContext(context); + invokable.open(getTaskConfiguration()); + } + } + + protected void closeOperator() throws Exception { + userInvokable.close(); + + for (ChainableInvokable invokable : outputHandler.chainedInvokables) { + invokable.close(); + } + } + protected void clearBuffers() throws IOException { if (outputHandler != null) { outputHandler.clearWriters(); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java index 92d23aad6ad71..3f0c48a0b6876 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -68,11 +68,6 @@ public static final class MySink implements SinkFunction { @Override public void invoke(Boolean tuple) { } - - @Override - public void cancel() { - } - } @Test diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java index 62872fd15da6d..bc7fe73ce8b49 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java @@ -88,9 +88,6 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, this.list = outputs.get(name); } - @Override - public void cancel() { - } } private static Map> outputs = new HashMap>(); @@ -115,6 +112,7 @@ public void outputSelectorTest() throws Exception { assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult()); assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), evenAndOddSink.getSortedResult()); - assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), allSink.getSortedResult()); + assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), + allSink.getSortedResult()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java index 34986c810b11c..f0b5500cbb947 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java @@ -286,10 +286,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -303,10 +299,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -320,10 +312,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -337,10 +325,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -354,10 +338,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -371,10 +351,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -388,10 +364,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -405,10 +377,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -422,10 +390,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } @SuppressWarnings("serial") @@ -439,10 +403,6 @@ public void invoke(StreamWindow value) throws Exception { windows.add(value); } - @Override - public void cancel() { - } - } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java index a88a60d5193ae..0c3ff83084f7b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java @@ -83,9 +83,6 @@ public void invoke(Tuple2 tuple) { data.put(k, v); } - @Override - public void cancel() { - } } @SuppressWarnings("unused") @@ -134,13 +131,13 @@ private static class CoMap implements CoMapFunction { @Override public String map1(String value) { -// System.out.println(value); + // System.out.println(value); return value; } @Override public String map2(Long value) { -// System.out.println(value); + // System.out.println(value); return value.toString(); } } @@ -154,9 +151,6 @@ public void invoke(String value) { result.add(value); } - @Override - public void cancel() { - } } @Test diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java index 8b78a42d7ab37..87f290f08497c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.util; - import java.util.ArrayList; import java.util.List; import java.util.TreeSet; @@ -72,9 +71,4 @@ public List getSortedResult() { return sortedList; } } - - @Override - public void cancel() { - - } } diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 3dc54d62645da..9eee49e25a3fe 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -626,7 +626,6 @@ class DataStream[T](javaStream: JavaStream[T]) { val sinkFunction = new SinkFunction[T] { val cleanFun = clean(fun) def invoke(in: T) = cleanFun(in) - def cancel() = {} } this.addSink(sinkFunction) } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index 3accb117475d5..12c6b41eb55fa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -178,9 +178,7 @@ public void close() { } } - @Override - public void cancel() {} - }); + }); env.execute(); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java index 18b52c5b418bb..9a244a4164509 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java @@ -98,9 +98,5 @@ public static class NoOpSink implements SinkFunction{ @Override public void invoke(Word value) throws Exception { } - - @Override - public void cancel() { - } } }