From 76d4fbffcf79ecf53b5bd2c666186e3a6d41ab34 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2015 12:43:30 +0200 Subject: [PATCH] [streaming] Fast calculation of medians of windows --- .../api/datastream/WindowedDataStream.java | 51 ++++ .../streaming/api/windowing/WindowUtils.java | 2 +- .../windowbuffer/MedianGroupedPreReducer.java | 124 +++++++++ .../windowbuffer/MedianPreReducer.java | 241 ++++++++++++++++++ .../MedianGroupedPreReducerTest.java | 127 +++++++++ .../api/scala/WindowedDataStream.scala | 26 ++ 6 files changed, 570 insertions(+), 1 deletion(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianGroupedPreReducer.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianPreReducer.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianGroupedPreReducerTest.java diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 9565f4b773fc73..2f043dd7fbe114 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -71,6 +71,8 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer; import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer; import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; +import org.apache.flink.streaming.api.windowing.windowbuffer.MedianPreReducer; +import org.apache.flink.streaming.api.windowing.windowbuffer.MedianGroupedPreReducer; import org.apache.flink.streaming.util.keys.KeySelectorUtil; /** @@ -590,6 +592,55 @@ private WindowBuffer getWindowBuffer(WindowTransformation transformation) { } } + /** + * Gives the median of the current window at the specified field at every trigger. + * The type of the field can only be Double (as the median of integers might be a fractional number). + * + * The median is updated online as the window changes, and the runtime of + * one update is logarithmic with the current window size. + * + * @param pos + * The position in the tuple/array to calculate the median of + * @return The transformed DataStream. + */ + @SuppressWarnings("unchecked") + public DiscretizedStream median(int pos) { + WindowBuffer windowBuffer; + if (groupByKey == null) { + windowBuffer = new MedianPreReducer(pos, getType(), getExecutionConfig()); + } else { + windowBuffer = new MedianGroupedPreReducer(pos, getType(), getExecutionConfig(), groupByKey); + } + return discretize(WindowTransformation.MEDIAN, windowBuffer); + } + + /** + * Gives the median of the current window at the specified field at every trigger. + * The type of the field can only be Double (as the median of integers might be a fractional number). + * + * The field is given by a field expression that is either + * the name of a public field or a getter method with parentheses of the + * stream's underlying type. A dot can be used to drill down into objects, + * as in {@code "field1.getInnerField2()" }. + * + * The median is updated online as the window changes, and the runtime of + * one update is logarithmic with the current window size. + * + * @param field + * The field to calculate the median of + * @return The transformed DataStream. + */ + @SuppressWarnings("unchecked") + public DiscretizedStream median(String field) { + WindowBuffer windowBuffer; + if (groupByKey == null) { + windowBuffer = new MedianPreReducer(field, getType(), getExecutionConfig()); + } else { + windowBuffer = new MedianGroupedPreReducer(field, getType(), getExecutionConfig(), groupByKey); + } + return discretize(WindowTransformation.MEDIAN, windowBuffer); + } + /** * Applies an aggregation that sums every window of the data stream at the * given position. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java index dd62a4438283e9..c18fad9a37fc32 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java @@ -32,7 +32,7 @@ public class WindowUtils { public enum WindowTransformation { - REDUCEWINDOW, MAPWINDOW, FOLDWINDOW, NONE; + REDUCEWINDOW, MAPWINDOW, FOLDWINDOW, MEDIAN, NONE; private Function UDF; public WindowTransformation with(Function UDF) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianGroupedPreReducer.java new file mode 100644 index 00000000000000..e82b468bcb24bb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianGroupedPreReducer.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.windowbuffer; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.StreamWindow; +import org.apache.flink.streaming.util.FieldAccessor; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.Map; +import java.util.TreeMap; +import java.util.Queue; +import java.util.ArrayDeque; +import java.util.Iterator; + +/** + * Grouped pre-reducer for calculating median with any eviction policy. + * + * It stores a MedianPreReducer for every group. + */ +public class MedianGroupedPreReducer extends WindowBuffer implements PreAggregator, Serializable { + + private static final long serialVersionUID = 1L; + + // This is for getting and setting the field specified in the parameters to the median call. + private FieldAccessor fieldAccessor; + + private TypeSerializer serializer; + + private KeySelector keySelector; + + // PreReducers for the individual groups + private Map> groupPreReducers = new TreeMap>(); + + // Holds a reference for the groupPreReducer belonging to each element currently in the window. + // (This is used at evict, so that we have to index into groupPreReducers only on store.) + private Queue> groupPreReducerPerElement = new ArrayDeque>(); + + public MedianGroupedPreReducer(FieldAccessor fieldAccessor, TypeSerializer serializer, + KeySelector keySelector) { + this.fieldAccessor = fieldAccessor; + this.serializer = serializer; + this.keySelector = keySelector; + } + + public MedianGroupedPreReducer(int pos, TypeInformation typeInfo, ExecutionConfig config, + KeySelector keySelector) { + this.fieldAccessor = FieldAccessor.create(pos, typeInfo, config); + this.serializer = typeInfo.createSerializer(config); + this.keySelector = keySelector; + } + + public MedianGroupedPreReducer(String field, TypeInformation typeInfo, ExecutionConfig config, + KeySelector keySelector) { + this.fieldAccessor = FieldAccessor.create(field, typeInfo, config); + this.serializer = typeInfo.createSerializer(config); + this.keySelector = keySelector; + } + + @Override + public void store(T elem) throws Exception { + Object key = keySelector.getKey(elem); + MedianPreReducer groupPreReducer = groupPreReducers.get(key); + if(groupPreReducer == null) { // Group doesn't exist yet, create it. + groupPreReducer = new MedianPreReducer(fieldAccessor, serializer); + groupPreReducers.put(key, groupPreReducer); + } + groupPreReducer.store(elem); + groupPreReducerPerElement.add(groupPreReducer); + } + + @Override + public void evict(int n) { + for (int i = 0; i < n; i++) { + MedianPreReducer groupPreReducer = groupPreReducerPerElement.poll(); + if(groupPreReducer == null) { + break; + } + groupPreReducer.evict(1); + } + } + + @Override + public void emitWindow(Collector> collector) { + StreamWindow currentWindow = createEmptyWindow(); + for(Iterator> it = groupPreReducers.values().iterator(); it.hasNext(); ) { + MedianPreReducer groupPreReducer = it.next(); + T groupMedian = groupPreReducer.getMedian(); + if(groupMedian != null) { + currentWindow.add(groupMedian); + } else { + it.remove(); // Remove groups that don't contain elements, to not leak memory for long ago seen groups. + } + } + if(!currentWindow.isEmpty() || emitEmpty) { + collector.collect(currentWindow); + } + } + + @Override + public WindowBuffer clone() { + return new MedianGroupedPreReducer(fieldAccessor, serializer, keySelector); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianPreReducer.java new file mode 100644 index 00000000000000..8a7c8ddce07b30 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianPreReducer.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.windowbuffer; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.windowing.StreamWindow; +import org.apache.flink.util.Collector; +import org.apache.flink.streaming.util.FieldAccessor; + +import java.util.TreeMap; +import java.util.Map.Entry; +import java.util.Deque; +import java.util.ArrayDeque; +import java.util.Comparator; +import java.io.Serializable; + +/** + * Non-grouped pre-reducer for calculating median with any eviction policy. + */ +public class MedianPreReducer extends WindowBuffer implements PreAggregator, Serializable { + + private static final long serialVersionUID = 1L; + + // This is for getting and setting the field specified in the parameters to the median call. + private FieldAccessor fieldAccessor; + + private TypeSerializer serializer; + + // These contain lower-than-median and higher-than-median elements. + // Elements that compare equal to the median can be in either. + // The invariant is that low always contains the same amount or one more element than high. + // (see the assert in updateMedian) + TreeMultiset + low = new TreeMultiset(new CompareOnField()), + high = new TreeMultiset(new CompareOnField()); + Deque elements = new ArrayDeque(); + + T median; + + public T getMedian() { + return median; + } + + public MedianPreReducer(FieldAccessor fieldAccessor, TypeSerializer serializer) { + this.fieldAccessor = fieldAccessor; + this.serializer = serializer; + } + + public MedianPreReducer(int pos, TypeInformation typeInfo, ExecutionConfig config) { + this.fieldAccessor = FieldAccessor.create(pos, typeInfo, config); + this.serializer = typeInfo.createSerializer(config); + } + + public MedianPreReducer(String field, TypeInformation typeInfo, ExecutionConfig config) { + this.fieldAccessor = FieldAccessor.create(field, typeInfo, config); + this.serializer = typeInfo.createSerializer(config); + } + + private void updateMedian() { + assert low.size() == high.size() || low.size() == high.size() + 1; + if(low.size() == 0) { + median = null; + } else if(low.size() == high.size()) { + // This is essentially (low.last + high.first) / 2, but we have to drill down to the double field + median = serializer.copy(elements.getLast()); + median = fieldAccessor.set(median, (fieldAccessor.get(low.last()) + fieldAccessor.get(high.first())) / 2); + } else { + // low.last + median = serializer.copy(elements.getLast()); + median = fieldAccessor.set(median, fieldAccessor.get(low.last())); + } + } + + private void moveUpIfNeccessary() { + if(low.size() > high.size() + 1) { + high.add(low.pollLast()); + } + } + private void moveDownIfNeccessary() { + if(low.size() < high.size()) { + low.add(high.pollFirst()); + } + } + + @Override + public void store(T elem) throws Exception { + if(median == null) { + low.add(elem); + } else if(fieldAccessor.get(elem) <= fieldAccessor.get(median)) { + low.add(elem); + moveUpIfNeccessary(); + } else if(fieldAccessor.get(elem) > fieldAccessor.get(median)) { + high.add(elem); + moveDownIfNeccessary(); + } + elements.addLast(elem); + updateMedian(); + } + + @Override + public void evict(int n) { + for(int i = 0; i < n; i++) { + T elem = elements.pollFirst(); + if(elem == null) { + break; + } + if(low.contains(elem)) { + low.removeOne(elem); + moveDownIfNeccessary(); + } else if(high.contains(elem)){ + high.removeOne(elem); + moveUpIfNeccessary(); + } else { + throw new RuntimeException("Internal error in MedianPreReducer"); + } + } + updateMedian(); + } + + @Override + public void emitWindow(Collector> collector) { + if (median != null) { + StreamWindow currentWindow = createEmptyWindow(); + currentWindow.add(median); + collector.collect(currentWindow); + } else if (emitEmpty) { + collector.collect(createEmptyWindow()); + } + } + + @Override + public WindowBuffer clone() { + return new MedianPreReducer(fieldAccessor, serializer); + } + + private class CompareOnField implements Comparator, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int compare(T a, T b) { + try { + return fieldAccessor.get(a).compareTo(fieldAccessor.get(b)); + } catch (ClassCastException e) { + throw new RuntimeException("ClassCastException in MedianPreReducer: " + e.getMessage() + + "\nWindowedDataStream.median only supports Double fields!"); + } + } + } + + // This multiset class is implemented by storing the counts of the elements in a TreeMap. + static class TreeMultiset extends TreeMap { + + int size = 0; + + @Override + public int size() { + return size; + } + + TreeMultiset(Comparator comparator) { + super(comparator); + } + + T first() { + return firstKey(); + } + + T last() { + return lastKey(); + } + + T pollFirst() { + if(!isEmpty()) { + size--; + } + Entry first = firstEntry(); + if(first.getValue() > 1) { + put(first.getKey(), first.getValue() - 1); + return first.getKey(); + } else { + return pollFirstEntry().getKey(); + } + } + + T pollLast() { + if(!isEmpty()) { + size--; + } + Entry last = lastEntry(); + if(last.getValue() > 1) { + put(last.getKey(), last.getValue() - 1); + return last.getKey(); + } else { + return pollLastEntry().getKey(); + } + } + + void add(T elem) { + size++; + Integer oldCount = get(elem); + if(oldCount == null) { + oldCount = 0; + } + put(elem, oldCount + 1); + } + + Boolean contains(T elem) { + return get(elem) != null; + } + + void removeOne(T elem) { + assert contains(elem); + + size--; + Integer oldCount = get(elem); + if(oldCount > 1) { + put(elem, oldCount - 1); + } else { + remove(elem); + } + } + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianGroupedPreReducerTest.java new file mode 100644 index 00000000000000..22d79d24a8937f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/MedianGroupedPreReducerTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.windowbuffer; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.windowing.StreamWindow; +import org.junit.Test; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Random; +import java.util.ArrayList; +import java.util.Collections; +import java.util.TreeMap; +import java.util.Map.Entry; + +import static org.junit.Assert.assertEquals; + +// As the MedianGroupedPreReducer uses the non-grouped MedianPreReducer, this also tests that class. +public class MedianGroupedPreReducerTest { + + TypeInformation> typeInfo = TypeExtractor.getForObject(new Tuple2(1, 1.0)); + + @SuppressWarnings("unchecked") + @Test + public void testMedianPreReducer() throws Exception { + Random rnd = new Random(42); + + ArrayDeque > inputs = new ArrayDeque >(); + + KeySelector, Integer> keySelector = new KeySelector, Integer>() { + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + MedianGroupedPreReducer> preReducer = + new MedianGroupedPreReducer>(1, typeInfo, null, keySelector); + + TestCollector>> collector = + new TestCollector>>(); + + List>> collected = collector.getCollected(); + + // We do a bunch of random operations, and check the results by also naively calculating the median. + int N = 10000; + for(int i = 0; i < N; i++) { + switch (rnd.nextInt(3)) { + case 0: // store + Tuple2 elem; + Integer group = rnd.nextInt(5); + // We sometimes add doubles and sometimes add small integers. + // The latter is for ensuring that it happens that there are duplicate elements, + // so that we test TreeMultiset properly. + if(rnd.nextDouble() < 0.5) { + elem = new Tuple2(group, (double)rnd.nextInt(5)); + } else { + elem = new Tuple2(group, rnd.nextDouble()); + } + inputs.addLast(elem); + preReducer.store(elem); + break; + case 1: // evict + int howMany = rnd.nextInt(2) + 1; + int howMany2 = Math.min(howMany, inputs.size()); + for(int j = 0; j < howMany2; j++) { + inputs.removeFirst(); + } + preReducer.evict(howMany); + break; + case 2: // emitWindow + if(inputs.size() > 0) { + // Calculate the correct medians: + // The inputs are split into groups into inputDoublesPerGroup, + // then the medians are calculated per group into correctMedians. + TreeMap> inputDoublesPerGroup = new TreeMap>(); + for (Tuple2 e : inputs) { + Integer key = keySelector.getKey(e); + ArrayList a = inputDoublesPerGroup.get(key); + if(a == null) { // Group doesn't exist yet, create it. + a = new ArrayList(); + inputDoublesPerGroup.put(key, a); + } + a.add(e.f1); + } + TreeMap correctMedians = new TreeMap(); + for(Entry> e: inputDoublesPerGroup.entrySet()) { + ArrayList doublesInGroup = e.getValue(); + Collections.sort(doublesInGroup); + int half = doublesInGroup.size() / 2; + correctMedians.put( + e.getKey(), + doublesInGroup.size() % 2 == 1 ? + doublesInGroup.get(half) : + (doublesInGroup.get(half) + doublesInGroup.get(half - 1)) / 2); + } + + preReducer.emitWindow(collector); + for(Tuple2 e: collected.get(collected.size() - 1)){ + assertEquals( + correctMedians.get(keySelector.getKey(e)), + e.f1); + } + } + break; + } + } + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala index f4d21548982e68..cd76ab5b85adaa 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala @@ -335,4 +335,30 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { javaStream.getDiscretizedStream.getExecutionEnvironment).scalaClean(f) } + /** + * Gives the median of the current window at the specified field at every trigger. + * The type of the field can only be Double (as the median of integers might be a + * fractional number). + * + * The median is updated online as the window changes, and the runtime of + * one update is logarithmic with the current window size. + * + */ + def median(pos: Int): WindowedDataStream[T] = { + javaStream.median(pos) + } + + /** + * Gives the median of the current window at the specified field at every trigger. + * The type of the field can only be Double (as the median of integers might be a + * fractional number). + * + * The median is updated online as the window changes, and the runtime of + * one update is logarithmic with the current window size. + * + */ + def median(field: String): WindowedDataStream[T] = { + javaStream.median(field) + } + }