From f2b3bb3f52d71771b2f009a831924b26bb3f7363 Mon Sep 17 00:00:00 2001 From: Ellison Anne Williams Date: Sun, 11 Sep 2016 11:50:37 -0400 Subject: [PATCH] changes to allow the ResponderDriver to read properties files from hdfs --- .gitignore | 1 + .../responder/wideskies/ResponderCLI.java | 36 +- .../responder/wideskies/ResponderDriver.java | 1 + .../streaming/ComputeStreamingResponse.java | 5 + .../pirk/utils/SystemConfiguration.java | 611 ++++++++++-------- 5 files changed, 393 insertions(+), 261 deletions(-) diff --git a/.gitignore b/.gitignore index 63756168..bf4c4668 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ *.log .DS_Store pom.xml.releaseBackup +/bin/ diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java index 0b306c63..5ba81709 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java @@ -19,6 +19,7 @@ package org.apache.pirk.responder.wideskies; import java.io.File; +import java.io.IOException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -26,6 +27,8 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +44,8 @@ public class ResponderCLI private CommandLine commandLine = null; private static final String LOCALPROPFILE = "local.responder.properties"; + private static final String HDFSPROPDIR = "hdfsPropertiesDir"; + private static final String HDFSPROPFILE = "hdfsPropertiesFile"; /** * Create and parse allowable options @@ -106,8 +111,9 @@ public String getOptionValue(String option) * Method to parse and validate the options provided * * @return - true if valid, false otherwise + * @throws IOException */ - private boolean parseOptions() + private boolean parseOptions() throws IOException { boolean valid; @@ -116,6 +122,16 @@ private boolean parseOptions() { SystemConfiguration.loadPropsFromFile(new File(getOptionValue(LOCALPROPFILE))); } + else if(hasOption(HDFSPROPDIR)) + { + FileSystem fs = FileSystem.get(new Configuration()); + SystemConfiguration.loadPropsFromHDFSDir(getOptionValue(HDFSPROPDIR), fs); + } + else if(hasOption(HDFSPROPFILE)) + { + FileSystem fs = FileSystem.get(new Configuration()); + SystemConfiguration.loadPropsFromFile(getOptionValue(HDFSPROPFILE), fs); + } else { // Pull options, set as properties @@ -148,16 +164,30 @@ private Options createOptions() optionHelp.setRequired(false); options.addOption(optionHelp); - // local.querier.properties + // local.responder.properties Option optionLocalPropFile = new Option("localPropFile", LOCALPROPFILE, true, "Optional local properties file"); optionLocalPropFile.setRequired(false); optionLocalPropFile.setArgName(LOCALPROPFILE); optionLocalPropFile.setType(String.class); options.addOption(optionLocalPropFile); + + // hdfsPropertiesDir + Option optionHDFSPropDir = new Option("hdfsPropsDir", HDFSPROPDIR, true, "Optional location of directory in hdfs containing properties file(s)"); + optionHDFSPropDir.setRequired(false); + optionHDFSPropDir.setArgName(HDFSPROPDIR); + optionHDFSPropDir.setType(String.class); + options.addOption(optionHDFSPropDir); + + // hdfsPropertiesFile + Option optionHDFSPropFile = new Option("hdfsPropsFile", HDFSPROPFILE, true, "Optional location of properties file(s) in hdfs"); + optionHDFSPropFile.setRequired(false); + optionHDFSPropFile.setArgName(HDFSPROPFILE); + optionHDFSPropFile.setType(String.class); + options.addOption(optionHDFSPropFile); // platform Option optionPlatform = new Option("p", ResponderProps.PLATFORM, true, - "required -- 'mapreduce', 'spark', or 'standalone' : Processing platform technology for the responder"); + "required -- 'mapreduce', 'spark', 'sparkstreaming', 'storm', or 'standalone' : Processing platform technology for the responder"); optionPlatform.setRequired(false); optionPlatform.setArgName(ResponderProps.PLATFORM); optionPlatform.setType(String.class); diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java index 6f34de5e..044012d7 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java @@ -72,6 +72,7 @@ public static void main(String[] args) throws Exception logger.error("platform " + platformString + " not found."); } + logger.info("platform = " + platform); switch (platform) { case MAPREDUCE: diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java index acb86826..b91cc687 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java @@ -191,6 +191,11 @@ private void setup() throws Exception QueryInfo queryInfo = query.getQueryInfo(); bVars.setQuery(query); bVars.setQueryInfo(queryInfo); + + if(query == null) + { + logger.info("query is null for queryInput = " + queryInput); + } if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false)) { diff --git a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java index 8cb5d17c..4f2fa035 100755 --- a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java +++ b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java @@ -20,11 +20,16 @@ import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; +import java.util.Enumeration; import java.util.Properties; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,262 +45,352 @@ */ public class SystemConfiguration { - private static final Logger logger = LoggerFactory.getLogger(SystemConfiguration.class); - - private static final Properties props = new Properties(); - - /** - * By default, these files should be found on the root of the classpath - */ - private static final String DEFAULT_PROPERTY_FILE = "pirk.properties"; - private static final String QUERIER_PROPERTIES_FILE = "querier.properties"; - private static final String RESPONDER_PROPERTIES_FILE = "responder.properties"; - - private static final String LOCAL_PROPERTIES_DIR = "local.pirk.properties.dir"; - - static - { - initialize(); - } - - public static void initialize() - { - props.clear(); - - // First try to load the default properties file - loadPropsFromResource(DEFAULT_PROPERTY_FILE); - - // Try to load props from the querier and responder property files, if they exist - loadPropsFromResource(QUERIER_PROPERTIES_FILE); - loadPropsFromResource(RESPONDER_PROPERTIES_FILE); - - // Try to load the local properties files, if they exists - loadPropsFromDir(getProperty(LOCAL_PROPERTIES_DIR)); - } - - /** - * Gets the specified property; returns null if the property isn't found. - * - * @param propertyName - * The name of the requested property. - * @return The value of the property, or null if the property cannot be found. - */ - public static String getProperty(String propertyName) - { - return props.getProperty(propertyName); - } - - /** - * Gets the specified property as a String, or the default value if the property isn't found. - * - * @param propertyName - * The name of the requested string property value. - * @param defaultValue - * The value to return if the property is undefined. - * @return The value of the requested property, or the default value if the property is undefined. - */ - public static String getProperty(String propertyName, String defaultValue) - { - return props.getProperty(propertyName, defaultValue); - } - - /** - * Gets the specified property as an int, or the default value if the property isn't found. - * - * @param propertyName - * The name of the requested int property value. - * @param defaultValue - * The value to return if the property is undefined. - * @return The value of the requested property, or the default value if the property is undefined. - * @throws NumberFormatException - * If the property does not contain a parsable int value. - */ - public static int getIntProperty(String propertyName, int defaultValue) - { - String value = props.getProperty(propertyName); - return (value == null) ? defaultValue : Integer.parseInt(value); - } - - /** - * Gets the specified property as an long, or the default value if the property isn't found. - * - * @param propertyName - * The name of the requested long property value. - * @param defaultValue - * The value to return if the property is undefined. - * @return The value of the requested property, or the default value if the property is undefined. - * @throws NumberFormatException - * If the property does not contain a parsable long value. - */ - public static long getLongProperty(String propertyName, long defaultValue) - { - String value = props.getProperty(propertyName); - return (value == null) ? defaultValue : Long.parseLong(value); - } - - /** - * Gets the specified property as a boolean, or the default value if the property isn't defined. - * - * @param propertyName - * The name of the requested boolean property value. - * @param defaultValue - * The value to return if the property is undefined. - * @return true if the property is defined and has the value "true", otherwise defaultValue. - */ - public static boolean getBooleanProperty(String propertyName, boolean defaultValue) - { - return (isSetTrue(propertyName)) || defaultValue; - } - - /** - * Returns true iff the specified boolean property value is "true". - *

- * If the property is not found, or it's value is not "true" then the method will return false. - * - * @param propertyName - * The name of the requested boolean property value. - * @return true if the property is defined and has the value "true", otherwise false. - */ - public static boolean isSetTrue(String propertyName) - { - String value = props.getProperty(propertyName); - return "true".equals(value); - } - - /** - * Sets the property to the given value. - *

- * Any previous values stored at the same property name are replaced. - * - * @param propertyName - * The name of the property to set. - * @param value - * The property value. - */ - public static void setProperty(String propertyName, String value) - { - props.setProperty(propertyName, value); - } - - /** - * Returns true iff the given property name is defined. - * - * @param propertyName - * The property name to test. - * @return true if the property is found in the configuration, or false otherwise. - */ - public static boolean hasProperty(String propertyName) - { - return props.containsKey(propertyName); - } - - /** - * Appends a property via a comma separated list - *

- * If the property does not exist, it adds it. - * - * @param propertyName - * The property whose value is to be appended with the given value. - * @param value - * The value to be stored, or appended to the current value. - */ - public static void appendProperty(String propertyName, String value) - { - String oldValue = props.getProperty(propertyName); - - if (oldValue != null && !oldValue.equals("none")) - { - oldValue += "," + value; - } - else - { - oldValue = value; - } - props.setProperty(propertyName, oldValue); - } - - /** - * Loads the properties from local properties file in the specified directory. - *

- * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration. - * - * @param dirName - * The directory to search for the new properties files. - */ - public static void loadPropsFromDir(String dirName) - { - logger.info("Loading properties from dirName = " + dirName); - File[] directoryListing = new File(dirName).listFiles(new FilenameFilter() - { - @Override - public boolean accept(File dir, String name) - { - return name.endsWith(".properties"); - } - }); - - if (directoryListing != null) - { - for (File file : directoryListing) - { - loadPropsFromFile(file); - } - } - } - - /** - * Loads the properties from the specified file. - *

- * The new properties are added to the current system configuration. - * - * @param file - * The properties file containing the system properties to add. - */ - public static void loadPropsFromFile(File file) - { - if (file.exists()) - { - try (InputStream stream = new FileInputStream(file)) - { - logger.info("Loading properties file '" + file.getAbsolutePath() + "'"); - props.load(stream); - } catch (IOException e) - { - logger.error("Problem loading properties file '" + file.getAbsolutePath() + "'"); - e.printStackTrace(); - } - } - else - { - logger.warn("Properties file does not exist: '" + file.getAbsolutePath() + "'"); - } - } - - /** - * Loads the properties from the specified resource on the current classloader. - *

- * The new properties are added to the current system configuration. - * - * @param name - * The name of the resource defining the properties. - */ - public static void loadPropsFromResource(String name) - { - try (InputStream stream = SystemConfiguration.class.getClassLoader().getResourceAsStream(name)) - { - if (stream != null) - { - logger.info("Loading file '" + name + "'"); - props.load(stream); - } - else - { - logger.error("No file found '" + name + "'"); - } - } catch (IOException e) - { - logger.error("Problem loading file '" + name + "'"); - e.printStackTrace(); - } - } + private static final Logger logger = LoggerFactory.getLogger(SystemConfiguration.class); + + private static final Properties props = new Properties(); + + /** + * By default, these files should be found on the root of the classpath + */ + private static final String DEFAULT_PROPERTY_FILE = "pirk.properties"; + private static final String QUERIER_PROPERTIES_FILE = "querier.properties"; + private static final String RESPONDER_PROPERTIES_FILE = "responder.properties"; + + private static final String LOCAL_PROPERTIES_DIR = "local.pirk.properties.dir"; + + static + { + initialize(); + } + + public static void initialize() + { + props.clear(); + + // First try to load the default properties file + loadPropsFromResource(DEFAULT_PROPERTY_FILE); + + // Try to load props from the querier and responder property files, if they exist + loadPropsFromResource(QUERIER_PROPERTIES_FILE); + loadPropsFromResource(RESPONDER_PROPERTIES_FILE); + + // Try to load the local properties files, if they exists + loadPropsFromDir(getProperty(LOCAL_PROPERTIES_DIR)); + } + + /** + * Gets the specified property; returns null if the property isn't found. + * + * @param propertyName + * The name of the requested property. + * @return The value of the property, or null if the property cannot be found. + */ + public static String getProperty(String propertyName) + { + return props.getProperty(propertyName); + } + + /** + * Gets the specified property as a String, or the default value if the property isn't found. + * + * @param propertyName + * The name of the requested string property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return The value of the requested property, or the default value if the property is undefined. + */ + public static String getProperty(String propertyName, String defaultValue) + { + return props.getProperty(propertyName, defaultValue); + } + + /** + * Gets the specified property as an int, or the default value if the property isn't found. + * + * @param propertyName + * The name of the requested int property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return The value of the requested property, or the default value if the property is undefined. + * @throws NumberFormatException + * If the property does not contain a parsable int value. + */ + public static int getIntProperty(String propertyName, int defaultValue) + { + String value = props.getProperty(propertyName); + return (value == null) ? defaultValue : Integer.parseInt(value); + } + + /** + * Gets the specified property as an long, or the default value if the property isn't found. + * + * @param propertyName + * The name of the requested long property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return The value of the requested property, or the default value if the property is undefined. + * @throws NumberFormatException + * If the property does not contain a parsable long value. + */ + public static long getLongProperty(String propertyName, long defaultValue) + { + String value = props.getProperty(propertyName); + return (value == null) ? defaultValue : Long.parseLong(value); + } + + /** + * Gets the specified property as a boolean, or the default value if the property isn't defined. + * + * @param propertyName + * The name of the requested boolean property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return true if the property is defined and has the value "true", otherwise defaultValue. + */ + public static boolean getBooleanProperty(String propertyName, boolean defaultValue) + { + return (isSetTrue(propertyName)) || defaultValue; + } + + /** + * Returns true iff the specified boolean property value is "true". + *

+ * If the property is not found, or it's value is not "true" then the method will return false. + * + * @param propertyName + * The name of the requested boolean property value. + * @return true if the property is defined and has the value "true", otherwise false. + */ + public static boolean isSetTrue(String propertyName) + { + String value = props.getProperty(propertyName); + return "true".equals(value); + } + + /** + * Sets the property to the given value. + *

+ * Any previous values stored at the same property name are replaced. + * + * @param propertyName + * The name of the property to set. + * @param value + * The property value. + */ + public static void setProperty(String propertyName, String value) + { + props.setProperty(propertyName, value); + } + + /** + * Returns true iff the given property name is defined. + * + * @param propertyName + * The property name to test. + * @return true if the property is found in the configuration, or false otherwise. + */ + public static boolean hasProperty(String propertyName) + { + return props.containsKey(propertyName); + } + + /** + * Appends a property via a comma separated list + *

+ * If the property does not exist, it adds it. + * + * @param propertyName + * The property whose value is to be appended with the given value. + * @param value + * The value to be stored, or appended to the current value. + */ + public static void appendProperty(String propertyName, String value) + { + String oldValue = props.getProperty(propertyName); + + if (oldValue != null && !oldValue.equals("none")) + { + oldValue += "," + value; + } + else + { + oldValue = value; + } + props.setProperty(propertyName, oldValue); + } + + /** + * Loads the properties from local properties file in the specified directory. + *

+ * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration. + * + * @param dirName + * The directory to search for the new properties files. + */ + public static void loadPropsFromDir(String dirName) + { + logger.info("Loading properties from dirName = " + dirName); + File[] directoryListing = new File(dirName).listFiles(new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + return name.endsWith(".properties"); + } + }); + + if (directoryListing != null) + { + for (File file : directoryListing) + { + loadPropsFromFile(file); + } + } + } + + /** + * Loads the properties from local properties file in the specified directory in hdfs. + *

+ * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration. + * + * @param dirName + * The directory to search for the new properties files. + * @throws IOException + * @throws FileNotFoundException + */ + public static void loadPropsFromHDFSDir(String dirName, FileSystem fs) throws FileNotFoundException, IOException + { + logger.info("Loading properties from dirName = " + dirName); + + Path dirPath = new Path(dirName); + + FileStatus[] status = fs.listStatus(dirPath); + for (int i=0;i + * The new properties are added to the current system configuration. + * + * @param file + * The properties file containing the system properties to add. + */ + public static void loadPropsFromFile(File file) + { + if (file.exists()) + { + try (InputStream stream = new FileInputStream(file)) + { + logger.info("Loading properties file '" + file.getAbsolutePath() + "'"); + loadProperties(stream); + } catch (IOException e) + { + logger.error("Problem loading properties file '" + file.getAbsolutePath() + "'"); + e.printStackTrace(); + } + } + else + { + logger.warn("Properties file does not exist: '" + file.getAbsolutePath() + "'"); + } + } + + /** + * Loads the properties from the specified file in hdfs + *

+ * The new properties are added to the current system configuration. + * + * @param file + * The properties file containing the system properties to add. + * @throws IOException + */ + public static void loadPropsFromFile(String filename, FileSystem fs) throws IOException + { + Path p = new Path(filename); + loadPropsFromFile(p, fs); + } + + /** + * Loads the properties from the specified file in hdfs + *

+ * The new properties are added to the current system configuration. + * + * @param file + * The properties file containing the system properties to add. + * @throws IOException + */ + public static void loadPropsFromFile(Path filePath, FileSystem fs) throws IOException + { + if(fs.exists(filePath)) + { + try (InputStream stream = fs.open(filePath);) + { + logger.info("Loading properties file from hdfs'" + filePath.toString() + "'"); + loadProperties(stream); + } catch (IOException e) + { + logger.error("Problem loading properties file from hdfs '" + filePath.toString() + "'"); + e.printStackTrace(); + } + } + else + { + logger.warn("Properties file does not exist: '" + filePath.toString() + "'"); + } + } + + /** + * Loads the properties from the specified resource on the current classloader. + *

+ * The new properties are added to the current system configuration. + * + * @param name + * The name of the resource defining the properties. + */ + public static void loadPropsFromResource(String name) + { + try (InputStream stream = SystemConfiguration.class.getClassLoader().getResourceAsStream(name)) + { + if (stream != null) + { + logger.info("Loading file '" + name + "'"); + loadProperties(stream); + } + else + { + logger.error("No file found '" + name + "'"); + } + } catch (IOException e) + { + logger.error("Problem loading file '" + name + "'"); + e.printStackTrace(); + } + } + + /** + * Load the properties in the Properties object and then trim any whitespace + *

+ * Properties.load does not do this automatically + * @throws IOException + */ + public static void loadProperties(InputStream stream) throws IOException + { + props.load(stream); + + Enumeration propKeys = props.propertyNames(); + while(propKeys.hasMoreElements()) + { + String tmpKey = (String)propKeys.nextElement(); + String tmpValue = props.getProperty(tmpKey); + tmpValue = tmpValue.trim(); + props.put(tmpKey, tmpValue); + } + } }