From 8161beab46b5d489aa37db524b980c1676dd08fe Mon Sep 17 00:00:00 2001 From: zhuol Date: Thu, 22 Oct 2015 12:43:37 -0500 Subject: [PATCH] [STORM-1122] Fix the format issue in Utils.java --- .../src/jvm/backtype/storm/utils/Utils.java | 262 +++++++++--------- 1 file changed, 132 insertions(+), 130 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 7f02b567f8c..c8523068e39 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -40,7 +40,6 @@ import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.SafeConstructor; -import java.io.*; import java.net.URL; import java.net.URLDecoder; import java.nio.ByteBuffer; @@ -48,7 +47,26 @@ import java.nio.channels.WritableByteChannel; import java.io.File; import java.io.FileInputStream; -import java.util.*; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectInputStream; +import java.io.ByteArrayInputStream; +import java.io.OutputStreamWriter; +import java.io.InputStreamReader; +import java.io.InputStream; +import java.io.FileOutputStream; +import java.io.BufferedReader; +import java.io.Serializable; +import java.io.IOException; +import java.util.Map; +import java.util.Iterator; +import java.util.Enumeration; +import java.util.List; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.HashMap; +import java.util.TreeMap; +import java.util.UUID; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -99,9 +117,9 @@ public static T javaDeserialize(byte[] serialized, Class clazz) { Object ret = ois.readObject(); ois.close(); return (T)ret; - } catch(IOException ioe) { + } catch (IOException ioe) { throw new RuntimeException(ioe); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } @@ -155,9 +173,9 @@ public static Map fromCompressedJsonConf(byte[] serialized) { Object ret = JSONValue.parseWithException(in); in.close(); return (Map)ret; - } catch(IOException ioe) { + } catch (IOException ioe) { throw new RuntimeException(ioe); - } catch(ParseException e) { + } catch (ParseException e) { throw new RuntimeException(e); } } @@ -186,7 +204,7 @@ public static List findResources(String name) { try { Enumeration resources = Thread.currentThread().getContextClassLoader().getResources(name); List ret = new ArrayList(); - while(resources.hasMoreElements()) { + while (resources.hasMoreElements()) { ret.add(resources.nextElement()); } return ret; @@ -259,7 +277,7 @@ private static InputStream getConfigFileInputStream(String configFilePath) public static Map findAndReadConfigFile(String name) { - return findAndReadConfigFile(name, true); + return findAndReadConfigFile(name, true); } public static Map readDefaultConfig() { @@ -269,7 +287,7 @@ public static Map readDefaultConfig() { public static Map readCommandLineOpts() { Map ret = new HashMap(); String commandOptions = System.getProperty("storm.options"); - if(commandOptions != null) { + if (commandOptions != null) { String[] configs = commandOptions.split(","); for (String config : configs) { config = URLDecoder.decode(config); @@ -292,7 +310,7 @@ public static Map readStormConfig() { Map ret = readDefaultConfig(); String confFile = System.getProperty("storm.conf.file"); Map storm; - if (confFile==null || confFile.equals("")) { + if (confFile == null || confFile.equals("")) { storm = findAndReadConfigFile("storm.yaml", false); } else { storm = findAndReadConfigFile(confFile, true); @@ -303,24 +321,24 @@ public static Map readStormConfig() { } private static Object normalizeConf(Object conf) { - if(conf==null) return new HashMap(); - if(conf instanceof Map) { + if (conf == null) return new HashMap(); + if (conf instanceof Map) { Map confMap = new HashMap((Map) conf); - for(Object key: confMap.keySet()) { + for (Object key : confMap.keySet()) { Object val = confMap.get(key); confMap.put(key, normalizeConf(val)); } return confMap; - } else if(conf instanceof List) { + } else if (conf instanceof List) { List confList = new ArrayList((List) conf); - for(int i=0; i stormConf) { } public static Object getSetComponentObject(ComponentObject obj) { - if(obj.getSetField()==ComponentObject._Fields.SERIALIZED_JAVA) { + if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) { return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class); - } else if(obj.getSetField()==ComponentObject._Fields.JAVA_OBJECT) { + } else if (obj.getSetField() == ComponentObject._Fields.JAVA_OBJECT) { return obj.get_java_object(); } else { return obj.get_shell(); @@ -343,7 +361,7 @@ public static Object getSetComponentObject(ComponentObject obj) { public static T get(Map m, S key, T def) { T ret = m.get(key); - if(ret==null) { + if (ret == null) { ret = def; } return ret; @@ -351,7 +369,7 @@ public static T get(Map m, S key, T def) { public static List tuple(Object... values) { List ret = new ArrayList(); - for(Object v: values) { + for (Object v : values) { ret.add(v); } return ret; @@ -360,18 +378,18 @@ public static List tuple(Object... values) { public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException { NimbusClient client = NimbusClient.getConfiguredClient(conf); try { - download(client, file, localFile); + download(client, file, localFile); } finally { - client.close(); + client.close(); } } public static void downloadFromHost(Map conf, String file, String localFile, String host, int port) throws IOException, TException, AuthorizationException { NimbusClient client = new NimbusClient (conf, host, port, null); try { - download(client, file, localFile); + download(client, file, localFile); } finally { - client.close(); + client.close(); } } @@ -379,21 +397,21 @@ private static void download(NimbusClient client, String file, String localFile) WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile)); try { String id = client.getClient().beginFileDownload(file); - while(true) { - ByteBuffer chunk = client.getClient().downloadChunk(id); - int written = out.write(chunk); - if(written==0) break; - } + while (true) { + ByteBuffer chunk = client.getClient().downloadChunk(id); + int written = out.write(chunk); + if (written == 0) break; + } } finally { - out.close(); + out.close(); } } public static IFn loadClojureFn(String namespace, String name) { try { - clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")")); + clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")")); } catch (Exception e) { - //if playing from the repl and defining functions, file won't exist + //if playing from the repl and defining functions, file won't exist } return (IFn) RT.var(namespace, name).deref(); } @@ -404,52 +422,52 @@ public static boolean isSystemId(String id) { public static Map reverseMap(Map map) { Map ret = new HashMap(); - for(K key: map.keySet()) { + for (K key : map.keySet()) { ret.put(map.get(key), key); } return ret; } public static ComponentCommon getComponentCommon(StormTopology topology, String id) { - if(topology.get_spouts().containsKey(id)) { + if (topology.get_spouts().containsKey(id)) { return topology.get_spouts().get(id).get_common(); } - if(topology.get_bolts().containsKey(id)) { + if (topology.get_bolts().containsKey(id)) { return topology.get_bolts().get(id).get_common(); } - if(topology.get_state_spouts().containsKey(id)) { + if (topology.get_state_spouts().containsKey(id)) { return topology.get_state_spouts().get(id).get_common(); } throw new IllegalArgumentException("Could not find component with id " + id); } public static Integer getInt(Object o) { - Integer result = getInt(o, null); - if (null == result) { - throw new IllegalArgumentException("Don't know how to convert null to int"); - } - return result; + Integer result = getInt(o, null); + if (null == result) { + throw new IllegalArgumentException("Don't know how to convert null to int"); + } + return result; } public static Integer getInt(Object o, Integer defaultValue) { - if (null == o) { - return defaultValue; - } - - if (o instanceof Integer || - o instanceof Short || - o instanceof Byte) { - return ((Number) o).intValue(); - } else if (o instanceof Long) { - final long l = (Long) o; - if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) { - return (int) l; - } - } else if (o instanceof String) { - return Integer.parseInt((String) o); - } - - throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); + if (null == o) { + return defaultValue; + } + + if (o instanceof Integer || + o instanceof Short || + o instanceof Byte) { + return ((Number) o).intValue(); + } else if (o instanceof Long) { + final long l = (Long) o; + if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) { + return (int) l; + } + } else if (o instanceof String) { + return Integer.parseInt((String) o); + } + + throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); } public static Double getDouble(Object o) { @@ -472,15 +490,14 @@ public static Double getDouble(Object o, Double defaultValue) { } public static boolean getBoolean(Object o, boolean defaultValue) { - if (null == o) { - return defaultValue; - } - - if(o instanceof Boolean) { - return (Boolean) o; - } else { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean"); - } + if (null == o) { + return defaultValue; + } + if (o instanceof Boolean) { + return (Boolean) o; + } else { + throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean"); + } } public static long secureRandomLong() { @@ -493,7 +510,7 @@ public static CuratorFramework newCurator(Map conf, List servers, Object public static CuratorFramework newCurator(Map conf, List servers, Object port, String root, ZookeeperAuthInfo auth) { List serverPorts = new ArrayList(); - for(String zkServer: (List) servers) { + for (String zkServer : (List) servers) { serverPorts.add(zkServer + ":" + Utils.getInt(port)); } String zkStr = StringUtils.join(serverPorts, ",") + root; @@ -507,14 +524,14 @@ public static CuratorFramework newCurator(Map conf, List servers, Object protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) { builder.connectString(zkStr) - .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) - .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) - .retryPolicy(new StormBoundedExponentialBackoffRetry( + .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) + .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) + .retryPolicy(new StormBoundedExponentialBackoffRetry( Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); - if(auth!=null && auth.scheme!=null && auth.payload!=null) { + if (auth != null && auth.scheme != null && auth.payload != null) { builder = builder.authorization(auth.scheme, auth.payload); } } @@ -535,28 +552,13 @@ public static CuratorFramework newCuratorStarted(Map conf, List servers, return ret; } - /** - * -(defn integer-divided [sum num-pieces] - (let [base (int (/ sum num-pieces)) - num-inc (mod sum num-pieces) - num-bases (- num-pieces num-inc)] - (if (= num-inc 0) - {base num-bases} - {base num-bases (inc base) num-inc} - ))) - * @param sum - * @param numPieces - * @return - */ - public static TreeMap integerDivided(int sum, int numPieces) { int base = sum / numPieces; int numInc = sum % numPieces; int numBases = numPieces - numInc; TreeMap ret = new TreeMap(); ret.put(base, numBases); - if(numInc!=0) { + if (numInc != 0) { ret.put(base+1, numInc); } return ret; @@ -572,7 +574,7 @@ public static void readAndLogStream(String prefix, InputStream in) { try { BufferedReader r = new BufferedReader(new InputStreamReader(in)); String line = null; - while ((line = r.readLine())!= null) { + while ((line = r.readLine()) != null) { LOG.info("{}:{}", prefix, line); } } catch (IOException e) { @@ -582,8 +584,8 @@ public static void readAndLogStream(String prefix, InputStream in) { public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) { Throwable t = throwable; - while(t != null) { - if(klass.isInstance(t)) { + while (t != null) { + if (klass.isInstance(t)) { return true; } t = t.getCause(); @@ -599,9 +601,9 @@ public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwabl */ public static boolean isZkAuthenticationConfiguredStormServer(Map conf) { return null != System.getProperty("java.security.auth.login.config") - || (conf != null + || (conf != null && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null - && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty()); + && !((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty()); } /** @@ -612,7 +614,7 @@ public static boolean isZkAuthenticationConfiguredStormServer(Map conf) { public static boolean isZkAuthenticationConfiguredTopology(Map conf) { return (conf != null && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null - && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty()); + && !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty()); } public static List getWorkerACL(Map conf) { @@ -622,37 +624,37 @@ public static List getWorkerACL(Map conf) { } String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL); if (stormZKUser == null) { - throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set"); + throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set"); } String[] split = stormZKUser.split(":",2); if (split.length != 2) { - throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); + throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); } ArrayList ret = new ArrayList(ZooDefs.Ids.CREATOR_ALL_ACL); ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1]))); return ret; } - public static String threadDump() { - final StringBuilder dump = new StringBuilder(); - final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean(); - final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); - for (java.lang.management.ThreadInfo threadInfo : threadInfos) { - dump.append('"'); - dump.append(threadInfo.getThreadName()); - dump.append("\" "); - final Thread.State state = threadInfo.getThreadState(); - dump.append("\n java.lang.Thread.State: "); - dump.append(state); - final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace(); - for (final StackTraceElement stackTraceElement : stackTraceElements) { - dump.append("\n at "); - dump.append(stackTraceElement); - } - dump.append("\n\n"); - } - return dump.toString(); - } + public static String threadDump() { + final StringBuilder dump = new StringBuilder(); + final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean(); + final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); + for (java.lang.management.ThreadInfo threadInfo : threadInfos) { + dump.append('"'); + dump.append(threadInfo.getThreadName()); + dump.append("\" "); + final Thread.State state = threadInfo.getThreadState(); + dump.append("\n java.lang.Thread.State: "); + dump.append(state); + final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace(); + for (final StackTraceElement stackTraceElement : stackTraceElements) { + dump.append("\n at "); + dump.append(stackTraceElement); + } + dump.append("\n\n"); + } + return dump.toString(); + } // Assumes caller is synchronizing private static SerializationDelegate getSerializationDelegate(Map stormConf) { @@ -675,20 +677,20 @@ private static SerializationDelegate getSerializationDelegate(Map stormConf) { return delegate; } - public static void handleUncaughtException(Throwable t) { - if (t != null && t instanceof Error) { - if (t instanceof OutOfMemoryError) { - try { - System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName()); - } catch (Throwable err) { - //Again we don't want to exit because of logging issues. + public static void handleUncaughtException(Throwable t) { + if (t != null && t instanceof Error) { + if (t instanceof OutOfMemoryError) { + try { + System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName()); + } catch (Throwable err) { + //Again we don't want to exit because of logging issues. + } + Runtime.getRuntime().halt(-1); + } else { + //Running in daemon mode, we would pass Error to calling thread. + throw (Error) t; + } } - Runtime.getRuntime().halt(-1); - } else { - //Running in daemon mode, we would pass Error to calling thread. - throw (Error) t; - } } - } }