diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java index 1a4d5eb9ab7..e3e60e7510f 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.cli.CommandLine; @@ -35,7 +36,6 @@ import org.apache.storm.Config; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.BoltStats; -import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.ComponentCommon; import org.apache.storm.generated.ExecutorSummary; import org.apache.storm.generated.GlobalStreamId; @@ -75,7 +75,7 @@ private static List extractBoltValues(List summaries, if (data != null) { List subvalues = data.values().stream() .map((subMap) -> subMap.get(id)) - .filter((value) -> value != null) + .filter(Objects::nonNull) .collect(Collectors.toList()); ret.addAll(subvalues); } @@ -104,7 +104,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top LOG.info("with config {}: {}", key, o); } } - //Lets use the number of actually scheduled workers as a way to bridge RAS and non-RAS + //Let's use the number of actually scheduled workers as a way to bridge RAS and non-RAS int numWorkers = tpinfo.get_num_workers(); if (savedTopoConf.containsKey(Config.TOPOLOGY_WORKERS)) { numWorkers = Math.max(numWorkers, ((Number) savedTopoConf.get(Config.TOPOLOGY_WORKERS)).intValue()); @@ -154,20 +154,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top } Map> boltResources = getBoltsResources(topo, topoConf); - for (Map.Entry> entry: boltResources.entrySet()) { - LoadCompConf.Builder bd = boltBuilders.get(entry.getKey()); - if (bd != null) { - Map resources = entry.getValue(); - Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); - if (cpu != null) { - bd.withCpuLoad(cpu); - } - Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); - if (mem != null) { - bd.withMemoryLoad(mem); - } - } - } + addCpuMemToBuilders(boltBuilders, boltResources); } //Spouts @@ -195,31 +182,14 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top } Map> spoutResources = getSpoutsResources(topo, topoConf); - for (Map.Entry> entry: spoutResources.entrySet()) { - LoadCompConf.Builder sd = spoutBuilders.get(entry.getKey()); - if (sd != null) { - Map resources = entry.getValue(); - Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); - if (cpu != null) { - sd.withCpuLoad(cpu); - } - Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); - if (mem != null) { - sd.withMemoryLoad(mem); - } - } - } + addCpuMemToBuilders(spoutBuilders, spoutResources); } //Stats... Map> byComponent = new HashMap<>(); for (ExecutorSummary executor: info.get_executors()) { String component = executor.get_component_id(); - List list = byComponent.get(component); - if (list == null) { - list = new ArrayList<>(); - byComponent.put(component, list); - } + List list = byComponent.computeIfAbsent(component, k -> new ArrayList<>()); list.add(executor); } @@ -238,7 +208,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top } //There is a bug in some versions that returns 0 for the uptime. - // To work around it we should get it an alternative (working) way. + // To work around it, we should get it an alternative (working) way. Map workerToUptime = new HashMap<>(); for (WorkerSummary ws : tpinfo.get_workers()) { workerToUptime.put(ws.get_supervisor_id() + ":" + ws.get_port(), ws.get_uptime_secs()); @@ -265,7 +235,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top String timeWindow = statEntry.getKey(); long timeSecs = uptime; try { - timeSecs = Long.valueOf(timeWindow); + timeSecs = Long.parseLong(timeWindow); } catch (NumberFormatException e) { //Ignored... } @@ -291,22 +261,39 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top } List spouts = spoutBuilders.values().stream() - .map((b) -> b.build()) + .map(LoadCompConf.Builder::build) .collect(Collectors.toList()); List bolts = boltBuilders.values().stream() - .map((b) -> b.build()) + .map(LoadCompConf.Builder::build) .collect(Collectors.toList()); return new TopologyLoadConf(topologyName, savedTopoConf, spouts, bolts, streams); } + private static void addCpuMemToBuilders(Map boltBuilders, + Map> boltResources) { + for (Map.Entry> entry: boltResources.entrySet()) { + LoadCompConf.Builder bd = boltBuilders.get(entry.getKey()); + if (bd != null) { + Map resources = entry.getValue(); + Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); + if (cpu != null) { + bd.withCpuLoad(cpu); + } + Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); + if (mem != null) { + bd.withMemoryLoad(mem); + } + } + } + } + /** * Main entry point for CaptureLoad command. * @param args the arguments to the command - * @throws Exception on any error */ - public static void main(String[] args) throws Exception { + public static void main(String[] args) { Options options = new Options(); options.addOption(Option.builder("a") .longOpt("anonymize") @@ -368,8 +355,8 @@ public static void main(String[] args) throws Exception { } } - //ResourceUtils.java is not a available on the classpath to let us parse out the resources we want. - // So we have copied and pasted some of the needed methods here. (with a few changes to logging) + //ResourceUtils.java is not an available on the classpath to let us parse out the resources we want. + // So we have copied and pasted some needed methods here. (with a few changes to logging) static Map> getBoltsResources(StormTopology topology, Map topologyConf) { Map> boltResources = new HashMap<>(); if (topology.get_bolts() != null) { @@ -420,7 +407,7 @@ static Map parseResources(String input) { LOG.debug("Topology Resources {}", topologyResources); } } catch (org.json.simple.parser.ParseException e) { - LOG.error("Failed to parse component resources is:" + e.toString(), e); + LOG.error("Failed to parse component resources is:" + e, e); return null; } return topologyResources; @@ -429,10 +416,9 @@ static Map parseResources(String input) { /** * Checks if the topology's resource requirements are initialized. * Will modify topologyResources by adding the appropriate defaults - * @param topologyResources map of resouces requirements + * @param topologyResources map of resources requirements * @param componentId component for which initialization is being conducted * @param topologyConf topology configuration - * @throws Exception on any error */ public static void checkInitialization(Map topologyResources, String componentId, Map topologyConf) { StringBuilder msgBuilder = new StringBuilder(); diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java index 2cbee19e340..86f8516d62a 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java @@ -18,25 +18,22 @@ package org.apache.storm.perf.queuetest; -import java.util.Collections; -import org.apache.storm.metrics2.StormMetricRegistry; -import org.apache.storm.policy.WaitStrategyPark; -import org.apache.storm.utils.JCQueue; +//import java.util.Collections; +//import org.apache.storm.metrics2.StormMetricRegistry; +//import org.apache.storm.policy.WaitStrategyPark; +//import org.apache.storm.utils.JCQueue; @SuppressWarnings("checkstyle:AbbreviationAsWordInName") public class JCQueuePerfTest { // Usage: Let it and then explicitly terminate. // Metrics will be printed when application is terminated. public static void main(String[] args) throws Exception { - // oneProducer1Consumer(1000); // -- measurement 1 - // twoProducer1Consumer(1000); // -- measurement 2 - // threeProducer1Consumer(1); // -- measurement 3 - - // oneProducer2Consumers(); // -- measurement 4 - - // producerFwdConsumer(); // -- measurement 5 - - // ackingProducerSimulation(); // -- measurement 6 + // oneProducer1Consumer(1000); // -- measurement 1 + // twoProducer1Consumer(1000); // -- measurement 2 + // threeProducer1Consumer(1); // -- measurement 3 + // oneProducer2Consumers(); // -- measurement 4 + // producerFwdConsumer(); // -- measurement 5 + // ackingProducerSimulation(); // -- measurement 6 while (true) { Thread.sleep(1000); @@ -44,108 +41,108 @@ public static void main(String[] args) throws Exception { } - private static void ackingProducerSimulation() { - WaitStrategyPark ws = new WaitStrategyPark(100); - StormMetricRegistry registry = new StormMetricRegistry(); - JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry); - JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry); - - final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); - final Acker acker = new Acker(ackQ, spoutQ); - - runAllThds(ackingProducer, acker); - } - - private static void producerFwdConsumer(int prodBatchSz) { - WaitStrategyPark ws = new WaitStrategyPark(100); - StormMetricRegistry registry = new StormMetricRegistry(); - JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", - Collections.singletonList(1000), 1000, registry); - JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry); - - final Producer prod = new Producer(q1); - final Forwarder fwd = new Forwarder(q1, q2); - final Consumer cons = new Consumer(q2); - - runAllThds(prod, fwd, cons); - } - - - private static void oneProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", - Collections.singletonList(1000), 1000, new StormMetricRegistry()); - - final Producer prod1 = new Producer(q1); - final Consumer cons1 = new Consumer(q1); - - runAllThds(prod1, cons1); - } - - private static void twoProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", - Collections.singletonList(1000), 1000, new StormMetricRegistry()); - - final Producer prod1 = new Producer(q1); - final Producer prod2 = new Producer(q1); - final Consumer cons1 = new Consumer(q1); - - runAllThds(prod1, prod2, cons1); - } - - private static void threeProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", - Collections.singletonList(1000), 1000, new StormMetricRegistry()); - - final Producer prod1 = new Producer(q1); - final Producer prod2 = new Producer(q1); - final Producer prod3 = new Producer(q1); - final Consumer cons1 = new Consumer(q1); - - runAllThds(prod1, prod2, prod3, cons1); - } - - - private static void oneProducer2Consumers(int prodBatchSz) { - WaitStrategyPark ws = new WaitStrategyPark(100); - StormMetricRegistry registry = new StormMetricRegistry(); - JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry); - JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry); - - final Producer2 prod1 = new Producer2(q1, q2); - final Consumer cons1 = new Consumer(q1); - final Consumer cons2 = new Consumer(q2); - - runAllThds(prod1, cons1, cons2); - } - - public static void runAllThds(MyThread... threads) { - for (Thread thread : threads) { - thread.start(); - } - addShutdownHooks(threads); - } - - public static void addShutdownHooks(MyThread... threads) { - - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - System.err.println("Stopping"); - for (Thread thread : threads) { - thread.interrupt(); - } - - for (Thread thread : threads) { - System.err.println("Waiting for " + thread.getName()); - thread.join(); - } - - for (MyThread thread : threads) { - System.err.printf("%s : %d, Throughput: %,d \n", thread.getName(), thread.count, thread.throughput()); - } - } catch (InterruptedException e) { - return; - } - })); - - } + //private static void ackingProducerSimulation() { + // WaitStrategyPark ws = new WaitStrategyPark(100); + // StormMetricRegistry registry = new StormMetricRegistry(); + // JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", + // Collections.singletonList(1000), 1000, registry); + // JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry); + // + // final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); + // final Acker acker = new Acker(ackQ, spoutQ); + // + // runAllThds(ackingProducer, acker); + //} + + //private static void producerFwdConsumer(int prodBatchSz) { + // WaitStrategyPark ws = new WaitStrategyPark(100); + // StormMetricRegistry registry = new StormMetricRegistry(); + // JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", + // Collections.singletonList(1000), 1000, registry); + // JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry); + // + // final Producer prod = new Producer(q1); + // final Forwarder fwd = new Forwarder(q1, q2); + // final Consumer cons = new Consumer(q2); + // + // runAllThds(prod, fwd, cons); + //} + + + //private static void oneProducer1Consumer(int prodBatchSz) { + // JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", + // Collections.singletonList(1000), 1000, new StormMetricRegistry()); + // + // final Producer prod1 = new Producer(q1); + // final Consumer cons1 = new Consumer(q1); + // + // runAllThds(prod1, cons1); + //} + + //private static void twoProducer1Consumer(int prodBatchSz) { + // JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", + // Collections.singletonList(1000), 1000, new StormMetricRegistry()); + // + // final Producer prod1 = new Producer(q1); + // final Producer prod2 = new Producer(q1); + // final Consumer cons1 = new Consumer(q1); + // + // runAllThds(prod1, prod2, cons1); + //} + + //private static void threeProducer1Consumer(int prodBatchSz) { + // JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", + // Collections.singletonList(1000), 1000, new StormMetricRegistry()); + // + // final Producer prod1 = new Producer(q1); + // final Producer prod2 = new Producer(q1); + // final Producer prod3 = new Producer(q1); + // final Consumer cons1 = new Consumer(q1); + // + // runAllThds(prod1, prod2, prod3, cons1); + //} + + + //private static void oneProducer2Consumers(int prodBatchSz) { + // WaitStrategyPark ws = new WaitStrategyPark(100); + // StormMetricRegistry registry = new StormMetricRegistry(); + // JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry); + // JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry); + // + // final Producer2 prod1 = new Producer2(q1, q2); + // final Consumer cons1 = new Consumer(q1); + // final Consumer cons2 = new Consumer(q2); + // + // runAllThds(prod1, cons1, cons2); + //} + + //public static void runAllThds(MyThread... threads) { + // for (Thread thread : threads) { + // thread.start(); + // } + // addShutdownHooks(threads); + //} + + //public static void addShutdownHooks(MyThread... threads) { + // + // Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // try { + // System.err.println("Stopping"); + // for (Thread thread : threads) { + // thread.interrupt(); + // } + // + // for (Thread thread : threads) { + // System.err.println("Waiting for " + thread.getName()); + // thread.join(); + // } + // + // for (MyThread thread : threads) { + // System.err.printf("%s : %d, Throughput: %,d \n", thread.getName(), thread.count, thread.throughput()); + // } + // } catch (InterruptedException ignored) { + // } + // })); + // + //} } diff --git a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java index 95c8160f94c..a5df1f08393 100644 --- a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java +++ b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java @@ -65,7 +65,7 @@ public class JpmmlRunnerTestTopology { private String blobKey; // PMML Model downloaded from Blobstore - null if using File private String tplgyName = "test"; - public static void main(String[] args) throws Exception { + public static void main(String[] args) { try { JpmmlRunnerTestTopology testTopology = new JpmmlRunnerTestTopology(); testTopology.parseArgs(args); @@ -77,10 +77,10 @@ public static void main(String[] args) throws Exception { } private void parseArgs(String[] args) { - if (Arrays.stream(args).anyMatch(option -> option.equals("-h"))) { + if (Arrays.asList(args).contains("-h")) { printUsage(); - } else if (Arrays.stream(args).anyMatch(option -> option.equals("-f")) - && Arrays.stream(args).anyMatch(option -> option.equals("-b"))) { + } else if (Arrays.asList(args).contains("-f") + && Arrays.asList(args).contains("-b")) { System.out.println("Please specify only one option of [-b, -f]"); printUsage(); } else { @@ -116,12 +116,12 @@ private void parseArgs(String[] args) { private void setDefaults() { if (blobKey == null) { // blob key not specified, use file if (pmml == null) { - pmml = loadExample(pmml, PMML_MODEL_FILE); + pmml = loadExample(PMML_MODEL_FILE); } } if (rawInputs == null) { - rawInputs = loadExample(rawInputs, RAW_INPUTS_FILE); + rawInputs = loadExample(RAW_INPUTS_FILE); } if (tplgyName == null) { @@ -129,8 +129,12 @@ private void setDefaults() { } } - private File loadExample(File file, String example) { + private File loadExample(String example) { + File file; try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(example)) { + if (stream == null) { + throw new RuntimeException("Error loading example=" + example + ", stream is null"); + } file = File.createTempFile("pmml-example", ".tmp"); IOUtils.copy(stream, new FileOutputStream(file)); } catch (IOException e) { @@ -147,8 +151,8 @@ private static void printUsage() { } private void run() throws Exception { - System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]", - blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath())); + System.out.printf("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]%n", + blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath()); submitTopologyRemoteCluster(newTopology(), newConfig()); } @@ -171,7 +175,7 @@ private Config newConfig() { return config; } - private IRichBolt newBolt() throws Exception { + private IRichBolt newBolt() { final List streams = Lists.newArrayList(Utils.DEFAULT_STREAM_ID, NON_DEFAULT_STREAM_ID); if (blobKey != null) { // Load PMML Model from Blob store final ModelOutputs outFields = JpmmlModelOutputs.toStreams(blobKey, streams); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java index d5dec296896..7c800e3b69a 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -41,10 +41,10 @@ protected int run(String[] args) throws Exception { builder.setSpout("spout1", () -> UUID.randomUUID().toString()); builder.setBolt("bolt1", (tuple, collector) -> { - String[] parts = tuple.getStringByField("lambda").split("\\-"); + String[] parts = tuple.getStringByField("lambda").split("-"); collector.emit(new Values(prefix + parts[0] + suffix, tag)); }, "strValue", "intValue").shuffleGrouping("spout1"); - builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1"); + builder.setBolt("bolt2", System.out::println).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(true); diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java index e7a3581637d..a05d0ac74e4 100644 --- a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java +++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,11 +23,8 @@ import java.util.Map; import javax.security.auth.Subject; -import javax.security.auth.login.LoginContext; import org.apache.storm.Config; -import org.apache.storm.blobstore.BlobStore; -import org.apache.storm.blobstore.LocalFsBlobStore; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyAlreadyExistsException; import org.apache.storm.generated.KeyNotFoundException; @@ -47,9 +44,8 @@ protected static void deleteAllBlobStoreKeys(BlobStore bs, Subject who) throws A } } - protected static void copyBlobStoreKeys(BlobStore bsFrom, - Subject whoFrom, - BlobStore bsTo, Subject whoTo) throws AuthorizationException, + protected static void copyBlobStoreKeys(BlobStore bsFrom, Subject whoFrom, BlobStore bsTo, Subject whoTo) + throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException { @@ -63,13 +59,12 @@ protected static void copyBlobStoreKeys(BlobStore bsFrom, System.out.println("DONE CREATING BLOB " + key); } } - - + public static void main(String[] args) throws Exception { Map hdfsConf = Utils.readStormConfig(); if (args.length < 2) { - System.out.println("Need at least 2 arguments, but have " + Integer.toString(args.length)); + System.out.println("Need at least 2 arguments, but have " + args.length); System.out.println("migrate "); System.out.println("Migrates blobs from LocalFsBlobStore to HdfsBlobStore"); System.out.println("Example: migrate '/srv/storm' " @@ -104,8 +99,7 @@ public static void main(String[] args) throws Exception { HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(); hdfsBlobStore.prepare(hdfsConf, null, null, null); - - + /* LOOK AT LOCAL BLOBSTORE */ System.out.println("Listing local blobstore keys."); MigratorMain.listBlobStoreKeys(lfsBlobStore, null); diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java index d7961d85874..eae4766b18e 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -60,29 +60,29 @@ public ObjectMapperCqlStatementMapper build() { codecs, udtClasses); } - public ObjectMapperCqlStatementMapperBuilder withCodecs(List>> codecProducer) { - this.codecProducers.addAll(codecProducer); - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withUdtCodecs(List> udtClass) { - this.udtClasses.addAll(udtClass); - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withTimestampField(String timestampField) { - this.timestampField = timestampField; - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withTtlField(String ttlField) { - this.ttlField = ttlField; - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withConsistencyLevelField(String consistencyLevelField) { - this.consistencyLevelField = consistencyLevelField; - return this; - } + //public ObjectMapperCqlStatementMapperBuilder withCodecs(List>> codecProducer) { + // this.codecProducers.addAll(codecProducer); + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withUdtCodecs(List> udtClass) { + // this.udtClasses.addAll(udtClass); + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withTimestampField(String timestampField) { + // this.timestampField = timestampField; + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withTtlField(String ttlField) { + // this.ttlField = ttlField; + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withConsistencyLevelField(String consistencyLevelField) { + // this.consistencyLevelField = consistencyLevelField; + // return this; + //} } diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java index b2e2df91ffe..8709037f4ec 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,18 +18,17 @@ package org.apache.storm.elasticsearch.common; import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.apache.http.HttpHost; +import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.testing.NullPointerTester; -import org.junit.jupiter.api.Assertions; +import org.apache.http.HttpHost; import org.junit.jupiter.api.Test; public class EsConfigTest { @Test public void urlsCannotBeEmpty() { - Assertions.assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {})); + assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {})); } @Test diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 4c1e0684333..f1b2c1e661e 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - *******************************************************************************/ + */ package org.apache.storm.eventhubs.bolt; @@ -38,23 +38,22 @@ */ public class EventHubBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(EventHubBolt.class); + private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class); protected OutputCollector collector; protected PartitionSender sender; protected EventHubClient ehClient; protected EventHubBoltConfig boltConfig; - public EventHubBolt(String connectionString, String entityPath) { - boltConfig = new EventHubBoltConfig(connectionString, entityPath); - } - - public EventHubBolt(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - boltConfig = new EventHubBoltConfig(userName, password, namespace, - entityPath, partitionMode); - } + //public EventHubBolt(String connectionString, String entityPath) { + // boltConfig = new EventHubBoltConfig(connectionString, entityPath); + //} + // + //public EventHubBolt(String userName, String password, String namespace, + // String entityPath, boolean partitionMode) { + // boltConfig = new EventHubBoltConfig(userName, password, namespace, + // entityPath, partitionMode); + //} public EventHubBolt(EventHubBoltConfig config) { boltConfig = config; @@ -97,12 +96,9 @@ public void execute(Tuple tuple) { throw new EventHubException("ehclient is null"); } collector.ack(tuple); - } catch (EventHubException ex) { + } catch (EventHubException | ServiceBusException ex) { collector.reportError(ex); collector.fail(tuple); - } catch (ServiceBusException e) { - collector.reportError(e); - collector.fail(tuple); } } @@ -113,17 +109,15 @@ public void cleanup() { sender.close().whenComplete((voidargs, error) -> { try { if (error != null) { - logger.error("Exception during sender cleanup phase" + error.toString()); + logger.error("Exception during sender cleanup phase" + error); } ehClient.closeSync(); } catch (Exception e) { - logger.error("Exception during ehclient cleanup phase" + e.toString()); + logger.error("Exception during ehclient cleanup phase" + e); } }).get(); - } catch (InterruptedException e) { - logger.error("Exception occured during cleanup phase" + e.toString()); - } catch (ExecutionException e) { - logger.error("Exception occured during cleanup phase" + e.toString()); + } catch (InterruptedException | ExecutionException e) { + logger.error("Exception occurred during cleanup phase" + e); } logger.info("Eventhub Bolt cleaned up"); sender = null; diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java index bcd31e47f7e..82cd95b4dcb 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java @@ -69,8 +69,6 @@ public class AvroGenericRecordBoltTest { + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; - private static Schema schema1; - private static Schema schema2; private static Tuple tuple1; private static Tuple tuple2; @@ -94,10 +92,10 @@ public class AvroGenericRecordBoltTest { @BeforeAll public static void setupClass() { Schema.Parser parser = new Schema.Parser(); - schema1 = parser.parse(schemaV1); + Schema schema1 = parser.parse(schemaV1); parser = new Schema.Parser(); - schema2 = parser.parse(schemaV2); + Schema schema2 = parser.parse(schemaV2); GenericRecordBuilder builder1 = new GenericRecordBuilder(schema1); builder1.set("foo1", "bar1"); @@ -113,7 +111,8 @@ public static void setupClass() { private static Tuple generateTestTuple(GenericRecord record) { TopologyBuilder builder = new TopologyBuilder(); GeneralTopologyContext topologyContext = - new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap<>(), + new GeneralTopologyContext(builder.createTopology(), new Config(), + new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java index f3fc8080541..d4a1c538c14 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -70,7 +70,7 @@ public class KafkaBolt extends BaseTickTupleAwareRichBolt { */ private boolean fireAndForget = false; /** - * {@see KafkaBolt#setAsync(boolean)} for more details on this. + * {@see KafkaBolt#setAsync(boolean)} for more details on this. */ private boolean async = true; @@ -116,7 +116,7 @@ public void prepare(Map topoConf, TopologyContext context, Outpu //for backward compatibility. if (mapper == null) { LOG.info("Mapper not specified. Setting default mapper to {}", FieldNameBasedTupleToKafkaMapper.class.getSimpleName()); - this.mapper = new FieldNameBasedTupleToKafkaMapper(); + this.mapper = new FieldNameBasedTupleToKafkaMapper<>(); } //for backward compatibility. @@ -169,9 +169,9 @@ private Callback createProducerCallback(final Tuple input) { @Override protected void process(final Tuple input) { - K key = null; - V message = null; - String topic = null; + K key; + V message; + String topic; try { key = mapper.getKeyFromTuple(input); message = mapper.getMessageFromTuple(input); @@ -238,10 +238,10 @@ public void setAsync(boolean async) { @Override public String toString() { - return "KafkaBolt: {mapper: " + mapper + return "KafkaBolt: {mapper: " + mapper + " topicSelector: " + topicSelector - + " fireAndForget: " + fireAndForget - + " async: " + async - + " proerties: " + boltSpecifiedProperties; + + " fireAndForget: " + fireAndForget + + " async: " + async + + " properties: " + boltSpecifiedProperties; } } diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java index 8322a8a86c0..547898eb4ef 100644 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java +++ b/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java @@ -38,7 +38,7 @@ public interface ModelOutputs extends Serializable { /** * Convenience method that returns a set with all the streams declared by the {@link PMMLPredictorBolt}. - * By default this this method calls {@link #streamFields()}{@code .keySet()}. + * By default, this method calls {@link #streamFields()}{@code .keySet()}. * @return The set with all declared streams */ default Set streams() { diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java index 1a083a2aa97..d48b0e11169 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index e9adc29d95f..38495c646ba 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -57,10 +57,10 @@ private FluxParser() { * * @param inputFile source YAML file * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution - * @return resulting topologuy definition + * @param envSub whether to perform environment variable substitution + * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes, @@ -73,14 +73,14 @@ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean } /** - * Parse a flux topology definition from a classpath resource.. + * Parse a flux topology definition from a classpath resource. * * @param resource YAML resource * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution - * @return resulting topologuy definition + * @param envSub whether to perform environment variable substitution + * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes, @@ -97,9 +97,9 @@ public static TopologyDef parseResource(String resource, boolean dumpYaml, boole * * @param inputStream InputStream representation of YAML file * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param envSub whether to perform environment variable substitution * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ @@ -119,7 +119,7 @@ public static TopologyDef parseInputStream(InputStream inputStream, boolean dump } if (processIncludes) { - return processIncludes(yaml, topology, properties, envSub); + return processIncludes(topology, properties, envSub); } else { return topology; } @@ -129,7 +129,7 @@ public static TopologyDef parseInputStream(InputStream inputStream, boolean dump * Parse filter properties file. * * @param propertiesFile properties file for variable substitution - * @param resource whether or not to load properties file from classpath + * @param resource whether to load properties file from classpath * @return resulting filter properties * @throws IOException if there is a problem reading file */ @@ -138,7 +138,7 @@ public static Properties parseProperties(String propertiesFile, boolean resource if (propertiesFile != null) { properties = new Properties(); - InputStream in = null; + InputStream in; if (resource) { in = FluxParser.class.getResourceAsStream(propertiesFile); } else { @@ -166,7 +166,7 @@ private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties proper : line; }).collect(Collectors.joining(System.lineSeparator())); - return (TopologyDef) yaml.load(conf); + return yaml.load(conf); } } @@ -204,25 +204,23 @@ private static Yaml yaml() { Constructor constructor = new Constructor(TopologyDef.class, new LoaderOptions()); constructor.addTypeDescription(topologyDescription); - Yaml yaml = new Yaml(constructor); - return yaml; + return new Yaml(constructor); } /** * Process includes contained within a yaml file. * - * @param yaml the yaml parser for parsing the include file(s) * @param topologyDef the topology definition containing (possibly zero) includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param envSub whether to perform environment variable substitution * @return The TopologyDef with includes resolved. */ - private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub) + private static TopologyDef processIncludes(TopologyDef topologyDef, Properties properties, boolean envSub) throws IOException { //TODO support multiple levels of includes if (topologyDef.getIncludes() != null) { for (IncludeDef include : topologyDef.getIncludes()) { - TopologyDef includeTopologyDef = null; + TopologyDef includeTopologyDef; if (include.isResource()) { LOG.info("Loading includes from resource: {}", include.getFile()); includeTopologyDef = parseResource(include.getFile(), true, false, properties, envSub); diff --git a/pom.xml b/pom.xml index fd84ad801e0..4a33370d453 100644 --- a/pom.xml +++ b/pom.xml @@ -1285,6 +1285,7 @@ org/apache/storm/generated/** org/apache/storm/sql/parser/impl/** + 1.8 diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java index 5f415843f98..58bcc7cbaef 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -21,7 +21,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; -import java.util.regex.Pattern; import javax.security.auth.Subject; import org.apache.storm.daemon.Shutdownable; import org.apache.storm.generated.AuthorizationException; @@ -33,8 +32,6 @@ import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Provides a way to store blobs that can be downloaded. Blobs must be able to be uploaded and listed from Nimbus, and @@ -51,15 +48,14 @@ */ public abstract class BlobStore implements Shutdownable, AutoCloseable { protected static final String BASE_BLOBS_DIR_NAME = "blobs"; - private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class); - private static final KeyFilter TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key); + private static final KeyFilter TO_TOPO_ID = ConfigUtils::getIdFromBlobKey; /** * Validates key checking for potentially harmful patterns. * * @param key Key for the blob */ - public static final void validateKey(String key) throws IllegalArgumentException { + public static void validateKey(String key) throws IllegalArgumentException { if (!Utils.isValidKey(key)) { throw new IllegalArgumentException(key + " does not appear to be a valid blob key"); } @@ -131,7 +127,7 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec try { out = createBlob(key, meta, who); byte[] buffer = new byte[2048]; - int len = 0; + int len; while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } @@ -153,7 +149,7 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec * Updates the blob data. * * @param key Key for the blob - * @param who Is the subject having the write privilege for the blob + * @param who Is the subject with write privilege for the blob * @return AtomicOutputStream returns a stream into which the data can be written */ public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; @@ -189,7 +185,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException; /** - * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi. + * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi). */ public abstract void setLeaderElector(ILeaderElector leaderElector); @@ -199,7 +195,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio * * @param key Key for the blob * @param meta Metadata which contains the updated acls information - * @param who Is the subject having the write privilege for the blob + * @param who Is the subject with write privilege for the blob */ public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException; @@ -221,7 +217,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; /** - * Returns an iterator with all the list of keys currently available on the blob store. + * Returns an iterator with all the list of keys currently available in the blob store. * * @return {@code Iterator} */ @@ -260,7 +256,7 @@ public void close() { * @return Set of filtered keys */ public Set filterAndListKeys(KeyFilter filter) { - Set ret = new HashSet(); + Set ret = new HashSet<>(); Iterator keys = listKeys(); while (keys.hasNext()) { String key = keys.next(); @@ -285,7 +281,7 @@ public void readBlobTo(String key, OutputStream out, Subject who) throws IOExcep throw new IOException("Could not find " + key); } byte[] buffer = new byte[2048]; - int len = 0; + int len; try { while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); @@ -312,7 +308,7 @@ public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundE /** * Get IDs stored in blob store. - * @return a set of all of the topology ids with special data stored in the blob store. + * @return a set of all topology ids with special data stored in the blob store. */ public Set storedTopoIds() { return filterAndListKeys(TO_TOPO_ID); @@ -328,7 +324,7 @@ public void updateLastBlobUpdateTime() throws IOException { } /** - * Validates that the blob update time of the blobstore is up to date with the current existing blobs. + * Validates that the blob update time of the blobstore is up-to-date with the current existing blobs. * * @throws IOException on any error */ @@ -340,9 +336,9 @@ public void validateBlobUpdateTime() throws IOException { * Blob store implements its own version of iterator to list the blobs. */ public static class KeyTranslationIterator implements Iterator { - private Iterator it = null; + private final Iterator it; private String next = null; - private String prefix = null; + private final String prefix; public KeyTranslationIterator(Iterator it, String prefix) throws IOException { this.it = it; @@ -385,9 +381,9 @@ public void remove() { /** * Output stream implementation used for reading the metadata and data information. */ - protected class BlobStoreFileOutputStream extends AtomicOutputStream { - private BlobStoreFile part; - private OutputStream out; + protected static class BlobStoreFileOutputStream extends AtomicOutputStream { + private final BlobStoreFile part; + private final OutputStream out; public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException { this.part = part; @@ -434,9 +430,9 @@ public void write(byte[] b, int offset, int len) throws IOException { /** * Input stream implementation used for writing both the metadata containing the acl information and the blob data. */ - protected class BlobStoreFileInputStream extends InputStreamWithMeta { - private BlobStoreFile part; - private InputStream in; + protected static class BlobStoreFileInputStream extends InputStreamWithMeta { + private final BlobStoreFile part; + private final InputStream in; public BlobStoreFileInputStream(BlobStoreFile part) throws IOException { this.part = part; diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 742e2f91560..9ccf076ec1d 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -14,17 +14,16 @@ import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.storm.assignments.ILocalAssignmentsBackend; -import org.apache.storm.callback.ZKStateChangedCallback; import org.apache.storm.generated.Assignment; import org.apache.storm.generated.ClusterWorkerHeartbeat; import org.apache.storm.generated.Credentials; @@ -44,7 +43,6 @@ import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionState; import org.apache.storm.shade.org.apache.zookeeper.KeeperException; -import org.apache.storm.shade.org.apache.zookeeper.Watcher; import org.apache.storm.shade.org.apache.zookeeper.data.ACL; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; @@ -53,34 +51,34 @@ public class StormClusterStateImpl implements IStormClusterState { - private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class); private final List defaultAcls; private final String stateId; private final boolean shouldCloseStateStorageOnDisconnect; private final ClusterStateContext context; - private IStateStorage stateStorage; - private ILocalAssignmentsBackend assignmentsBackend; - private ConcurrentHashMap assignmentInfoCallback; - private ConcurrentHashMap assignmentInfoWithVersionCallback; - private ConcurrentHashMap assignmentVersionCallback; - private AtomicReference supervisorsCallback; + private final IStateStorage stateStorage; + private final ILocalAssignmentsBackend assignmentsBackend; + private final ConcurrentHashMap assignmentInfoCallback; + private final ConcurrentHashMap assignmentInfoWithVersionCallback; + private final ConcurrentHashMap assignmentVersionCallback; + private final AtomicReference supervisorsCallback; // we want to register a topo directory getChildren callback for all workers of this dir - private ConcurrentHashMap backPressureCallback; - private AtomicReference leaderInfoCallback; - private AtomicReference assignmentsCallback; - private ConcurrentHashMap stormBaseCallback; - private AtomicReference blobstoreCallback; - private ConcurrentHashMap credentialsCallback; - private ConcurrentHashMap logConfigCallback; - - public StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend, + private final ConcurrentHashMap backPressureCallback; + private final AtomicReference leaderInfoCallback; + private final AtomicReference assignmentsCallback; + private final ConcurrentHashMap stormBaseCallback; + private final AtomicReference blobstoreCallback; + private final ConcurrentHashMap credentialsCallback; + private final ConcurrentHashMap logConfigCallback; + + public StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsBackend, ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception { this.stateStorage = stateStorage; this.shouldCloseStateStorageOnDisconnect = shouldCloseStateStorageOnDisconnect; this.defaultAcls = context.getDefaultZkAcls(); this.context = context; - this.assignmentsBackend = assignmentsassignmentsBackend; + this.assignmentsBackend = assignmentsBackend; assignmentInfoCallback = new ConcurrentHashMap<>(); assignmentInfoWithVersionCallback = new ConcurrentHashMap<>(); assignmentVersionCallback = new ConcurrentHashMap<>(); @@ -93,48 +91,41 @@ public StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBacken logConfigCallback = new ConcurrentHashMap<>(); blobstoreCallback = new AtomicReference<>(); - stateId = this.stateStorage.register(new ZKStateChangedCallback() { - - @Override - public void changed(Watcher.Event.EventType type, String path) { - List toks = tokenizePath(path); - int size = toks.size(); - if (size >= 1) { - String root = toks.get(0); - if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) { - if (size == 1) { - // set null and get the old value - issueCallback(assignmentsCallback); - } else { - issueMapCallback(assignmentInfoCallback, toks.get(1)); - issueMapCallback(assignmentVersionCallback, toks.get(1)); - issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1)); - } - - } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) { - issueCallback(supervisorsCallback); - } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) { - issueCallback(blobstoreCallback); - } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) { - issueMapCallback(stormBaseCallback, toks.get(1)); - } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) { - issueMapCallback(credentialsCallback, toks.get(1)); - } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) { - issueMapCallback(logConfigCallback, toks.get(1)); - } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) { - issueMapCallback(backPressureCallback, toks.get(1)); - } else if (root.equals(ClusterUtils.LEADERINFO_ROOT)) { - issueCallback(leaderInfoCallback); + stateId = this.stateStorage.register((type, path) -> { + List toks = tokenizePath(path); + int size = toks.size(); + if (size >= 1) { + String root = toks.get(0); + if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) { + if (size == 1) { + // set null and get the old value + issueCallback(assignmentsCallback); } else { - LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path); - Runtime.getRuntime().exit(30); + issueMapCallback(assignmentInfoCallback, toks.get(1)); + issueMapCallback(assignmentVersionCallback, toks.get(1)); + issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1)); } + } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) { + issueCallback(supervisorsCallback); + } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) { + issueCallback(blobstoreCallback); + } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) { + issueMapCallback(stormBaseCallback, toks.get(1)); + } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) { + issueMapCallback(credentialsCallback, toks.get(1)); + } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) { + issueMapCallback(logConfigCallback, toks.get(1)); + } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) { + issueMapCallback(backPressureCallback, toks.get(1)); + } else if (root.equals(ClusterUtils.LEADERINFO_ROOT)) { + issueCallback(leaderInfoCallback); + } else { + LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path); + Runtime.getRuntime().exit(30); } - return; } - }); String[] pathlist = { @@ -248,13 +239,13 @@ public VersionedData assignmentInfoWithVersion(String stormId, Runna assignmentInfoWithVersionCallback.put(stormId, callback); } Assignment assignment = null; - Integer version = 0; + int version = 0; VersionedData dataWithVersion = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null); if (dataWithVersion != null) { assignment = ClusterUtils.maybeDeserialize(dataWithVersion.getData(), Assignment.class); version = dataWithVersion.getVersion(); } - return new VersionedData(version, assignment); + return new VersionedData<>(version, assignment); } @Override @@ -291,7 +282,7 @@ public List nimbuses() { @Override public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) { - // explicit delete for ephmeral node to ensure this session creates the entry. + // explicit delete for ephemeral node to ensure this session creates the entry. stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId)); stateStorage.add_listener((curatorFramework, connectionState) -> { LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState); @@ -532,14 +523,14 @@ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) { } /** - * Check whether a topology is in throttle-on status or not: if the backpresure/storm-id dir is not empty, this topology has - * throttle-on, otherwise throttle-off. But if the backpresure/storm-id dir is not empty and has not been updated for more than + * Check whether a topology is in throttle-on status or not: if the backpressure/storm-id dir is not empty, this topology has + * throttle-on, otherwise throttle-off. But if the backpressure/storm-id dir is not empty and has not been updated for more than * timeoutMs, we treat it as throttle-off. This will prevent the spouts from getting stuck indefinitely if something wrong happens. * * @param stormId The topology Id * @param timeoutMs How long until the backpressure znode is invalid. * @param callback The callback function - * @return True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise. + * @return True only when backpressure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out. */ @Override public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback) { @@ -552,7 +543,7 @@ public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable cal List children = stateStorage.get_children(path, callback != null); mostRecentTimestamp = children.stream() .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false)) - .filter(data -> data != null) + .filter(Objects::nonNull) .mapToLong(data -> ByteBuffer.wrap(data).getLong()) .max() .orElse(0); @@ -607,12 +598,9 @@ public void updateStorm(String stormId, StormBase newElems) { StormBase stormBase = stormBase(stormId, null); if (stormBase.get_component_executors() != null) { - Map newComponentExecutors = new HashMap<>(); Map componentExecutors = newElems.get_component_executors(); // componentExecutors maybe be APersistentMap, which don't support "put" - for (Map.Entry entry : componentExecutors.entrySet()) { - newComponentExecutors.put(entry.getKey(), entry.getValue()); - } + Map newComponentExecutors = new HashMap<>(componentExecutors); for (Map.Entry entry : stormBase.get_component_executors().entrySet()) { if (!componentExecutors.containsKey(entry.getKey())) { newComponentExecutors.put(entry.getKey(), entry.getValue()); @@ -693,7 +681,7 @@ public void removeStormBase(String stormId) { public void setAssignment(String stormId, Assignment info, Map topoConf) { byte[] serAssignment = Utils.serialize(info); stateStorage.mkdirs(ClusterUtils.ASSIGNMENTS_SUBTREE, defaultAcls); - stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), ClusterUtils.mkTopoReadOnlyAcls(topoConf)); + stateStorage.set_data(ClusterUtils.assignmentPath(stormId), serAssignment, ClusterUtils.mkTopoReadOnlyAcls(topoConf)); this.assignmentsBackend.keepOrUpdateAssignment(stormId, info); } @@ -762,12 +750,7 @@ public void reportError(String stormId, String componentId, String node, Long po stateStorage.set_data(lastErrorPath, serData, defaultAcls); List childrens = stateStorage.get_children(path, false); - Collections.sort(childrens, new Comparator() { - @Override - public int compare(String arg0, String arg1) { - return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1))); - } - }); + childrens.sort(Comparator.comparingLong(arg0 -> Long.parseLong(arg0.substring(1)))); while (childrens.size() > 10) { String znodePath = path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0); @@ -798,12 +781,7 @@ public List errors(String stormId, String componentId) { } } } - Collections.sort(errorInfos, new Comparator() { - @Override - public int compare(ErrorInfo arg0, ErrorInfo arg1) { - return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs()); - } - }); + errorInfos.sort(Comparator.comparingInt(arg -> arg.get_error_time_secs())); return errorInfos; } @@ -812,8 +790,7 @@ public int compare(ErrorInfo arg0, ErrorInfo arg1) { public ErrorInfo lastError(String stormId, String componentId) { String path = ClusterUtils.lastErrorPath(stormId, componentId); if (stateStorage.node_exists(path, false)) { - ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class); - return errorInfo; + return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class); } return null; diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java b/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java index 11a1cdf2d5f..5df94a47df1 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java @@ -92,7 +92,7 @@ public String toString() { + ", reqId=" + reqId + ", remoteAddr=" + remoteAddr + ", authZPrincipal=" + ((principal() != null) ? principal().getName() : "null") - + ", ThreadId=" + Thread.currentThread().toString() + + ", ThreadId=" + Thread.currentThread() + '}'; } diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java index ad7a4339e10..24b1e3f7e53 100644 --- a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java +++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java @@ -52,7 +52,7 @@ public class SerializationFactory { public static final ServiceLoader loader = ServiceLoader.load(SerializationRegister.class); public static Kryo getKryo(Map conf) { - IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY)); + IKryoFactory kryoFactory = ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY)); Kryo k = kryoFactory.getKryo(conf); k.register(byte[].class); @@ -108,9 +108,7 @@ public static Kryo getKryo(Map conf) { } else { throw new RuntimeException(e); } - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { + } catch (InstantiationException | IllegalAccessException e) { throw new RuntimeException(e); } } @@ -233,7 +231,7 @@ public IdDictionary(StormTopology topology) { *

Note: Only one key wins if there are duplicate values. Which key wins is indeterminate: "{:a 1 :b 1} -> {1 :a} *or* {1 :b}" */ private static Map simpleReverseMap(Map map) { - Map ret = new HashMap(); + Map ret = new HashMap<>(); for (Map.Entry entry : map.entrySet()) { ret.put(entry.getValue(), entry.getKey()); } diff --git a/storm-client/src/jvm/org/apache/storm/streams/PairStream.java b/storm-client/src/jvm/org/apache/storm/streams/PairStream.java index 14b88a05cb5..cc0ba5990e9 100644 --- a/storm-client/src/jvm/org/apache/storm/streams/PairStream.java +++ b/storm-client/src/jvm/org/apache/storm/streams/PairStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -535,7 +535,7 @@ public ArrayList apply(ArrayList aggregate, V value) { @Override public ArrayList merge(ArrayList accum1, ArrayList accum2) { - ArrayList res = new ArrayList(); + ArrayList res = new ArrayList<>(); res.addAll(accum1); res.addAll(accum2); return res; diff --git a/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java b/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java index 96665058b57..99872e77688 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java +++ b/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java index 0ef7afee0e8..82de58c4941 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -94,8 +94,8 @@ public class TopologyBuilder { private final Map sharedMemory = new HashMap<>(); private boolean hasStatefulBolt = false; - private Map stateSpouts = new HashMap<>(); - private List workerHooks = new ArrayList<>(); + private final Map stateSpouts = new HashMap<>(); + private final List workerHooks = new ArrayList<>(); private static String mergeIntoJson(Map into, Map newMap) { Map res = new HashMap<>(into); @@ -251,7 +251,7 @@ public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumen * outputs. * @param bolt the windowed bolt * @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process - * somwehere around the cluster. + * somewhere around the cluster. * @return use the returned object to declare the inputs to this component * * @throws IllegalArgumentException if {@code parallelism_hint} is not positive @@ -292,7 +292,7 @@ public BoltDeclarer setBolt(String id, IStatefulBolt bolt) * outputs. * @param bolt the stateful bolt * @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process - * somwehere around the cluster. + * somewhere around the cluster. * @return use the returned object to declare the inputs to this component * * @throws IllegalArgumentException if {@code parallelism_hint} is not positive @@ -300,7 +300,7 @@ public BoltDeclarer setBolt(String id, IStatefulBolt bolt) public BoltDeclarer setBolt(String id, IStatefulBolt bolt, Number parallelismHint) throws IllegalArgumentException { hasStatefulBolt = true; - return setBolt(id, new StatefulBoltExecutor(bolt), parallelismHint); + return setBolt(id, new StatefulBoltExecutor<>(bolt), parallelismHint); } /** @@ -330,7 +330,7 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt the type of the state (e.g. {@link org.apache.storm.state.KeyValueState}) * @return use the returned object to declare the inputs to this component * @@ -343,9 +343,9 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt(bolt); } else { - executor = new StatefulWindowedBoltExecutor(bolt); + executor = new StatefulWindowedBoltExecutor<>(bolt); } - return setBolt(id, new StatefulBoltExecutor(executor), parallelismHint); + return setBolt(id, new StatefulBoltExecutor<>(executor), parallelismHint); } /** @@ -553,7 +553,7 @@ private ComponentCommon getComponentCommon(String id, IComponent component) { private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException { ComponentCommon common = new ComponentCommon(); - common.set_inputs(new HashMap()); + common.set_inputs(new HashMap<>()); if (parallelism != null) { int dop = parallelism.intValue(); if (dop < 1) { @@ -646,7 +646,7 @@ public SpoutGetter(String id) { } protected class BoltGetter extends ConfigGetter implements BoltDeclarer { - private String boltId; + private final String boltId; public BoltGetter(String boltId) { super(boltId); @@ -670,7 +670,7 @@ public BoltDeclarer globalGrouping(String componentId) { @Override public BoltDeclarer globalGrouping(String componentId, String streamId) { - return grouping(componentId, streamId, Grouping.fields(new ArrayList())); + return grouping(componentId, streamId, Grouping.fields(new ArrayList<>())); } @Override @@ -723,6 +723,7 @@ public BoltDeclarer directGrouping(String componentId, String streamId) { return grouping(componentId, streamId, Grouping.direct(new NullStruct())); } + /* does not implement or override a super or interface */ private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) { commons.get(boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping); return this; diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/Aggregator.java b/storm-client/src/jvm/org/apache/storm/trident/operation/Aggregator.java index ecfacbf4062..71ecae61068 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/operation/Aggregator.java +++ b/storm-client/src/jvm/org/apache/storm/trident/operation/Aggregator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java index bf208cb35b3..7f07f27144c 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java +++ b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 9357cc562cf..9cfd432669b 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -79,12 +79,7 @@ public static ConfigUtils setInstance(ConfigUtils u) { } public static Map maskPasswords(final Map conf) { - Maps.EntryTransformer maskPasswords = new Maps.EntryTransformer() { - @Override - public Object transformEntry(String key, Object value) { - return passwordConfigKeys.contains(key) ? "*****" : value; - } - }; + Maps.EntryTransformer maskPasswords = (key, value) -> passwordConfigKeys.contains(key) ? "*****" : value; return Maps.transformEntries(conf, maskPasswords); } @@ -110,7 +105,7 @@ public static boolean isLocalMode(Map conf) { */ public static Collection readDirContents(String dir) { Collection ret = readDirFiles(dir); - return ret.stream().map(car -> car.getName()).collect(Collectors.toList()); + return ret.stream().map(File::getName).collect(Collectors.toList()); } /** @@ -123,9 +118,7 @@ public static Collection readDirFiles(String dir) { Collection ret = new HashSet<>(); File[] files = new File(dir).listFiles(); if (files != null) { - for (File f : files) { - ret.add(f); - } + ret.addAll(Arrays.asList(files)); } return ret; } diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index 547e2d1dc7b..1d7b93ff8c3 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -117,10 +117,10 @@ public class Utils { private static final Set> defaultAllowedExceptions = Collections.emptySet(); private static final List LOCALHOST_ADDRESSES = Lists.newArrayList("localhost", "127.0.0.1", "0:0:0:0:0:0:0:1"); static SerializationDelegate serializationDelegate; - private static ThreadLocal threadSer = new ThreadLocal(); - private static ThreadLocal threadDes = new ThreadLocal(); + private static final ThreadLocal threadSer = new ThreadLocal<>(); + private static final ThreadLocal threadDes = new ThreadLocal<>(); private static ClassLoader cl = null; - private static Map localConf; + private static final Map localConf; // A singleton instance allows us to mock delegated static methods in our // tests by subclassing. private static Utils _instance = new Utils(); @@ -160,7 +160,7 @@ public static void resetClassLoaderForJavaDeSerialize() { public static List findResources(String name) { try { Enumeration resources = Thread.currentThread().getContextClassLoader().getResources(name); - List ret = new ArrayList(); + List ret = new ArrayList<>(); while (resources.hasMoreElements()) { ret.add(resources.nextElement()); } @@ -177,8 +177,7 @@ public static Map findAndReadConfigFile(String name, boolean mus in = getConfigFileInputStream(name); if (null != in) { Yaml yaml = new Yaml(new SafeConstructor(new LoaderOptions())); - @SuppressWarnings("unchecked") - Map ret = (Map) yaml.load(new InputStreamReader(in)); + Map ret = yaml.load(new InputStreamReader(in)); if (null != ret) { return new HashMap<>(ret); } else { @@ -219,7 +218,7 @@ private static InputStream getConfigFileInputStream(String configFilePath) "Could not find config file, name not specified"); } - HashSet resources = new HashSet(findResources(configFilePath)); + HashSet resources = new HashSet<>(findResources(configFilePath)); if (resources.isEmpty()) { File configFile = new File(configFilePath); if (configFile.exists()) { @@ -386,43 +385,37 @@ public static boolean isSystemId(String id) { public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh, int priority, final boolean isFactory, boolean startImmediately, String threadName) { - SmartThread thread = new SmartThread(new Runnable() { - @Override - public void run() { - try { - final Callable fn = isFactory ? (Callable) afn.call() : afn; - while (true) { - if (Thread.interrupted()) { - throw new InterruptedException(); - } - final Long s = fn.call(); - if (s == null) { // then stop running it - break; - } - if (s > 0) { - Time.sleep(s); - } + SmartThread thread = new SmartThread(() -> { + try { + final Callable fn = (Callable) (isFactory ? afn.call() : afn); + while (true) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + final Long s = fn.call(); + if (s == null) { // then stop running it + break; } - } catch (Throwable t) { - if (Utils.exceptionCauseIsInstanceOf( - InterruptedException.class, t)) { - LOG.info("Async loop interrupted!"); - return; + if (s > 0) { + Time.sleep(s); } - LOG.error("Async loop died!", t); - throw new RuntimeException(t); } + } catch (Throwable t) { + if (Utils.exceptionCauseIsInstanceOf( + InterruptedException.class, t)) { + LOG.info("Async loop interrupted!"); + return; + } + LOG.error("Async loop died!", t); + throw new RuntimeException(t); } }); if (eh != null) { thread.setUncaughtExceptionHandler(eh); } else { - thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.error("Async loop died!", e); - Utils.exitProcess(1, "Async loop died!"); - } + thread.setUncaughtExceptionHandler((t, e) -> { + LOG.error("Async loop died!", e); + Utils.exitProcess(1, "Async loop died!"); }); } thread.setDaemon(isDaemon); @@ -543,7 +536,7 @@ public static T javaDeserialize(byte[] serialized, Class clazz) { try { ByteArrayInputStream bis = new ByteArrayInputStream(serialized); - ObjectInputStream ois = null; + ObjectInputStream ois; if (null == Utils.cl) { ois = new ObjectInputStream(bis); } else { @@ -553,10 +546,8 @@ public static T javaDeserialize(byte[] serialized, Class clazz) { Object ret = ois.readObject(); ois.close(); return (T) ret; - } catch (IOException ioe) { + } catch (IOException | ClassNotFoundException ioe) { throw new RuntimeException(ioe); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); } } @@ -595,8 +586,8 @@ public static Id parseZkId(String id, String configName) { /** * Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled. * - * @param conf the config to get the super User ACL from - * @return the super user ACL. + * @param conf the config to get the superuser ACL from + * @return the superuser ACL. */ @SuppressWarnings("checkstyle:AbbreviationAsWordInName") public static ACL getSuperUserAcl(Map conf) { @@ -674,7 +665,7 @@ public static void handleWorkerUncaughtException(Throwable t) { handleUncaughtException(t, defaultAllowedExceptions, true); } - // Hadoop UserGroupInformation can launch an autorenewal thread that can cause a NullPointerException + // Hadoop UserGroupInformation can launch an auto-renewal thread that can cause a NullPointerException // for workers. See STORM-3606 for an explanation. private static boolean isAllowedWorkerException(Throwable t) { if (t instanceof NullPointerException) { @@ -759,14 +750,14 @@ public static UptimeComputer makeUptimeComputer() { *

Example usage in java: * Map<Integer, String> tasks; Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks); * - *

The order of he resulting list values depends on the ordering properties of the Map passed in. The caller is + *

The order of the resulting list values depends on the ordering properties of the Map passed in. The caller is * responsible for passing an ordered map if they expect the result to be consistently ordered as well. * * @param map to reverse * @return a reversed map */ public static HashMap> reverseMap(Map map) { - HashMap> rtn = new HashMap>(); + HashMap> rtn = new HashMap<>(); if (map == null) { return rtn; } @@ -775,7 +766,7 @@ public static HashMap> reverseMap(Map map) { V val = entry.getValue(); List list = rtn.get(val); if (list == null) { - list = new ArrayList(); + list = new ArrayList<>(); rtn.put(entry.getValue(), list); } list.add(key); @@ -797,11 +788,7 @@ public static Map> reverseMap(List> listSeq) { for (List listEntry : listSeq) { Object key = listEntry.get(0); Object val = listEntry.get(1); - List list = rtn.get(val); - if (list == null) { - list = new ArrayList<>(); - rtn.put(val, list); - } + List list = rtn.computeIfAbsent(val, k -> new ArrayList<>()); list.add(key); } return rtn; @@ -865,18 +852,13 @@ public static byte[] toByteArray(ByteBuffer buffer) { } public static Runnable mkSuicideFn() { - return new Runnable() { - @Override - public void run() { - exitProcess(1, "Worker died"); - } - }; + return () -> exitProcess(1, "Worker died"); } public static void readAndLogStream(String prefix, InputStream in) { try { BufferedReader r = new BufferedReader(new InputStreamReader(in)); - String line = null; + String line; while ((line = r.readLine()) != null) { LOG.info("{}:{}", prefix, line); } @@ -918,10 +900,8 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String } public static List tuple(Object... values) { - List ret = new ArrayList(); - for (Object v : values) { - ret.add(v); - } + List ret = new ArrayList<>(); + Collections.addAll(ret, values); return ret; } @@ -943,7 +923,7 @@ public static byte[] gunzip(byte[] data) { ByteArrayInputStream bis = new ByteArrayInputStream(data); GZIPInputStream in = new GZIPInputStream(bis); byte[] buffer = new byte[1024]; - int len = 0; + int len; while ((len = in.read(buffer)) >= 0) { bos.write(buffer, 0, len); } @@ -956,8 +936,8 @@ public static byte[] gunzip(byte[] data) { } public static List getRepeat(List list) { - List rtn = new ArrayList(); - Set idSet = new HashSet(); + List rtn = new ArrayList<>(); + Set idSet = new HashSet<>(); for (String id : list) { if (idSet.contains(id)) { @@ -990,7 +970,7 @@ public static Object getSetComponentObject(ComponentObject obj) { /** * A cheap way to deterministically convert a number to a positive value. When the input is positive, the original value is returned. * When the input number is negative, the returned positive value is the original value bit AND against Integer.MAX_VALUE(0x7fffffff) - * which is not its absolutely value. + * which is not its absolute value. * * @param number a given number * @return a positive number. @@ -1025,8 +1005,8 @@ public static Map fromCompressedJsonConf(byte[] serialized) { } /** - * Creates a new map with a string value in the map replaced with an equivalently-lengthed string of '#'. (If the object is not a - * string to string will be called on it and replaced) + * Creates a new map with a string value in the map replaced with an equally long string of '#'. (If the object is not a + * string toString() will be called on it and replaced) * * @param m The map that a value will be redacted from * @param key The key pointing to the value to be redacted @@ -1104,7 +1084,7 @@ public static Double parseJvmHeapMemByChildOpts(List options, Double def default: unit = 1; } - Double result = value * unit / 1024.0 / 1024.0; + double result = value * unit / 1024.0 / 1024.0; return (result < 1.0) ? 1.0 : result; } } @@ -1115,7 +1095,7 @@ public static Double parseJvmHeapMemByChildOpts(List options, Double def } public static ClientBlobStore getClientBlobStore(Map conf) { - ClientBlobStore store = (ClientBlobStore) ReflectionUtils.newInstance((String) conf.get(Config.CLIENT_BLOBSTORE)); + ClientBlobStore store = ReflectionUtils.newInstance((String) conf.get(Config.CLIENT_BLOBSTORE)); store.prepare(conf); return store; } @@ -1145,9 +1125,7 @@ private static Map normalizeConf(Map conf) { return new HashMap<>(); } Map ret = new HashMap<>(conf); - for (Map.Entry entry : ret.entrySet()) { - ret.put(entry.getKey(), normalizeConfValue(entry.getValue())); - } + ret.replaceAll((k, v) -> normalizeConfValue(v)); return ret; } @@ -1381,7 +1359,7 @@ public static TreeMap integerDivided(int sum, int numPieces) { int base = sum / numPieces; int numInc = sum % numPieces; int numBases = numPieces - numInc; - TreeMap ret = new TreeMap(); + TreeMap ret = new TreeMap<>(); ret.put(base, numBases); if (numInc != 0) { ret.put(base + 1, numInc); @@ -1409,8 +1387,8 @@ public static List> partitionFixed(int maxNumChunks, Collection c Map parts = integerDivided(coll.size(), maxNumChunks); // Keys sorted in descending order - List sortedKeys = new ArrayList(parts.keySet()); - Collections.sort(sortedKeys, Collections.reverseOrder()); + List sortedKeys = new ArrayList<>(parts.keySet()); + sortedKeys.sort(Collections.reverseOrder()); Iterator it = coll.iterator(); @@ -1667,17 +1645,16 @@ public static T getCompatibleVersion(NavigableMap versione } return defaultValue; } - LOG.warn("Could not find a higer compatible version for {} {}, using {} instead", what, desiredVersion, ret.getKey()); + LOG.warn("Could not find a higher compatible version for {} {}, using {} instead", what, desiredVersion, ret.getKey()); } return ret.getValue(); } - @SuppressWarnings("unchecked") private static Map readConfIgnoreNotFound(Yaml yaml, File f) throws IOException { Map ret = null; if (f.exists()) { try (FileReader fr = new FileReader(f)) { - ret = (Map) yaml.load(fr); + ret = yaml.load(fr); } } return ret; @@ -1853,7 +1830,7 @@ public boolean isSleeping() { } public static class UptimeComputer { - int startTime = 0; + final int startTime; public UptimeComputer() { startTime = Time.currentTimeSecs(); @@ -1865,10 +1842,10 @@ public int upTime() { } private static class JarConfigReader { - private Yaml yaml; + private final Yaml yaml; private Map defaultsConf; private Map stormConf; - private File file; + private final File file; JarConfigReader(Yaml yaml, Map defaultsConf, Map stormConf, File file) { this.yaml = yaml; @@ -1906,13 +1883,13 @@ private void readArchive(ZipFile zipFile) throws IOException { if (!entry.isDirectory()) { if (defaultsConf == null && entry.getName().equals("defaults.yaml")) { try (InputStreamReader isr = new InputStreamReader(zipFile.getInputStream(entry))) { - defaultsConf = (Map) yaml.load(isr); + defaultsConf = yaml.load(isr); } } if (stormConf == null && entry.getName().equals("storm.yaml")) { try (InputStreamReader isr = new InputStreamReader(zipFile.getInputStream(entry))) { - stormConf = (Map) yaml.load(isr); + stormConf = yaml.load(isr); } } } @@ -1921,7 +1898,7 @@ private void readArchive(ZipFile zipFile) throws IOException { } /** - * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in + * Create a map of forward edges for bolts in a topology. Note that spouts can be a source but not a target in * the edge. The mapping contains ids of spouts and bolts. * * @param topology StormTopology to examine. @@ -1931,11 +1908,10 @@ private static Map> getStormTopologyForwardGraph(StormTopolo Map> edgesOut = new HashMap<>(); if (topology.get_bolts() != null) { - topology.get_bolts().entrySet().forEach(entry -> { - if (!Utils.isSystemId(entry.getKey())) { - entry.getValue().get_common().get_inputs().forEach((k, v) -> { - edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey()); - }); + topology.get_bolts().forEach((key, value) -> { + if (!Utils.isSystemId(key)) { + value.get_common().get_inputs().forEach( + (k, v) -> edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(key)); } }); } diff --git a/storm-client/src/jvm/org/apache/storm/windowing/persistence/SimpleWindowPartitionCache.java b/storm-client/src/jvm/org/apache/storm/windowing/persistence/SimpleWindowPartitionCache.java index f52dc128e7d..4fb6a660633 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/persistence/SimpleWindowPartitionCache.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/persistence/SimpleWindowPartitionCache.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java index 0e4acdc3a0a..9d2ffc15862 100644 --- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java +++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) * under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -60,7 +60,7 @@ public interface AdminCommand { void run(String [] args, Map conf, String command) throws Exception; /** - * Print a help message to out. typically this should be in the form of. + * Print a help message to stdout. Typically, this should be in the form of. * command arguments: * description of command * argument - description @@ -75,7 +75,7 @@ public void run(String[] args, Map conf, String command) throws IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); - Set blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(key -> ConfigUtils.getIdFromBlobKey(key)); + Set blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(ConfigUtils::getIdFromBlobKey); Set activeTopologyIds = new HashSet<>(stormClusterState.activeStorms()); Sets.SetView diffTopology = Sets.difference(activeTopologyIds, blobStoreTopologyIds); LOG.info("active-topology-ids [{}] blob-topology-ids [{}] diff-topology [{}]", @@ -119,7 +119,7 @@ public void printCliHelp(String command, PrintStream out) { /** * Print value in a human readable format. * @param value what to print. - * @return a human readable string + * @return a human-readable string */ public static String prettyPrint(TBase value) { StringBuilder builder = new StringBuilder(); diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java index a8f519d6453..7852f027070 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. * See the NOTICE file distributed with this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); @@ -75,14 +75,13 @@ * 2. The USER sets the ACLs, and the blob access is validated against these ACLs. * 3. The SUPERVISOR interacts with nimbus through the NimbusBlobStore Client API to download the blobs. * The supervisors principal should match the set of users configured into SUPERVISOR_ADMINS. - * Here, the PrincipalToLocalPlugin takes care of mapping the principal to user name before the ACL validation. + * Here, the PrincipalToLocalPlugin takes care of mapping the principal to username before the ACL validation. */ public class LocalFsBlobStore extends BlobStore { public static final Logger LOG = LoggerFactory.getLogger(LocalFsBlobStore.class); private static final String DATA_PREFIX = "data_"; private static final String META_PREFIX = "meta_"; private static final String BLOBSTORE_SUBTREE = "/blobstore/"; - private final int allPermissions = READ | WRITE | ADMIN; protected BlobStoreAclHandler aclHandler; private NimbusInfo nimbusInfo; private FileBlobStoreImpl fbs; @@ -208,6 +207,7 @@ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject KeyAlreadyExistsException { LOG.debug("Creating Blob for key {}", key); validateKey(key); + int allPermissions = READ | WRITE | ADMIN; aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions); BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key); @@ -222,9 +222,7 @@ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject outputStream = null; this.stormClusterState.setupBlob(key, this.nimbusInfo, getVersionForKey(key, this.nimbusInfo, zkClient)); return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX + key, true)); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (KeyNotFoundException e) { + } catch (IOException | KeyNotFoundException e) { throw new RuntimeException(e); } finally { if (outputStream != null) { @@ -299,7 +297,7 @@ public ReadableBlobMeta getBlobMeta(String key, Subject who) throws Authorizatio } /** - * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi. + * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi). */ @Override public void setLeaderElector(ILeaderElector leaderElector) { @@ -412,7 +410,7 @@ public void shutdown() { zkClient.close(); } if (timer != null) { - timer.cancel();; + timer.cancel(); } stormClusterState.disconnect(); } @@ -436,9 +434,10 @@ public int getBlobReplication(String key, Subject who) throws Exception { } @Override - public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException { - throw new UnsupportedOperationException("For local file system blob store the update blobs function does not work. " - + "Please use HDFS blob store to make this feature available."); + public int updateBlobReplication(String key, int replication, Subject who) { + throw new UnsupportedOperationException( + "For local file system blob store the update blobs function does not work. " + + "Please use HDFS blob store to make this feature available."); } //This additional check and download is for nimbus high availability in case you have more than one nimbus diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java index a58c24d0784..cdf50ba09ac 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java @@ -185,7 +185,7 @@ protected Iterable sortObjectResources( *
  • * The tie between two nodes with same resource availability is broken by using the node with lower minimum * percentage used. This comparison was used in {@link #sortObjectResourcesDefault(ObjectResourcesSummary, ExistingScheduleFunc)} - * but here it is made subservient to modified resource availbility used in + * but here it is made subservient to modified resource availability used in * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, ExistingScheduleFunc)}. * *
  • @@ -384,7 +384,7 @@ private Iterable sortObjectResourcesDefault( *

    2) the subordinate/subservient resource availability percentage of a node in descending * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this - * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of + * calculation, nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of * one resource but a low amount of another. * * @param availHosts a collection of all the hosts we want to sort @@ -431,7 +431,7 @@ private Iterable sortHosts( *

    2) the subordinate/subservient resource availability percentage of a node in descending * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this - * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of + * calculation, nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of * one resource but a low amount of another. * * @param availRasNodes a list of all the nodes we want to sort @@ -640,7 +640,7 @@ private ObjectResourcesSummary createClusterSummarizedResources() { clusterResourcesSummary.getAvailableResourcesOverall(), clusterResourcesSummary.getTotalResourcesOverall(), clusterResourcesSummary.getObjectResources().size(), - rackIdToHosts.values().stream().mapToInt(x -> x.size()).sum()); + rackIdToHosts.values().stream().mapToInt(Set::size).sum()); return clusterResourcesSummary; } @@ -703,7 +703,7 @@ public List hostnameToNodes(String hostname) { } /** - * interface for calculating the number of existing executors scheduled on a object (rack or node). + * interface for calculating the number of existing executors scheduled on an object (rack or node). */ public interface ExistingScheduleFunc { int getNumExistingSchedule(String objectId); diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java index ac0833f4c26..5f7f938bdce 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java @@ -76,7 +76,7 @@ public static Stream data() { private static final Logger LOG = LoggerFactory.getLogger(TestConstraintSolverStrategy.class); private static final int MAX_TRAVERSAL_DEPTH = 2000; private static final int NORMAL_BOLT_PARALLEL = 11; - //Dropping the parallelism of the bolts to 3 instead of 11 so we can find a solution in a reasonable amount of work when backtracking. + //Dropping the parallelism of the bolts to 3 instead of 11, so we can find a solution in a reasonable amount of work when backtracking. private static final int BACKTRACK_BOLT_PARALLEL = 3; private static final int CO_LOCATION_CNT = 2; @@ -183,7 +183,7 @@ private void setConstraintConfig(List> constraints, Map(spreads.keySet())); } } @@ -324,16 +324,12 @@ public void testNewConstraintFormat(boolean consolidatedConfigFlag) { Object jsonValue = JSONValue.parse(s); Map config = Utils.readDefaultConfig(); config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, jsonValue); - Set allComps = new HashSet<>(); - allComps.addAll(Arrays.asList("comp-1", "comp-2", "comp-3", "comp-4", "comp-5")); + Set allComps = new HashSet<>(Arrays.asList("comp-1", "comp-2", "comp-3", "comp-4", "comp-5")); ConstraintSolverConfig constraintSolverConfig = new ConstraintSolverConfig("test-topoid-1", config, allComps); - Set expectedSetComp1 = new HashSet<>(); - expectedSetComp1.addAll(Arrays.asList("comp-2", "comp-3")); - Set expectedSetComp2 = new HashSet<>(); - expectedSetComp2.addAll(Arrays.asList("comp-1", "comp-4")); - Set expectedSetComp3 = new HashSet<>(); - expectedSetComp3.addAll(Arrays.asList("comp-1", "comp-5")); + Set expectedSetComp1 = new HashSet<>(Arrays.asList("comp-2", "comp-3")); + Set expectedSetComp2 = new HashSet<>(Arrays.asList("comp-1", "comp-4")); + Set expectedSetComp3 = new HashSet<>(Arrays.asList("comp-1", "comp-5")); assertEquals(expectedSetComp1, constraintSolverConfig.getIncompatibleComponentSets().get("comp-1"), "comp-1 incompatible components"); assertEquals(expectedSetComp2, constraintSolverConfig.getIncompatibleComponentSets().get("comp-2"), "comp-2 incompatible components"); assertEquals(expectedSetComp3, constraintSolverConfig.getIncompatibleComponentSets().get("comp-3"), "comp-3 incompatible components"); @@ -523,7 +519,7 @@ public void testIntegrationWithRAS(boolean consolidatedConfigFlag) { addConstraints("bolt-1", "bolt-1", constraints); addConstraints("bolt-1", "bolt-2", constraints); - Map spreads = new HashMap(); + Map spreads = new HashMap<>(); spreads.put("spout-0", 1); spreads.put("bolt-1", 10); @@ -583,7 +579,7 @@ public void testZeroExecutorScheduling(boolean consolidatedConfigFlag) { Cluster cluster = makeCluster(new Topologies(topo)); cs.schedule(cluster, topo); LOG.info("********************* Scheduling Zero Unassigned Executors *********************"); - cs.schedule(cluster, topo); // reschedule a fully schedule topology + cs.schedule(cluster, topo); // reschedule a fully scheduled topology LOG.info("********************* End of Scheduling Zero Unassigned Executors *********************"); }