From 0c7a575f30de97784a76128b2b2566577ea4b89f Mon Sep 17 00:00:00 2001 From: Sergey Golovko Date: Thu, 6 Jul 2017 11:25:08 -0700 Subject: [PATCH] APEXCORE-754 Add plugin dependency jar-files to application package Included plugin jar-files into the application package and added names of the plugin jar-files to the application classpath. --- .../apache/apex/common/util/JarHelper.java | 129 ++++++++++++++---- .../com/datatorrent/stram/StramClient.java | 12 +- .../stram/StreamingAppMasterService.java | 2 + 3 files changed, 111 insertions(+), 32 deletions(-) diff --git a/common/src/main/java/org/apache/apex/common/util/JarHelper.java b/common/src/main/java/org/apache/apex/common/util/JarHelper.java index d40cec8a8f..48cc6ec7bb 100644 --- a/common/src/main/java/org/apache/apex/common/util/JarHelper.java +++ b/common/src/main/java/org/apache/apex/common/util/JarHelper.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.JarURLConnection; import java.net.URL; +import java.net.URLConnection; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -31,6 +32,8 @@ import java.nio.file.attribute.BasicFileAttributes; import java.security.CodeSource; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.jar.JarEntry; import java.util.jar.JarFile; @@ -41,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -70,41 +74,108 @@ public static String createJar(String prefix, File dir, boolean deleteOnExit) th public String getJar(Class jarClass) { - String jar = null; - final CodeSource codeSource = jarClass.getProtectionDomain().getCodeSource(); - if (codeSource != null) { - URL location = codeSource.getLocation(); - jar = sourceToJar.get(location); - if (jar == null) { - // don't create jar file from folders multiple times - if ("jar".equals(location.getProtocol())) { - try { - location = ((JarURLConnection)location.openConnection()).getJarFileURL(); - } catch (IOException e) { - throw new AssertionError("Cannot resolve jar file for " + jarClass, e); - } - } - if ("file".equals(location.getProtocol())) { - jar = location.getFile(); - final File dir = new File(jar); - if (dir.isDirectory()) { - try { - jar = createJar("apex-", dir, false); - } catch (IOException e) { - throw new AssertionError("Cannot resolve jar file for " + jarClass + ". URL " + location, e); - } + List list = getJars(jarClass, true, false); + if (list == null) { + return null; + } else if (list.isEmpty()) { + throw new AssertionError("Cannot resolve jar file for " + jarClass); + } + return list.get(0); + } + + /** + * Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files + * that are defined in the property "apex-dependencies" of the manifest of the root jar-file. + * If the class is an independent file the method makes jar file from the folder that contains the class + * @param classPath Class path + * @param makeJarFromFolder True if the method should make jar from folder that contains the independent class + * @param addJarDependencies True if the method should include dependent jar files + * @return List of names of the jar-files + */ + public List getJars(String classPath, boolean makeJarFromFolder, boolean addJarDependencies) + { + try { + return getJars(Thread.currentThread() + .getContextClassLoader().loadClass(classPath), makeJarFromFolder, addJarDependencies); + } catch (ClassNotFoundException ex) { + logger.error("Cannot find the class {}", classPath, ex); + throw new RuntimeException("Cannot find the class " + classPath, ex); + } + } + + /** + * Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files + * that are defined in the property "apex-dependencies" of the manifest of the root jar-file. + * If the class is an independent file the method makes jar file from the folder that contains the class + * @param clazz Class + * @param makeJarFromFolder True if the method should make jar from folder that contains the independent class + * @param addJarDependencies True if the method should include dependent jar files + * @return List of names of the jar-files + */ + public List getJars(Class clazz, boolean makeJarFromFolder, boolean addJarDependencies) + { + CodeSource codeSource = clazz.getProtectionDomain().getCodeSource(); + if (codeSource == null) { + return null; + } + + List list = new LinkedList<>(); + URL location = codeSource.getLocation(); + String jarPath = sourceToJar.get(location); + + if (jarPath == null) { + try { + URLConnection conn = clazz.getResource(clazz.getSimpleName() + ".class").openConnection(); + + if (conn instanceof JarURLConnection) { + jarPath = ((JarURLConnection)conn).getJarFileURL().getFile(); + if (addJarDependencies) { + getDependentJarsFromManifest((JarURLConnection)conn, jarPath, list); + // add the location of the jar-file to cache only if the dependent jars were added to the jar list + sourceToJar.put(location, jarPath); } + } else if (makeJarFromFolder && ("file".equals(location.getProtocol()))) { + jarPath = createJar("apex-", new File(conn.getURL().getFile()).getParentFile(), false); + sourceToJar.put(location, jarPath); } else { - throw new AssertionError("Cannot resolve jar file for " + jarClass + ". URL " + location); + logger.warn("The class {} was loaded from incorrect url connection {}", clazz.getCanonicalName(), conn); + return list; } - sourceToJar.put(location, jar); - logger.debug("added sourceLocation {} as {}", location, jar); + } catch (IOException ex) { + String className = clazz.getCanonicalName(); + logger.error("Cannot resolve jar file for the class {}", className, ex); + throw new RuntimeException("Cannot resolve jar file for the class " + className, ex); } - if (jar == null) { - throw new AssertionError("Cannot resolve jar file for " + jarClass); + } + + list.add(jarPath); + logger.debug("added sourceLocation {} as {}", location, jarPath); + + return list; + } + + /** + * Gets dependent jar-files from manifest + * @param conn Jar connection + * @param jarPath path to jar file + * @param list List of target jar-files + * @throws IOException + */ + private void getDependentJarsFromManifest(JarURLConnection conn, String jarPath, List list) throws IOException + { + String value = ((JarURLConnection)conn).getMainAttributes().getValue("apex-dependencies"); + if (!StringUtils.isEmpty(value)) { + String folderPath = new File(jarPath).getParent(); + for (String jarFile : value.split(",")) { + String file = folderPath + File.separator + jarFile; + if (new File(file).exists()) { + list.add(file); + logger.debug("Jar-file {} was added as a dependent jar", file); + } else { + logger.warn("Jar-file {} does not exist", file); + } } } - return jar; } private static class JarCreator diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 96f9daaca3..f5deaac332 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -179,7 +179,7 @@ public void stop() yarnClient.stop(); } - public static LinkedHashSet findJars(LogicalPlan dag, Class[] defaultClasses) + public LinkedHashSet findJars(Class[] defaultClasses) { List> jarClasses = new ArrayList<>(); @@ -223,8 +223,14 @@ public static LinkedHashSet findJars(LogicalPlan dag, Class[] default localJarFiles.addAll(Arrays.asList(libJars)); } - LOG.info("Local jar file dependencies: " + localJarFiles); + String pluginClassesPaths = conf.get(StreamingAppMasterService.PLUGINS_CONF_KEY); + if (!StringUtils.isEmpty(pluginClassesPaths)) { + for (String pluginClassPath : StringUtils.splitByWholeSeparator(pluginClassesPaths, StreamingAppMasterService.PLUGINS_CONF_SEP)) { + localJarFiles.addAll(jarHelper.getJars(pluginClassPath, true, true)); + } + } + LOG.info("Local jar file dependencies: " + localJarFiles); return localJarFiles; } @@ -338,7 +344,7 @@ public void startApplication() throws YarnException, IOException throw new IllegalStateException(applicationType + " is not a valid application type."); } - LinkedHashSet localJarFiles = findJars(dag, defaultClasses); + LinkedHashSet localJarFiles = findJars(defaultClasses); if (resources != null) { localJarFiles.addAll(resources); diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 09478ebbc0..762c7cb300 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -604,6 +604,8 @@ protected void serviceInit(Configuration conf) throws Exception } public static final String PLUGINS_CONF_KEY = "apex.plugin.stram.plugins"; + public static final String PLUGINS_CONF_SEP = ","; + private void initApexPluginDispatcher() { PluginLocator locator = new ChainedPluginLocator<>(new ServiceLoaderBasedPluginLocator<>(DAGExecutionPlugin.class),