From 811e9869bbf461cc4db7ec4d89efe9c1281bfc88 Mon Sep 17 00:00:00 2001 From: Sergey Golovko Date: Thu, 6 Jul 2017 11:25:08 -0700 Subject: [PATCH] APEXCORE-754 Add plugin dependency jar-files to application package Included plugin jar-files into the application package and added names of the plugin jar-files to the application classpath. --- .../apache/apex/common/util/JarHelper.java | 76 ++++++++++++++++++- .../com/datatorrent/stram/StramClient.java | 22 ++++-- .../stram/StreamingAppMasterService.java | 2 + 3 files changed, 92 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/apex/common/util/JarHelper.java b/common/src/main/java/org/apache/apex/common/util/JarHelper.java index d40cec8a8f..bd75b44212 100644 --- a/common/src/main/java/org/apache/apex/common/util/JarHelper.java +++ b/common/src/main/java/org/apache/apex/common/util/JarHelper.java @@ -27,11 +27,14 @@ import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.security.CodeSource; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; @@ -41,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -52,6 +56,7 @@ public class JarHelper { private static final Logger logger = LoggerFactory.getLogger(JarHelper.class); + private static final String APEX_DEPENDENCIES = "apex-dependencies"; private final Map sourceToJar = new HashMap<>(); @@ -68,7 +73,7 @@ public static String createJar(String prefix, File dir, boolean deleteOnExit) th return temp.getAbsolutePath(); } - public String getJar(Class jarClass) + public String getJar(Class jarClass, boolean makeJarFromFolder) { String jar = null; final CodeSource codeSource = jarClass.getProtectionDomain().getCodeSource(); @@ -88,6 +93,9 @@ public String getJar(Class jarClass) jar = location.getFile(); final File dir = new File(jar); if (dir.isDirectory()) { + if (!makeJarFromFolder) { + throw new AssertionError("Cannot resolve jar file for " + jarClass + ". URL " + location); + } try { jar = createJar("apex-", dir, false); } catch (IOException e) { @@ -107,6 +115,72 @@ public String getJar(Class jarClass) return jar; } + public String getJar(Class jarClass) + { + return getJar(jarClass, true); + } + + /** + * Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files + * that are defined in the property "apex-dependencies" of the manifest of the root jar-file. + * If the class is an independent file the method makes jar file from the folder that contains the class + * @param jarClass Class + * @param makeJarFromFolder True if the method should make jar from folder that contains the independent class + * @param addJarDependencies True if the method should include dependent jar files + * @return Set of names of the jar-files + */ + public Set getJars(Class jarClass, boolean makeJarFromFolder, boolean addJarDependencies) + { + String jar = getJar(jarClass, makeJarFromFolder); + Set set = new HashSet<>(); + if (jar != null) { + set.add(jar); + if (addJarDependencies) { + try { + getDependentJarsFromManifest(new JarFile(jar), set); + } catch (IOException ex) { + logger.warn("Cannot open Jar-file {}", jar); + } + } + } + return set; + } + + /** + * Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files + * that are defined in the property "apex-dependencies" of the manifest of the root jar-file. + * If the class is an independent file the method makes jar file from the folder that contains the class + * @param jarClass Class + * @return Set of names of the jar-files + */ + public Set getJars(Class jarClass) + { + return getJars(jarClass, true, true); + } + + /** + * Adds dependent jar-files from manifest to the target list of jar-files + * @param jarFile Jar file + * @param set Set of target jar-files + * @throws IOException + */ + public void getDependentJarsFromManifest(JarFile jarFile, Set set) throws IOException + { + String value = jarFile.getManifest().getMainAttributes().getValue(APEX_DEPENDENCIES); + if (!StringUtils.isEmpty(value)) { + Path folderPath = Paths.get(jarFile.getName()).getParent(); + for (String jar : value.split(",")) { + File file = folderPath.resolve(jar).toFile(); + if (file.exists()) { + set.add(file.getPath()); + logger.debug("The file {} was added as a dependent of the jar {}", file.getPath(), jarFile.getName()); + } else { + logger.warn("The dependent file {} of the jar {} does not exist", file.getPath(), jarFile.getName()); + } + } + } + } + private static class JarCreator { diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 96f9daaca3..cf69c35078 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -179,7 +179,7 @@ public void stop() yarnClient.stop(); } - public static LinkedHashSet findJars(LogicalPlan dag, Class[] defaultClasses) + public LinkedHashSet findJars(Class[] defaultClasses) { List> jarClasses = new ArrayList<>(); @@ -211,10 +211,7 @@ public static LinkedHashSet findJars(LogicalPlan dag, Class[] default JarHelper jarHelper = new JarHelper(); for (Class jarClass : jarClasses) { - String jar = jarHelper.getJar(jarClass); - if (jar != null) { - localJarFiles.add(jar); - } + localJarFiles.addAll(jarHelper.getJars(jarClass)); } String libJarsPath = dag.getValue(Context.DAGContext.LIBRARY_JARS); @@ -223,7 +220,18 @@ public static LinkedHashSet findJars(LogicalPlan dag, Class[] default localJarFiles.addAll(Arrays.asList(libJars)); } - LOG.info("Local jar file dependencies: " + localJarFiles); + String pluginClassesPaths = conf.get(StreamingAppMasterService.PLUGINS_CONF_KEY); + if (!StringUtils.isEmpty(pluginClassesPaths)) { + for (String pluginClassPath : StringUtils.splitByWholeSeparator(pluginClassesPaths, StreamingAppMasterService.PLUGINS_CONF_SEP)) { + try { + localJarFiles.addAll(jarHelper.getJars(Thread.currentThread().getContextClassLoader().loadClass(pluginClassPath))); + } catch (ClassNotFoundException ex) { + LOG.error("Cannot find the class {}", pluginClassPath, ex); + } + } + } + + LOG.info("Local jar file dependencies: {}", localJarFiles); return localJarFiles; } @@ -338,7 +346,7 @@ public void startApplication() throws YarnException, IOException throw new IllegalStateException(applicationType + " is not a valid application type."); } - LinkedHashSet localJarFiles = findJars(dag, defaultClasses); + LinkedHashSet localJarFiles = findJars(defaultClasses); if (resources != null) { localJarFiles.addAll(resources); diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 09478ebbc0..762c7cb300 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -604,6 +604,8 @@ protected void serviceInit(Configuration conf) throws Exception } public static final String PLUGINS_CONF_KEY = "apex.plugin.stram.plugins"; + public static final String PLUGINS_CONF_SEP = ","; + private void initApexPluginDispatcher() { PluginLocator locator = new ChainedPluginLocator<>(new ServiceLoaderBasedPluginLocator<>(DAGExecutionPlugin.class),