Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
[Java Streamlet API] Extend Validations Part II (#3111)
Browse files Browse the repository at this point in the history
* Extends Streamlet Java API Validations Part II

* Set otherStreamlet parameter name as more meaningful
  • Loading branch information
erenavsarogullari authored and nwangtw committed Nov 19, 2018
1 parent 57b308c commit a81021a
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 27 deletions.
13 changes: 10 additions & 3 deletions heron/api/src/java/BUILD
Expand Up @@ -33,7 +33,8 @@ java_library(
javacopts = DOCLINT_HTML_AND_SYNTAX,
deps = api_deps_files + [
":api-java-low-level",
"//third_party/java:kryo-neverlink"
"//third_party/java:kryo-neverlink",
"@org_apache_commons_commons_lang3//jar"
]
)

Expand All @@ -42,13 +43,19 @@ java_library(
name = "api-java-low-level-functional",
javacopts = DOCLINT_HTML_AND_SYNTAX,
srcs = glob(["org/apache/heron/api/**/*.java", "org/apache/heron/streamlet/**/*.java"]),
deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
deps = api_deps_files + [
"//third_party/java:kryo-neverlink",
"@org_apache_commons_commons_lang3//jar"
]
)

java_binary(
name = "api-unshaded",
srcs = glob(["org/apache/heron/api/**/*.java", "org/apache/heron/streamlet/**/*.java"]),
deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
deps = api_deps_files + [
"//third_party/java:kryo-neverlink",
"@org_apache_commons_commons_lang3//jar"
]
)

jarjar_binary(
Expand Down
Expand Up @@ -30,7 +30,8 @@
import org.apache.heron.streamlet.SerializableSupplier;
import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.impl.utils.StreamletUtils;

import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;

/**
* BuilderImpl implements the Builder interface.
Expand All @@ -46,23 +47,20 @@ public BuilderImpl() {

@Override
public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
StreamletUtils.require(supplier != null, "supplier must not be null.");
StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
sources.add(retval);
return retval;
}

@Override
public <R> Streamlet<R> newSource(Source<R> generator) {
StreamletUtils.require(generator != null, "source must not be null.");
StreamletImpl<R> retval = StreamletImpl.createGeneratorStreamlet(generator);
sources.add(retval);
return retval;
}

@Override
public <R> Streamlet<R> newSource(IRichSpout spout) {
StreamletUtils.require(spout != null, "spout must not be null.");
StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
sources.add(retval);
return retval;
Expand All @@ -78,6 +76,8 @@ public TopologyBuilder build() {
}

public TopologyBuilder build(TopologyBuilder builder) {
checkNotNull(builder, "builder cannot not be null");

Set<String> stageNames = new HashSet<>();
for (StreamletImpl<?> streamlet : sources) {
streamlet.build(builder, stageNames);
Expand Down
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/


package org.apache.heron.streamlet.impl;

import java.util.ArrayList;
Expand Down Expand Up @@ -59,7 +58,10 @@
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
import org.apache.heron.streamlet.impl.utils.StreamletUtils;

import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;

/**
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
Expand Down Expand Up @@ -151,8 +153,8 @@ public List<StreamletImpl<?>> getChildren() {
*/
@Override
public Streamlet<R> setName(String sName) {
StreamletUtils.require(sName != null && !sName.trim().isEmpty(),
"Streamlet name cannot be null/blank");
checkNotBlank(sName, "Streamlet name cannot be null/blank");

this.name = sName;
return this;
}
Expand Down Expand Up @@ -190,8 +192,8 @@ protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stag
*/
@Override
public Streamlet<R> setNumPartitions(int numPartitions) {
StreamletUtils.require(numPartitions > 0,
"Streamlet's partitions number should be > 0");
require(numPartitions > 0, "Streamlet's partitions number should be > 0");

this.nPartitions = numPartitions;
return this;
}
Expand Down Expand Up @@ -255,6 +257,8 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
* @param supplier The Supplier function to generate the elements
*/
static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T> supplier) {
checkNotNull(supplier, "supplier cannot not be null");

return new SupplierStreamlet<T>(supplier);
}

Expand All @@ -263,6 +267,8 @@ static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T> supp
* @param generator The Generator function to generate the elements
*/
static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
checkNotNull(generator, "generator cannot not be null");

return new SourceStreamlet<T>(generator);
}

Expand All @@ -271,6 +277,8 @@ static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
* @param spout The Spout function to generate the elements
*/
static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
checkNotNull(spout, "spout cannot not be null");

return new SpoutStreamlet<T>(spout);
}

Expand All @@ -280,6 +288,8 @@ static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
*/
@Override
public <T> Streamlet<T> map(SerializableFunction<R, ? extends T> mapFn) {
checkNotNull(mapFn, "mapFn cannot be null");

MapStreamlet<R, T> retval = new MapStreamlet<>(this, mapFn);
addChild(retval);
return retval;
Expand All @@ -293,6 +303,8 @@ public <T> Streamlet<T> map(SerializableFunction<R, ? extends T> mapFn) {
@Override
public <T> Streamlet<T> flatMap(
SerializableFunction<R, ? extends Iterable<? extends T>> flatMapFn) {
checkNotNull(flatMapFn, "flatMapFn cannot be null");

FlatMapStreamlet<R, T> retval = new FlatMapStreamlet<>(this, flatMapFn);
addChild(retval);
return retval;
Expand All @@ -305,6 +317,8 @@ public <T> Streamlet<T> flatMap(
*/
@Override
public Streamlet<R> filter(SerializablePredicate<R> filterFn) {
checkNotNull(filterFn, "filterFn cannot be null");

FilterStreamlet<R> retval = new FilterStreamlet<>(this, filterFn);
addChild(retval);
return retval;
Expand All @@ -325,6 +339,8 @@ public Streamlet<R> repartition(int numPartitions) {
@Override
public Streamlet<R> repartition(int numPartitions,
SerializableBiFunction<R, Integer, List<Integer>> partitionFn) {
checkNotNull(partitionFn, "partitionFn cannot be null");

RemapStreamlet<R> retval = new RemapStreamlet<>(this, partitionFn);
retval.setNumPartitions(numPartitions);
addChild(retval);
Expand All @@ -338,7 +354,7 @@ public Streamlet<R> repartition(int numPartitions,
*/
@Override
public List<Streamlet<R>> clone(int numClones) {
StreamletUtils.require(numClones > 0,
require(numClones > 0,
"Streamlet's clone number should be > 0");
List<Streamlet<R>> retval = new ArrayList<>(numClones);
for (int i = 0; i < numClones; ++i) {
Expand All @@ -352,7 +368,7 @@ public List<Streamlet<R>> clone(int numClones) {
* The join is done over elements accumulated over a time window defined by windowCfg.
* The elements are compared using the thisKeyExtractor for this streamlet with the
* otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied.
* @param other The Streamlet that we are joining with.
* @param otherStreamlet The Streamlet that we are joining with.
* @param thisKeyExtractor The function applied to a tuple of this streamlet to get the key
* @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key
* @param windowCfg This is a specification of what kind of windowing strategy you like to
Expand All @@ -361,10 +377,16 @@ public List<Streamlet<R>> clone(int numClones) {
*/
@Override
public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> thisKeyExtractor,
SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
SerializableBiFunction<R, S, ? extends T> joinFunction) {
return join(other, thisKeyExtractor, otherKeyExtractor,
checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null");
checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null");
checkNotNull(windowCfg, "windowCfg cannot be null");
checkNotNull(joinFunction, "joinFunction cannot be null");

return join(otherStreamlet, thisKeyExtractor, otherKeyExtractor,
windowCfg, JoinType.INNER, joinFunction);
}

Expand All @@ -375,7 +397,7 @@ public List<Streamlet<R>> clone(int numClones) {
* The elements are compared using the thisKeyExtractor for this streamlet with the
* otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied.
* Types of joins {@link JoinType}
* @param other The Streamlet that we are joining with.
* @param otherStreamlet The Streamlet that we are joining with.
* @param thisKeyExtractor The function applied to a tuple of this streamlet to get the key
* @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key
* @param windowCfg This is a specification of what kind of windowing strategy you like to
Expand All @@ -385,11 +407,17 @@ public List<Streamlet<R>> clone(int numClones) {
*/
@Override
public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> thisKeyExtractor,
SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg,
JoinType joinType, SerializableBiFunction<R, S, ? extends T> joinFunction) {

StreamletImpl<S> joinee = (StreamletImpl<S>) other;
checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null");
checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null");
checkNotNull(windowCfg, "windowCfg cannot be null");
checkNotNull(joinType, "joinType cannot be null");
checkNotNull(joinFunction, "joinFunction cannot be null");

StreamletImpl<S> joinee = (StreamletImpl<S>) otherStreamlet;
JoinStreamlet<K, R, S, T> retval = JoinStreamlet.createJoinStreamlet(
this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg, joinType, joinFunction);
addChild(retval);
Expand All @@ -411,6 +439,11 @@ public List<Streamlet<R>> clone(int numClones) {
public <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> valueExtractor,
WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn) {
checkNotNull(keyExtractor, "keyExtractor cannot be null");
checkNotNull(valueExtractor, "valueExtractor cannot be null");
checkNotNull(windowCfg, "windowCfg cannot be null");
checkNotNull(reduceFn, "reduceFn cannot be null");

ReduceByKeyAndWindowStreamlet<K, V, R> retval =
new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
windowCfg, reduceFn);
Expand All @@ -435,6 +468,11 @@ public <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg,
T identity, SerializableBiFunction<T, R, ? extends T> reduceFn) {
checkNotNull(keyExtractor, "keyExtractor cannot be null");
checkNotNull(windowCfg, "windowCfg cannot be null");
checkNotNull(identity, "identity cannot be null");
checkNotNull(reduceFn, "reduceFn cannot be null");

GeneralReduceByKeyAndWindowStreamlet<K, R, T> retval =
new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg,
identity, reduceFn);
Expand All @@ -447,8 +485,10 @@ public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
* the new streamlet will contain tuples belonging to both Streamlets
*/
@Override
public Streamlet<R> union(Streamlet<? extends R> other) {
StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>) other;
public Streamlet<R> union(Streamlet<? extends R> otherStreamlet) {
checkNotNull(otherStreamlet, "otherStreamlet cannot be null");

StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>) otherStreamlet;
UnionStreamlet<R> retval = new UnionStreamlet<>(this, joinee);
addChild(retval);
joinee.addChild(retval);
Expand All @@ -472,6 +512,8 @@ public void log() {
*/
@Override
public void consume(SerializableConsumer<R> consumer) {
checkNotNull(consumer, "consumer cannot be null");

ConsumerStreamlet<R> consumerStreamlet = new ConsumerStreamlet<>(this, consumer);
addChild(consumerStreamlet);
}
Expand All @@ -482,6 +524,8 @@ public void consume(SerializableConsumer<R> consumer) {
*/
@Override
public void toSink(Sink<R> sink) {
checkNotNull(sink, "sink cannot be null");

SinkStreamlet<R> sinkStreamlet = new SinkStreamlet<>(this, sink);
addChild(sinkStreamlet);
}
Expand All @@ -497,6 +541,8 @@ public void toSink(Sink<R> sink) {
@Override
public <T> Streamlet<T> transform(
SerializableTransformer<R, ? extends T> serializableTransformer) {
checkNotNull(serializableTransformer, "serializableTransformer cannot be null");

TransformStreamlet<R, T> transformStreamlet =
new TransformStreamlet<>(this, serializableTransformer);
addChild(transformStreamlet);
Expand All @@ -511,6 +557,8 @@ public <T> Streamlet<T> transform(
*/
@Override
public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
checkNotNull(operator, "operator cannot be null");

StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
addChild(customStreamlet);
return customStreamlet;
Expand Down
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/


package org.apache.heron.streamlet.impl.streamlets;

import java.util.Set;
Expand All @@ -40,7 +39,7 @@ public class CustomStreamlet<R, T> extends StreamletImpl<T> {
/**
* Create a custom streamlet from user defined CustomOperator object.
* @param parent The parent(upstream) streamlet object
* @param operator The user defined CustomeOperator
* @param operator The user defined CustomOperator
*/
public CustomStreamlet(StreamletImpl<R> parent,
IStreamletOperator<R, T> operator) {
Expand Down
Expand Up @@ -19,6 +19,8 @@

package org.apache.heron.streamlet.impl.utils;

import org.apache.commons.lang3.StringUtils;

public final class StreamletUtils {

private StreamletUtils() {
Expand All @@ -36,4 +38,31 @@ public static void require(Boolean requirement, String errorMessage) {
}
}

/**
* Verifies not blank text as the utility function.
* @param text The text to verify
* @param errorMessage The error message
* @throws IllegalArgumentException if the requirement fails
*/
public static String checkNotBlank(String text, String errorMessage) {
if (StringUtils.isBlank(text)) {
throw new IllegalArgumentException(errorMessage);
} else {
return text;
}
}

/**
* Verifies not null reference as the utility function.
* @param reference The reference to verify
* @param errorMessage The error message
* @throws NullPointerException if the requirement fails
*/
public static <T> T checkNotNull(T reference, String errorMessage) {
if (reference == null) {
throw new NullPointerException(errorMessage);
}
return reference;
}

}

0 comments on commit a81021a

Please sign in to comment.