From 86096adb688d9233c77986c3012298ae00bfbba0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Hermann?= Date: Thu, 6 Aug 2015 15:42:54 +0200 Subject: [PATCH] [FLINK-2286] [streaming] Wrapped ParallelMerge into stream operator --- .../api/datastream/DiscretizedStream.java | 8 ++-- .../api/operators/co/CoStreamFlatMap.java | 4 ++ .../operators/windowing/ParallelMerge.java | 3 ++ .../windowing/ParallelMergeOperator.java | 43 +++++++++++++++++++ 4 files changed, 53 insertions(+), 5 deletions(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java index ba28fa4f1946f..e35592e60f32d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java @@ -30,11 +30,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.functions.WindowMapFunction; -import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamFilter; import org.apache.flink.streaming.api.operators.StreamFlatMap; -import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap; import org.apache.flink.streaming.api.operators.windowing.EmptyWindowFilter; import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge; import org.apache.flink.streaming.api.operators.windowing.ParallelMerge; @@ -45,6 +43,7 @@ import org.apache.flink.streaming.api.operators.windowing.WindowPartExtractor; import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner; import org.apache.flink.streaming.api.operators.windowing.WindowReducer; +import org.apache.flink.streaming.api.operators.windowing.ParallelMergeOperator; import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo; import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey; @@ -147,7 +146,7 @@ protected DiscretizedStream timeReduce(ReduceFunction reduceFunction) DataStream> numOfParts, DiscretizedStream reduced, ReduceFunction reduceFunction) { - CoFlatMapFunction, Tuple2, StreamWindow> parallelMerger = isGrouped() ? new ParallelGroupedMerge() + ParallelMerge parallelMerger = isGrouped() ? new ParallelGroupedMerge() : new ParallelMerge(reduceFunction); return reduced.discretizedStream @@ -156,8 +155,7 @@ protected DiscretizedStream timeReduce(ReduceFunction reduceFunction) .transform( "CoFlatMap", reduced.discretizedStream.getType(), - new CoStreamFlatMap, Tuple2, StreamWindow>( - parallelMerger)); + new ParallelMergeOperator(parallelMerger)); } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java index d2bd107d55cfd..1448ab8ebed3f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java @@ -81,4 +81,8 @@ public void processWatermark2(Watermark mark) throws Exception { output.emitWatermark(new Watermark(combinedWatermark)); } } + + protected TimestampedCollector getCollector() { + return collector; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java index cd239fc66c106..ce7d887530d5e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java @@ -139,4 +139,7 @@ public void open(Configuration conf) { this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks(); } + Map, Integer>> getReceivedWindows() { + return receivedWindows; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java new file mode 100644 index 0000000000000..74df3ad21b10b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.windowing; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap; +import org.apache.flink.streaming.api.windowing.StreamWindow; + +public class ParallelMergeOperator extends CoStreamFlatMap, Tuple2, StreamWindow> { + + private ParallelMerge parallelMerge; + + public ParallelMergeOperator(ParallelMerge parallelMerge) { + super(parallelMerge); + this.parallelMerge = parallelMerge; + } + + @Override + public void close() throws Exception { + // emit remaining (partial) windows + + for (Tuple2, Integer> receivedWindow : parallelMerge.getReceivedWindows().values()) { + getCollector().collect(receivedWindow.f0); + } + + super.close(); + } +}