Skip to content

Commit

Permalink
[STORM-3850] Collapse exceptions, remove unused variables/params, fix…
Browse files Browse the repository at this point in the history
… spelling, etc
  • Loading branch information
bipinprasad committed Apr 5, 2022
1 parent 659629d commit 30bd741
Show file tree
Hide file tree
Showing 36 changed files with 272 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
return new TopologyLoadConf(topologyName, savedTopoConf, spouts, bolts, streams);
}

private static void addCpuMemToBuilders(Map<String, LoadCompConf.Builder> boltBuilders, Map<String, Map<String, Double>> boltResources) {
private static void addCpuMemToBuilders(Map<String, LoadCompConf.Builder> boltBuilders,
Map<String, Map<String, Double>> boltResources) {
for (Map.Entry<String, Map<String, Double>> entry: boltResources.entrySet()) {
LoadCompConf.Builder bd = boltBuilders.get(entry.getKey());
if (bd != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public static void main(String[] args) throws Exception {
//private static void ackingProducerSimulation() {
// WaitStrategyPark ws = new WaitStrategyPark(100);
// StormMetricRegistry registry = new StormMetricRegistry();
// JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
// JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test",
// Collections.singletonList(1000), 1000, registry);
// JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
//
// final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class JpmmlRunnerTestTopology {
private String blobKey; // PMML Model downloaded from Blobstore - null if using File
private String tplgyName = "test";

public static void main(String[] args) throws Exception {
public static void main(String[] args) {
try {
JpmmlRunnerTestTopology testTopology = new JpmmlRunnerTestTopology();
testTopology.parseArgs(args);
Expand All @@ -77,10 +77,10 @@ public static void main(String[] args) throws Exception {
}

private void parseArgs(String[] args) {
if (Arrays.stream(args).anyMatch(option -> option.equals("-h"))) {
if (Arrays.asList(args).contains("-h")) {
printUsage();
} else if (Arrays.stream(args).anyMatch(option -> option.equals("-f"))
&& Arrays.stream(args).anyMatch(option -> option.equals("-b"))) {
} else if (Arrays.asList(args).contains("-f")
&& Arrays.asList(args).contains("-b")) {
System.out.println("Please specify only one option of [-b, -f]");
printUsage();
} else {
Expand Down Expand Up @@ -116,21 +116,25 @@ private void parseArgs(String[] args) {
private void setDefaults() {
if (blobKey == null) { // blob key not specified, use file
if (pmml == null) {
pmml = loadExample(pmml, PMML_MODEL_FILE);
pmml = loadExample(PMML_MODEL_FILE);
}
}

if (rawInputs == null) {
rawInputs = loadExample(rawInputs, RAW_INPUTS_FILE);
rawInputs = loadExample(RAW_INPUTS_FILE);
}

if (tplgyName == null) {
tplgyName = "pmmlPredictorLocal";
}
}

private File loadExample(File file, String example) {
private File loadExample(String example) {
File file;
try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(example)) {
if (stream == null) {
throw new RuntimeException("Error loading example=" + example + ", stream is null");
}
file = File.createTempFile("pmml-example", ".tmp");
IOUtils.copy(stream, new FileOutputStream(file));
} catch (IOException e) {
Expand All @@ -147,8 +151,8 @@ private static void printUsage() {
}

private void run() throws Exception {
System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]",
blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath()));
System.out.printf("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]%n",
blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath());
submitTopologyRemoteCluster(newTopology(), newConfig());
}

Expand All @@ -171,7 +175,7 @@ private Config newConfig() {
return config;
}

private IRichBolt newBolt() throws Exception {
private IRichBolt newBolt() {
final List<String> streams = Lists.newArrayList(Utils.DEFAULT_STREAM_ID, NON_DEFAULT_STREAM_ID);
if (blobKey != null) { // Load PMML Model from Blob store
final ModelOutputs outFields = JpmmlModelOutputs.toStreams(blobKey, streams);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Expand All @@ -12,7 +12,6 @@

package org.apache.storm.starter;

import java.io.Serializable;
import java.util.UUID;
import org.apache.storm.Config;
import org.apache.storm.topology.ConfigurableTopology;
Expand Down Expand Up @@ -42,10 +41,10 @@ protected int run(String[] args) throws Exception {

builder.setSpout("spout1", () -> UUID.randomUUID().toString());
builder.setBolt("bolt1", (tuple, collector) -> {
String[] parts = tuple.getStringByField("lambda").split("\\-");
String[] parts = tuple.getStringByField("lambda").split("-");
collector.emit(new Values(prefix + parts[0] + suffix, tag));
}, "strValue", "intValue").shuffleGrouping("spout1");
builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1");
builder.setBolt("bolt2", System.out::println).shuffleGrouping("bolt1");

Config conf = new Config();
conf.setDebug(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -23,11 +23,8 @@
import java.util.Map;

import javax.security.auth.Subject;
import javax.security.auth.login.LoginContext;

import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
Expand All @@ -47,9 +44,8 @@ protected static void deleteAllBlobStoreKeys(BlobStore bs, Subject who) throws A
}
}

protected static void copyBlobStoreKeys(BlobStore bsFrom,
Subject whoFrom,
BlobStore bsTo, Subject whoTo) throws AuthorizationException,
protected static void copyBlobStoreKeys(BlobStore bsFrom, Subject whoFrom, BlobStore bsTo, Subject whoTo)
throws AuthorizationException,
KeyAlreadyExistsException,
IOException,
KeyNotFoundException {
Expand All @@ -63,13 +59,12 @@ protected static void copyBlobStoreKeys(BlobStore bsFrom,
System.out.println("DONE CREATING BLOB " + key);
}
}



public static void main(String[] args) throws Exception {
Map<String, Object> hdfsConf = Utils.readStormConfig();

if (args.length < 2) {
System.out.println("Need at least 2 arguments, but have " + Integer.toString(args.length));
System.out.println("Need at least 2 arguments, but have " + args.length);
System.out.println("migrate <local_blobstore_dir> <hdfs_blobstore_path> <hdfs_principal> <keytab>");
System.out.println("Migrates blobs from LocalFsBlobStore to HdfsBlobStore");
System.out.println("Example: migrate '/srv/storm' "
Expand Down Expand Up @@ -104,8 +99,7 @@ public static void main(String[] args) throws Exception {

HdfsBlobStore hdfsBlobStore = new HdfsBlobStore();
hdfsBlobStore.prepare(hdfsConf, null, null, null);



/* LOOK AT LOCAL BLOBSTORE */
System.out.println("Listing local blobstore keys.");
MigratorMain.listBlobStoreKeys(lfsBlobStore, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -60,29 +60,29 @@ public ObjectMapperCqlStatementMapper build() {
codecs, udtClasses);
}

public ObjectMapperCqlStatementMapperBuilder withCodecs(List<SerializableCallable<TypeCodec<?>>> codecProducer) {
this.codecProducers.addAll(codecProducer);
return this;
}

public ObjectMapperCqlStatementMapperBuilder withUdtCodecs(List<Class<?>> udtClass) {
this.udtClasses.addAll(udtClass);
return this;
}

public ObjectMapperCqlStatementMapperBuilder withTimestampField(String timestampField) {
this.timestampField = timestampField;
return this;
}

public ObjectMapperCqlStatementMapperBuilder withTtlField(String ttlField) {
this.ttlField = ttlField;
return this;
}

public ObjectMapperCqlStatementMapperBuilder withConsistencyLevelField(String consistencyLevelField) {
this.consistencyLevelField = consistencyLevelField;
return this;
}
//public ObjectMapperCqlStatementMapperBuilder withCodecs(List<SerializableCallable<TypeCodec<?>>> codecProducer) {
// this.codecProducers.addAll(codecProducer);
// return this;
//}
//
//public ObjectMapperCqlStatementMapperBuilder withUdtCodecs(List<Class<?>> udtClass) {
// this.udtClasses.addAll(udtClass);
// return this;
//}
//
//public ObjectMapperCqlStatementMapperBuilder withTimestampField(String timestampField) {
// this.timestampField = timestampField;
// return this;
//}
//
//public ObjectMapperCqlStatementMapperBuilder withTtlField(String ttlField) {
// this.ttlField = ttlField;
// return this;
//}
//
//public ObjectMapperCqlStatementMapperBuilder withConsistencyLevelField(String consistencyLevelField) {
// this.consistencyLevelField = consistencyLevelField;
// return this;
//}
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -17,23 +17,22 @@
*/
package org.apache.storm.elasticsearch.common;

import static org.junit.Assert.assertEquals;

import org.apache.http.HttpHost;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.google.common.testing.NullPointerTester;
import org.junit.jupiter.api.Assertions;
import org.apache.http.HttpHost;
import org.junit.jupiter.api.Test;

public class EsConfigTest {

@Test
public void urlsCannotBeEmpty() throws Exception {
Assertions.assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {}));
public void urlsCannotBeEmpty() {
assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {}));
}

@Test
public void constructorThrowsOnNull() throws Exception {
public void constructorThrowsOnNull() {
new NullPointerTester().testAllPublicConstructors(EsConfig.class);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*******************************************************************************
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
*/

package org.apache.storm.eventhubs.bolt;

Expand All @@ -38,23 +38,22 @@
*/
public class EventHubBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory
.getLogger(EventHubBolt.class);
private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class);

protected OutputCollector collector;
protected PartitionSender sender;
protected EventHubClient ehClient;
protected EventHubBoltConfig boltConfig;

public EventHubBolt(String connectionString, String entityPath) {
boltConfig = new EventHubBoltConfig(connectionString, entityPath);
}

public EventHubBolt(String userName, String password, String namespace,
String entityPath, boolean partitionMode) {
boltConfig = new EventHubBoltConfig(userName, password, namespace,
entityPath, partitionMode);
}
//public EventHubBolt(String connectionString, String entityPath) {
// boltConfig = new EventHubBoltConfig(connectionString, entityPath);
//}
//
//public EventHubBolt(String userName, String password, String namespace,
// String entityPath, boolean partitionMode) {
// boltConfig = new EventHubBoltConfig(userName, password, namespace,
// entityPath, partitionMode);
//}

public EventHubBolt(EventHubBoltConfig config) {
boltConfig = config;
Expand Down Expand Up @@ -97,12 +96,9 @@ public void execute(Tuple tuple) {
throw new EventHubException("ehclient is null");
}
collector.ack(tuple);
} catch (EventHubException ex) {
} catch (EventHubException | ServiceBusException ex) {
collector.reportError(ex);
collector.fail(tuple);
} catch (ServiceBusException e) {
collector.reportError(e);
collector.fail(tuple);
}
}

Expand All @@ -113,17 +109,15 @@ public void cleanup() {
sender.close().whenComplete((voidargs, error) -> {
try {
if (error != null) {
logger.error("Exception during sender cleanup phase" + error.toString());
logger.error("Exception during sender cleanup phase" + error);
}
ehClient.closeSync();
} catch (Exception e) {
logger.error("Exception during ehclient cleanup phase" + e.toString());
logger.error("Exception during ehclient cleanup phase" + e);
}
}).get();
} catch (InterruptedException e) {
logger.error("Exception occured during cleanup phase" + e.toString());
} catch (ExecutionException e) {
logger.error("Exception occured during cleanup phase" + e.toString());
} catch (InterruptedException | ExecutionException e) {
logger.error("Exception occurred during cleanup phase" + e);
}
logger.info("Eventhub Bolt cleaned up");
sender = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -68,10 +68,9 @@ public class AvroGenericRecordBoltTest {
+ "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"},"
+ "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" },"
+ "{ \"name\":\"int1\", \"type\":\"int\" }]}";
private static Schema schema1;
private static Schema schema2;
private static Tuple tuple1;
private static Tuple tuple2;

@Rule
public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> {
Configuration conf = new Configuration();
Expand All @@ -92,10 +91,10 @@ public class AvroGenericRecordBoltTest {
@BeforeClass
public static void setupClass() {
Schema.Parser parser = new Schema.Parser();
schema1 = parser.parse(schemaV1);
Schema schema1 = parser.parse(schemaV1);

parser = new Schema.Parser();
schema2 = parser.parse(schemaV2);
Schema schema2 = parser.parse(schemaV2);

GenericRecordBuilder builder1 = new GenericRecordBuilder(schema1);
builder1.set("foo1", "bar1");
Expand All @@ -110,8 +109,8 @@ public static void setupClass() {

private static Tuple generateTestTuple(GenericRecord record) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(),
new HashMap<>(), new HashMap<>(), new HashMap<>(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("record");
Expand Down
Loading

0 comments on commit 30bd741

Please sign in to comment.