From b5fe05a3115b8077e0105f70eee89edc3635eb2a Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Wed, 25 May 2016 11:51:12 +0530 Subject: [PATCH 1/2] STORM-1864 StormSubmitter should throw respective exceptions and log respective errors forregistered submitter hook invocation. --- .../hive/trident/TridentHiveTopology.java | 29 ++++++++----- .../jvm/org/apache/storm/StormSubmitter.java | 43 +++++++++++++------ .../storm/hooks/SubmitterHookException.java | 41 ++++++++++++++++++ 3 files changed, 90 insertions(+), 23 deletions(-) create mode 100644 storm-core/src/jvm/org/apache/storm/hooks/SubmitterHookException.java diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java index d6c1d65d145..86a35e67047 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,6 +26,7 @@ import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; +import org.apache.storm.hooks.SubmitterHookException; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.task.TopologyContext; @@ -41,9 +42,13 @@ import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.state.StateFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TridentHiveTopology { + private static final Logger LOG = LoggerFactory.getLogger(TridentHiveTopology.class); + public static StormTopology buildTopology(String metaStoreURI, String dbName, String tblName, Object keytab, Object principal) { int batchSize = 100; FixedBatchSpout spout = new FixedBatchSpout(batchSize); @@ -93,28 +98,32 @@ public static void main(String[] args) { if(args.length == 3) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("tridentHiveTopology", conf, buildTopology(metaStoreURI, dbName, tblName,null,null)); - System.out.println("waiting for 60 seconds"); + LOG.info("waiting for 60 seconds"); waitForSeconds(60); - System.out.println("killing topology"); + LOG.info("killing topology"); cluster.killTopology("tridenHiveTopology"); - System.out.println("cluster shutdown"); + LOG.info("cluster shutdown"); cluster.shutdown(); - System.out.println("cluster shutdown"); + LOG.info("cluster shutdown"); System.exit(0); } else if(args.length == 4) { try { StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null)); - } catch(Exception e) { - System.out.println("Failed to submit topology "+e); + } catch(SubmitterHookException e) { + LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e); + } catch (Exception e) { + LOG.warn("Failed to submit topology ", e); } } else if (args.length == 6) { try { StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,args[4],args[5])); - } catch(Exception e) { - System.out.println("Failed to submit topology "+e); + } catch(SubmitterHookException e) { + LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e); + } catch (Exception e) { + LOG.warn("Failed to submit topology ", e); } } else { - System.out.println("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]"); + LOG.info("Usage: TridentHiveTopology metastoreURI dbName tableName [topologyNamey]"); } } diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java index c199f97944a..077c5779416 100644 --- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java +++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.storm.hooks.SubmitterHookException; import org.apache.storm.scheduler.resource.ResourceUtils; import org.apache.storm.validation.ConfigValidation; import org.apache.commons.lang.StringUtils; @@ -149,6 +150,7 @@ public static void pushCredentials(String name, Map stormConf, Map Date: Wed, 25 May 2016 12:35:26 +0530 Subject: [PATCH 2/2] STORM-1864 log messages with format --- storm-core/src/jvm/org/apache/storm/StormSubmitter.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java index 077c5779416..97667409e18 100644 --- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java +++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java @@ -273,13 +273,18 @@ private static void invokeSubmitterHook(String name, String asUser, Map stormCon if (stormConf.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) { submissionNotifierClassName = stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString(); LOG.info("Initializing the registered ISubmitterHook [{}]", submissionNotifierClassName); + + if(submissionNotifierClassName == null || submissionNotifierClassName.isEmpty()) { + throw new IllegalArgumentException(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN + " property must be a non empty string."); + } + ISubmitterHook submitterHook = (ISubmitterHook) Class.forName(submissionNotifierClassName).newInstance(); TopologyInfo topologyInfo = Utils.getTopologyInfo(name, asUser, stormConf); LOG.info("Invoking the registered ISubmitterHook [{}]", submissionNotifierClassName); submitterHook.notify(topologyInfo, stormConf, topology); } } catch (Exception e) { - LOG.warn("Error occurred in invoking submitter hook: "+submissionNotifierClassName, e); + LOG.warn("Error occurred in invoking submitter hook:[{}] ",submissionNotifierClassName, e); throw new SubmitterHookException(e); } }