Skip to content
This repository has been archived by the owner on Jun 7, 2021. It is now read-only.

Commit

Permalink
APEXCORE-754 Add plugin dependency jar-files to application package
Browse files Browse the repository at this point in the history
Included plugin jar-files into the application package and added names of the plugin jar-files to the application classpath.
  • Loading branch information
Sergey Golovko committed Jul 12, 2017
1 parent 2ba6084 commit 0c7a575
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 32 deletions.
129 changes: 100 additions & 29 deletions common/src/main/java/org/apache/apex/common/util/JarHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import java.io.IOException;
import java.net.JarURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.security.CodeSource;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
Expand All @@ -41,6 +44,7 @@
import org.slf4j.LoggerFactory;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

Expand Down Expand Up @@ -70,41 +74,108 @@ public static String createJar(String prefix, File dir, boolean deleteOnExit) th

public String getJar(Class<?> jarClass)
{
String jar = null;
final CodeSource codeSource = jarClass.getProtectionDomain().getCodeSource();
if (codeSource != null) {
URL location = codeSource.getLocation();
jar = sourceToJar.get(location);
if (jar == null) {
// don't create jar file from folders multiple times
if ("jar".equals(location.getProtocol())) {
try {
location = ((JarURLConnection)location.openConnection()).getJarFileURL();
} catch (IOException e) {
throw new AssertionError("Cannot resolve jar file for " + jarClass, e);
}
}
if ("file".equals(location.getProtocol())) {
jar = location.getFile();
final File dir = new File(jar);
if (dir.isDirectory()) {
try {
jar = createJar("apex-", dir, false);
} catch (IOException e) {
throw new AssertionError("Cannot resolve jar file for " + jarClass + ". URL " + location, e);
}
List<String> list = getJars(jarClass, true, false);
if (list == null) {
return null;
} else if (list.isEmpty()) {
throw new AssertionError("Cannot resolve jar file for " + jarClass);
}
return list.get(0);
}

/**
* Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files
* that are defined in the property "apex-dependencies" of the manifest of the root jar-file.
* If the class is an independent file the method makes jar file from the folder that contains the class
* @param classPath Class path
* @param makeJarFromFolder True if the method should make jar from folder that contains the independent class
* @param addJarDependencies True if the method should include dependent jar files
* @return List of names of the jar-files
*/
public List<String> getJars(String classPath, boolean makeJarFromFolder, boolean addJarDependencies)
{
try {
return getJars(Thread.currentThread()
.getContextClassLoader().loadClass(classPath), makeJarFromFolder, addJarDependencies);
} catch (ClassNotFoundException ex) {
logger.error("Cannot find the class {}", classPath, ex);
throw new RuntimeException("Cannot find the class " + classPath, ex);
}
}

/**
* Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files
* that are defined in the property "apex-dependencies" of the manifest of the root jar-file.
* If the class is an independent file the method makes jar file from the folder that contains the class
* @param clazz Class
* @param makeJarFromFolder True if the method should make jar from folder that contains the independent class
* @param addJarDependencies True if the method should include dependent jar files
* @return List of names of the jar-files
*/
public List<String> getJars(Class<?> clazz, boolean makeJarFromFolder, boolean addJarDependencies)
{
CodeSource codeSource = clazz.getProtectionDomain().getCodeSource();
if (codeSource == null) {
return null;
}

List<String> list = new LinkedList<>();
URL location = codeSource.getLocation();
String jarPath = sourceToJar.get(location);

if (jarPath == null) {
try {
URLConnection conn = clazz.getResource(clazz.getSimpleName() + ".class").openConnection();

if (conn instanceof JarURLConnection) {
jarPath = ((JarURLConnection)conn).getJarFileURL().getFile();
if (addJarDependencies) {
getDependentJarsFromManifest((JarURLConnection)conn, jarPath, list);
// add the location of the jar-file to cache only if the dependent jars were added to the jar list
sourceToJar.put(location, jarPath);
}
} else if (makeJarFromFolder && ("file".equals(location.getProtocol()))) {
jarPath = createJar("apex-", new File(conn.getURL().getFile()).getParentFile(), false);
sourceToJar.put(location, jarPath);
} else {
throw new AssertionError("Cannot resolve jar file for " + jarClass + ". URL " + location);
logger.warn("The class {} was loaded from incorrect url connection {}", clazz.getCanonicalName(), conn);
return list;
}
sourceToJar.put(location, jar);
logger.debug("added sourceLocation {} as {}", location, jar);
} catch (IOException ex) {
String className = clazz.getCanonicalName();
logger.error("Cannot resolve jar file for the class {}", className, ex);
throw new RuntimeException("Cannot resolve jar file for the class " + className, ex);
}
if (jar == null) {
throw new AssertionError("Cannot resolve jar file for " + jarClass);
}

list.add(jarPath);
logger.debug("added sourceLocation {} as {}", location, jarPath);

return list;
}

/**
* Gets dependent jar-files from manifest
* @param conn Jar connection
* @param jarPath path to jar file
* @param list List of target jar-files
* @throws IOException
*/
private void getDependentJarsFromManifest(JarURLConnection conn, String jarPath, List<String> list) throws IOException
{
String value = ((JarURLConnection)conn).getMainAttributes().getValue("apex-dependencies");
if (!StringUtils.isEmpty(value)) {
String folderPath = new File(jarPath).getParent();
for (String jarFile : value.split(",")) {
String file = folderPath + File.separator + jarFile;
if (new File(file).exists()) {
list.add(file);
logger.debug("Jar-file {} was added as a dependent jar", file);
} else {
logger.warn("Jar-file {} does not exist", file);
}
}
}
return jar;
}

private static class JarCreator
Expand Down
12 changes: 9 additions & 3 deletions engine/src/main/java/com/datatorrent/stram/StramClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void stop()
yarnClient.stop();
}

public static LinkedHashSet<String> findJars(LogicalPlan dag, Class<?>[] defaultClasses)
public LinkedHashSet<String> findJars(Class<?>[] defaultClasses)
{
List<Class<?>> jarClasses = new ArrayList<>();

Expand Down Expand Up @@ -223,8 +223,14 @@ public static LinkedHashSet<String> findJars(LogicalPlan dag, Class<?>[] default
localJarFiles.addAll(Arrays.asList(libJars));
}

LOG.info("Local jar file dependencies: " + localJarFiles);
String pluginClassesPaths = conf.get(StreamingAppMasterService.PLUGINS_CONF_KEY);
if (!StringUtils.isEmpty(pluginClassesPaths)) {
for (String pluginClassPath : StringUtils.splitByWholeSeparator(pluginClassesPaths, StreamingAppMasterService.PLUGINS_CONF_SEP)) {
localJarFiles.addAll(jarHelper.getJars(pluginClassPath, true, true));
}
}

LOG.info("Local jar file dependencies: " + localJarFiles);
return localJarFiles;
}

Expand Down Expand Up @@ -338,7 +344,7 @@ public void startApplication() throws YarnException, IOException
throw new IllegalStateException(applicationType + " is not a valid application type.");
}

LinkedHashSet<String> localJarFiles = findJars(dag, defaultClasses);
LinkedHashSet<String> localJarFiles = findJars(defaultClasses);

if (resources != null) {
localJarFiles.addAll(resources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ protected void serviceInit(Configuration conf) throws Exception
}

public static final String PLUGINS_CONF_KEY = "apex.plugin.stram.plugins";
public static final String PLUGINS_CONF_SEP = ",";

private void initApexPluginDispatcher()
{
PluginLocator<DAGExecutionPlugin> locator = new ChainedPluginLocator<>(new ServiceLoaderBasedPluginLocator<>(DAGExecutionPlugin.class),
Expand Down

0 comments on commit 0c7a575

Please sign in to comment.