From a81021a763afe5bd5e36f90b46466cfd4736b9e6 Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Mon, 19 Nov 2018 16:05:37 +0000 Subject: [PATCH] [Java Streamlet API] Extend Validations Part II (#3111) * Extends Streamlet Java API Validations Part II * Set otherStreamlet parameter name as more meaningful --- heron/api/src/java/BUILD | 13 ++- .../heron/streamlet/impl/BuilderImpl.java | 8 +- .../heron/streamlet/impl/StreamletImpl.java | 80 +++++++++++++++---- .../impl/streamlets/CustomStreamlet.java | 3 +- .../streamlet/impl/utils/StreamletUtils.java | 29 +++++++ .../impl/utils/StreamletUtilsTest.java | 39 ++++++++- 6 files changed, 145 insertions(+), 27 deletions(-) diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD index cdab255fcfe..0ae795f8e32 100644 --- a/heron/api/src/java/BUILD +++ b/heron/api/src/java/BUILD @@ -33,7 +33,8 @@ java_library( javacopts = DOCLINT_HTML_AND_SYNTAX, deps = api_deps_files + [ ":api-java-low-level", - "//third_party/java:kryo-neverlink" + "//third_party/java:kryo-neverlink", + "@org_apache_commons_commons_lang3//jar" ] ) @@ -42,13 +43,19 @@ java_library( name = "api-java-low-level-functional", javacopts = DOCLINT_HTML_AND_SYNTAX, srcs = glob(["org/apache/heron/api/**/*.java", "org/apache/heron/streamlet/**/*.java"]), - deps = api_deps_files + ["//third_party/java:kryo-neverlink"] + deps = api_deps_files + [ + "//third_party/java:kryo-neverlink", + "@org_apache_commons_commons_lang3//jar" + ] ) java_binary( name = "api-unshaded", srcs = glob(["org/apache/heron/api/**/*.java", "org/apache/heron/streamlet/**/*.java"]), - deps = api_deps_files + ["//third_party/java:kryo-neverlink"] + deps = api_deps_files + [ + "//third_party/java:kryo-neverlink", + "@org_apache_commons_commons_lang3//jar" + ] ) jarjar_binary( diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java index 61715e8528e..029b49e7969 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java @@ -30,7 +30,8 @@ import org.apache.heron.streamlet.SerializableSupplier; import org.apache.heron.streamlet.Source; import org.apache.heron.streamlet.Streamlet; -import org.apache.heron.streamlet.impl.utils.StreamletUtils; + +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull; /** * BuilderImpl implements the Builder interface. @@ -46,7 +47,6 @@ public BuilderImpl() { @Override public Streamlet newSource(SerializableSupplier supplier) { - StreamletUtils.require(supplier != null, "supplier must not be null."); StreamletImpl retval = StreamletImpl.createSupplierStreamlet(supplier); sources.add(retval); return retval; @@ -54,7 +54,6 @@ public Streamlet newSource(SerializableSupplier supplier) { @Override public Streamlet newSource(Source generator) { - StreamletUtils.require(generator != null, "source must not be null."); StreamletImpl retval = StreamletImpl.createGeneratorStreamlet(generator); sources.add(retval); return retval; @@ -62,7 +61,6 @@ public Streamlet newSource(Source generator) { @Override public Streamlet newSource(IRichSpout spout) { - StreamletUtils.require(spout != null, "spout must not be null."); StreamletImpl retval = StreamletImpl.createSpoutStreamlet(spout); sources.add(retval); return retval; @@ -78,6 +76,8 @@ public TopologyBuilder build() { } public TopologyBuilder build(TopologyBuilder builder) { + checkNotNull(builder, "builder cannot not be null"); + Set stageNames = new HashSet<>(); for (StreamletImpl streamlet : sources) { streamlet.build(builder, stageNames); diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java index a05040b6a95..f382bcd4cf2 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java @@ -17,7 +17,6 @@ * under the License. */ - package org.apache.heron.streamlet.impl; import java.util.ArrayList; @@ -59,7 +58,10 @@ import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet; import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet; import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet; -import org.apache.heron.streamlet.impl.utils.StreamletUtils; + +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank; +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull; +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require; /** * A Streamlet is a (potentially unbounded) ordered collection of tuples. @@ -151,8 +153,8 @@ public List> getChildren() { */ @Override public Streamlet setName(String sName) { - StreamletUtils.require(sName != null && !sName.trim().isEmpty(), - "Streamlet name cannot be null/blank"); + checkNotBlank(sName, "Streamlet name cannot be null/blank"); + this.name = sName; return this; } @@ -190,8 +192,8 @@ protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set stag */ @Override public Streamlet setNumPartitions(int numPartitions) { - StreamletUtils.require(numPartitions > 0, - "Streamlet's partitions number should be > 0"); + require(numPartitions > 0, "Streamlet's partitions number should be > 0"); + this.nPartitions = numPartitions; return this; } @@ -255,6 +257,8 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set sta * @param supplier The Supplier function to generate the elements */ static StreamletImpl createSupplierStreamlet(SerializableSupplier supplier) { + checkNotNull(supplier, "supplier cannot not be null"); + return new SupplierStreamlet(supplier); } @@ -263,6 +267,8 @@ static StreamletImpl createSupplierStreamlet(SerializableSupplier supp * @param generator The Generator function to generate the elements */ static StreamletImpl createGeneratorStreamlet(Source generator) { + checkNotNull(generator, "generator cannot not be null"); + return new SourceStreamlet(generator); } @@ -271,6 +277,8 @@ static StreamletImpl createGeneratorStreamlet(Source generator) { * @param spout The Spout function to generate the elements */ static StreamletImpl createSpoutStreamlet(IRichSpout spout) { + checkNotNull(spout, "spout cannot not be null"); + return new SpoutStreamlet(spout); } @@ -280,6 +288,8 @@ static StreamletImpl createSpoutStreamlet(IRichSpout spout) { */ @Override public Streamlet map(SerializableFunction mapFn) { + checkNotNull(mapFn, "mapFn cannot be null"); + MapStreamlet retval = new MapStreamlet<>(this, mapFn); addChild(retval); return retval; @@ -293,6 +303,8 @@ public Streamlet map(SerializableFunction mapFn) { @Override public Streamlet flatMap( SerializableFunction> flatMapFn) { + checkNotNull(flatMapFn, "flatMapFn cannot be null"); + FlatMapStreamlet retval = new FlatMapStreamlet<>(this, flatMapFn); addChild(retval); return retval; @@ -305,6 +317,8 @@ public Streamlet flatMap( */ @Override public Streamlet filter(SerializablePredicate filterFn) { + checkNotNull(filterFn, "filterFn cannot be null"); + FilterStreamlet retval = new FilterStreamlet<>(this, filterFn); addChild(retval); return retval; @@ -325,6 +339,8 @@ public Streamlet repartition(int numPartitions) { @Override public Streamlet repartition(int numPartitions, SerializableBiFunction> partitionFn) { + checkNotNull(partitionFn, "partitionFn cannot be null"); + RemapStreamlet retval = new RemapStreamlet<>(this, partitionFn); retval.setNumPartitions(numPartitions); addChild(retval); @@ -338,7 +354,7 @@ public Streamlet repartition(int numPartitions, */ @Override public List> clone(int numClones) { - StreamletUtils.require(numClones > 0, + require(numClones > 0, "Streamlet's clone number should be > 0"); List> retval = new ArrayList<>(numClones); for (int i = 0; i < numClones; ++i) { @@ -352,7 +368,7 @@ public List> clone(int numClones) { * The join is done over elements accumulated over a time window defined by windowCfg. * The elements are compared using the thisKeyExtractor for this streamlet with the * otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied. - * @param other The Streamlet that we are joining with. + * @param otherStreamlet The Streamlet that we are joining with. * @param thisKeyExtractor The function applied to a tuple of this streamlet to get the key * @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key * @param windowCfg This is a specification of what kind of windowing strategy you like to @@ -361,10 +377,16 @@ public List> clone(int numClones) { */ @Override public Streamlet, T>> - join(Streamlet other, SerializableFunction thisKeyExtractor, + join(Streamlet otherStreamlet, SerializableFunction thisKeyExtractor, SerializableFunction otherKeyExtractor, WindowConfig windowCfg, SerializableBiFunction joinFunction) { - return join(other, thisKeyExtractor, otherKeyExtractor, + checkNotNull(otherStreamlet, "otherStreamlet cannot be null"); + checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null"); + checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null"); + checkNotNull(windowCfg, "windowCfg cannot be null"); + checkNotNull(joinFunction, "joinFunction cannot be null"); + + return join(otherStreamlet, thisKeyExtractor, otherKeyExtractor, windowCfg, JoinType.INNER, joinFunction); } @@ -375,7 +397,7 @@ public List> clone(int numClones) { * The elements are compared using the thisKeyExtractor for this streamlet with the * otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied. * Types of joins {@link JoinType} - * @param other The Streamlet that we are joining with. + * @param otherStreamlet The Streamlet that we are joining with. * @param thisKeyExtractor The function applied to a tuple of this streamlet to get the key * @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key * @param windowCfg This is a specification of what kind of windowing strategy you like to @@ -385,11 +407,17 @@ public List> clone(int numClones) { */ @Override public Streamlet, T>> - join(Streamlet other, SerializableFunction thisKeyExtractor, + join(Streamlet otherStreamlet, SerializableFunction thisKeyExtractor, SerializableFunction otherKeyExtractor, WindowConfig windowCfg, JoinType joinType, SerializableBiFunction joinFunction) { - - StreamletImpl joinee = (StreamletImpl) other; + checkNotNull(otherStreamlet, "otherStreamlet cannot be null"); + checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null"); + checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null"); + checkNotNull(windowCfg, "windowCfg cannot be null"); + checkNotNull(joinType, "joinType cannot be null"); + checkNotNull(joinFunction, "joinFunction cannot be null"); + + StreamletImpl joinee = (StreamletImpl) otherStreamlet; JoinStreamlet retval = JoinStreamlet.createJoinStreamlet( this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg, joinType, joinFunction); addChild(retval); @@ -411,6 +439,11 @@ public List> clone(int numClones) { public Streamlet, V>> reduceByKeyAndWindow( SerializableFunction keyExtractor, SerializableFunction valueExtractor, WindowConfig windowCfg, SerializableBinaryOperator reduceFn) { + checkNotNull(keyExtractor, "keyExtractor cannot be null"); + checkNotNull(valueExtractor, "valueExtractor cannot be null"); + checkNotNull(windowCfg, "windowCfg cannot be null"); + checkNotNull(reduceFn, "reduceFn cannot be null"); + ReduceByKeyAndWindowStreamlet retval = new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor, windowCfg, reduceFn); @@ -435,6 +468,11 @@ public Streamlet, V>> reduceByKeyAndWindow( public Streamlet, T>> reduceByKeyAndWindow( SerializableFunction keyExtractor, WindowConfig windowCfg, T identity, SerializableBiFunction reduceFn) { + checkNotNull(keyExtractor, "keyExtractor cannot be null"); + checkNotNull(windowCfg, "windowCfg cannot be null"); + checkNotNull(identity, "identity cannot be null"); + checkNotNull(reduceFn, "reduceFn cannot be null"); + GeneralReduceByKeyAndWindowStreamlet retval = new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg, identity, reduceFn); @@ -447,8 +485,10 @@ public Streamlet, T>> reduceByKeyAndWindow( * the new streamlet will contain tuples belonging to both Streamlets */ @Override - public Streamlet union(Streamlet other) { - StreamletImpl joinee = (StreamletImpl) other; + public Streamlet union(Streamlet otherStreamlet) { + checkNotNull(otherStreamlet, "otherStreamlet cannot be null"); + + StreamletImpl joinee = (StreamletImpl) otherStreamlet; UnionStreamlet retval = new UnionStreamlet<>(this, joinee); addChild(retval); joinee.addChild(retval); @@ -472,6 +512,8 @@ public void log() { */ @Override public void consume(SerializableConsumer consumer) { + checkNotNull(consumer, "consumer cannot be null"); + ConsumerStreamlet consumerStreamlet = new ConsumerStreamlet<>(this, consumer); addChild(consumerStreamlet); } @@ -482,6 +524,8 @@ public void consume(SerializableConsumer consumer) { */ @Override public void toSink(Sink sink) { + checkNotNull(sink, "sink cannot be null"); + SinkStreamlet sinkStreamlet = new SinkStreamlet<>(this, sink); addChild(sinkStreamlet); } @@ -497,6 +541,8 @@ public void toSink(Sink sink) { @Override public Streamlet transform( SerializableTransformer serializableTransformer) { + checkNotNull(serializableTransformer, "serializableTransformer cannot be null"); + TransformStreamlet transformStreamlet = new TransformStreamlet<>(this, serializableTransformer); addChild(transformStreamlet); @@ -511,6 +557,8 @@ public Streamlet transform( */ @Override public Streamlet applyOperator(IStreamletOperator operator) { + checkNotNull(operator, "operator cannot be null"); + StreamletImpl customStreamlet = new CustomStreamlet<>(this, operator); addChild(customStreamlet); return customStreamlet; diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java index b0f71c0050d..dbeea14a2eb 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java @@ -17,7 +17,6 @@ * under the License. */ - package org.apache.heron.streamlet.impl.streamlets; import java.util.Set; @@ -40,7 +39,7 @@ public class CustomStreamlet extends StreamletImpl { /** * Create a custom streamlet from user defined CustomOperator object. * @param parent The parent(upstream) streamlet object - * @param operator The user defined CustomeOperator + * @param operator The user defined CustomOperator */ public CustomStreamlet(StreamletImpl parent, IStreamletOperator operator) { diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java b/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java index 79104394c7d..76792fcabe6 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java @@ -19,6 +19,8 @@ package org.apache.heron.streamlet.impl.utils; +import org.apache.commons.lang3.StringUtils; + public final class StreamletUtils { private StreamletUtils() { @@ -36,4 +38,31 @@ public static void require(Boolean requirement, String errorMessage) { } } + /** + * Verifies not blank text as the utility function. + * @param text The text to verify + * @param errorMessage The error message + * @throws IllegalArgumentException if the requirement fails + */ + public static String checkNotBlank(String text, String errorMessage) { + if (StringUtils.isBlank(text)) { + throw new IllegalArgumentException(errorMessage); + } else { + return text; + } + } + + /** + * Verifies not null reference as the utility function. + * @param reference The reference to verify + * @param errorMessage The error message + * @throws NullPointerException if the requirement fails + */ + public static T checkNotNull(T reference, String errorMessage) { + if (reference == null) { + throw new NullPointerException(errorMessage); + } + return reference; + } + } diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java index c1b93ce478c..9f13c1670d9 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java @@ -21,18 +21,53 @@ import org.junit.Test; +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank; +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull; +import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require; + public class StreamletUtilsTest { @Test public void testRequire() { String text = "test_text"; - StreamletUtils.require(!text.isEmpty(), "text should not be blank"); + require(!text.isEmpty(), "text should not be blank"); } @Test(expected = IllegalArgumentException.class) public void testRequireWithNegativeCase() { String text = ""; - StreamletUtils.require(!text.isEmpty(), "text should not be blank"); + require(!text.isEmpty(), "text should not be blank"); + } + + @Test + public void testCheckNotBlank() { + checkNotBlank("test_text", "text should not be blank"); + } + + @Test(expected = IllegalArgumentException.class) + public void testCheckNotBlankWithNullReference() { + checkNotBlank(null, "text should not be blank"); + } + + @Test(expected = IllegalArgumentException.class) + public void testCheckNotBlankWithEmptyString() { + checkNotBlank("", "text should not be blank"); + } + + @Test(expected = IllegalArgumentException.class) + public void testCheckNotBlankWithBlankString() { + checkNotBlank(" ", "text should not be blank"); + } + + @Test + public void testCheckNotNull() { + checkNotNull(new String(), "text should not be null"); + } + + @Test(expected = NullPointerException.class) + public void testCheckNotNullWithNullReference() { + String text = null; + checkNotNull(text, "text should not be null"); } }