Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Review comments are addressed.
Browse files Browse the repository at this point in the history
  • Loading branch information
erenavsarogullari committed Nov 1, 2018
1 parent 2142aa6 commit 4d91ba6
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 29 deletions.
13 changes: 3 additions & 10 deletions heron/api/src/java/BUILD
Expand Up @@ -33,8 +33,7 @@ java_library(
javacopts = DOCLINT_HTML_AND_SYNTAX,
deps = api_deps_files + [
":api-java-low-level",
"//third_party/java:kryo-neverlink",
"//third_party/java:guava"
"//third_party/java:kryo-neverlink"
]
)

Expand All @@ -43,19 +42,13 @@ java_library(
name = "api-java-low-level-functional",
javacopts = DOCLINT_HTML_AND_SYNTAX,
srcs = glob(["org/apache/heron/api/**/*.java", "org/apache/heron/streamlet/**/*.java"]),
deps = api_deps_files + [
"//third_party/java:kryo-neverlink",
"//third_party/java:guava"
]
deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
)

java_binary(
name = "api-unshaded",
srcs = glob(["org/apache/heron/api/**/*.java", "org/apache/heron/streamlet/**/*.java"]),
deps = api_deps_files + [
"//third_party/java:kryo-neverlink",
"//third_party/java:guava"
]
deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
)

jarjar_binary(
Expand Down
Expand Up @@ -24,13 +24,12 @@
import java.util.List;
import java.util.Set;

import com.google.common.base.Preconditions;

import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.Builder;
import org.apache.heron.streamlet.SerializableSupplier;
import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.impl.utils.StreamletUtils;

/**
* BuilderImpl implements the Builder interface.
Expand All @@ -46,7 +45,7 @@ public BuilderImpl() {

@Override
public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
Preconditions.checkNotNull(supplier, "supplier must not be null.");
StreamletUtils.require(supplier != null, "supplier must not be null.");
StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
retval.setNumPartitions(1);
sources.add(retval);
Expand All @@ -55,7 +54,7 @@ public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {

@Override
public <R> Streamlet<R> newSource(Source<R> generator) {
Preconditions.checkNotNull(generator, "source must not be null.");
StreamletUtils.require(generator != null, "source must not be null.");
StreamletImpl<R> retval = StreamletImpl.createGeneratorStreamlet(generator);
retval.setNumPartitions(1);
sources.add(retval);
Expand Down
Expand Up @@ -61,6 +61,7 @@
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
import org.apache.heron.streamlet.impl.utils.StreamletUtils;

/**
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
Expand Down Expand Up @@ -151,7 +152,7 @@ public List<StreamletImpl<?>> getChildren() {
*/
@Override
public Streamlet<R> setName(String sName) {
require(sName != null && !sName.trim().isEmpty(),
StreamletUtils.require(sName != null && !sName.trim().isEmpty(),
"Streamlet name cannot be null/blank");
this.name = sName;
return this;
Expand Down Expand Up @@ -190,7 +191,7 @@ protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stag
*/
@Override
public Streamlet<R> setNumPartitions(int numPartitions) {
require(numPartitions > 0,
StreamletUtils.require(numPartitions > 0,
"Streamlet's partitions number should be > 0");
this.nPartitions = numPartitions;
return this;
Expand All @@ -216,7 +217,7 @@ protected StreamletImpl() {

public void build(TopologyBuilder bldr, Set<String> stageNames) {
if (built) {
throw new RuntimeException("Logic Error While building stage: " + getName());
throw new RuntimeException("Logic Error While building " + getName());
}
if (doBuild(bldr, stageNames)) {
built = true;
Expand Down Expand Up @@ -329,7 +330,8 @@ public Streamlet<R> repartition(int numPartitions,
*/
@Override
public List<Streamlet<R>> clone(int numClones) {
require(numClones > 0, "Streamlet's clone number should be > 0");
StreamletUtils.require(numClones > 0,
"Streamlet's clone number should be > 0");
List<Streamlet<R>> retval = new ArrayList<>(numClones);
for (int i = 0; i < numClones; ++i) {
retval.add(repartition(getNumPartitions()));
Expand Down Expand Up @@ -533,15 +535,4 @@ public <T> Streamlet<T> applyOperator(IStreamletWindowOperator<R, T> operator) {
return customStreamlet;
}

/**
* Verifies the requirement as the utility function.
* @param requirement The requirement to verify
* @param errorMessage The error message
* @throws IllegalArgumentException if the requirement fails
*/
private void require(Boolean requirement, String errorMessage) {
if (!requirement) {
throw new IllegalArgumentException(errorMessage);
}
}
}
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.heron.streamlet.impl.utils;

public final class StreamletUtils {

private StreamletUtils() {
}

/**
* Verifies the requirement as the utility function.
* @param requirement The requirement to verify
* @param errorMessage The error message
* @throws IllegalArgumentException if the requirement fails
*/
public static void require(Boolean requirement, String errorMessage) {
if (!requirement) {
throw new IllegalArgumentException(errorMessage);
}
}

}
1 change: 1 addition & 0 deletions heron/api/tests/java/BUILD
Expand Up @@ -31,6 +31,7 @@ java_tests(
"org.apache.heron.streamlet.impl.operators.JoinOperatorTest",
"org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
"org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
"org.apache.heron.streamlet.impl.utils.StreamletUtilsTest",
"org.apache.heron.api.ConfigTest",
"org.apache.heron.api.HeronSubmitterTest",
"org.apache.heron.api.utils.UtilsTest"
Expand Down
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.heron.streamlet.impl.utils;

import org.junit.Test;

public class StreamletUtilsTest {

@Test
public void testRequire() {
String text = "test_text";
StreamletUtils.require(!text.isEmpty(), "text should not be blank");
}

@Test(expected = IllegalArgumentException.class)
public void testRequireWithNegativeCase() {
String text = "";
StreamletUtils.require(!text.isEmpty(), "text should not be blank");
}

}

0 comments on commit 4d91ba6

Please sign in to comment.