From 7e050750bdce5ba89f141e4d2a25ee0de51fc7e9 Mon Sep 17 00:00:00 2001 From: mbalassi Date: Tue, 19 May 2015 15:51:34 +0200 Subject: [PATCH 1/2] [FLINK-2007] [streaming] Proper Delta policy serialization Closes #697 --- .../streaming/api/datastream/DataStream.java | 2 + .../api/datastream/WindowedDataStream.java | 1 + .../streaming/api/windowing/helper/Count.java | 2 +- .../streaming/api/windowing/helper/Delta.java | 43 +++++++++------ .../api/windowing/helper/FullStream.java | 2 +- .../streaming/api/windowing/helper/Time.java | 2 +- .../api/windowing/helper/WindowingHelper.java | 28 ++++++++-- .../api/windowing/policy/DeltaPolicy.java | 54 +++++++++++++++---- .../api/complex/ComplexIntegrationTest.java | 15 ++++-- .../api/windowing/policy/DeltaPolicyTest.java | 51 +++++++++--------- 10 files changed, 140 insertions(+), 60 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index dbb9b05c4bcd4..5165ec7aa7307 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -941,6 +941,7 @@ public StreamJoinOperator join(DataStream dataStreamToJoin) */ @SuppressWarnings({ "rawtypes", "unchecked" }) public WindowedDataStream window(WindowingHelper policyHelper) { + policyHelper.setExecutionConfig(getExecutionConfig()); return new WindowedDataStream(this, policyHelper); } @@ -972,6 +973,7 @@ public WindowedDataStream window(TriggerPolicy trigger, EvictionPolicy */ @SuppressWarnings("rawtypes") public WindowedDataStream every(WindowingHelper policyHelper) { + policyHelper.setExecutionConfig(getExecutionConfig()); return window(FullStream.window()).every(policyHelper); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index fd11d94539c45..a10c79ead9074 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -149,6 +149,7 @@ public WindowedDataStream() { */ @SuppressWarnings({ "unchecked", "rawtypes" }) public WindowedDataStream every(WindowingHelper policyHelper) { + policyHelper.setExecutionConfig(getExecutionConfig()); WindowedDataStream ret = this.copy(); if (ret.evictionHelper == null) { ret.evictionHelper = ret.triggerHelper; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java index 840546f0bac0d..3266a245002db 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java @@ -27,7 +27,7 @@ * {@link Count#of(int)} to get an instance. */ @SuppressWarnings("rawtypes") -public class Count implements WindowingHelper { +public class Count extends WindowingHelper { private int count; private int deleteOnEviction = 1; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java index 5434a4e6f86c4..bcb548f35c085 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java @@ -17,6 +17,9 @@ package org.apache.flink.streaming.api.windowing.helper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; @@ -30,23 +33,23 @@ * the data type handled by the delta function represented by this * helper. */ -public class Delta implements WindowingHelper { +public class Delta extends WindowingHelper { private DeltaFunction deltaFunction; private DATA initVal; private double threshold; + private TypeSerializer typeSerializer; /** * Creates a delta helper representing a delta count or eviction policy - * * @param deltaFunction - * The delta function which should be used to calculate the delta - * between points. + * The delta function which should be used to calculate the delta + * points. * @param initVal - * The initial value which will be used to calculate the first - * delta. + * The initial value which will be used to calculate the first + * delta. * @param threshold - * The threshold used by the delta function. + * The threshold used by the delta function. */ public Delta(DeltaFunction deltaFunction, DATA initVal, double threshold) { this.deltaFunction = deltaFunction; @@ -56,12 +59,14 @@ public Delta(DeltaFunction deltaFunction, DATA initVal, double threshold) @Override public EvictionPolicy toEvict() { - return new DeltaPolicy(deltaFunction, initVal, threshold); + instantiateTypeSerializer(); + return new DeltaPolicy(deltaFunction, initVal, threshold, typeSerializer); } @Override public TriggerPolicy toTrigger() { - return new DeltaPolicy(deltaFunction, initVal, threshold); + instantiateTypeSerializer(); + return new DeltaPolicy(deltaFunction, initVal, threshold, typeSerializer); } /** @@ -73,19 +78,27 @@ public TriggerPolicy toTrigger() { * buffer and removes all elements from the buffer which have a higher delta * then the threshold. As soon as there is an element with a lower delta, * the eviction stops. - * + * * @param deltaFunction - * The delta function which should be used to calculate the delta - * between points. + * The delta function which should be used to calculate the delta + * points. * @param initVal - * The initial value which will be used to calculate the first - * delta. + * The initial value which will be used to calculate the first + * delta. * @param threshold - * The threshold used by the delta function. + * The threshold used by the delta function. * @return Helper representing a delta trigger or eviction policy */ public static Delta of(double threshold, DeltaFunction deltaFunction, DATA initVal) { return new Delta(deltaFunction, initVal, threshold); } + + private void instantiateTypeSerializer(){ + if (executionConfig == null){ + throw new UnsupportedOperationException("ExecutionConfig has to be set to instantiate TypeSerializer."); + } + TypeInformation typeInformation = TypeExtractor.getForObject(initVal); + typeSerializer = typeInformation.createSerializer(executionConfig); + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java index 3508b26c7847a..7773d9a91bb67 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java @@ -28,7 +28,7 @@ * policy and only with operations that support pre-aggregator such as reduce or * aggregations. */ -public class FullStream implements WindowingHelper, Serializable { +public class FullStream extends WindowingHelper implements Serializable { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java index da8d92943ca37..0089d26a63c0d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java @@ -33,7 +33,7 @@ * The data type which is handled by the time stamp used in the * policy represented by this helper */ -public class Time implements WindowingHelper { +public class Time extends WindowingHelper { protected long length; protected TimeUnit granularity; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java index 9df843279749a..17e142a839a97 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.windowing.helper; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; @@ -30,10 +31,31 @@ * @see Time * @see Delta */ -public interface WindowingHelper { +public abstract class WindowingHelper { - public EvictionPolicy toEvict(); + /** + * Provides information for initial value serialization + * in {@link Delta}, unused in other subclasses. + */ + protected ExecutionConfig executionConfig; - public TriggerPolicy toTrigger(); + /** + * Method for encapsulating the {@link EvictionPolicy}. + * @return the eviction policy + */ + public abstract EvictionPolicy toEvict(); + /** + * Method for encapsulating the {@link TriggerPolicy}. + * @return the trigger policy + */ + public abstract TriggerPolicy toTrigger(); + + /** + * Setter for the {@link ExecutionConfig} field. + * @param executionConfig Desired value + */ + public final void setExecutionConfig(ExecutionConfig executionConfig){ + this.executionConfig = executionConfig; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java index 93fc636bb3eee..69dd66f47ad54 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java @@ -17,9 +17,15 @@ package org.apache.flink.streaming.api.windowing.policy; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.LinkedList; import java.util.List; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; /** @@ -47,34 +53,45 @@ public class DeltaPolicy implements CloneableTriggerPolicy, */ private static final long serialVersionUID = -7797538922123394967L; + //Used for serializing the threshold + private final static int INITIAL_SERIALIZER_BYTES = 1024; + protected DeltaFunction deltaFuntion; private List windowBuffer; protected double threshold; - protected DATA triggerDataPoint; + private TypeSerializer typeSerializer; + protected transient DATA triggerDataPoint; /** - * Crates a delta policy which calculates a delta between the data point + * Creates a delta policy which calculates a delta between the data point * which triggered last and the currently arrived data point. It triggers if - * the delta is higher than a specified threshold. - * + * the delta is higher than a specified threshold. As the data may be sent to + * the cluster a {@link TypeSerializer} is needed for the initial value. + * + *

* In case it gets used for eviction, this policy starts from the first * element of the buffer and removes all elements from the buffer which have * a higher delta then the threshold. As soon as there is an element with a * lower delta, the eviction stops. - * + *

+ * * @param deltaFuntion - * The delta function to be used. + * The delta function to be used. * @param init - * The initial to be used for the calculation of a delta before - * the first trigger. + * The initial to be used for the calculation of a delta before + * the first trigger. * @param threshold - * The threshold upon which a triggering should happen. + * The threshold upon which a triggering should happen. + * @param typeSerializer + * TypeSerializer to properly forward the initial value to + * the cluster */ - public DeltaPolicy(DeltaFunction deltaFuntion, DATA init, double threshold) { + public DeltaPolicy(DeltaFunction deltaFuntion, DATA init, double threshold, TypeSerializer typeSerializer) { this.deltaFuntion = deltaFuntion; this.triggerDataPoint = init; this.windowBuffer = new LinkedList(); this.threshold = threshold; + this.typeSerializer = typeSerializer; } @Override @@ -107,7 +124,7 @@ public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) { @Override public DeltaPolicy clone() { - return new DeltaPolicy(deltaFuntion, triggerDataPoint, threshold); + return new DeltaPolicy(deltaFuntion, triggerDataPoint, threshold, typeSerializer); } @Override @@ -131,4 +148,19 @@ public boolean equals(Object other) { public String toString() { return "DeltaPolicy(" + threshold + ", " + deltaFuntion.getClass().getSimpleName() + ")"; } + + private void writeObject(ObjectOutputStream stream) throws IOException{ + stream.defaultWriteObject(); + DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(INITIAL_SERIALIZER_BYTES); + typeSerializer.serialize(triggerDataPoint, dataOutputSerializer); + stream.write(dataOutputSerializer.getByteArray()); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + byte[] bytes = new byte[stream.available()]; + stream.readFully(bytes); + triggerDataPoint = typeSerializer.deserialize(new DataInputDeserializer(bytes, 0, bytes.length)); + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java index 7eb263af1ac8a..738654abdf38b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; @@ -35,6 +36,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; import org.apache.flink.streaming.api.windowing.helper.Count; @@ -611,10 +613,15 @@ public void flatMap(Long value, Collector out) throws Exception { } } - private static class RectangleSource implements SourceFunction { + private static class RectangleSource extends RichSourceFunction { private static final long serialVersionUID = 1L; - RectangleClass rectangle = new RectangleClass(100, 100); - int cnt = 0; + private transient RectangleClass rectangle; + private transient int cnt; + + public void open(Configuration parameters) throws Exception { + rectangle = new RectangleClass(100, 100); + cnt = 0; + } @Override public boolean reachedEnd() throws Exception { @@ -764,7 +771,7 @@ public String toString() { } } - public static class RectangleClass implements Serializable { + public static class RectangleClass { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java index 52a4d132d2fc8..448377dcfceb6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java @@ -1,25 +1,25 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.flink.streaming.api.windowing.policy; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; -import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy; import org.junit.Test; import java.util.List; @@ -29,6 +29,9 @@ public class DeltaPolicyTest { + //Dummy serializer, this is not used because the tests are done locally + private final static TypeSerializer> SERIALIZER = null; + @SuppressWarnings({ "serial", "unchecked", "rawtypes" }) @Test public void testDelta() { @@ -38,7 +41,7 @@ public double getDelta(Tuple2 oldDataPoint, Tuple2 newDataPoint) { return (double) newDataPoint.f0 - oldDataPoint.f0; } - }, new Tuple2(0, 0), 2); + }, new Tuple2(0, 0), 2, SERIALIZER); List tuples = Arrays.asList(new Tuple2(1, 0), new Tuple2(2, 0), new Tuple2(3, 0), new Tuple2(6, 0)); @@ -70,16 +73,16 @@ public double getDelta(Tuple2 oldDataPoint, }; assertEquals(new DeltaPolicy>(df, new Tuple2(0, - 0), 2), new DeltaPolicy>(df, new Tuple2( - 0, 0), 2)); + 0), 2, SERIALIZER), new DeltaPolicy>(df, new Tuple2( + 0, 0), 2, SERIALIZER)); assertNotEquals(new DeltaPolicy>(df, new Tuple2( - 0, 1), 2), new DeltaPolicy>(df, - new Tuple2(0, 0), 2)); - + 0, 1), 2, SERIALIZER), new DeltaPolicy>(df, + new Tuple2(0, 0), 2, SERIALIZER)); + assertNotEquals(new DeltaPolicy>(df, new Tuple2(0, - 0), 2), new DeltaPolicy>(df, new Tuple2( - 0, 0), 3)); + 0), 2, SERIALIZER), new DeltaPolicy>(df, new Tuple2( + 0, 0), 3, SERIALIZER)); } } \ No newline at end of file From 31041e344c2b00bdc43822bf7287ff47d67f7284 Mon Sep 17 00:00:00 2001 From: Ajay Bhat Date: Sun, 24 May 2015 11:00:09 +0530 Subject: [PATCH 2/2] [FLINK-2018] Add ParameterUtil.fromGenericOptionsParser() --- .../apache/flink/api/java/utils/ParameterTool.java | 12 ++++++++++++ .../flink/api/java/utils/ParameterToolTest.java | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java index d1c1f492569da..98ad0d01c8c51 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java @@ -18,9 +18,11 @@ package org.apache.flink.api.java.utils; import com.google.common.base.Preconditions; +import org.apache.commons.cli.Option; import org.apache.commons.lang3.math.NumberUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.util.GenericOptionsParser; import java.io.File; import java.io.FileInputStream; @@ -139,6 +141,16 @@ public static ParameterTool fromSystemProperties() { return fromMap((Map) System.getProperties()); } + public static ParameterTool fromGenericOptionsParser(String[] args) throws IOException { + Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions(); + Map map = new HashMap(); + for (Option option : options) { + String[] split = option.getValue().split("="); + map.put(split[0], split[1]); + } + return fromMap(map); + } + // ------------------ ParameterUtil ------------------------ protected final Map data; protected final HashMap defaultData; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java index c660b7a18871e..8987c97841628 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java @@ -150,6 +150,12 @@ public void testMerged() { validate(parameter); } + @Test + public void testFromGenericOptionsParser() throws IOException { + ParameterTool parameter = ParameterTool.fromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"}); + validate(parameter); + } + private void validate(ParameterTool parameter) { ClosureCleaner.ensureSerializable(parameter); Assert.assertEquals("myInput", parameter.getRequired("input"));