From 06a47a9a9676aae7f34f6e5d14ef29fd2c59b422 Mon Sep 17 00:00:00 2001 From: Aaron Dixon Date: Sun, 15 Feb 2015 11:42:45 -0600 Subject: [PATCH 1/6] exhibitor support --- storm-core/src/jvm/backtype/storm/Config.java | 43 ++++++++++++ .../src/jvm/backtype/storm/utils/Utils.java | 66 +++++++++++++++++-- 2 files changed, 104 insertions(+), 5 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 9db99773f82..dd488c4f8bb 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -152,6 +152,19 @@ public class Config extends HashMap { @isPositiveNumber public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port"; + /** + * A list of hosts of Exhibitor servers used to discover/maintain connection to ZooKeeper cluster. + * Any configured ZooKeeper servers will be used for the curator/exhibitor backup connection string. + */ + public static final String STORM_EXHIBITOR_SERVERS = "storm.exhibitor.servers"; + public static final Object STORM_EXHIBITOR_SERVERS_SCHEMA = ConfigValidation.StringsValidator; + + /** + * The port Storm will use to connect to each of the exhibitor servers. + */ + public static final String STORM_EXHIBITOR_PORT = "storm.exhibitor.port"; + public static final Object STORM_EXHIBITOR_PORT_SCHEMA = ConfigValidation.IntegerValidator; + /** * A directory on the local filesystem used by Storm for any local * filesystem usage it needs. The directory must exist and the Storm daemons must @@ -342,6 +355,36 @@ public class Config extends HashMap { @isString public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD="storm.zookeeper.topology.auth.payload"; + /* + * How often to poll Exhibitor cluster in millis. + */ + public static final String STORM_EXHIBITOR_URIPATH="storm.exhibitor.poll.uripath"; + public static final Object STORM_EXHIBITOR_URIPATH_SCHEMA = String.class; + + /** + * How often to poll Exhibitor cluster in millis. + */ + public static final String STORM_EXHIBITOR_POLL="storm.exhibitor.poll.millis"; + public static final Object STORM_EXHIBITOR_POLL_SCHEMA = ConfigValidation.IntegerValidator; + + /** + * The number of times to retry an Exhibitor operation. + */ + public static final String STORM_EXHIBITOR_RETRY_TIMES="storm.exhibitor.retry.times"; + public static final Object STORM_EXHIBITOR_RETRY_TIMES_SCHEMA = ConfigValidation.IntegerValidator; + + /** + * The interval between retries of an Exhibitor operation. + */ + public static final String STORM_EXHIBITOR_RETRY_INTERVAL="storm.exhibitor.retry.interval"; + public static final Object STORM_EXHIBITOR_RETRY_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator; + + /** + * The ceiling of the interval between retries of an Exhibitor operation. + */ + public static final String STORM_EXHIBITOR_RETRY_INTERVAL_CEILING="storm.exhibitor.retry.intervalceiling.millis"; + public static final Object STORM_EXHIBITOR_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator; + /** * The id assigned to a running topology. The id is the storm name with a unique nonce appended. */ diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 0d9140f3365..2296209aff5 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -35,6 +35,25 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.io.input.ClassLoaderObjectInputStream; import org.apache.commons.lang.StringUtils; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.URL; +import java.net.URLDecoder; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.*; + +import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; +import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.curator.ensemble.exhibitor.Exhibitors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.thrift.TBase; @@ -621,6 +640,22 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String throw new IllegalArgumentException("Could not find component with id " + id); } + public static List getStrings(final Object o) { + if (o == null) { + return Collections.emptyList(); + } else if (o instanceof String) { + return new ArrayList() {{ add((String) o); }}; + } else if (o instanceof Collection) { + List answer = new ArrayList(); + for (Object v : (Collection) o) { + answer.add(v.toString()); + } + return answer; + } else { + throw new IllegalArgumentException("Don't know how to convert to string list"); + } + } + public static Integer getInt(Object o) { Integer result = getInt(o, null); if (null == result) { @@ -982,12 +1017,33 @@ public static CuratorFramework newCurator(Map conf, List servers, Object return builder.build(); } - protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) + protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, final String zkStr, Map conf, ZookeeperAuthInfo auth) { - builder.connectString(zkStr) - .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) - .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) - .retryPolicy(new StormBoundedExponentialBackoffRetry( + List exhibitorServers = getStrings(conf.get(Config.STORM_EXHIBITOR_SERVERS)); + if (!exhibitorServers.isEmpty()) { + // use exhibitor servers + builder.ensembleProvider(new ExhibitorEnsembleProvider( + new Exhibitors(exhibitorServers, Utils.getInt(conf.get(Config.STORM_EXHIBITOR_PORT), 8080), + new Exhibitors.BackupConnectionStringProvider() { + @Override + public String getBackupConnectionString() throws Exception { + // use zk servers as backup if they exist + return zkStr; + }}), + new DefaultExhibitorRestClient(), + (String) Utils.get(conf, Config.STORM_EXHIBITOR_URIPATH, "/exhibitor/v1/cluster/list"), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_POLL)), + new StormBoundedExponentialBackoffRetry( + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL)), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL_CEILING)), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_TIMES))))); + } else { + builder.connectString(zkStr); + } + builder + .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) + .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) + .retryPolicy(new StormBoundedExponentialBackoffRetry( Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); From 29bf22a2c2d81d4c3d9db7035d42460d3eddc1a7 Mon Sep 17 00:00:00 2001 From: Aaron Dixon Date: Sun, 15 Feb 2015 12:00:51 -0600 Subject: [PATCH 2/6] clean up package imports (minimizing diff for pull req) --- storm-core/src/jvm/backtype/storm/utils/Utils.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 2296209aff5..4d2ecf1afb0 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -49,7 +49,17 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.*; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.Collection; +import java.util.Collections; import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; From c53ae51e4511e940fe37fa778bafe7d122371bd8 Mon Sep 17 00:00:00 2001 From: Aaron Dixon Date: Tue, 17 Mar 2015 15:11:04 -0500 Subject: [PATCH 3/6] moved exhibitor in-code defaults to defaults.yaml --- conf/defaults.yaml | 2 ++ .../src/jvm/backtype/storm/utils/Utils.java | 35 +++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index fb87e8e8fa1..b5c8b47bd72 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -35,6 +35,8 @@ storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 30000 storm.zookeeper.auth.user: null storm.zookeeper.auth.password: null +storm.exhibitor.port: 8080 +storm.exhibitor.poll.uripath: "/exhibitor/v1/cluster/list" storm.cluster.mode: "distributed" # can be distributed or local storm.local.mode.zmq: false storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin" diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 4d2ecf1afb0..f7c6d5a90af 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -35,21 +35,34 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.io.input.ClassLoaderObjectInputStream; import org.apache.commons.lang.StringUtils; +import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; +import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.curator.ensemble.exhibitor.Exhibitors; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.thrift.TException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.net.URL; import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -58,6 +71,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; + import java.util.Collection; import java.util.Collections; @@ -666,6 +680,13 @@ public static List getStrings(final Object o) { } } + public static String getString(Object o) { + if (null == o) { + throw new IllegalArgumentException("Don't know how to convert null to String"); + } + return o.toString(); + } + public static Integer getInt(Object o) { Integer result = getInt(o, null); if (null == result) { @@ -1016,7 +1037,7 @@ public static CuratorFramework newCurator(Map conf, List servers, Object public static CuratorFramework newCurator(Map conf, List servers, Object port, String root, ZookeeperAuthInfo auth) { List serverPorts = new ArrayList(); - for (String zkServer : (List) servers) { + for (String zkServer: servers) { serverPorts.add(zkServer + ":" + Utils.getInt(port)); } String zkStr = StringUtils.join(serverPorts, ",") + root; @@ -1033,7 +1054,7 @@ protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, fina if (!exhibitorServers.isEmpty()) { // use exhibitor servers builder.ensembleProvider(new ExhibitorEnsembleProvider( - new Exhibitors(exhibitorServers, Utils.getInt(conf.get(Config.STORM_EXHIBITOR_PORT), 8080), + new Exhibitors(exhibitorServers, Utils.getInt(conf.get(Config.STORM_EXHIBITOR_PORT)), new Exhibitors.BackupConnectionStringProvider() { @Override public String getBackupConnectionString() throws Exception { @@ -1041,7 +1062,7 @@ public String getBackupConnectionString() throws Exception { return zkStr; }}), new DefaultExhibitorRestClient(), - (String) Utils.get(conf, Config.STORM_EXHIBITOR_URIPATH, "/exhibitor/v1/cluster/list"), + Utils.getString(conf.get(Config.STORM_EXHIBITOR_URIPATH)), Utils.getInt(conf.get(Config.STORM_EXHIBITOR_POLL)), new StormBoundedExponentialBackoffRetry( Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL)), From 2e88a2fc2b52c43498d5d0df8b177c691c5bfa77 Mon Sep 17 00:00:00 2001 From: Aaron Dixon Date: Tue, 17 Mar 2015 15:15:10 -0500 Subject: [PATCH 4/6] 'un-intellij-optimizing' imports to minimize pull request diff --- .../src/jvm/backtype/storm/utils/Utils.java | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index f7c6d5a90af..2af8616f84e 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -61,8 +61,6 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -71,6 +69,35 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.Collection; +import java.util.Collections; + +import backtype.storm.serialization.DefaultSerializationDelegate; +import backtype.storm.serialization.SerializationDelegate; +import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; +import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.curator.ensemble.exhibitor.Exhibitors; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.commons.lang.StringUtils; +import org.apache.thrift.TException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + +import backtype.storm.Config; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.ComponentObject; +import backtype.storm.generated.StormTopology; +import backtype.storm.generated.AuthorizationException; + +import clojure.lang.IFn; +import clojure.lang.RT; import java.util.Collection; import java.util.Collections; From c9cc96a1766ae5ff423a32886c3ce58075816671 Mon Sep 17 00:00:00 2001 From: Aaron Dixon Date: Tue, 5 Jan 2016 16:57:30 -0600 Subject: [PATCH 5/6] fixes after rebase --- storm-core/src/jvm/backtype/storm/Config.java | 15 +- .../src/jvm/backtype/storm/utils/Utils.java | 139 +++++------------- 2 files changed, 48 insertions(+), 106 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index dd488c4f8bb..932d2a7eee0 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -156,14 +156,15 @@ public class Config extends HashMap { * A list of hosts of Exhibitor servers used to discover/maintain connection to ZooKeeper cluster. * Any configured ZooKeeper servers will be used for the curator/exhibitor backup connection string. */ + @isStringList public static final String STORM_EXHIBITOR_SERVERS = "storm.exhibitor.servers"; - public static final Object STORM_EXHIBITOR_SERVERS_SCHEMA = ConfigValidation.StringsValidator; /** * The port Storm will use to connect to each of the exhibitor servers. */ + @isInteger + @isPositiveNumber public static final String STORM_EXHIBITOR_PORT = "storm.exhibitor.port"; - public static final Object STORM_EXHIBITOR_PORT_SCHEMA = ConfigValidation.IntegerValidator; /** * A directory on the local filesystem used by Storm for any local @@ -358,32 +359,32 @@ public class Config extends HashMap { /* * How often to poll Exhibitor cluster in millis. */ + @isString public static final String STORM_EXHIBITOR_URIPATH="storm.exhibitor.poll.uripath"; - public static final Object STORM_EXHIBITOR_URIPATH_SCHEMA = String.class; /** * How often to poll Exhibitor cluster in millis. */ + @isInteger public static final String STORM_EXHIBITOR_POLL="storm.exhibitor.poll.millis"; - public static final Object STORM_EXHIBITOR_POLL_SCHEMA = ConfigValidation.IntegerValidator; /** * The number of times to retry an Exhibitor operation. */ + @isInteger public static final String STORM_EXHIBITOR_RETRY_TIMES="storm.exhibitor.retry.times"; - public static final Object STORM_EXHIBITOR_RETRY_TIMES_SCHEMA = ConfigValidation.IntegerValidator; /** * The interval between retries of an Exhibitor operation. */ + @isInteger public static final String STORM_EXHIBITOR_RETRY_INTERVAL="storm.exhibitor.retry.interval"; - public static final Object STORM_EXHIBITOR_RETRY_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator; /** * The ceiling of the interval between retries of an Exhibitor operation. */ + @isInteger public static final String STORM_EXHIBITOR_RETRY_INTERVAL_CEILING="storm.exhibitor.retry.intervalceiling.millis"; - public static final Object STORM_EXHIBITOR_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator; /** * The id assigned to a running topology. The id is the storm name with a unique nonce appended. diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 2af8616f84e..eaa6e853f3d 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -23,7 +23,18 @@ import backtype.storm.blobstore.ClientBlobStore; import backtype.storm.blobstore.InputStreamWithMeta; import backtype.storm.blobstore.LocalFsBlobStore; -import backtype.storm.generated.*; +import backtype.storm.generated.AccessControl; +import backtype.storm.generated.AccessControlType; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.ClusterSummary; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.ComponentObject; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.generated.StormTopology; +import backtype.storm.generated.TopologyInfo; +import backtype.storm.generated.TopologySummary; import backtype.storm.localizer.Localizer; import backtype.storm.nimbus.NimbusInfo; import backtype.storm.serialization.DefaultSerializationDelegate; @@ -40,127 +51,61 @@ import org.apache.curator.ensemble.exhibitor.Exhibitors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.json.simple.JSONValue; +import org.json.simple.parser.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.SafeConstructor; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.io.RandomAccessFile; +import java.io.Serializable; import java.net.URL; import java.net.URLDecoder; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermission; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; -import java.util.Collection; -import java.util.Collections; - -import backtype.storm.serialization.DefaultSerializationDelegate; -import backtype.storm.serialization.SerializationDelegate; -import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; -import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; -import org.apache.curator.ensemble.exhibitor.Exhibitors; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.commons.lang.StringUtils; -import org.apache.thrift.TException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.constructor.SafeConstructor; - -import backtype.storm.Config; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.ComponentObject; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.AuthorizationException; - -import clojure.lang.IFn; -import clojure.lang.RT; - -import java.util.Collection; -import java.util.Collections; - -import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; -import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; -import org.apache.curator.ensemble.exhibitor.Exhibitors; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.thrift.TBase; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.json.simple.JSONValue; -import org.json.simple.parser.ParseException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.constructor.SafeConstructor; -import java.net.URL; -import java.net.URLDecoder; -import java.nio.ByteBuffer; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.nio.file.attribute.PosixFilePermission; - -import java.io.File; -import java.io.FileReader; -import java.io.FileInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectInputStream; -import java.io.ByteArrayInputStream; -import java.io.OutputStreamWriter; -import java.io.InputStreamReader; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.FileOutputStream; -import java.io.BufferedReader; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.PrintStream; -import java.io.RandomAccessFile; -import java.io.Serializable; -import java.io.IOException; - import java.util.jar.JarEntry; import java.util.jar.JarFile; -import java.util.Map; -import java.util.Set; -import java.util.Iterator; -import java.util.Enumeration; -import java.util.List; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.HashMap; -import java.util.TreeMap; -import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; @@ -168,10 +113,6 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipFile; -import org.apache.thrift.TBase; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TSerializer; - public class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); public static final String DEFAULT_STREAM_ID = "default"; @@ -1064,7 +1005,7 @@ public static CuratorFramework newCurator(Map conf, List servers, Object public static CuratorFramework newCurator(Map conf, List servers, Object port, String root, ZookeeperAuthInfo auth) { List serverPorts = new ArrayList(); - for (String zkServer: servers) { + for (String zkServer : servers) { serverPorts.add(zkServer + ":" + Utils.getInt(port)); } String zkStr = StringUtils.join(serverPorts, ",") + root; @@ -1107,7 +1048,7 @@ public String getBackupConnectionString() throws Exception { Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); if (auth != null && auth.scheme != null && auth.payload != null) { - builder = builder.authorization(auth.scheme, auth.payload); + builder.authorization(auth.scheme, auth.payload); } } From ab769cba028b5db4eb00540da6254bb67be216d6 Mon Sep 17 00:00:00 2001 From: Aaron Dixon Date: Tue, 5 Jan 2016 16:59:45 -0600 Subject: [PATCH 6/6] per cr, answering mutable empty list instead of immutable in Utils.getStrings() --- storm-core/src/jvm/backtype/storm/utils/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index eaa6e853f3d..890451e031c 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -634,7 +634,7 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String public static List getStrings(final Object o) { if (o == null) { - return Collections.emptyList(); + return new ArrayList(); } else if (o instanceof String) { return new ArrayList() {{ add((String) o); }}; } else if (o instanceof Collection) {