From 3f3830dd0a26b0b051e9e59e6fdaa0bac3be66f8 Mon Sep 17 00:00:00 2001 From: mbalassi Date: Wed, 22 Apr 2015 18:05:16 +0200 Subject: [PATCH 1/2] [streaming] StreamConfig now uses InstantiationUtils for serialization --- .../streaming/api/graph/StreamConfig.java | 127 +++++++++++------- 1 file changed, 79 insertions(+), 48 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 24819900e5843..fdfec0032e392 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -17,14 +17,13 @@ package org.apache.flink.streaming.api.graph; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.SerializationException; -import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -113,7 +112,7 @@ public StreamRecordSerializer getTypeSerializerIn1(ClassLoader cl) { return (StreamRecordSerializer) InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_1, cl); } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); + throw new StreamTaskException("Could not instantiate serializer.", e); } } @@ -123,7 +122,7 @@ public StreamRecordSerializer getTypeSerializerIn2(ClassLoader cl) { return (StreamRecordSerializer) InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_2, cl); } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); + throw new StreamTaskException("Could not instantiate serializer.", e); } } @@ -133,7 +132,7 @@ public StreamRecordSerializer getTypeSerializerOut1(ClassLoader cl) { return (StreamRecordSerializer) InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl); } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); + throw new StreamTaskException("Could not instantiate serializer.", e); } } @@ -143,12 +142,16 @@ public StreamRecordSerializer getTypeSerializerOut2(ClassLoader cl) { return (StreamRecordSerializer) InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_2, cl); } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); + throw new StreamTaskException("Could not instantiate serializer.", e); } } private void setTypeSerializer(String key, StreamRecordSerializer typeWrapper) { - config.setBytes(key, SerializationUtils.serialize(typeWrapper)); + try { + InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key); + } catch (IOException e) { + throw new StreamTaskException("Could not serialize type serializer.", e); + } } public void setBufferTimeout(long timeout) { @@ -164,10 +167,10 @@ public void setStreamOperator(StreamOperator operator) { config.setClass(USER_FUNCTION, operator.getClass()); try { - config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(operator)); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize operator object " - + operator.getClass(), e); + InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF); + } catch (IOException e) { + throw new StreamTaskException("Cannot serialize operator object " + + operator.getClass() + ".", e); } } } @@ -177,15 +180,15 @@ public T getStreamOperator(ClassLoader cl) { try { return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl); } catch (Exception e) { - throw new StreamTaskException("Cannot instantiate user function", e); + throw new StreamTaskException("Cannot instantiate user function.", e); } } public void setOutputSelectorWrapper(OutputSelectorWrapper outputSelectorWrapper) { try { - config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper)); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize OutputSelectorWrapper"); + InstantiationUtil.writeObjectToConfig(outputSelectorWrapper, this.config, OUTPUT_SELECTOR_WRAPPER); + } catch (IOException e) { + throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e); } } @@ -195,7 +198,7 @@ public OutputSelectorWrapper getOutputSelectorWrapper(ClassLoader cl) { return (OutputSelectorWrapper) InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, cl); } catch (Exception e) { - throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper", e); + throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.", e); } } @@ -216,19 +219,26 @@ public long getIterationWaitTime() { } public void setSelectedNames(Integer output, List selected) { - if (selected != null) { - config.setBytes(OUTPUT_NAME + output, - SerializationUtils.serialize((Serializable) selected)); - } else { - config.setBytes(OUTPUT_NAME + output, - SerializationUtils.serialize(new ArrayList())); + if (selected == null) { + selected = new ArrayList(); + } + + try { + InstantiationUtil.writeObjectToConfig(selected, this.config, OUTPUT_NAME + output); + } catch (IOException e) { + throw new StreamTaskException("Cannot serialize OutputSelector for name \"" + output+ "\".", e); } } @SuppressWarnings("unchecked") - public List getSelectedNames(Integer output) { - return (List) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output, - null)); + public List getSelectedNames(Integer output, ClassLoader cl) { + List selectedNames; + try { + selectedNames = (List) InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_NAME + output, cl); + } catch (Exception e) { + throw new StreamTaskException("Cannot deserialize OutputSelector for name \"" + output + "\".", e); + } + return selectedNames == null ? new ArrayList() : selectedNames; } public void setNumberOfInputs(int numberOfInputs) { @@ -248,7 +258,11 @@ public int getNumberOfOutputs() { } public void setNonChainedOutputs(List outputvertexIDs) { - config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs)); + try { + InstantiationUtil.writeObjectToConfig(outputvertexIDs, this.config, NONCHAINED_OUTPUTS); + } catch (IOException e) { + throw new StreamTaskException("Cannot serialize non chained outputs.", e); + } } @SuppressWarnings("unchecked") @@ -257,13 +271,16 @@ public List getNonChainedOutputs(ClassLoader cl) { List nonChainedOutputs = (List) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl); return nonChainedOutputs == null ? new ArrayList() : nonChainedOutputs; } catch (Exception e) { - throw new RuntimeException("Could not instantiate outputs."); + throw new StreamTaskException("Could not instantiate non chained outputs.", e); } } public void setChainedOutputs(List chainedOutputs) { - config.setBytes(CHAINED_OUTPUTS, - SerializationUtils.serialize((Serializable) chainedOutputs)); + try { + InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, CHAINED_OUTPUTS); + } catch (IOException e) { + throw new StreamTaskException("Cannot serialize chained outputs.", e); + } } @SuppressWarnings("unchecked") @@ -272,78 +289,92 @@ public List getChainedOutputs(ClassLoader cl) { List chainedOutputs = (List) InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, cl); return chainedOutputs == null ? new ArrayList() : chainedOutputs; } catch (Exception e) { - throw new RuntimeException("Could not instantiate chained outputs."); + throw new StreamTaskException("Could not instantiate chained outputs.", e); } } public void setOutEdges(List outEdges) { - config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges)); + try { + InstantiationUtil.writeObjectToConfig(outEdges, this.config, OUT_STREAM_EDGES); + } catch (IOException e) { + throw new StreamTaskException("Cannot serialize outward edges.", e); + } } @SuppressWarnings("unchecked") public List getOutEdges(ClassLoader cl) { try { - return (List) InstantiationUtil.readObjectFromConfig( + List outEdges = (List) InstantiationUtil.readObjectFromConfig( this.config, OUT_STREAM_EDGES, cl); + return outEdges == null ? new ArrayList() : outEdges; } catch (Exception e) { - throw new RuntimeException("Could not instantiate outputs."); + throw new StreamTaskException("Could not instantiate outputs.", e); } } public void setInPhysicalEdges(List inEdges) { - config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges)); + try { + InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES); + } catch (IOException e) { + throw new StreamTaskException("Cannot serialize inward edges.", e); + } } @SuppressWarnings("unchecked") public List getInPhysicalEdges(ClassLoader cl) { try { - return (List) InstantiationUtil.readObjectFromConfig( + List inEdges = (List) InstantiationUtil.readObjectFromConfig( this.config, IN_STREAM_EDGES, cl); + return inEdges == null ? new ArrayList() : inEdges; } catch (Exception e) { - throw new RuntimeException("Could not instantiate inputs."); + throw new StreamTaskException("Could not instantiate inputs.", e); } } public void setStateMonitoring(boolean stateMonitoring) { - config.setBoolean(STATE_MONITORING, stateMonitoring); - } - public boolean getStateMonitoring() - { + public boolean getStateMonitoring() { return config.getBoolean(STATE_MONITORING, false); } public void setOutEdgesInOrder(List outEdgeList) { - config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList)); + try { + InstantiationUtil.writeObjectToConfig(outEdgeList, this.config, EDGES_IN_ORDER); + } catch (IOException e) { + throw new StreamTaskException("Could not serialize outputs in order.", e); + } } @SuppressWarnings("unchecked") public List getOutEdgesInOrder(ClassLoader cl) { try { - return (List) InstantiationUtil.readObjectFromConfig( + List outEdgesInOrder = (List) InstantiationUtil.readObjectFromConfig( this.config, EDGES_IN_ORDER, cl); + return outEdgesInOrder == null ? new ArrayList() : outEdgesInOrder; } catch (Exception e) { - throw new RuntimeException("Could not instantiate outputs."); + throw new StreamTaskException("Could not instantiate outputs in order.", e); } } public void setTransitiveChainedTaskConfigs(Map chainedTaskConfigs) { - config.setBytes(CHAINED_TASK_CONFIG, - SerializationUtils.serialize((Serializable) chainedTaskConfigs)); + + try { + InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG); + } catch (IOException e) { + throw new StreamTaskException("Could not serialize configuration.", e); + } } @SuppressWarnings("unchecked") public Map getTransitiveChainedTaskConfigs(ClassLoader cl) { try { - Map confs = (Map) InstantiationUtil .readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl); - return confs == null ? new HashMap() : confs; } catch (Exception e) { - throw new RuntimeException("Could not instantiate configuration."); + throw new StreamTaskException("Could not instantiate configuration.", e); } } From 046f39ea448050ea52cb2389b0acad9d58ddafda Mon Sep 17 00:00:00 2001 From: mbalassi Date: Fri, 24 Apr 2015 11:04:32 +0200 Subject: [PATCH 2/2] [streaming] Stream operators robustness improved for serilization This closes #620 --- .../apache/flink/streaming/api/graph/StreamNode.java | 2 +- .../flink/streaming/api/operators/StreamFilter.java | 5 ++--- .../flink/streaming/api/operators/StreamFlatMap.java | 6 ++---- .../flink/streaming/api/operators/StreamFold.java | 7 ++----- .../streaming/api/operators/StreamGroupedFold.java | 3 +++ .../streaming/api/operators/StreamGroupedReduce.java | 3 ++- .../flink/streaming/api/operators/StreamOperator.java | 2 +- .../flink/streaming/api/operators/StreamReduce.java | 5 ++--- .../flink/streaming/api/operators/StreamSink.java | 6 ++---- .../flink/streaming/api/operators/StreamSource.java | 9 ++++----- .../streaming/api/operators/co/CoStreamFlatMap.java | 9 ++++----- .../api/operators/co/CoStreamGroupedReduce.java | 11 ++++++++--- .../streaming/api/operators/co/CoStreamOperator.java | 4 ++-- .../streaming/api/operators/co/CoStreamReduce.java | 6 ++++-- .../streaming/api/operators/co/CoStreamWindow.java | 5 ++--- .../api/operators/windowing/WindowMerger.java | 2 +- .../api/windowing/policy/MultiEvictionPolicy.java | 2 +- .../runtime/partitioner/FieldsPartitioner.java | 2 +- .../runtime/partitioner/StreamPartitioner.java | 2 +- 19 files changed, 45 insertions(+), 46 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index cb07f428fab3e..576150e20c99c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -46,7 +46,7 @@ public class StreamNode implements Serializable { private Long bufferTimeout = null; private String operatorName; - private StreamOperator operator; + private transient StreamOperator operator; private List> outputSelectors; private StreamRecordSerializer typeSerializerIn1; private StreamRecordSerializer typeSerializerIn2; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java index d2cddf6ec33e7..898f5ef74bf4c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java @@ -23,12 +23,10 @@ public class StreamFilter extends ChainableStreamOperator { private static final long serialVersionUID = 1L; - FilterFunction filterFunction; private boolean collect; public StreamFilter(FilterFunction filterFunction) { super(filterFunction); - this.filterFunction = filterFunction; } @Override @@ -39,8 +37,9 @@ public void run() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { - collect = filterFunction.filter(nextObject); + collect = ((FilterFunction) userFunction).filter(nextObject); if (collect) { collector.collect(nextObject); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java index a17b1625a0913..2b8a3a880e17b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java @@ -22,11 +22,8 @@ public class StreamFlatMap extends ChainableStreamOperator { private static final long serialVersionUID = 1L; - private FlatMapFunction flatMapper; - public StreamFlatMap(FlatMapFunction flatMapper) { super(flatMapper); - this.flatMapper = flatMapper; } @Override @@ -37,8 +34,9 @@ public void run() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { - flatMapper.flatMap(nextObject, collector); + ((FlatMapFunction) userFunction).flatMap(nextObject, collector); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java index fc5f187840e4f..542f65c257cfe 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java @@ -24,14 +24,12 @@ public class StreamFold extends ChainableStreamOperator { private static final long serialVersionUID = 1L; - protected FoldFunction folder; private OUT accumulator; protected TypeSerializer outTypeSerializer; public StreamFold(FoldFunction folder, OUT initialValue, TypeInformation outTypeInformation) { super(folder); - this.folder = folder; this.accumulator = initialValue; this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); } @@ -44,10 +42,9 @@ public void run() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { - - accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextObject); + accumulator = ((FoldFunction) userFunction).fold(outTypeSerializer.copy(accumulator), nextObject); collector.collect(accumulator); - } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java index 303f1b348b841..88c75dfac9ae5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java @@ -40,9 +40,12 @@ public StreamGroupedFold(FoldFunction folder, KeySelector keySel } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { Object key = nextRecord.getKey(keySelector); OUT accumulator = values.get(key); + FoldFunction folder = ((FoldFunction) userFunction); + if (accumulator != null) { OUT folded = folder.fold(outTypeSerializer.copy(accumulator), nextObject); values.put(key, folded); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java index f5c8f2105128e..d254fd401f945 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java @@ -36,11 +36,12 @@ public StreamGroupedReduce(ReduceFunction reducer, KeySelector keySel } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { Object key = keySelector.getKey(nextObject); IN currentValue = values.get(key); if (currentValue != null) { - IN reduced = reducer.reduce(copy(currentValue), nextObject); + IN reduced = ((ReduceFunction) userFunction).reduce(copy(currentValue), nextObject); values.put(key, reduced); collector.collect(reduced); } else { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 6d6c7934dad95..5cb3ec9d5cba6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -199,7 +199,7 @@ public ChainingStrategy getChainingStrategy() { } public static enum ChainingStrategy { - ALWAYS, NEVER, HEAD; + ALWAYS, NEVER, HEAD } public Function getUserFunction() { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java index 179d690ae56e1..fdf12845b3278 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java @@ -22,12 +22,10 @@ public class StreamReduce extends ChainableStreamOperator { private static final long serialVersionUID = 1L; - protected ReduceFunction reducer; private IN currentValue; public StreamReduce(ReduceFunction reducer) { super(reducer); - this.reducer = reducer; currentValue = null; } @@ -39,10 +37,11 @@ public void run() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { if (currentValue != null) { - currentValue = reducer.reduce(copy(currentValue), nextObject); + currentValue = ((ReduceFunction) userFunction).reduce(copy(currentValue), nextObject); } else { currentValue = nextObject; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index d1f93d112c487..26e37fad0b3f7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -22,11 +22,8 @@ public class StreamSink extends ChainableStreamOperator { private static final long serialVersionUID = 1L; - private SinkFunction sinkFunction; - public StreamSink(SinkFunction sinkFunction) { super(sinkFunction); - this.sinkFunction = sinkFunction; } @Override @@ -37,7 +34,8 @@ public void run() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { - sinkFunction.invoke(nextObject); + ((SinkFunction) userFunction).invoke(nextObject); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 8c834f55cdbba..ef253ac3d1b28 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -25,11 +25,8 @@ public class StreamSource extends StreamOperator implements Seria private static final long serialVersionUID = 1L; - private SourceFunction sourceFunction; - public StreamSource(SourceFunction sourceFunction) { super(sourceFunction); - this.sourceFunction = sourceFunction; } @Override @@ -38,13 +35,15 @@ public void run() { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { - sourceFunction.run(collector); + ((SourceFunction) userFunction).run(collector); } @Override + @SuppressWarnings("unchecked") public void cancel() { super.cancel(); - sourceFunction.cancel(); + ((SourceFunction) userFunction).cancel(); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java index 004a17ae64acf..95f089c084021 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java @@ -22,11 +22,8 @@ public class CoStreamFlatMap extends CoStreamOperator { private static final long serialVersionUID = 1L; - private CoFlatMapFunction flatMapper; - public CoStreamFlatMap(CoFlatMapFunction flatMapper) { super(flatMapper); - this.flatMapper = flatMapper; } @Override @@ -40,14 +37,16 @@ public void handleStream2() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction1() throws Exception { - flatMapper.flatMap1(reuse1.getObject(), collector); + ((CoFlatMapFunction) userFunction).flatMap1(reuse1.getObject(), collector); } @Override + @SuppressWarnings("unchecked") protected void callUserFunction2() throws Exception { - flatMapper.flatMap2(reuse2.getObject(), collector); + ((CoFlatMapFunction) userFunction).flatMap2(reuse2.getObject(), collector); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java index 2ed3b2ee79808..0a763920c1de7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java @@ -36,7 +36,6 @@ public class CoStreamGroupedReduce extends CoStreamReduce coReducer, KeySelector keySelector1, KeySelector keySelector2) { super(coReducer); - this.coReducer = coReducer; this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; values1 = new HashMap(); @@ -44,7 +43,9 @@ public CoStreamGroupedReduce(CoReduceFunction coReducer, } @Override + @SuppressWarnings("unchecked") public void handleStream1() throws Exception { + CoReduceFunction coReducer = (CoReduceFunction) userFunction; Object key = reuse1.getKey(keySelector1); currentValue1 = values1.get(key); nextValue1 = reuse1.getObject(); @@ -59,7 +60,9 @@ public void handleStream1() throws Exception { } @Override + @SuppressWarnings("unchecked") public void handleStream2() throws Exception { + CoReduceFunction coReducer = (CoReduceFunction) userFunction; Object key = reuse2.getKey(keySelector2); currentValue2 = values2.get(key); nextValue2 = reuse2.getObject(); @@ -74,14 +77,16 @@ public void handleStream2() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction1() throws Exception { - reduced1 = coReducer.reduce1(currentValue1, nextValue1); + reduced1 = ((CoReduceFunction) userFunction).reduce1(currentValue1, nextValue1); } @Override + @SuppressWarnings("unchecked") protected void callUserFunction2() throws Exception { - reduced2 = coReducer.reduce2(currentValue2, nextValue2); + reduced2 = ((CoReduceFunction) userFunction).reduce2(currentValue2, nextValue2); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java index 214cb177f837d..5e764aba1425f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java @@ -122,11 +122,11 @@ public void run() throws Exception { protected void initialize1() { - }; + } protected void initialize2() { - }; + } protected void callUserFunctionAndLogException1() { try { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java index c2801745af757..90aecc760d9a2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java @@ -22,7 +22,6 @@ public class CoStreamReduce extends CoStreamOperator { private static final long serialVersionUID = 1L; - protected CoReduceFunction coReducer; protected IN1 currentValue1 = null; protected IN2 currentValue2 = null; protected IN1 nextValue1 = null; @@ -30,7 +29,6 @@ public class CoStreamReduce extends CoStreamOperator coReducer) { super(coReducer); - this.coReducer = coReducer; currentValue1 = null; currentValue2 = null; } @@ -48,7 +46,9 @@ public void handleStream2() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction1() throws Exception { + CoReduceFunction coReducer = (CoReduceFunction) userFunction; if (currentValue1 != null) { currentValue1 = coReducer.reduce1(currentValue1, nextValue1); } else { @@ -58,7 +58,9 @@ protected void callUserFunction1() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction2() throws Exception { + CoReduceFunction coReducer = (CoReduceFunction) userFunction; if (currentValue2 != null) { currentValue2 = coReducer.reduce2(currentValue2, nextValue2); } else { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java index 0875b7e00a020..78371cc236b84 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java @@ -30,7 +30,6 @@ public class CoStreamWindow extends CoStreamOperator { private static final long serialVersionUID = 1L; - protected CoWindowFunction coWindowFunction; protected long windowSize; protected long slideSize; protected CircularFifoList> circularList1; @@ -46,7 +45,6 @@ public class CoStreamWindow extends CoStreamOperator coWindowFunction, long windowSize, long slideInterval, TimestampWrapper timeStamp1, TimestampWrapper timeStamp2) { super(coWindowFunction); - this.coWindowFunction = coWindowFunction; this.windowSize = windowSize; this.slideSize = slideInterval; this.circularList1 = new CircularFifoList>(); @@ -69,6 +67,7 @@ protected void handleStream2() throws Exception { } @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { List first = new ArrayList(); @@ -82,7 +81,7 @@ protected void callUserFunction() throws Exception { } if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) { - coWindowFunction.coWindow(first, second, collector); + ((CoWindowFunction) userFunction).coWindow(first, second, collector); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java index fc03780262636..e69257b6909f3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java @@ -47,8 +47,8 @@ public void run() throws Exception { } } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings("unchecked") protected void callUserFunction() throws Exception { StreamWindow nextWindow = nextObject; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java index 0ad1605daa8c7..79e81193b3887 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java @@ -53,7 +53,7 @@ public class MultiEvictionPolicy implements ActiveEvictionPolicy { * */ public enum EvictionStrategy { - MIN, MAX, SUM, PRIORITY; + MIN, MAX, SUM, PRIORITY } private List> allEvictionPolicies; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java index f44bd12f1fa5a..08c431b5b2295 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java @@ -31,7 +31,7 @@ public class FieldsPartitioner extends StreamPartitioner { private static final long serialVersionUID = 1L; - private int[] returnArray = new int[1];; + private int[] returnArray = new int[1]; KeySelector keySelector; public FieldsPartitioner(KeySelector keySelector) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java index cd5b9c22dbf9f..3af7c7a6704af 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java @@ -27,7 +27,7 @@ public abstract class StreamPartitioner implements public enum PartitioningStrategy { - FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY; + FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY }