diff --git a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java index 95c8160f94c..a5df1f08393 100644 --- a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java +++ b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java @@ -65,7 +65,7 @@ public class JpmmlRunnerTestTopology { private String blobKey; // PMML Model downloaded from Blobstore - null if using File private String tplgyName = "test"; - public static void main(String[] args) throws Exception { + public static void main(String[] args) { try { JpmmlRunnerTestTopology testTopology = new JpmmlRunnerTestTopology(); testTopology.parseArgs(args); @@ -77,10 +77,10 @@ public static void main(String[] args) throws Exception { } private void parseArgs(String[] args) { - if (Arrays.stream(args).anyMatch(option -> option.equals("-h"))) { + if (Arrays.asList(args).contains("-h")) { printUsage(); - } else if (Arrays.stream(args).anyMatch(option -> option.equals("-f")) - && Arrays.stream(args).anyMatch(option -> option.equals("-b"))) { + } else if (Arrays.asList(args).contains("-f") + && Arrays.asList(args).contains("-b")) { System.out.println("Please specify only one option of [-b, -f]"); printUsage(); } else { @@ -116,12 +116,12 @@ private void parseArgs(String[] args) { private void setDefaults() { if (blobKey == null) { // blob key not specified, use file if (pmml == null) { - pmml = loadExample(pmml, PMML_MODEL_FILE); + pmml = loadExample(PMML_MODEL_FILE); } } if (rawInputs == null) { - rawInputs = loadExample(rawInputs, RAW_INPUTS_FILE); + rawInputs = loadExample(RAW_INPUTS_FILE); } if (tplgyName == null) { @@ -129,8 +129,12 @@ private void setDefaults() { } } - private File loadExample(File file, String example) { + private File loadExample(String example) { + File file; try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(example)) { + if (stream == null) { + throw new RuntimeException("Error loading example=" + example + ", stream is null"); + } file = File.createTempFile("pmml-example", ".tmp"); IOUtils.copy(stream, new FileOutputStream(file)); } catch (IOException e) { @@ -147,8 +151,8 @@ private static void printUsage() { } private void run() throws Exception { - System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]", - blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath())); + System.out.printf("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]%n", + blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath()); submitTopologyRemoteCluster(newTopology(), newConfig()); } @@ -171,7 +175,7 @@ private Config newConfig() { return config; } - private IRichBolt newBolt() throws Exception { + private IRichBolt newBolt() { final List streams = Lists.newArrayList(Utils.DEFAULT_STREAM_ID, NON_DEFAULT_STREAM_ID); if (blobKey != null) { // Load PMML Model from Blob store final ModelOutputs outFields = JpmmlModelOutputs.toStreams(blobKey, streams); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java index e7c134b58c6..7c800e3b69a 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -12,7 +12,6 @@ package org.apache.storm.starter; -import java.io.Serializable; import java.util.UUID; import org.apache.storm.Config; import org.apache.storm.topology.ConfigurableTopology; @@ -42,10 +41,10 @@ protected int run(String[] args) throws Exception { builder.setSpout("spout1", () -> UUID.randomUUID().toString()); builder.setBolt("bolt1", (tuple, collector) -> { - String[] parts = tuple.getStringByField("lambda").split("\\-"); + String[] parts = tuple.getStringByField("lambda").split("-"); collector.emit(new Values(prefix + parts[0] + suffix, tag)); }, "strValue", "intValue").shuffleGrouping("spout1"); - builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1"); + builder.setBolt("bolt2", System.out::println).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(true); diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java index e7a3581637d..a05d0ac74e4 100644 --- a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java +++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,11 +23,8 @@ import java.util.Map; import javax.security.auth.Subject; -import javax.security.auth.login.LoginContext; import org.apache.storm.Config; -import org.apache.storm.blobstore.BlobStore; -import org.apache.storm.blobstore.LocalFsBlobStore; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyAlreadyExistsException; import org.apache.storm.generated.KeyNotFoundException; @@ -47,9 +44,8 @@ protected static void deleteAllBlobStoreKeys(BlobStore bs, Subject who) throws A } } - protected static void copyBlobStoreKeys(BlobStore bsFrom, - Subject whoFrom, - BlobStore bsTo, Subject whoTo) throws AuthorizationException, + protected static void copyBlobStoreKeys(BlobStore bsFrom, Subject whoFrom, BlobStore bsTo, Subject whoTo) + throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException { @@ -63,13 +59,12 @@ protected static void copyBlobStoreKeys(BlobStore bsFrom, System.out.println("DONE CREATING BLOB " + key); } } - - + public static void main(String[] args) throws Exception { Map hdfsConf = Utils.readStormConfig(); if (args.length < 2) { - System.out.println("Need at least 2 arguments, but have " + Integer.toString(args.length)); + System.out.println("Need at least 2 arguments, but have " + args.length); System.out.println("migrate "); System.out.println("Migrates blobs from LocalFsBlobStore to HdfsBlobStore"); System.out.println("Example: migrate '/srv/storm' " @@ -104,8 +99,7 @@ public static void main(String[] args) throws Exception { HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(); hdfsBlobStore.prepare(hdfsConf, null, null, null); - - + /* LOOK AT LOCAL BLOBSTORE */ System.out.println("Listing local blobstore keys."); MigratorMain.listBlobStoreKeys(lfsBlobStore, null); diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java index d7961d85874..eae4766b18e 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -60,29 +60,29 @@ public ObjectMapperCqlStatementMapper build() { codecs, udtClasses); } - public ObjectMapperCqlStatementMapperBuilder withCodecs(List>> codecProducer) { - this.codecProducers.addAll(codecProducer); - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withUdtCodecs(List> udtClass) { - this.udtClasses.addAll(udtClass); - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withTimestampField(String timestampField) { - this.timestampField = timestampField; - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withTtlField(String ttlField) { - this.ttlField = ttlField; - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withConsistencyLevelField(String consistencyLevelField) { - this.consistencyLevelField = consistencyLevelField; - return this; - } + //public ObjectMapperCqlStatementMapperBuilder withCodecs(List>> codecProducer) { + // this.codecProducers.addAll(codecProducer); + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withUdtCodecs(List> udtClass) { + // this.udtClasses.addAll(udtClass); + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withTimestampField(String timestampField) { + // this.timestampField = timestampField; + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withTtlField(String ttlField) { + // this.ttlField = ttlField; + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withConsistencyLevelField(String consistencyLevelField) { + // this.consistencyLevelField = consistencyLevelField; + // return this; + //} } diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java index 23f00ca5dcd..8709037f4ec 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,23 +17,22 @@ */ package org.apache.storm.elasticsearch.common; -import static org.junit.Assert.assertEquals; - -import org.apache.http.HttpHost; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.testing.NullPointerTester; -import org.junit.jupiter.api.Assertions; +import org.apache.http.HttpHost; import org.junit.jupiter.api.Test; public class EsConfigTest { @Test - public void urlsCannotBeEmpty() throws Exception { - Assertions.assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {})); + public void urlsCannotBeEmpty() { + assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {})); } @Test - public void constructorThrowsOnNull() throws Exception { + public void constructorThrowsOnNull() { new NullPointerTester().testAllPublicConstructors(EsConfig.class); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 4c1e0684333..f1b2c1e661e 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - *******************************************************************************/ + */ package org.apache.storm.eventhubs.bolt; @@ -38,23 +38,22 @@ */ public class EventHubBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(EventHubBolt.class); + private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class); protected OutputCollector collector; protected PartitionSender sender; protected EventHubClient ehClient; protected EventHubBoltConfig boltConfig; - public EventHubBolt(String connectionString, String entityPath) { - boltConfig = new EventHubBoltConfig(connectionString, entityPath); - } - - public EventHubBolt(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - boltConfig = new EventHubBoltConfig(userName, password, namespace, - entityPath, partitionMode); - } + //public EventHubBolt(String connectionString, String entityPath) { + // boltConfig = new EventHubBoltConfig(connectionString, entityPath); + //} + // + //public EventHubBolt(String userName, String password, String namespace, + // String entityPath, boolean partitionMode) { + // boltConfig = new EventHubBoltConfig(userName, password, namespace, + // entityPath, partitionMode); + //} public EventHubBolt(EventHubBoltConfig config) { boltConfig = config; @@ -97,12 +96,9 @@ public void execute(Tuple tuple) { throw new EventHubException("ehclient is null"); } collector.ack(tuple); - } catch (EventHubException ex) { + } catch (EventHubException | ServiceBusException ex) { collector.reportError(ex); collector.fail(tuple); - } catch (ServiceBusException e) { - collector.reportError(e); - collector.fail(tuple); } } @@ -113,17 +109,15 @@ public void cleanup() { sender.close().whenComplete((voidargs, error) -> { try { if (error != null) { - logger.error("Exception during sender cleanup phase" + error.toString()); + logger.error("Exception during sender cleanup phase" + error); } ehClient.closeSync(); } catch (Exception e) { - logger.error("Exception during ehclient cleanup phase" + e.toString()); + logger.error("Exception during ehclient cleanup phase" + e); } }).get(); - } catch (InterruptedException e) { - logger.error("Exception occured during cleanup phase" + e.toString()); - } catch (ExecutionException e) { - logger.error("Exception occured during cleanup phase" + e.toString()); + } catch (InterruptedException | ExecutionException e) { + logger.error("Exception occurred during cleanup phase" + e); } logger.info("Eventhub Bolt cleaned up"); sender = null; diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java index a919c921e73..9d28876f48b 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -68,10 +68,9 @@ public class AvroGenericRecordBoltTest { + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; - private static Schema schema1; - private static Schema schema2; private static Tuple tuple1; private static Tuple tuple2; + @Rule public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> { Configuration conf = new Configuration(); @@ -92,10 +91,10 @@ public class AvroGenericRecordBoltTest { @BeforeClass public static void setupClass() { Schema.Parser parser = new Schema.Parser(); - schema1 = parser.parse(schemaV1); + Schema schema1 = parser.parse(schemaV1); parser = new Schema.Parser(); - schema2 = parser.parse(schemaV2); + Schema schema2 = parser.parse(schemaV2); GenericRecordBuilder builder1 = new GenericRecordBuilder(schema1); builder1.set("foo1", "bar1"); @@ -110,8 +109,8 @@ public static void setupClass() { private static Tuple generateTestTuple(GenericRecord record) { TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), + new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("record"); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java index f3fc8080541..d4a1c538c14 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -70,7 +70,7 @@ public class KafkaBolt extends BaseTickTupleAwareRichBolt { */ private boolean fireAndForget = false; /** - * {@see KafkaBolt#setAsync(boolean)} for more details on this. + * {@see KafkaBolt#setAsync(boolean)} for more details on this. */ private boolean async = true; @@ -116,7 +116,7 @@ public void prepare(Map topoConf, TopologyContext context, Outpu //for backward compatibility. if (mapper == null) { LOG.info("Mapper not specified. Setting default mapper to {}", FieldNameBasedTupleToKafkaMapper.class.getSimpleName()); - this.mapper = new FieldNameBasedTupleToKafkaMapper(); + this.mapper = new FieldNameBasedTupleToKafkaMapper<>(); } //for backward compatibility. @@ -169,9 +169,9 @@ private Callback createProducerCallback(final Tuple input) { @Override protected void process(final Tuple input) { - K key = null; - V message = null; - String topic = null; + K key; + V message; + String topic; try { key = mapper.getKeyFromTuple(input); message = mapper.getMessageFromTuple(input); @@ -238,10 +238,10 @@ public void setAsync(boolean async) { @Override public String toString() { - return "KafkaBolt: {mapper: " + mapper + return "KafkaBolt: {mapper: " + mapper + " topicSelector: " + topicSelector - + " fireAndForget: " + fireAndForget - + " async: " + async - + " proerties: " + boltSpecifiedProperties; + + " fireAndForget: " + fireAndForget + + " async: " + async + + " properties: " + boltSpecifiedProperties; } } diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java index 8322a8a86c0..547898eb4ef 100644 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java +++ b/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java @@ -38,7 +38,7 @@ public interface ModelOutputs extends Serializable { /** * Convenience method that returns a set with all the streams declared by the {@link PMMLPredictorBolt}. - * By default this this method calls {@link #streamFields()}{@code .keySet()}. + * By default, this method calls {@link #streamFields()}{@code .keySet()}. * @return The set with all declared streams */ default Set streams() { diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java index 1a083a2aa97..d48b0e11169 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index 50570e11d06..7f75072d788 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -56,10 +56,10 @@ private FluxParser() { * * @param inputFile source YAML file * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution - * @return resulting topologuy definition + * @param envSub whether to perform environment variable substitution + * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes, @@ -72,14 +72,14 @@ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean } /** - * Parse a flux topology definition from a classpath resource.. + * Parse a flux topology definition from a classpath resource. * * @param resource YAML resource * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution - * @return resulting topologuy definition + * @param envSub whether to perform environment variable substitution + * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes, @@ -96,9 +96,9 @@ public static TopologyDef parseResource(String resource, boolean dumpYaml, boole * * @param inputStream InputStream representation of YAML file * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param envSub whether to perform environment variable substitution * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ @@ -118,7 +118,7 @@ public static TopologyDef parseInputStream(InputStream inputStream, boolean dump } if (processIncludes) { - return processIncludes(yaml, topology, properties, envSub); + return processIncludes(topology, properties, envSub); } else { return topology; } @@ -128,7 +128,7 @@ public static TopologyDef parseInputStream(InputStream inputStream, boolean dump * Parse filter properties file. * * @param propertiesFile properties file for variable substitution - * @param resource whether or not to load properties file from classpath + * @param resource whether to load properties file from classpath * @return resulting filter properties * @throws IOException if there is a problem reading file */ @@ -137,7 +137,7 @@ public static Properties parseProperties(String propertiesFile, boolean resource if (propertiesFile != null) { properties = new Properties(); - InputStream in = null; + InputStream in; if (resource) { in = FluxParser.class.getResourceAsStream(propertiesFile); } else { @@ -165,7 +165,7 @@ private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties proper : line; }).collect(Collectors.joining(System.lineSeparator())); - return (TopologyDef) yaml.load(conf); + return yaml.load(conf); } } @@ -203,25 +203,23 @@ private static Yaml yaml() { Constructor constructor = new Constructor(TopologyDef.class); constructor.addTypeDescription(topologyDescription); - Yaml yaml = new Yaml(constructor); - return yaml; + return new Yaml(constructor); } /** * Process includes contained within a yaml file. * - * @param yaml the yaml parser for parsing the include file(s) * @param topologyDef the topology definition containing (possibly zero) includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param envSub whether to perform environment variable substitution * @return The TopologyDef with includes resolved. */ - private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub) + private static TopologyDef processIncludes(TopologyDef topologyDef, Properties properties, boolean envSub) throws IOException { //TODO support multiple levels of includes if (topologyDef.getIncludes() != null) { for (IncludeDef include : topologyDef.getIncludes()) { - TopologyDef includeTopologyDef = null; + TopologyDef includeTopologyDef; if (include.isResource()) { LOG.info("Loading includes from resource: {}", include.getFile()); includeTopologyDef = parseResource(include.getFile(), true, false, properties, envSub); diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java index 1a67f9323bf..58bcc7cbaef 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -149,7 +149,7 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec * Updates the blob data. * * @param key Key for the blob - * @param who Is the subject having the write privilege for the blob + * @param who Is the subject with write privilege for the blob * @return AtomicOutputStream returns a stream into which the data can be written */ public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; @@ -185,7 +185,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException; /** - * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi. + * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi). */ public abstract void setLeaderElector(ILeaderElector leaderElector); @@ -195,7 +195,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio * * @param key Key for the blob * @param meta Metadata which contains the updated acls information - * @param who Is the subject having the write privilege for the blob + * @param who Is the subject with write privilege for the blob */ public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException; @@ -217,7 +217,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; /** - * Returns an iterator with all the list of keys currently available on the blob store. + * Returns an iterator with all the list of keys currently available in the blob store. * * @return {@code Iterator} */ @@ -308,7 +308,7 @@ public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundE /** * Get IDs stored in blob store. - * @return a set of all of the topology ids with special data stored in the blob store. + * @return a set of all topology ids with special data stored in the blob store. */ public Set storedTopoIds() { return filterAndListKeys(TO_TOPO_ID); @@ -324,7 +324,7 @@ public void updateLastBlobUpdateTime() throws IOException { } /** - * Validates that the blob update time of the blobstore is up to date with the current existing blobs. + * Validates that the blob update time of the blobstore is up-to-date with the current existing blobs. * * @throws IOException on any error */ diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java index 0e4acdc3a0a..9d2ffc15862 100644 --- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java +++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) * under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -60,7 +60,7 @@ public interface AdminCommand { void run(String [] args, Map conf, String command) throws Exception; /** - * Print a help message to out. typically this should be in the form of. + * Print a help message to stdout. Typically, this should be in the form of. * command arguments: * description of command * argument - description @@ -75,7 +75,7 @@ public void run(String[] args, Map conf, String command) throws IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); - Set blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(key -> ConfigUtils.getIdFromBlobKey(key)); + Set blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(ConfigUtils::getIdFromBlobKey); Set activeTopologyIds = new HashSet<>(stormClusterState.activeStorms()); Sets.SetView diffTopology = Sets.difference(activeTopologyIds, blobStoreTopologyIds); LOG.info("active-topology-ids [{}] blob-topology-ids [{}] diff-topology [{}]", @@ -119,7 +119,7 @@ public void printCliHelp(String command, PrintStream out) { /** * Print value in a human readable format. * @param value what to print. - * @return a human readable string + * @return a human-readable string */ public static String prettyPrint(TBase value) { StringBuilder builder = new StringBuilder(); diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java index a8f519d6453..7852f027070 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. * See the NOTICE file distributed with this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); @@ -75,14 +75,13 @@ * 2. The USER sets the ACLs, and the blob access is validated against these ACLs. * 3. The SUPERVISOR interacts with nimbus through the NimbusBlobStore Client API to download the blobs. * The supervisors principal should match the set of users configured into SUPERVISOR_ADMINS. - * Here, the PrincipalToLocalPlugin takes care of mapping the principal to user name before the ACL validation. + * Here, the PrincipalToLocalPlugin takes care of mapping the principal to username before the ACL validation. */ public class LocalFsBlobStore extends BlobStore { public static final Logger LOG = LoggerFactory.getLogger(LocalFsBlobStore.class); private static final String DATA_PREFIX = "data_"; private static final String META_PREFIX = "meta_"; private static final String BLOBSTORE_SUBTREE = "/blobstore/"; - private final int allPermissions = READ | WRITE | ADMIN; protected BlobStoreAclHandler aclHandler; private NimbusInfo nimbusInfo; private FileBlobStoreImpl fbs; @@ -208,6 +207,7 @@ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject KeyAlreadyExistsException { LOG.debug("Creating Blob for key {}", key); validateKey(key); + int allPermissions = READ | WRITE | ADMIN; aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions); BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key); @@ -222,9 +222,7 @@ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject outputStream = null; this.stormClusterState.setupBlob(key, this.nimbusInfo, getVersionForKey(key, this.nimbusInfo, zkClient)); return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX + key, true)); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (KeyNotFoundException e) { + } catch (IOException | KeyNotFoundException e) { throw new RuntimeException(e); } finally { if (outputStream != null) { @@ -299,7 +297,7 @@ public ReadableBlobMeta getBlobMeta(String key, Subject who) throws Authorizatio } /** - * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi. + * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi). */ @Override public void setLeaderElector(ILeaderElector leaderElector) { @@ -412,7 +410,7 @@ public void shutdown() { zkClient.close(); } if (timer != null) { - timer.cancel();; + timer.cancel(); } stormClusterState.disconnect(); } @@ -436,9 +434,10 @@ public int getBlobReplication(String key, Subject who) throws Exception { } @Override - public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException { - throw new UnsupportedOperationException("For local file system blob store the update blobs function does not work. " - + "Please use HDFS blob store to make this feature available."); + public int updateBlobReplication(String key, int replication, Subject who) { + throw new UnsupportedOperationException( + "For local file system blob store the update blobs function does not work. " + + "Please use HDFS blob store to make this feature available."); } //This additional check and download is for nimbus high availability in case you have more than one nimbus