From aab2d0482738d2e426f5648458c60513dbdeab67 Mon Sep 17 00:00:00 2001 From: Mina Lee Date: Thu, 31 Dec 2015 23:37:56 -0800 Subject: [PATCH 1/5] [ZEPPELIN-546] Load interpreter from maven repository - Move org.apache.zeppelin.spark.dep package from zeppelin-spark to zeppelin-interpreter - Rename DependencyResolver/DependencyContext to SparkDependencyResolver/SparkDependencyContext - Add general DependencyResolver --- .../apache/zeppelin/spark/DepInterpreter.java | 36 +++- .../zeppelin/spark/PySparkInterpreter.java | 4 +- .../zeppelin/spark/SparkInterpreter.java | 14 +- .../zeppelin/spark/ZeppelinContext.java | 6 +- .../spark/dep/SparkDependencyContext.java | 181 ++++++++++++++++ ...lver.java => SparkDependencyResolver.java} | 16 +- zeppelin-interpreter/pom.xml | 117 +++++++++++ .../java/org/apache/zeppelin}/dep/Booter.java | 4 +- .../org/apache/zeppelin}/dep/Dependency.java | 2 +- .../zeppelin}/dep/DependencyContext.java | 41 +--- .../zeppelin/dep/DependencyResolver.java | 194 ++++++++++++++++++ .../org/apache/zeppelin}/dep/Repository.java | 4 +- .../zeppelin}/dep/RepositoryListener.java | 2 +- .../dep/RepositorySystemFactory.java | 2 +- .../zeppelin}/dep/TransferListener.java | 2 +- .../zeppelin/conf/ZeppelinConfiguration.java | 1 + 16 files changed, 553 insertions(+), 73 deletions(-) create mode 100644 spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java rename spark/src/main/java/org/apache/zeppelin/spark/dep/{DependencyResolver.java => SparkDependencyResolver.java} (94%) rename {spark/src/main/java/org/apache/zeppelin/spark => zeppelin-interpreter/src/main/java/org/apache/zeppelin}/dep/Booter.java (98%) rename {spark/src/main/java/org/apache/zeppelin/spark => zeppelin-interpreter/src/main/java/org/apache/zeppelin}/dep/Dependency.java (98%) rename {spark/src/main/java/org/apache/zeppelin/spark => zeppelin-interpreter/src/main/java/org/apache/zeppelin}/dep/DependencyContext.java (76%) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java rename {spark/src/main/java/org/apache/zeppelin/spark => zeppelin-interpreter/src/main/java/org/apache/zeppelin}/dep/Repository.java (95%) rename {spark/src/main/java/org/apache/zeppelin/spark => zeppelin-interpreter/src/main/java/org/apache/zeppelin}/dep/RepositoryListener.java (99%) rename {spark/src/main/java/org/apache/zeppelin/spark => zeppelin-interpreter/src/main/java/org/apache/zeppelin}/dep/RepositorySystemFactory.java (98%) rename {spark/src/main/java/org/apache/zeppelin/spark => zeppelin-interpreter/src/main/java/org/apache/zeppelin}/dep/TransferListener.java (99%) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java index 7a17aa038d8..a4fdae32395 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java @@ -40,7 +40,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.spark.dep.DependencyContext; +import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonatype.aether.resolution.ArtifactResolutionException; @@ -69,7 +69,9 @@ public class DepInterpreter extends Interpreter { "spark", DepInterpreter.class.getName(), new InterpreterPropertyBuilder() - .add("zeppelin.dep.localrepo", "local-repo", "local repository for dependency loader") + .add("zeppelin.dep.localrepo", + getSystemDefault("ZEPPELIN_DEP_LOCALREPO", null, "local-repo"), + "local repository for dependency loader") .add("zeppelin.dep.additionalRemoteRepository", "spark-packages,http://dl.bintray.com/spark-packages/maven,false;", "A list of 'id,remote-repository-URL,is-snapshot;' for each remote repository.") @@ -79,7 +81,7 @@ public class DepInterpreter extends Interpreter { private SparkIMain intp; private ByteArrayOutputStream out; - private DependencyContext depc; + private SparkDependencyContext depc; private SparkJLineCompletion completor; private SparkILoop interpreter; static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class); @@ -88,10 +90,30 @@ public DepInterpreter(Properties property) { super(property); } - public DependencyContext getDependencyContext() { + public SparkDependencyContext getDependencyContext() { return depc; } + public static String getSystemDefault( + String envName, + String propertyName, + String defaultValue) { + + if (envName != null && !envName.isEmpty()) { + String envValue = System.getenv().get(envName); + if (envValue != null) { + return envValue; + } + } + + if (propertyName != null && !propertyName.isEmpty()) { + String propValue = System.getProperty(propertyName); + if (propValue != null) { + return propValue; + } + } + return defaultValue; + } @Override public void close() { @@ -152,16 +174,16 @@ private void createIMain() { intp.setContextClassLoader(); intp.initializeSynchronous(); - depc = new DependencyContext(getProperty("zeppelin.dep.localrepo"), + depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"), getProperty("zeppelin.dep.additionalRemoteRepository")); completor = new SparkJLineCompletion(intp); - intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); Map binder = (Map) getValue("_binder"); binder.put("depc", depc); intp.interpret("@transient val z = " - + "_binder.get(\"depc\").asInstanceOf[org.apache.zeppelin.spark.dep.DependencyContext]"); + + "_binder.get(\"depc\")" + + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]"); } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 0bfad6a0b40..8c4ba877f75 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -54,7 +54,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; -import org.apache.zeppelin.spark.dep.DependencyContext; +import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,7 +127,7 @@ public void open() { URL [] urls = new URL[0]; if (depInterpreter != null) { - DependencyContext depc = depInterpreter.getDependencyContext(); + SparkDependencyContext depc = depInterpreter.getDependencyContext(); if (depc != null) { List files = depc.getFiles(); List urlList = new LinkedList(); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 2bf7a6bd50d..d9757919cb8 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -55,8 +55,8 @@ import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.apache.zeppelin.spark.dep.DependencyContext; -import org.apache.zeppelin.spark.dep.DependencyResolver; +import org.apache.zeppelin.spark.dep.SparkDependencyContext; +import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +117,7 @@ public class SparkInterpreter extends Interpreter { private SparkContext sc; private ByteArrayOutputStream out; private SQLContext sqlc; - private DependencyResolver dep; + private SparkDependencyResolver dep; private SparkJLineCompletion completor; private JobProgressListener sparkListener; @@ -222,9 +222,9 @@ public SQLContext getSQLContext() { return sqlc; } - public DependencyResolver getDependencyResolver() { + public SparkDependencyResolver getDependencyResolver() { if (dep == null) { - dep = new DependencyResolver(intp, + dep = new SparkDependencyResolver(intp, sc, getProperty("zeppelin.dep.localrepo"), getProperty("zeppelin.dep.additionalRemoteRepository")); @@ -427,7 +427,7 @@ public void open() { // add dependency from DepInterpreter DepInterpreter depInterpreter = getDepInterpreter(); if (depInterpreter != null) { - DependencyContext depc = depInterpreter.getDependencyContext(); + SparkDependencyContext depc = depInterpreter.getDependencyContext(); if (depc != null) { List files = depc.getFiles(); if (files != null) { @@ -536,7 +536,7 @@ public void open() { // add jar if (depInterpreter != null) { - DependencyContext depc = depInterpreter.getDependencyContext(); + SparkDependencyContext depc = depInterpreter.getDependencyContext(); if (depc != null) { List files = depc.getFilesDist(); if (files != null) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 5ec38d494f6..af806bf02f5 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -43,7 +43,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.spark.dep.DependencyResolver; +import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import scala.Tuple2; import scala.Unit; @@ -53,14 +53,14 @@ * Spark context for zeppelin. */ public class ZeppelinContext extends HashMap { - private DependencyResolver dep; + private SparkDependencyResolver dep; private PrintStream out; private InterpreterContext interpreterContext; private int maxResult; public ZeppelinContext(SparkContext sc, SQLContext sql, InterpreterContext interpreterContext, - DependencyResolver dep, PrintStream printStream, + SparkDependencyResolver dep, PrintStream printStream, int maxResult) { this.sc = sc; this.sqlContext = sql; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java new file mode 100644 index 00000000000..1b20b0fb6f0 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark.dep; + +import java.io.File; +import java.net.MalformedURLException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.zeppelin.dep.Booter; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.Repository; + +import org.sonatype.aether.RepositorySystem; +import org.sonatype.aether.RepositorySystemSession; +import org.sonatype.aether.artifact.Artifact; +import org.sonatype.aether.collection.CollectRequest; +import org.sonatype.aether.graph.DependencyFilter; +import org.sonatype.aether.repository.RemoteRepository; +import org.sonatype.aether.repository.Authentication; +import org.sonatype.aether.resolution.ArtifactResolutionException; +import org.sonatype.aether.resolution.ArtifactResult; +import org.sonatype.aether.resolution.DependencyRequest; +import org.sonatype.aether.resolution.DependencyResolutionException; +import org.sonatype.aether.util.artifact.DefaultArtifact; +import org.sonatype.aether.util.artifact.JavaScopes; +import org.sonatype.aether.util.filter.DependencyFilterUtils; +import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter; + + +/** + * + */ +public class SparkDependencyContext { + List dependencies = new LinkedList(); + List repositories = new LinkedList(); + + List files = new LinkedList(); + List filesDist = new LinkedList(); + private RepositorySystem system = Booter.newRepositorySystem(); + private RepositorySystemSession session; + private RemoteRepository mavenCentral = Booter.newCentralRepository(); + private RemoteRepository mavenLocal = Booter.newLocalRepository(); + private List additionalRepos = new LinkedList(); + + public SparkDependencyContext(String localRepoPath, String additionalRemoteRepository) { + session = Booter.newRepositorySystemSession(system, localRepoPath); + addRepoFromProperty(additionalRemoteRepository); + } + + public Dependency load(String lib) { + Dependency dep = new Dependency(lib); + + if (dependencies.contains(dep)) { + dependencies.remove(dep); + } + dependencies.add(dep); + return dep; + } + + public Repository addRepo(String name) { + Repository rep = new Repository(name); + repositories.add(rep); + return rep; + } + + public void reset() { + dependencies = new LinkedList(); + repositories = new LinkedList(); + + files = new LinkedList(); + filesDist = new LinkedList(); + } + + private void addRepoFromProperty(String listOfRepo) { + if (listOfRepo != null) { + String[] repos = listOfRepo.split(";"); + for (String repo : repos) { + String[] parts = repo.split(","); + if (parts.length == 3) { + String id = parts[0].trim(); + String url = parts[1].trim(); + boolean isSnapshot = Boolean.parseBoolean(parts[2].trim()); + if (id.length() > 1 && url.length() > 1) { + RemoteRepository rr = new RemoteRepository(id, "default", url); + rr.setPolicy(isSnapshot, null); + additionalRepos.add(rr); + } + } + } + } + } + + /** + * fetch all artifacts + * @return + * @throws MalformedURLException + * @throws ArtifactResolutionException + * @throws DependencyResolutionException + */ + public List fetch() throws MalformedURLException, + DependencyResolutionException, ArtifactResolutionException { + + for (Dependency dep : dependencies) { + if (!dep.isLocalFsArtifact()) { + List artifacts = fetchArtifactWithDep(dep); + for (ArtifactResult artifact : artifacts) { + if (dep.isDist()) { + filesDist.add(artifact.getArtifact().getFile()); + } + files.add(artifact.getArtifact().getFile()); + } + } else { + if (dep.isDist()) { + filesDist.add(new File(dep.getGroupArtifactVersion())); + } + files.add(new File(dep.getGroupArtifactVersion())); + } + } + + return files; + } + + private List fetchArtifactWithDep(Dependency dep) + throws DependencyResolutionException, ArtifactResolutionException { + Artifact artifact = new DefaultArtifact( + SparkDependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion())); + + DependencyFilter classpathFlter = DependencyFilterUtils + .classpathFilter(JavaScopes.COMPILE); + PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter( + SparkDependencyResolver.inferScalaVersion(dep.getExclusions())); + + CollectRequest collectRequest = new CollectRequest(); + collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact, + JavaScopes.COMPILE)); + + collectRequest.addRepository(mavenCentral); + collectRequest.addRepository(mavenLocal); + for (RemoteRepository repo : additionalRepos) { + collectRequest.addRepository(repo); + } + for (Repository repo : repositories) { + RemoteRepository rr = new RemoteRepository(repo.getName(), "default", repo.getUrl()); + rr.setPolicy(repo.isSnapshot(), null); + Authentication auth = repo.getAuthentication(); + if (auth != null) { + rr.setAuthentication(auth); + } + collectRequest.addRepository(rr); + } + + DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, + DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter)); + + return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); + } + + public List getFiles() { + return files; + } + + public List getFilesDist() { + return filesDist; + } +} diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java similarity index 94% rename from spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java rename to spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java index eed3924e863..43a8525fe43 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java @@ -30,6 +30,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.spark.SparkContext; import org.apache.spark.repl.SparkIMain; +import org.apache.zeppelin.dep.Booter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonatype.aether.RepositorySystem; @@ -58,19 +59,14 @@ * Deps resolver. * Add new dependencies from mvn repo (at runetime) to Zeppelin. */ -public class DependencyResolver { - Logger logger = LoggerFactory.getLogger(DependencyResolver.class); +public class SparkDependencyResolver { + Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class); private Global global; private SparkIMain intp; private SparkContext sc; private RepositorySystem system = Booter.newRepositorySystem(); private List repos = new LinkedList(); private RepositorySystemSession session; - private DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter( - JavaScopes.COMPILE, - JavaScopes.PROVIDED, - JavaScopes.RUNTIME, - JavaScopes.SYSTEM); private final String[] exclusions = new String[] {"org.scala-lang:scala-library", "org.scala-lang:scala-compiler", @@ -80,7 +76,7 @@ public class DependencyResolver { "org.apache.zeppelin:zeppelin-spark", "org.apache.zeppelin:zeppelin-server"}; - public DependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath, + public SparkDependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath, String additionalRemoteRepository) { this.intp = intp; this.global = intp.global(); @@ -318,7 +314,7 @@ private List loadFromMvn(String artifact, Collection excludes, public List getArtifactsWithDep(String dependency, Collection excludes) throws Exception { Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency)); - DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter( JavaScopes.COMPILE ); + DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE); PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter(inferScalaVersion(excludes)); @@ -331,7 +327,7 @@ public List getArtifactsWithDep(String dependency, } } DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, - DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter)); + DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter)); return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); } diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 59d2cd0d439..67b4d5fbc9a 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -97,6 +97,123 @@ commons-lang3 ${commons-lang.version} + + + + org.apache.maven + maven-plugin-api + 3.0 + + + org.codehaus.plexus + plexus-utils + + + org.sonatype.sisu + sisu-inject-plexus + + + org.apache.maven + maven-model + + + + + + org.sonatype.aether + aether-api + 1.12 + + + + org.sonatype.aether + aether-util + 1.12 + + + + org.sonatype.aether + aether-impl + 1.12 + + + + org.apache.maven + maven-aether-provider + 3.0.3 + + + org.sonatype.aether + aether-api + + + org.sonatype.aether + aether-spi + + + org.sonatype.aether + aether-util + + + org.sonatype.aether + aether-impl + + + org.codehaus.plexus + plexus-utils + + + + + + org.sonatype.aether + aether-connector-file + 1.12 + + + + org.sonatype.aether + aether-connector-wagon + 1.12 + + + org.apache.maven.wagon + wagon-provider-api + + + + + + org.apache.maven.wagon + wagon-provider-api + 1.0 + + + org.codehaus.plexus + plexus-utils + + + + + + org.apache.maven.wagon + wagon-http-lightweight + 1.0 + + + org.apache.maven.wagon + wagon-http-shared + + + + + + org.apache.maven.wagon + wagon-http + 1.0 + + + diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/Booter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java similarity index 98% rename from spark/src/main/java/org/apache/zeppelin/spark/dep/Booter.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java index 1b7a6d653d9..7a487fa9327 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/Booter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.spark.dep; +package org.apache.zeppelin.dep; import java.io.File; @@ -64,7 +64,7 @@ public static RepositorySystemSession newRepositorySystemSession( public static RemoteRepository newCentralRepository() { return new RemoteRepository("central", "default", "http://repo1.maven.org/maven2/"); } - + public static RemoteRepository newLocalRepository() { return new RemoteRepository("local", "default", "file://" + System.getProperty("user.home") + "/.m2/repository"); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/Dependency.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Dependency.java similarity index 98% rename from spark/src/main/java/org/apache/zeppelin/spark/dep/Dependency.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Dependency.java index ca928932c64..8f77de49e07 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/Dependency.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Dependency.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.spark.dep; +package org.apache.zeppelin.dep; import java.util.LinkedList; import java.util.List; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyContext.java similarity index 76% rename from spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyContext.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyContext.java index 834e5186f9c..ab4da28fb2b 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyContext.java @@ -15,14 +15,12 @@ * limitations under the License. */ -package org.apache.zeppelin.spark.dep; +package org.apache.zeppelin.dep; import java.io.File; import java.net.MalformedURLException; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.HashMap; import org.sonatype.aether.RepositorySystem; import org.sonatype.aether.RepositorySystemSession; @@ -30,7 +28,6 @@ import org.sonatype.aether.collection.CollectRequest; import org.sonatype.aether.graph.DependencyFilter; import org.sonatype.aether.repository.RemoteRepository; -import org.sonatype.aether.repository.Authentication; import org.sonatype.aether.resolution.ArtifactResolutionException; import org.sonatype.aether.resolution.ArtifactResult; import org.sonatype.aether.resolution.DependencyRequest; @@ -54,11 +51,9 @@ public class DependencyContext { private RepositorySystemSession session; private RemoteRepository mavenCentral = Booter.newCentralRepository(); private RemoteRepository mavenLocal = Booter.newLocalRepository(); - private List additionalRepos = new LinkedList(); - public DependencyContext(String localRepoPath, String additionalRemoteRepository) { - session = Booter.newRepositorySystemSession(system, localRepoPath); - addRepoFromProperty(additionalRemoteRepository); + public DependencyContext(String localRepoPath) { + session = Booter.newRepositorySystemSession(system, localRepoPath); } public Dependency load(String lib) { @@ -85,24 +80,6 @@ public void reset() { filesDist = new LinkedList(); } - private void addRepoFromProperty(String listOfRepo) { - if (listOfRepo != null) { - String[] repos = listOfRepo.split(";"); - for (String repo : repos) { - String[] parts = repo.split(","); - if (parts.length == 3) { - String id = parts[0].trim(); - String url = parts[1].trim(); - boolean isSnapshot = Boolean.parseBoolean(parts[2].trim()); - if (id.length() > 1 && url.length() > 1) { - RemoteRepository rr = new RemoteRepository(id, "default", url); - rr.setPolicy(isSnapshot, null); - additionalRepos.add(rr); - } - } - } - } - } /** * fetch all artifacts @@ -136,13 +113,12 @@ public List fetch() throws MalformedURLException, private List fetchArtifactWithDep(Dependency dep) throws DependencyResolutionException, ArtifactResolutionException { - Artifact artifact = new DefaultArtifact( - DependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion())); + Artifact artifact = new DefaultArtifact(dep.getGroupArtifactVersion()); DependencyFilter classpathFlter = DependencyFilterUtils .classpathFilter(JavaScopes.COMPILE); PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter( - DependencyResolver.inferScalaVersion(dep.getExclusions())); + dep.getExclusions()); CollectRequest collectRequest = new CollectRequest(); collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact, @@ -150,16 +126,9 @@ private List fetchArtifactWithDep(Dependency dep) collectRequest.addRepository(mavenCentral); collectRequest.addRepository(mavenLocal); - for (RemoteRepository repo : additionalRepos) { - collectRequest.addRepository(repo); - } for (Repository repo : repositories) { RemoteRepository rr = new RemoteRepository(repo.getName(), "default", repo.getUrl()); rr.setPolicy(repo.isSnapshot(), null); - Authentication auth = repo.getAuthentication(); - if (auth != null) { - rr.setAuthentication(auth); - } collectRequest.addRepository(rr); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java new file mode 100644 index 00000000000..f1552c9aca7 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.dep; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sonatype.aether.RepositorySystem; +import org.sonatype.aether.RepositorySystemSession; +import org.sonatype.aether.artifact.Artifact; +import org.sonatype.aether.collection.CollectRequest; +import org.sonatype.aether.graph.Dependency; +import org.sonatype.aether.graph.DependencyFilter; +import org.sonatype.aether.repository.RemoteRepository; +import org.sonatype.aether.resolution.ArtifactResult; +import org.sonatype.aether.resolution.DependencyRequest; +import org.sonatype.aether.util.artifact.DefaultArtifact; +import org.sonatype.aether.util.artifact.JavaScopes; +import org.sonatype.aether.util.filter.DependencyFilterUtils; +import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter; + + +/** + * Deps resolver. + * Add new dependencies from mvn repo (at runetime) to Zeppelin. + * + */ +public class DependencyResolver { + Logger logger = LoggerFactory.getLogger(DependencyResolver.class); + private RepositorySystem system = Booter.newRepositorySystem(); + private List repos = new LinkedList(); + private RepositorySystemSession session; + + private final String[] exclusions = new String[] {"org.apache.zeppelin:zeppelin-zengine", + "org.apache.zeppelin:zeppelin-interpreter", + "org.apache.zeppelin:zeppelin-server"}; + + public DependencyResolver(String localRepoPath) { + session = Booter.newRepositorySystemSession(system, localRepoPath); + repos.add(Booter.newCentralRepository()); // add maven central + repos.add(Booter.newLocalRepository()); + } + + public void addRepo(String id, String url, boolean snapshot) { + synchronized (repos) { + delRepo(id); + RemoteRepository rr = new RemoteRepository(id, "default", url); + rr.setPolicy(snapshot, null); + repos.add(rr); + } + } + + public RemoteRepository delRepo(String id) { + synchronized (repos) { + Iterator it = repos.iterator(); + if (it.hasNext()) { + RemoteRepository repo = it.next(); + if (repo.getId().equals(id)) { + it.remove(); + return repo; + } + } + } + return null; + } + + + public List load(String artifact) throws Exception { + return load(artifact, new LinkedList()); + } + + public List load(String artifact, String destPath) throws Exception { + return load(artifact, new LinkedList(), destPath); + } + + public List load(String artifact, Collection excludes) throws Exception { + if (StringUtils.isBlank(artifact)) { + // Should throw here + throw new RuntimeException("Invalid artifact to load"); + } + + // :[:[:]]: + int numSplits = artifact.split(":").length; + if (numSplits >= 3 && numSplits <= 6) { + return loadFromMvn(artifact, excludes); + } else { + LinkedList libs = new LinkedList(); + libs.add(new File(artifact)); + return libs; + } + } + + public List load(String artifact, Collection excludes, String destPath) + throws Exception { + List libs = load(artifact, excludes); + + // find home dir + String home = System.getenv("ZEPPELIN_HOME"); + if (home == null) { + home = System.getProperty("zeppelin.home"); + } + if (home == null) { + home = ".."; + } + + for (File srcFile: libs) { + File destFile = new File(home + "/" + destPath, srcFile.getName()); + if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) { + FileUtils.copyFile(srcFile, destFile); + logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath); + } + } + return libs; + } + + private List loadFromMvn(String artifact, Collection excludes) throws Exception { + Collection allExclusions = new LinkedList(); + allExclusions.addAll(excludes); + allExclusions.addAll(Arrays.asList(exclusions)); + + List listOfArtifact; + listOfArtifact = getArtifactsWithDep(artifact, allExclusions); + + Iterator it = listOfArtifact.iterator(); + while (it.hasNext()) { + Artifact a = it.next().getArtifact(); + String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getVersion(); + for (String exclude : allExclusions) { + if (gav.startsWith(exclude)) { + it.remove(); + break; + } + } + } + + List files = new LinkedList(); + for (ArtifactResult artifactResult : listOfArtifact) { + files.add(artifactResult.getArtifact().getFile()); + logger.info("load {}", artifactResult.getArtifact().getFile().getAbsolutePath()); + } + + return files; + } + + /** + * + * @param dependency + * @param excludes list of pattern can either be of the form groupId:artifactId + * @return + * @throws Exception + */ + public List getArtifactsWithDep(String dependency, + Collection excludes) throws Exception { + Artifact artifact = new DefaultArtifact(dependency); + DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE); + PatternExclusionsDependencyFilter exclusionFilter = + new PatternExclusionsDependencyFilter(excludes); + + CollectRequest collectRequest = new CollectRequest(); + collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE)); + + synchronized (repos) { + for (RemoteRepository repo : repos) { + collectRequest.addRepository(repo); + } + } + DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, + DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter)); + return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); + } +} diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/Repository.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Repository.java similarity index 95% rename from spark/src/main/java/org/apache/zeppelin/spark/dep/Repository.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Repository.java index aee56b569d7..4c2d8670643 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/Repository.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Repository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.spark.dep; +package org.apache.zeppelin.dep; import org.sonatype.aether.repository.Authentication; /** * @@ -70,7 +70,7 @@ public Repository credentials(String username, String password) { return this; } - protected Authentication getAuthentication() { + public Authentication getAuthentication() { Authentication auth = null; if (this.username != null && this.password != null) { auth = new Authentication(this.username, this.password); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositoryListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositoryListener.java similarity index 99% rename from spark/src/main/java/org/apache/zeppelin/spark/dep/RepositoryListener.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositoryListener.java index 86b3334c6ea..9f62d5fabca 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositoryListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositoryListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.spark.dep; +package org.apache.zeppelin.dep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositorySystemFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositorySystemFactory.java similarity index 98% rename from spark/src/main/java/org/apache/zeppelin/spark/dep/RepositorySystemFactory.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositorySystemFactory.java index cc0740dd3f5..a2246035867 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/RepositorySystemFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/RepositorySystemFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.spark.dep; +package org.apache.zeppelin.dep; import org.apache.maven.repository.internal.DefaultServiceLocator; import org.apache.maven.wagon.Wagon; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/TransferListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/TransferListener.java similarity index 99% rename from spark/src/main/java/org/apache/zeppelin/spark/dep/TransferListener.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/TransferListener.java index 8a4058ab919..277a3031b50 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/TransferListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/TransferListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.spark.dep; +package org.apache.zeppelin.dep; import java.io.PrintStream; import java.text.DecimalFormat; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index ca63eefe2d0..edcf5133667 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -428,6 +428,7 @@ public static enum ConfVars { // Decide when new note is created, interpreter settings will be binded automatically or not. ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true), ZEPPELIN_CONF_DIR("zeppelin.conf.dir", "conf"), + ZEPPELIN_DEP_LOCALREPO("zeppelin.dep.localrepo", "local-repo"), // Allows a way to specify a ',' separated list of allowed origins for rest and websockets // i.e. http://localhost:8080 ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), From 4baf271bbac52de3d37571a8d74f9e7bf28be8c0 Mon Sep 17 00:00:00 2001 From: Mina Lee Date: Fri, 1 Jan 2016 11:04:33 -0800 Subject: [PATCH 2/5] [ZEPPELIN-546] Pass DependencyResolver to InterpreterFactory --- .../org/apache/zeppelin/server/ZeppelinServer.java | 5 ++++- .../zeppelin/interpreter/InterpreterFactory.java | 12 +++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 7ad2b713037..9e7a97cf862 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -30,6 +30,7 @@ import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.repo.NotebookRepo; @@ -73,12 +74,14 @@ public class ZeppelinServer extends Application { private InterpreterFactory replFactory; private NotebookRepo notebookRepo; private SearchService notebookIndex; + private DependencyResolver depResolver; public ZeppelinServer() throws Exception { ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO)); this.schedulerFactory = new SchedulerFactory(); - this.replFactory = new InterpreterFactory(conf, notebookWsServer); + this.replFactory = new InterpreterFactory(conf, notebookWsServer, depResolver); this.notebookRepo = new NotebookRepoSync(conf); this.notebookIndex = new LuceneSearch(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index fc8cc04fafb..4ff0cc3ad59 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -23,6 +23,7 @@ import org.apache.commons.lang.NullArgumentException; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; @@ -65,19 +66,24 @@ public class InterpreterFactory { AngularObjectRegistryListener angularObjectRegistryListener; + DependencyResolver depResolver; + public InterpreterFactory(ZeppelinConfiguration conf, - AngularObjectRegistryListener angularObjectRegistryListener) + AngularObjectRegistryListener angularObjectRegistryListener, + DependencyResolver depResolver) throws InterpreterException, IOException { - this(conf, new InterpreterOption(true), angularObjectRegistryListener); + this(conf, new InterpreterOption(true), angularObjectRegistryListener, depResolver); } public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption, - AngularObjectRegistryListener angularObjectRegistryListener) + AngularObjectRegistryListener angularObjectRegistryListener, + DependencyResolver depResolver) throws InterpreterException, IOException { this.conf = conf; this.defaultOption = defaultOption; this.angularObjectRegistryListener = angularObjectRegistryListener; + this.depResolver = depResolver; String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS); interpreterClassList = replsConf.split(","); From 039f5fa9ff22c8964ed16fba21f74edb85b9ca7d Mon Sep 17 00:00:00 2001 From: Mina Lee Date: Fri, 1 Jan 2016 11:05:05 -0800 Subject: [PATCH 3/5] [ZEPPELIN-546] Fix tests --- .../spark/dep/DependencyResolverTest.java | 20 +++--- .../interpreter/InterpreterFactoryTest.java | 62 +++++++++---------- .../notebook/NoteInterpreterLoaderTest.java | 2 +- .../zeppelin/notebook/NotebookTest.java | 4 +- .../notebook/repo/NotebookRepoSyncTest.java | 2 +- .../notebook/repo/VFSNotebookRepoTest.java | 2 +- 6 files changed, 46 insertions(+), 46 deletions(-) diff --git a/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java b/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java index e41de60976b..5cbba662d56 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java @@ -19,7 +19,7 @@ import static org.junit.Assert.assertEquals; -import org.apache.zeppelin.spark.dep.DependencyResolver; +import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.junit.Test; public class DependencyResolverTest { @@ -30,23 +30,23 @@ public void testInferScalaVersion() { String scalaVersion = version[0] + "." + version[1]; assertEquals("groupId:artifactId:version", - DependencyResolver.inferScalaVersion("groupId:artifactId:version")); + SparkDependencyResolver.inferScalaVersion("groupId:artifactId:version")); assertEquals("groupId:artifactId_" + scalaVersion + ":version", - DependencyResolver.inferScalaVersion("groupId::artifactId:version")); + SparkDependencyResolver.inferScalaVersion("groupId::artifactId:version")); assertEquals("groupId:artifactId:version::test", - DependencyResolver.inferScalaVersion("groupId:artifactId:version::test")); + SparkDependencyResolver.inferScalaVersion("groupId:artifactId:version::test")); assertEquals("*", - DependencyResolver.inferScalaVersion("*")); + SparkDependencyResolver.inferScalaVersion("*")); assertEquals("groupId:*", - DependencyResolver.inferScalaVersion("groupId:*")); + SparkDependencyResolver.inferScalaVersion("groupId:*")); assertEquals("groupId:artifactId*", - DependencyResolver.inferScalaVersion("groupId:artifactId*")); + SparkDependencyResolver.inferScalaVersion("groupId:artifactId*")); assertEquals("groupId:artifactId_" + scalaVersion, - DependencyResolver.inferScalaVersion("groupId::artifactId")); + SparkDependencyResolver.inferScalaVersion("groupId::artifactId")); assertEquals("groupId:artifactId_" + scalaVersion + "*", - DependencyResolver.inferScalaVersion("groupId::artifactId*")); + SparkDependencyResolver.inferScalaVersion("groupId::artifactId*")); assertEquals("groupId:artifactId_" + scalaVersion + ":*", - DependencyResolver.inferScalaVersion("groupId::artifactId:*")); + SparkDependencyResolver.inferScalaVersion("groupId::artifactId:*")); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 6a69b83b2e9..abd0e3bf11b 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -38,32 +38,32 @@ public class InterpreterFactoryTest { - private InterpreterFactory factory; + private InterpreterFactory factory; private File tmpDir; private ZeppelinConfiguration conf; private InterpreterContext context; @Before - public void setUp() throws Exception { + public void setUp() throws Exception { tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); tmpDir.mkdirs(); new File(tmpDir, "conf").mkdirs(); MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); - MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); + MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); - System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - conf = new ZeppelinConfiguration(); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null); - context = new InterpreterContext("note", "id", "title", "text", null, null, null, null); + System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); + System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); + conf = new ZeppelinConfiguration(); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + context = new InterpreterContext("note", "id", "title", "text", null, null, null, null); - } + } - @After - public void tearDown() throws Exception { - delete(tmpDir); - } + @After + public void tearDown() throws Exception { + delete(tmpDir); + } private void delete(File file){ if(file.isFile()) file.delete(); @@ -78,24 +78,24 @@ else if(file.isDirectory()){ } } - @Test - public void testBasic() { - List all = factory.getDefaultInterpreterSettingList(); + @Test + public void testBasic() { + List all = factory.getDefaultInterpreterSettingList(); - // get interpreter - Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst(); - assertFalse(((LazyOpenInterpreter) repl1).isOpen()); - repl1.interpret("repl1", context); - assertTrue(((LazyOpenInterpreter) repl1).isOpen()); + // get interpreter + Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst(); + assertFalse(((LazyOpenInterpreter) repl1).isOpen()); + repl1.interpret("repl1", context); + assertTrue(((LazyOpenInterpreter) repl1).isOpen()); - // try to get unavailable interpreter - assertNull(factory.get("unknown")); + // try to get unavailable interpreter + assertNull(factory.get("unknown")); - // restart interpreter - factory.restart(all.get(0)); - repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst(); - assertFalse(((LazyOpenInterpreter) repl1).isOpen()); - } + // restart interpreter + factory.restart(all.get(0)); + repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst(); + assertFalse(((LazyOpenInterpreter) repl1).isOpen()); + } @Test public void testFactoryDefaultList() throws IOException { @@ -119,8 +119,8 @@ public void testExceptions() throws IOException { try { factory.add("a mock", "mock2", null, new Properties()); } catch(NullArgumentException e) { - assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage()); - } + assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage()); + } try { factory.add("a mock" , "mock2" , new InterpreterOption(false),null); } catch (NullArgumentException e){ @@ -140,7 +140,7 @@ public void testSaveLoad() throws InterpreterException, IOException { factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties()); assertEquals(3, factory.get().size()); - InterpreterFactory factory2 = new InterpreterFactory(conf, null); + InterpreterFactory factory2 = new InterpreterFactory(conf, null, null); assertEquals(3, factory2.get().size()); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java index 2e1f5e3e5b2..a0455eb7ccd 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java @@ -58,7 +58,7 @@ public void setUp() throws Exception { MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11"); MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); } @After diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 917ea6b7d7e..34f7a1bb108 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -85,7 +85,7 @@ public void setUp() throws Exception { MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); @@ -172,7 +172,7 @@ public void testPersist() throws IOException, SchedulerException{ note.persist(); Notebook notebook2 = new Notebook( - conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this, null); + conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null), this, null); assertEquals(1, notebook2.getAllNotes().size()); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index 4e9e1800b2c..60b3ba35566 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -87,7 +87,7 @@ public void setUp() throws Exception { MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); SearchService search = mock(SearchService.class); notebookRepoSync = new NotebookRepoSync(conf); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java index 65be61b5bc8..cff086dc843 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java @@ -76,7 +76,7 @@ public void setUp() throws Exception { MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); this.schedulerFactory = new SchedulerFactory(); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); From 72ad12c5b1bc8a5512cdb7e81b0ae33375e12776 Mon Sep 17 00:00:00 2001 From: Mina Lee Date: Fri, 1 Jan 2016 20:37:28 -0800 Subject: [PATCH 4/5] [ZEPPELIN-546] Add test --- ....java => SparkDependencyResolverTest.java} | 2 +- .../zeppelin/dep/DependencyResolverTest.java | 62 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) rename spark/src/test/java/org/apache/zeppelin/spark/dep/{DependencyResolverTest.java => SparkDependencyResolverTest.java} (98%) create mode 100644 zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java diff --git a/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java similarity index 98% rename from spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java rename to spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java index 5cbba662d56..a0271f4471d 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/dep/DependencyResolverTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/dep/SparkDependencyResolverTest.java @@ -22,7 +22,7 @@ import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.junit.Test; -public class DependencyResolverTest { +public class SparkDependencyResolverTest { @Test public void testInferScalaVersion() { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java new file mode 100644 index 00000000000..33b7e5478a1 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.dep; + +import static org.junit.Assert.assertTrue; + +import java.io.File; + +import org.apache.commons.io.FileUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class DependencyResolverTest { + private static DependencyResolver resolver; + private static String testPath; + private static String testCopyPath; + private static String home; + + @BeforeClass + public static void setUp() throws Exception { + testPath = "test-repo"; + testCopyPath = "test-copy-repo"; + resolver = new DependencyResolver(testPath); + home = System.getenv("ZEPPELIN_HOME"); + if (home == null) { + home = System.getProperty("zeppelin.home"); + } + if (home == null) { + home = ".."; + } + } + + @AfterClass + public static void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(home + "/" + testPath)); + FileUtils.deleteDirectory(new File(home + "/" + testCopyPath)); + } + + @Test + public void testLoad() throws Exception { + resolver.load("org.apache.commons:commons-lang3:3.4", testCopyPath); + + assertTrue(new File(home + "/" + testPath + "/org/apache/commons/commons-lang3/3.4/").exists()); + assertTrue(new File(home + "/" + testCopyPath + "/commons-lang3-3.4.jar").exists()); + } +} \ No newline at end of file From cd190d40cb96540d22191822469b78fb12e816d5 Mon Sep 17 00:00:00 2001 From: Mina Lee Date: Tue, 12 Jan 2016 16:12:38 -0800 Subject: [PATCH 5/5] [ZEPPELIN-546] Refactoring --- .../spark/dep/SparkDependencyResolver.java | 40 ++--------- .../dep/AbstractDependencyResolver.java | 70 +++++++++++++++++++ .../zeppelin/dep/DependencyResolver.java | 43 ++---------- 3 files changed, 81 insertions(+), 72 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java index 43a8525fe43..e4881d373be 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java @@ -30,11 +30,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.spark.SparkContext; import org.apache.spark.repl.SparkIMain; -import org.apache.zeppelin.dep.Booter; +import org.apache.zeppelin.dep.AbstractDependencyResolver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.sonatype.aether.RepositorySystem; -import org.sonatype.aether.RepositorySystemSession; import org.sonatype.aether.artifact.Artifact; import org.sonatype.aether.collection.CollectRequest; import org.sonatype.aether.graph.Dependency; @@ -57,16 +55,13 @@ /** * Deps resolver. - * Add new dependencies from mvn repo (at runetime) to Zeppelin. + * Add new dependencies from mvn repo (at runtime) to Spark interpreter group. */ -public class SparkDependencyResolver { +public class SparkDependencyResolver extends AbstractDependencyResolver { Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class); private Global global; private SparkIMain intp; private SparkContext sc; - private RepositorySystem system = Booter.newRepositorySystem(); - private List repos = new LinkedList(); - private RepositorySystemSession session; private final String[] exclusions = new String[] {"org.scala-lang:scala-library", "org.scala-lang:scala-compiler", @@ -78,38 +73,13 @@ public class SparkDependencyResolver { public SparkDependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath, String additionalRemoteRepository) { + super(localRepoPath); this.intp = intp; this.global = intp.global(); this.sc = sc; - session = Booter.newRepositorySystemSession(system, localRepoPath); - repos.add(Booter.newCentralRepository()); // add maven central - repos.add(Booter.newLocalRepository()); addRepoFromProperty(additionalRemoteRepository); } - public void addRepo(String id, String url, boolean snapshot) { - synchronized (repos) { - delRepo(id); - RemoteRepository rr = new RemoteRepository(id, "default", url); - rr.setPolicy(snapshot, null); - repos.add(rr); - } - } - - public RemoteRepository delRepo(String id) { - synchronized (repos) { - Iterator it = repos.iterator(); - if (it.hasNext()) { - RemoteRepository repo = it.next(); - if (repo.getId().equals(id)) { - it.remove(); - return repo; - } - } - } - return null; - } - private void addRepoFromProperty(String listOfRepo) { if (listOfRepo != null) { String[] repos = listOfRepo.split(";"); @@ -305,12 +275,12 @@ private List loadFromMvn(String artifact, Collection excludes, } /** - * * @param dependency * @param excludes list of pattern can either be of the form groupId:artifactId * @return * @throws Exception */ + @Override public List getArtifactsWithDep(String dependency, Collection excludes) throws Exception { Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency)); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java new file mode 100644 index 00000000000..ba8ee16d3a9 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.dep; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.sonatype.aether.RepositorySystem; +import org.sonatype.aether.RepositorySystemSession; +import org.sonatype.aether.repository.RemoteRepository; +import org.sonatype.aether.resolution.ArtifactResult; + +/** + * Abstract dependency resolver. + * Add new dependencies from mvn repo (at runtime) Zeppelin. + */ +public abstract class AbstractDependencyResolver { + protected RepositorySystem system = Booter.newRepositorySystem(); + protected List repos = new LinkedList(); + protected RepositorySystemSession session; + + public AbstractDependencyResolver(String localRepoPath) { + session = Booter.newRepositorySystemSession(system, localRepoPath); + repos.add(Booter.newCentralRepository()); // add maven central + repos.add(Booter.newLocalRepository()); + } + + public void addRepo(String id, String url, boolean snapshot) { + synchronized (repos) { + delRepo(id); + RemoteRepository rr = new RemoteRepository(id, "default", url); + rr.setPolicy(snapshot, null); + repos.add(rr); + } + } + + public RemoteRepository delRepo(String id) { + synchronized (repos) { + Iterator it = repos.iterator(); + if (it.hasNext()) { + RemoteRepository repo = it.next(); + if (repo.getId().equals(id)) { + it.remove(); + return repo; + } + } + } + return null; + } + + public abstract List getArtifactsWithDep(String dependency, + Collection excludes) throws Exception; +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java index f1552c9aca7..cbe88bc8fda 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java @@ -28,8 +28,6 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.sonatype.aether.RepositorySystem; -import org.sonatype.aether.RepositorySystemSession; import org.sonatype.aether.artifact.Artifact; import org.sonatype.aether.collection.CollectRequest; import org.sonatype.aether.graph.Dependency; @@ -45,49 +43,19 @@ /** * Deps resolver. - * Add new dependencies from mvn repo (at runetime) to Zeppelin. - * + * Add new dependencies from mvn repo (at runtime) to Zeppelin. */ -public class DependencyResolver { +public class DependencyResolver extends AbstractDependencyResolver { Logger logger = LoggerFactory.getLogger(DependencyResolver.class); - private RepositorySystem system = Booter.newRepositorySystem(); - private List repos = new LinkedList(); - private RepositorySystemSession session; private final String[] exclusions = new String[] {"org.apache.zeppelin:zeppelin-zengine", "org.apache.zeppelin:zeppelin-interpreter", "org.apache.zeppelin:zeppelin-server"}; public DependencyResolver(String localRepoPath) { - session = Booter.newRepositorySystemSession(system, localRepoPath); - repos.add(Booter.newCentralRepository()); // add maven central - repos.add(Booter.newLocalRepository()); - } - - public void addRepo(String id, String url, boolean snapshot) { - synchronized (repos) { - delRepo(id); - RemoteRepository rr = new RemoteRepository(id, "default", url); - rr.setPolicy(snapshot, null); - repos.add(rr); - } - } - - public RemoteRepository delRepo(String id) { - synchronized (repos) { - Iterator it = repos.iterator(); - if (it.hasNext()) { - RemoteRepository repo = it.next(); - if (repo.getId().equals(id)) { - it.remove(); - return repo; - } - } - } - return null; + super(localRepoPath); } - public List load(String artifact) throws Exception { return load(artifact, new LinkedList()); } @@ -96,7 +64,8 @@ public List load(String artifact, String destPath) throws Exception { return load(artifact, new LinkedList(), destPath); } - public List load(String artifact, Collection excludes) throws Exception { + public synchronized List load(String artifact, Collection excludes) + throws Exception { if (StringUtils.isBlank(artifact)) { // Should throw here throw new RuntimeException("Invalid artifact to load"); @@ -166,12 +135,12 @@ private List loadFromMvn(String artifact, Collection excludes) thr } /** - * * @param dependency * @param excludes list of pattern can either be of the form groupId:artifactId * @return * @throws Exception */ + @Override public List getArtifactsWithDep(String dependency, Collection excludes) throws Exception { Artifact artifact = new DefaultArtifact(dependency);