From ab7ee2d83775f7f18d9a8c0e4d20691f6bfe829e Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Sun, 26 Jun 2016 00:03:06 +0200 Subject: [PATCH 01/21] beam interpreter --- beam/pom.xml | 418 ++++++++++++++++++ .../apache/zeppelin/beam/BeamInterpreter.java | 88 ++++ .../zeppelin/beam/CompileSourceInMemory.java | 159 +++++++ 3 files changed, 665 insertions(+) create mode 100644 beam/pom.xml create mode 100644 beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java create mode 100644 beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java diff --git a/beam/pom.xml b/beam/pom.xml new file mode 100644 index 00000000000..d65368feab1 --- /dev/null +++ b/beam/pom.xml @@ -0,0 +1,418 @@ + + 4.0.0 + + + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + .. + + + org.apache.zeppelin + zeppelin-beam + 0.6.0-incubating-SNAPSHOT + + + + + + repo.bodar.com + http://repo.bodar.com + + + + + + + + + + + + org.apache.spark + spark-core_2.10 + 1.4.1 + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + akka-actor_2.10 + org.spark-project.akka + + + akka-remote_2.10 + org.spark-project.akka + + + akka-slf4j_2.10 + org.spark-project.akka + + + + + org.apache.spark + spark-streaming_2.10 + 1.4.1 + + + + org.apache.hadoop + hadoop-mapreduce-client-core + 2.3.0 + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-common + 2.3.0 + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-hdfs + 2.3.0 + + + org.apache.hadoop + hadoop-client + 2.3.0 + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-annotations + 2.3.0 + + + org.apache.hadoop + hadoop-yarn-common + 2.3.0 + + + org.apache.hadoop + hadoop-mapreduce-client-common + 2.3.0 + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-core + 1.2.1 + + + + + com.thoughtworks.qdox + qdox + 2.0-M3 + + + + + org.mdkt.compiler + InMemoryJavaCompiler + 1.2 + + + + com.ning + async-http-client + 1.9.31 + + + + + com.google.code.gson + gson + 2.6.2 + + + jline + jline + 2.12 + + + + com.googlecode.totallylazy + totallylazy + 1.83 + + + + org.apache.beam + beam-runners-parent + 0.2.0-incubating-SNAPSHOT + pom + + + + org.apache.beam + beam-runners-core-java + 0.2.0-incubating-SNAPSHOT + + + google-http-client-jackson2 + com.google.http-client + + + + + org.apache.beam + beam-runners-direct-java + 0.2.0-incubating-SNAPSHOT + + + + org.apache.beam + beam-runners-flink_2.10 + 0.2.0-incubating-SNAPSHOT + + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + + + + org.apache.beam + beam-runners-flink_2.10-examples + 0.2.0-incubating-SNAPSHOT + + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.beam + beam-runners-spark + 0.2.0-incubating-SNAPSHOT + jar + + + + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.commons + commons-exec + 1.3 + + + + junit + junit + test + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.1 + + + package + + shade + + + + + org.apache.beam.runners.flink.examples.WordCount + + + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/beam + + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/beam + + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + \ No newline at end of file diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java new file mode 100644 index 00000000000..0bef52f18a4 --- /dev/null +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -0,0 +1,88 @@ +package org.apache.zeppelin.beam; + + +import java.io.File; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import org.apache.beam.examples.MinimalWordCount; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; + +import com.google.gson.Gson; + +/** +* +*/ +public class BeamInterpreter extends Interpreter { + + private String host = "http://localhost:8001"; + private InterpreterContext context; + + public BeamInterpreter(Properties property) { + super(property); + } + + static { + Interpreter.register("beam", "beam", BeamInterpreter.class.getName(), + new InterpreterPropertyBuilder().build()); + } + + public static void main(String[] args) { + + } + + @Override + public void open() { + + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + + String uuid = "C" + UUID.randomUUID().toString().replace("-", ""); + + try { + String msg = CompileSourceInMemory.execute(uuid, st); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg); + } catch (Exception e) { + e.printStackTrace(); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + + } + + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List completion(String buf, int cursor) { + return null; + } + +} diff --git a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java new file mode 100644 index 00000000000..6be493180a9 --- /dev/null +++ b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java @@ -0,0 +1,159 @@ +package org.apache.zeppelin.beam; + +import javax.tools.Diagnostic; +import javax.tools.DiagnosticCollector; +import javax.tools.JavaCompiler; +import javax.tools.JavaCompiler.CompilationTask; +import javax.tools.JavaFileObject; +import javax.tools.SimpleJavaFileObject; +import javax.tools.ToolProvider; + +import org.apache.log4j.Category; +import org.apache.log4j.Priority; + +import com.thoughtworks.qdox.JavaProjectBuilder; +import com.thoughtworks.qdox.model.JavaClass; +import com.thoughtworks.qdox.model.JavaSource; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.StringReader; +import java.io.StringWriter; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; + +/** + * @author admin + * + */ +public class CompileSourceInMemory { + public static String execute(String className, String code) throws Exception { + + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + DiagnosticCollector diagnostics = new DiagnosticCollector(); + + JavaProjectBuilder builder = new JavaProjectBuilder(); + JavaSource src = builder.addSource(new StringReader(code)); + +// List imports = src.getImports(); +// String importsString = ""; +// +// for (int i = 0; i < imports.size(); i++) { +// importsString += "import " + imports.get(i) + ";\n"; +// } + + List classes = src.getClasses(); + String classesSt = ""; + String classMain = "", classMainName = ""; + for (int i = 0; i < classes.size(); i++) { + boolean hasMain = false; + for (int j = 0; j < classes.get(i).getMethods().size(); j++) { + if (classes.get(i).getMethods().get(j).getName().equals("main")) { + hasMain = true; + break; + } + } + if (hasMain == true) { + classMain = classes.get(i).getCodeBlock() + "\n"; + classMainName = classes.get(i).getName(); + } else + classesSt += classes.get(i).getCodeBlock() + "\n"; + + } + code = code.replace(classMainName, className); + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + + out.println(code); + out.close(); + + System.out.println(writer.toString()); + + JavaFileObject file = new JavaSourceFromString(className, writer.toString()); + + Iterable compilationUnits = Arrays.asList(file); + + ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); + ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + + PrintStream newOut = new PrintStream(baosOut); + PrintStream newErr = new PrintStream(baosErr); + // IMPORTANT: Save the old System.out! + PrintStream oldOut = System.out; + PrintStream oldErr = System.err; + // Tell Java to use your special stream + System.setOut(newOut); + System.setErr(newErr); + + + CompilationTask task = compiler.getTask(null, null, diagnostics, null, null, compilationUnits); + + boolean success = task.call(); + if (!success) { + for (Diagnostic diagnostic : diagnostics.getDiagnostics()) { + System.out.println(diagnostic.getMessage(null)); + } + } + if (success) { + try { + URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI() + .toURL() }); + Class.forName(className, true, classLoader) + .getDeclaredMethod("main", new Class[] { String[].class }) + .invoke(null, new Object[] { null }); + + System.out.flush(); + System.err.flush(); + + System.setOut(oldOut); + System.setErr(oldErr); + + + classLoader.clearAssertionStatus(); + + return baosOut.toString(); + } catch (ClassNotFoundException e) { + e.printStackTrace(newErr); + System.err.println("Class not found: " + e); + throw new Exception(baosErr.toString()); + } catch (NoSuchMethodException e) { + e.printStackTrace(newErr); + System.err.println("No such method: " + e); + throw new Exception(baosErr.toString()); + } catch (IllegalAccessException e) { + e.printStackTrace(newErr); + System.err.println("Illegal access: " + e); + throw new Exception(baosErr.toString()); + } catch (InvocationTargetException e) { + e.printStackTrace(newErr); + System.err.println("Invocation target: " + e); + throw new Exception(baosErr.toString()); + } + } else { + throw new Exception(baosOut.toString()); + } + } + +} + +class JavaSourceFromString extends SimpleJavaFileObject { + final String code; + + JavaSourceFromString(String name, String code) { + super(URI.create("string:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE); + this.code = code; + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) { + return code; + } +} From 3a2bd852980679a32a873b99c039600e0c373eff Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Wed, 20 Jul 2016 16:26:49 +0200 Subject: [PATCH 02/21] Adding the beam to zeppelin 7 --- beam/pom.xml | 41 +++---------------- .../apache/zeppelin/beam/BeamInterpreter.java | 8 +--- .../zeppelin/beam/CompileSourceInMemory.java | 27 +++++------- pom.xml | 1 + 4 files changed, 19 insertions(+), 58 deletions(-) diff --git a/beam/pom.xml b/beam/pom.xml index d65368feab1..48122a689ef 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -7,23 +7,16 @@ zeppelin org.apache.zeppelin - 0.6.0-incubating-SNAPSHOT + 0.7.0-SNAPSHOT .. org.apache.zeppelin zeppelin-beam - 0.6.0-incubating-SNAPSHOT + 0.7.0-SNAPSHOT - - - repo.bodar.com - http://repo.bodar.com - - - - + + org.apache.commons commons-exec @@ -327,26 +316,6 @@ - - org.apache.maven.plugins - maven-shade-plugin - 2.4.1 - - - package - - shade - - - - - org.apache.beam.runners.flink.examples.WordCount - - - - - - diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java index 0bef52f18a4..d8ea9da9938 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -10,11 +10,11 @@ import java.util.Properties; import java.util.UUID; -import org.apache.beam.examples.MinimalWordCount; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import com.google.gson.Gson; @@ -30,10 +30,6 @@ public BeamInterpreter(Properties property) { super(property); } - static { - Interpreter.register("beam", "beam", BeamInterpreter.class.getName(), - new InterpreterPropertyBuilder().build()); - } public static void main(String[] args) { @@ -81,7 +77,7 @@ public int getProgress(InterpreterContext context) { } @Override - public List completion(String buf, int cursor) { + public List completion(String buf, int cursor) { return null; } diff --git a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java index 6be493180a9..f96374b2630 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java @@ -8,8 +8,6 @@ import javax.tools.SimpleJavaFileObject; import javax.tools.ToolProvider; -import org.apache.log4j.Category; -import org.apache.log4j.Priority; import com.thoughtworks.qdox.JavaProjectBuilder; import com.thoughtworks.qdox.model.JavaClass; @@ -17,7 +15,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.IOException; import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringReader; @@ -35,20 +32,20 @@ */ public class CompileSourceInMemory { public static String execute(String className, String code) throws Exception { - + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); DiagnosticCollector diagnostics = new DiagnosticCollector(); JavaProjectBuilder builder = new JavaProjectBuilder(); JavaSource src = builder.addSource(new StringReader(code)); -// List imports = src.getImports(); -// String importsString = ""; -// -// for (int i = 0; i < imports.size(); i++) { -// importsString += "import " + imports.get(i) + ";\n"; -// } - + // List imports = src.getImports(); + // String importsString = ""; + // + // for (int i = 0; i < imports.size(); i++) { + // importsString += "import " + imports.get(i) + ";\n"; + // } + List classes = src.getClasses(); String classesSt = ""; String classMain = "", classMainName = ""; @@ -80,10 +77,10 @@ public static String execute(String className, String code) throws Exception { JavaFileObject file = new JavaSourceFromString(className, writer.toString()); Iterable compilationUnits = Arrays.asList(file); - + ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); - + PrintStream newOut = new PrintStream(baosOut); PrintStream newErr = new PrintStream(baosErr); // IMPORTANT: Save the old System.out! @@ -93,7 +90,6 @@ public static String execute(String className, String code) throws Exception { System.setOut(newOut); System.setErr(newErr); - CompilationTask task = compiler.getTask(null, null, diagnostics, null, null, compilationUnits); boolean success = task.call(); @@ -112,11 +108,10 @@ public static String execute(String className, String code) throws Exception { System.out.flush(); System.err.flush(); - + System.setOut(oldOut); System.setErr(oldErr); - classLoader.clearAssertionStatus(); return baosOut.toString(); diff --git a/pom.xml b/pom.xml index d0f43885514..38c5e86b2f1 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,7 @@ angular shell livy + beam hbase postgresql jdbc From 26fc59bc53eff9a774bb0a2edcd07bc98a45486c Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Mon, 1 Aug 2016 10:53:12 +0200 Subject: [PATCH 03/21] Refactoring of the code --- beam/pom.xml | 36 ++++++++-------- .../apache/zeppelin/beam/BeamInterpreter.java | 21 ++++------ .../zeppelin/beam/CompileSourceInMemory.java | 41 +++++++------------ conf/interpreter-list | 1 + conf/zeppelin-site.xml.template | 4 +- .../zeppelin/conf/ZeppelinConfiguration.java | 5 ++- 6 files changed, 50 insertions(+), 58 deletions(-) diff --git a/beam/pom.xml b/beam/pom.xml index 48122a689ef..5e206419a6d 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -4,6 +4,7 @@ + zeppelin org.apache.zeppelin @@ -15,22 +16,26 @@ zeppelin-beam 0.7.0-SNAPSHOT - - + + + apache-beam + https://repository.apache.org/content/repositories/snapshots/org/apache/beam/ + + + 0.1.0-incubating --> - org.apache.commons commons-exec diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java index d8ea9da9938..cd5c07bdb2b 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -1,6 +1,5 @@ package org.apache.zeppelin.beam; - import java.io.File; import java.io.PrintWriter; import java.io.StringWriter; @@ -23,18 +22,10 @@ */ public class BeamInterpreter extends Interpreter { - private String host = "http://localhost:8001"; - private InterpreterContext context; - public BeamInterpreter(Properties property) { super(property); } - - public static void main(String[] args) { - - } - @Override public void open() { @@ -42,16 +33,22 @@ public void open() { @Override public void close() { - + File dir = new File("."); + for (int i = 0; i < dir.list().length; i++) { + File f = dir.listFiles()[i]; + System.out.println(f.getAbsolutePath()); + if (f.getAbsolutePath().contains(".class")) + f.delete(); + } } @Override public InterpreterResult interpret(String st, InterpreterContext context) { - String uuid = "C" + UUID.randomUUID().toString().replace("-", ""); + String className = "C" + UUID.randomUUID().toString().replace("-", ""); try { - String msg = CompileSourceInMemory.execute(uuid, st); + String msg = CompileSourceInMemory.execute(className, st); return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg); } catch (Exception e) { e.printStackTrace(); diff --git a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java index f96374b2630..ccf77c2fab6 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java @@ -14,20 +14,17 @@ import com.thoughtworks.qdox.model.JavaSource; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringReader; import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.net.URL; -import java.net.URLClassLoader; import java.util.Arrays; import java.util.List; /** - * @author admin + * @author Mahmoud * */ public class CompileSourceInMemory { @@ -39,31 +36,25 @@ public static String execute(String className, String code) throws Exception { JavaProjectBuilder builder = new JavaProjectBuilder(); JavaSource src = builder.addSource(new StringReader(code)); - // List imports = src.getImports(); - // String importsString = ""; - // - // for (int i = 0; i < imports.size(); i++) { - // importsString += "import " + imports.get(i) + ";\n"; - // } - List classes = src.getClasses(); - String classesSt = ""; - String classMain = "", classMainName = ""; + String classMainName = null; for (int i = 0; i < classes.size(); i++) { boolean hasMain = false; for (int j = 0; j < classes.get(i).getMethods().size(); j++) { if (classes.get(i).getMethods().get(j).getName().equals("main")) { + classMainName = classes.get(i).getName(); hasMain = true; break; } } - if (hasMain == true) { - classMain = classes.get(i).getCodeBlock() + "\n"; - classMainName = classes.get(i).getName(); - } else - classesSt += classes.get(i).getCodeBlock() + "\n"; + if (hasMain == true) + break; } + + if (classMainName == null) + throw new Exception("There isn't any class containing Main method."); + code = code.replace(classMainName, className); StringWriter writer = new StringWriter(); @@ -71,11 +62,11 @@ public static String execute(String className, String code) throws Exception { out.println(code); out.close(); + - System.out.println(writer.toString()); JavaFileObject file = new JavaSourceFromString(className, writer.toString()); - + Iterable compilationUnits = Arrays.asList(file); ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); @@ -100,11 +91,9 @@ public static String execute(String className, String code) throws Exception { } if (success) { try { - URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI() - .toURL() }); - Class.forName(className, true, classLoader) - .getDeclaredMethod("main", new Class[] { String[].class }) - .invoke(null, new Object[] { null }); + + Class.forName(className).getDeclaredMethod("main", new Class[] { String[].class }) + .invoke(null, new Object[] { null }); System.out.flush(); System.err.flush(); @@ -112,8 +101,8 @@ public static String execute(String className, String code) throws Exception { System.setOut(oldOut); System.setErr(oldErr); - classLoader.clearAssertionStatus(); + return baosOut.toString(); } catch (ClassNotFoundException e) { e.printStackTrace(newErr); diff --git a/conf/interpreter-list b/conf/interpreter-list index 17a6f1e4a36..77c4c4001a4 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -24,6 +24,7 @@ cassandra org.apache.zeppelin:zeppelin-cassandra_2.11:0.6.1 Cassandr elasticsearch org.apache.zeppelin:zeppelin-elasticsearch:0.6.1 Elasticsearch interpreter file org.apache.zeppelin:zeppelin-file:0.6.1 HDFS file interpreter flink org.apache.zeppelin:zeppelin-flink_2.11:0.6.1 Flink interpreter built with Scala 2.11 +beam org.apache.zeppelin:zeppelin-beam:0.6.1 Beam interpreter hbase org.apache.zeppelin:zeppelin-hbase:0.6.1 Hbase interpreter ignite org.apache.zeppelin:zeppelin-ignite_2.11:0.6.1 Ignite interpreter built with Scala 2.11 jdbc org.apache.zeppelin:zeppelin-jdbc:0.6.1 Jdbc interpreter diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 77e0b1f3bcd..9386fd78eba 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -184,13 +184,13 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter Comma separated interpreter configurations. First interpreter become a default zeppelin.interpreter.group.order - spark,md,angular,sh,livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,scalding,jdbc,hbase,bigquery + spark,md,angular,sh,livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,scalding,jdbc,hbase,bigquery,beam diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 75efe3956f8..eb8f27b78d6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -529,7 +529,8 @@ public static enum ConfVars { + "org.apache.zeppelin.scalding.ScaldingInterpreter," + "org.apache.zeppelin.jdbc.JDBCInterpreter," + "org.apache.zeppelin.hbase.HbaseInterpreter," - + "org.apache.zeppelin.bigquery.BigQueryInterpreter"), + + "org.apache.zeppelin.bigquery.BigQueryInterpreter", + + "org.apache.zeppelin.beam.BeamInterpreter"), ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), @@ -537,7 +538,7 @@ public static enum ConfVars { ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh," + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch," - + "scalding,jdbc,hbase,bigquery"), + + "scalding,jdbc,hbase,bigquery,beam"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), // use specified notebook (id) as homescreen From 5695077d13b68df113072ed67097e6ffe72e1d69 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Mon, 1 Aug 2016 12:23:48 +0200 Subject: [PATCH 04/21] Modifying pom file and Making documentation --- beam/pom.xml | 235 ++++++------------ .../main/resources/interpreter-setting.json | 11 + docs/interpreter/beam.md | 108 ++++++++ 3 files changed, 198 insertions(+), 156 deletions(-) create mode 100644 beam/src/main/resources/interpreter-setting.json create mode 100644 docs/interpreter/beam.md diff --git a/beam/pom.xml b/beam/pom.xml index 5e206419a6d..88d2e57142e 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -16,47 +16,13 @@ zeppelin-beam 0.7.0-SNAPSHOT - - - apache-beam - https://repository.apache.org/content/repositories/snapshots/org/apache/beam/ - - + - - - - - + + + org.apache.spark spark-core_2.10 1.4.1 @@ -89,94 +55,82 @@ 1.4.1 - - org.apache.hadoop - hadoop-mapreduce-client-core - 2.3.0 - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-common - 2.3.0 - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-hdfs - 2.3.0 - - - org.apache.hadoop - hadoop-client - 2.3.0 - - - slf4j-log4j12 - org.slf4j - - - - - org.apache.hadoop - hadoop-annotations - 2.3.0 - - - org.apache.hadoop - hadoop-yarn-common - 2.3.0 - - - org.apache.hadoop - hadoop-mapreduce-client-common - 2.3.0 - - - slf4j-log4j12 - org.slf4j - - - - - org.apache.hadoop - hadoop-core - 1.2.1 - - - - com.thoughtworks.qdox - qdox - 2.0-M3 + org.apache.hadoop + hadoop-mapreduce-client-core + 2.3.0 + + + slf4j-log4j12 + org.slf4j + + - - org.mdkt.compiler - InMemoryJavaCompiler - 1.2 + org.apache.hadoop + hadoop-common + 2.3.0 + + + slf4j-log4j12 + org.slf4j + + - com.ning - async-http-client - 1.9.31 + org.apache.hadoop + hadoop-hdfs + 2.3.0 + + + org.apache.hadoop + hadoop-client + 2.3.0 + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-annotations + 2.3.0 + + + org.apache.hadoop + hadoop-yarn-common + 2.3.0 + + + org.apache.hadoop + hadoop-mapreduce-client-common + 2.3.0 + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-core + 1.2.1 + + + + com.thoughtworks.qdox + qdox + 2.0-M3 + + com.google.code.gson gson @@ -195,20 +149,7 @@ 0.1.0-incubating pom - org.apache.beam beam-runners-core-java @@ -240,7 +181,7 @@ netty-all io.netty - + @@ -255,7 +196,7 @@ - - + + org.apache.beam beam-runners-spark 0.1.0-incubating jar - - - - + + + + ${project.groupId} zeppelin-interpreter @@ -306,9 +231,7 @@ commons-exec 1.3 - + junit junit diff --git a/beam/src/main/resources/interpreter-setting.json b/beam/src/main/resources/interpreter-setting.json new file mode 100644 index 00000000000..7cf57d24835 --- /dev/null +++ b/beam/src/main/resources/interpreter-setting.json @@ -0,0 +1,11 @@ +[ + { + "group": "beam", + "name": "beam", + "className": "org.apache.zeppelin.beam.BeamInterpreter", + "defaultInterpreter": true, + "properties": { + + } + } +] diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md new file mode 100644 index 00000000000..374aa06ab0d --- /dev/null +++ b/docs/interpreter/beam.md @@ -0,0 +1,108 @@ +--- +layout: page +title: "Beam Interpreter" +description: "" +group: interpreter +--- +{% include JB/setup %} + +# Beam interpreter for Apache Zeppelin + +
+ +## Overview +[Apache Beam](http://beam.incubator.apache.org) is an open source, unified programming model that you can use to create a data processing pipeline. You start by building a program that defines the pipeline using one of the open source Beam SDKs. The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow. + +Beam is particularly useful for Embarrassingly Parallel data processing tasks, in which the problem can be decomposed into many smaller bundles of data that can be processed independently and in parallel. You can also use Beam for Extract, Transform, and Load (ETL) tasks and pure data integration. These tasks are useful for moving data between different storage media and data sources, transforming data into a more desirable format, or loading data onto a new system. + +# Apache Beam Pipeline Runners +The Beam Pipeline Runners translate the data processing pipeline you define with your Beam program into the API compatible with the distributed processing back-end of your choice. When you run your Beam program, you’ll need to specify the appropriate runner for the back-end where you want to execute your pipeline. + +* Beam currently supports Runners that work with the following distributed processing back-ends: +- Google Cloud Dataflow +- Apache Flink +- Apache Spark + +## How to use +Basically, You can write normal java code and determine the runner inside the code. +You should write the main method inside the main class beacuase the interpreter invoke this main to execute pipline. +Each paragraph is considered as separate job, there isn't any relate to any another job, Beacuse the interpreter is a static repl of java, we compile and run each paragraph apart. + +**Example for Flink Runner** + +``` +%beam + +import java.util.ArrayList; +import java.util.List; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.*; +import org.apache.spark.SparkContext; +import org.apache.beam.runners.direct.*; +import org.apache.beam.sdk.runners.*; +import org.apache.beam.sdk.options.*; +import org.apache.beam.runners.spark.*; +import org.apache.beam.runners.spark.io.ConsoleIO; +import org.apache.beam.runners.flink.*; +import org.apache.beam.runners.flink.examples.WordCount.Options; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; + + +public class MinimalWordCount { + static List s = new ArrayList<>(); + public static void main(String[] args) { + + Options options = PipelineOptionsFactory.create().as(Options.class); + + options.setRunner(FlinkPipelineRunner.class); + + + Pipeline p = Pipeline.create(options); + + p.apply(TextIO.Read.from("/home/admin/mahmoud/work/bigdata/beam/shakespeare/input/file1.txt")) + + .apply(ParDo.named("ExtractWords").of(new DoFn() { + @Override + public void processElement(ProcessContext c) { + + for (String word : c.element().split("[^a-zA-Z']+")) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + })) + + .apply(Count. perElement()) + + .apply("FormatResults", ParDo.of(new DoFn, String>() { + + @Override + public void processElement(DoFn, String>.ProcessContext arg0) + throws Exception { + s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue()); + + } + })); + + p.run(); + + System.out.println("%table word\tcount"); + for (int i = 0; i < s.size(); i++) { + System.out.print(s.get(i)); + } + + } +} + +``` From 3c5038f779899327965a22c2f90b837202c38c3f Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Thu, 4 Aug 2016 00:55:32 +0200 Subject: [PATCH 05/21] Modifying the documentation --- docs/interpreter/beam.md | 56 ++++------------------------------------ 1 file changed, 5 insertions(+), 51 deletions(-) diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md index 374aa06ab0d..87490e094eb 100644 --- a/docs/interpreter/beam.md +++ b/docs/interpreter/beam.md @@ -11,70 +11,29 @@ group: interpreter
## Overview -[Apache Beam](http://beam.incubator.apache.org) is an open source, unified programming model that you can use to create a data processing pipeline. You start by building a program that defines the pipeline using one of the open source Beam SDKs. The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow. - -Beam is particularly useful for Embarrassingly Parallel data processing tasks, in which the problem can be decomposed into many smaller bundles of data that can be processed independently and in parallel. You can also use Beam for Extract, Transform, and Load (ETL) tasks and pure data integration. These tasks are useful for moving data between different storage media and data sources, transforming data into a more desirable format, or loading data onto a new system. - -# Apache Beam Pipeline Runners -The Beam Pipeline Runners translate the data processing pipeline you define with your Beam program into the API compatible with the distributed processing back-end of your choice. When you run your Beam program, you’ll need to specify the appropriate runner for the back-end where you want to execute your pipeline. - -* Beam currently supports Runners that work with the following distributed processing back-ends: -- Google Cloud Dataflow -- Apache Flink -- Apache Spark +[Apache Beam](http://beam.incubator.apache.org) is an open source unified platform for data processing pipelines. A pipeline can be build using one of the Beam SDKs. +The execution of the pipeline is done by different Runners . Currently, Beam supports Apache Flink Runner, Apache Spark Runner, and Google Dataflow Runner. ## How to use -Basically, You can write normal java code and determine the runner inside the code. -You should write the main method inside the main class beacuase the interpreter invoke this main to execute pipline. -Each paragraph is considered as separate job, there isn't any relate to any another job, Beacuse the interpreter is a static repl of java, we compile and run each paragraph apart. +Basically, you can write normal Beam java code where you can determine the Runner. You should write the main method inside a class becuase the interpreter invoke this main to execute the pipline. Unlike Zeppelin normal pattern, each paragraph is considered a separate job, there isn't any relation to any other paragraph. -**Example for Flink Runner** +The following is a demonstration of a word count example ``` %beam -import java.util.ArrayList; -import java.util.List; -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.*; -import org.apache.spark.SparkContext; -import org.apache.beam.runners.direct.*; -import org.apache.beam.sdk.runners.*; -import org.apache.beam.sdk.options.*; -import org.apache.beam.runners.spark.*; -import org.apache.beam.runners.spark.io.ConsoleIO; -import org.apache.beam.runners.flink.*; -import org.apache.beam.runners.flink.examples.WordCount.Options; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; - +// imports are omitted to save space public class MinimalWordCount { static List s = new ArrayList<>(); public static void main(String[] args) { - Options options = PipelineOptionsFactory.create().as(Options.class); - options.setRunner(FlinkPipelineRunner.class); - - Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.from("/home/admin/mahmoud/work/bigdata/beam/shakespeare/input/file1.txt")) - .apply(ParDo.named("ExtractWords").of(new DoFn() { @Override public void processElement(ProcessContext c) { - for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { c.output(word); @@ -82,11 +41,8 @@ public class MinimalWordCount { } } })) - .apply(Count. perElement()) - .apply("FormatResults", ParDo.of(new DoFn, String>() { - @Override public void processElement(DoFn, String>.ProcessContext arg0) throws Exception { @@ -94,9 +50,7 @@ public class MinimalWordCount { } })); - p.run(); - System.out.println("%table word\tcount"); for (int i = 0; i < s.size(); i++) { System.out.print(s.get(i)); From 7cf25fb72762495a21eec231dff58ac52fbb1e66 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Mon, 8 Aug 2016 10:37:52 +0200 Subject: [PATCH 06/21] Adding some tests --- beam/pom.xml | 6 +- .../apache/zeppelin/beam/BeamInterpreter.java | 4 +- .../zeppelin/beam/CompileSourceInMemory.java | 27 ++++++- .../zeppelin/beam/BeamInterpreterTest.java | 80 +++++++++++++++++++ 4 files changed, 110 insertions(+), 7 deletions(-) create mode 100644 beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java diff --git a/beam/pom.xml b/beam/pom.xml index 88d2e57142e..b7823a67a54 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -21,7 +21,11 @@ - + + com.javax0 + jscc + 1.0.1 + org.apache.spark spark-core_2.10 diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java index cd5c07bdb2b..aa4e2f5813e 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -36,7 +36,7 @@ public void close() { File dir = new File("."); for (int i = 0; i < dir.list().length; i++) { File f = dir.listFiles()[i]; - System.out.println(f.getAbsolutePath()); +// System.out.println(f.getAbsolutePath()); if (f.getAbsolutePath().contains(".class")) f.delete(); } @@ -51,7 +51,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { String msg = CompileSourceInMemory.execute(className, st); return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg); } catch (Exception e) { - e.printStackTrace(); +// e.printStackTrace(); return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); } diff --git a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java index ccf77c2fab6..d5f5b065490 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java @@ -8,18 +8,20 @@ import javax.tools.SimpleJavaFileObject; import javax.tools.ToolProvider; - import com.thoughtworks.qdox.JavaProjectBuilder; import com.thoughtworks.qdox.model.JavaClass; import com.thoughtworks.qdox.model.JavaSource; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringReader; import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.net.URI; +import java.net.URL; +import java.net.URLClassLoader; import java.util.Arrays; import java.util.List; @@ -86,14 +88,20 @@ public static String execute(String className, String code) throws Exception { boolean success = task.call(); if (!success) { for (Diagnostic diagnostic : diagnostics.getDiagnostics()) { - System.out.println(diagnostic.getMessage(null)); + if (diagnostic.getLineNumber() == -1) continue; + System.out.println("line "+ diagnostic.getLineNumber()+ " : " +diagnostic.getMessage(null)); } } if (success) { try { - Class.forName(className).getDeclaredMethod("main", new Class[] { String[].class }) - .invoke(null, new Object[] { null }); + URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI().toURL() }); + //URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI().toURL() }); + Class.forName(className, true, classLoader).getDeclaredMethod("main", new Class[] { String[].class }).invoke(null, new Object[] { null }); + + +// Class.forName(className).getDeclaredMethod("main", new Class[] { String[].class }) +// .invoke(null, new Object[] { null }); System.out.flush(); System.err.flush(); @@ -120,8 +128,19 @@ public static String execute(String className, String code) throws Exception { e.printStackTrace(newErr); System.err.println("Invocation target: " + e); throw new Exception(baosErr.toString()); + } finally{ + System.out.flush(); + System.err.flush(); + + System.setOut(oldOut); + System.setErr(oldErr); } } else { + System.out.flush(); + System.err.flush(); + + System.setOut(oldOut); + System.setErr(oldErr); throw new Exception(baosOut.toString()); } } diff --git a/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java b/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java new file mode 100644 index 00000000000..f0a5047a34d --- /dev/null +++ b/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java @@ -0,0 +1,80 @@ +package org.apache.zeppelin.beam; + +import static org.junit.Assert.assertEquals; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Properties; + +import org.apache.zeppelin.beam.BeamInterpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class BeamInterpreterTest { + + private static BeamInterpreter beam; + private static InterpreterContext context; + + @BeforeClass + public static void setUp() { + Properties p = new Properties(); + beam = new BeamInterpreter(p); + beam.open(); + context = new InterpreterContext(null, null, null, null, null, null, + null, null, null, null, null); + } + + @AfterClass + public static void tearDown() { + beam.close(); + } + + @Test + public void testStaticRepl() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" System.out.println(\"This is in another java file\");"); + out.println(" }"); + out.println("}"); + out.close(); + + InterpreterResult res = beam.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.SUCCESS, res.code()); + } + + @Test + public void testStaticReplWithoutMain() { + + StringBuffer sourceCode = new StringBuffer(); + sourceCode.append("package org.mdkt;\n"); + sourceCode.append("public class HelloClass {\n"); + sourceCode.append(" public String hello() { return \"hello\"; }"); + sourceCode.append("}"); + InterpreterResult res = beam.interpret(sourceCode.toString(), context); + assertEquals(InterpreterResult.Code.ERROR, res.code()); + } + + @Test + public void testStaticReplWithSyntaxError() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" System.out.prin(\"This is in another java file\");"); + out.println(" }"); + out.println("}"); + out.close(); + InterpreterResult res = beam.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.ERROR, res.code()); + } + +} From 2aa6d6593c33309c4f3d66219811a87f5735b6dd Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Mon, 8 Aug 2016 15:10:27 +0200 Subject: [PATCH 07/21] changing class name to StaticRepl and adding some modifications --- beam/pom.xml | 5 - .../apache/zeppelin/beam/BeamInterpreter.java | 7 +- ...ileSourceInMemory.java => StaticRepl.java} | 45 +++--- .../zeppelin/beam/BeamInterpreterTest.java | 129 +++++++++--------- zeppelin-web/src/index.html | 2 +- zeppelin-web/test/karma.conf.js | 2 +- 6 files changed, 92 insertions(+), 98 deletions(-) rename beam/src/main/java/org/apache/zeppelin/beam/{CompileSourceInMemory.java => StaticRepl.java} (82%) diff --git a/beam/pom.xml b/beam/pom.xml index b7823a67a54..e5abe5ed19e 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -21,11 +21,6 @@ - - com.javax0 - jscc - 1.0.1 - org.apache.spark spark-core_2.10 diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java index aa4e2f5813e..ca37edff2c4 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -36,22 +36,21 @@ public void close() { File dir = new File("."); for (int i = 0; i < dir.list().length; i++) { File f = dir.listFiles()[i]; -// System.out.println(f.getAbsolutePath()); if (f.getAbsolutePath().contains(".class")) f.delete(); } } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String code, InterpreterContext context) { String className = "C" + UUID.randomUUID().toString().replace("-", ""); try { - String msg = CompileSourceInMemory.execute(className, st); + String msg = StaticRepl.execute(className, code); return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg); } catch (Exception e) { -// e.printStackTrace(); + // e.printStackTrace(); return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); } diff --git a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java similarity index 82% rename from beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java rename to beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java index d5f5b065490..6fbeadf6927 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java @@ -27,9 +27,9 @@ /** * @author Mahmoud - * + * */ -public class CompileSourceInMemory { +public class StaticRepl { public static String execute(String className, String code) throws Exception { JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); @@ -64,11 +64,9 @@ public static String execute(String className, String code) throws Exception { out.println(code); out.close(); - - JavaFileObject file = new JavaSourceFromString(className, writer.toString()); - + Iterable compilationUnits = Arrays.asList(file); ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); @@ -88,20 +86,21 @@ public static String execute(String className, String code) throws Exception { boolean success = task.call(); if (!success) { for (Diagnostic diagnostic : diagnostics.getDiagnostics()) { - if (diagnostic.getLineNumber() == -1) continue; - System.out.println("line "+ diagnostic.getLineNumber()+ " : " +diagnostic.getMessage(null)); + if (diagnostic.getLineNumber() == -1) + continue; + System.out.println("line " + diagnostic.getLineNumber() + " : " + + diagnostic.getMessage(null)); } } if (success) { try { - URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI().toURL() }); - //URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI().toURL() }); - Class.forName(className, true, classLoader).getDeclaredMethod("main", new Class[] { String[].class }).invoke(null, new Object[] { null }); - + URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI() + .toURL() }); + Class.forName(className, true, classLoader) + .getDeclaredMethod("main", new Class[] { String[].class }) + .invoke(null, new Object[] { null }); -// Class.forName(className).getDeclaredMethod("main", new Class[] { String[].class }) -// .invoke(null, new Object[] { null }); System.out.flush(); System.err.flush(); @@ -109,8 +108,6 @@ public static String execute(String className, String code) throws Exception { System.setOut(oldOut); System.setErr(oldErr); - - return baosOut.toString(); } catch (ClassNotFoundException e) { e.printStackTrace(newErr); @@ -128,19 +125,19 @@ public static String execute(String className, String code) throws Exception { e.printStackTrace(newErr); System.err.println("Invocation target: " + e); throw new Exception(baosErr.toString()); - } finally{ - System.out.flush(); - System.err.flush(); - - System.setOut(oldOut); - System.setErr(oldErr); - } - } else { - System.out.flush(); + } finally { + System.out.flush(); System.err.flush(); System.setOut(oldOut); System.setErr(oldErr); + } + } else { + System.out.flush(); + System.err.flush(); + + System.setOut(oldOut); + System.setErr(oldErr); throw new Exception(baosOut.toString()); } } diff --git a/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java b/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java index f0a5047a34d..42caf925e46 100644 --- a/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java +++ b/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java @@ -5,76 +5,79 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.Properties; - -import org.apache.zeppelin.beam.BeamInterpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +/** + * + * @author admin + * + */ public class BeamInterpreterTest { - private static BeamInterpreter beam; - private static InterpreterContext context; - - @BeforeClass - public static void setUp() { - Properties p = new Properties(); - beam = new BeamInterpreter(p); - beam.open(); - context = new InterpreterContext(null, null, null, null, null, null, - null, null, null, null, null); - } - - @AfterClass - public static void tearDown() { - beam.close(); - } - - @Test - public void testStaticRepl() { - - StringWriter writer = new StringWriter(); - PrintWriter out = new PrintWriter(writer); - out.println("public class HelloWorld {"); - out.println(" public static void main(String args[]) {"); - out.println(" System.out.println(\"This is in another java file\");"); - out.println(" }"); - out.println("}"); - out.close(); - - InterpreterResult res = beam.interpret(writer.toString(), context); - - assertEquals(InterpreterResult.Code.SUCCESS, res.code()); - } - - @Test - public void testStaticReplWithoutMain() { - - StringBuffer sourceCode = new StringBuffer(); - sourceCode.append("package org.mdkt;\n"); - sourceCode.append("public class HelloClass {\n"); - sourceCode.append(" public String hello() { return \"hello\"; }"); - sourceCode.append("}"); - InterpreterResult res = beam.interpret(sourceCode.toString(), context); - assertEquals(InterpreterResult.Code.ERROR, res.code()); - } - - @Test - public void testStaticReplWithSyntaxError() { - - StringWriter writer = new StringWriter(); - PrintWriter out = new PrintWriter(writer); - out.println("public class HelloWorld {"); - out.println(" public static void main(String args[]) {"); - out.println(" System.out.prin(\"This is in another java file\");"); - out.println(" }"); - out.println("}"); - out.close(); - InterpreterResult res = beam.interpret(writer.toString(), context); - - assertEquals(InterpreterResult.Code.ERROR, res.code()); - } + private static BeamInterpreter beam; + private static InterpreterContext context; + + @BeforeClass + public static void setUp() { + Properties p = new Properties(); + beam = new BeamInterpreter(p); + beam.open(); + context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, + null); + } + + @AfterClass + public static void tearDown() { + beam.close(); + } + + @Test + public void testStaticRepl() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" System.out.println(\"This is in another java file\");"); + out.println(" }"); + out.println("}"); + out.close(); + + InterpreterResult res = beam.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.SUCCESS, res.code()); + } + + @Test + public void testStaticReplWithoutMain() { + + StringBuffer sourceCode = new StringBuffer(); + sourceCode.append("package org.mdkt;\n"); + sourceCode.append("public class HelloClass {\n"); + sourceCode.append(" public String hello() { return \"hello\"; }"); + sourceCode.append("}"); + InterpreterResult res = beam.interpret(sourceCode.toString(), context); + assertEquals(InterpreterResult.Code.ERROR, res.code()); + } + + @Test + public void testStaticReplWithSyntaxError() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" System.out.prin(\"This is in another java file\");"); + out.println(" }"); + out.println("}"); + out.close(); + InterpreterResult res = beam.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.ERROR, res.code()); + } } diff --git a/zeppelin-web/src/index.html b/zeppelin-web/src/index.html index 9b049552d59..3d5b19bbbd5 100644 --- a/zeppelin-web/src/index.html +++ b/zeppelin-web/src/index.html @@ -137,7 +137,7 @@ - + diff --git a/zeppelin-web/test/karma.conf.js b/zeppelin-web/test/karma.conf.js index 64e66c25bab..33f3fe6ab51 100644 --- a/zeppelin-web/test/karma.conf.js +++ b/zeppelin-web/test/karma.conf.js @@ -55,7 +55,7 @@ module.exports = function(config) { 'bower_components/angular-xeditable/dist/js/xeditable.js', 'bower_components/highlightjs/highlight.pack.js', 'bower_components/lodash/lodash.js', - 'bower_components/angular-filter/dist/angular-filter.js', + 'bower_components/angular-filter/dist/angular-filter.min.js', 'bower_components/ngtoast/dist/ngToast.js', 'bower_components/ng-focus-if/focusIf.js', 'bower_components/bootstrap3-dialog/dist/js/bootstrap-dialog.min.js', From 9c1e25d68b2d99fd4e62c3385b4fa3adf0e5692c Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Wed, 17 Aug 2016 00:10:25 +0200 Subject: [PATCH 08/21] Adding changes like logging and conventions and license --- beam/pom.xml | 87 ++++++++-------- .../apache/zeppelin/beam/BeamInterpreter.java | 39 +++++--- .../org/apache/zeppelin/beam/StaticRepl.java | 98 +++++++++++++------ .../zeppelin/beam/BeamInterpreterTest.java | 19 +++- docs/interpreter/beam.md | 20 ++++ zeppelin-distribution/src/bin_license/LICENSE | 2 + zeppelin-web/src/index.html | 2 +- zeppelin-web/test/karma.conf.js | 2 +- .../zeppelin/conf/ZeppelinConfiguration.java | 2 +- 9 files changed, 185 insertions(+), 86 deletions(-) diff --git a/beam/pom.xml b/beam/pom.xml index e5abe5ed19e..09ac0d275cf 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -1,3 +1,16 @@ + + + 4.0.0 @@ -14,17 +27,22 @@ org.apache.zeppelin zeppelin-beam + jar 0.7.0-SNAPSHOT + Zeppelin: Beam interpreter - + + 2.3.0 + 1.4.1 + 0.1.0-incubating + - org.apache.spark spark-core_2.10 - 1.4.1 + ${beam.spark.version} slf4j-log4j12 @@ -48,16 +66,17 @@ + org.apache.spark spark-streaming_2.10 - 1.4.1 + ${beam.spark.version} org.apache.hadoop hadoop-mapreduce-client-core - 2.3.0 + ${beam.hadoop.version} slf4j-log4j12 @@ -69,7 +88,7 @@ org.apache.hadoop hadoop-common - 2.3.0 + ${beam.hadoop.version} slf4j-log4j12 @@ -81,12 +100,13 @@ org.apache.hadoop hadoop-hdfs - 2.3.0 + ${beam.hadoop.version} + org.apache.hadoop hadoop-client - 2.3.0 + ${beam.hadoop.version} slf4j-log4j12 @@ -94,20 +114,23 @@ + org.apache.hadoop hadoop-annotations - 2.3.0 + ${beam.hadoop.version} + org.apache.hadoop hadoop-yarn-common - 2.3.0 + ${beam.hadoop.version} + org.apache.hadoop hadoop-mapreduce-client-common - 2.3.0 + ${beam.hadoop.version} slf4j-log4j12 @@ -115,44 +138,30 @@ + org.apache.hadoop hadoop-core 1.2.1 - com.thoughtworks.qdox qdox 2.0-M3 - - - - com.google.code.gson - gson - 2.6.2 - - - jline - jline - 2.12 - - - org.apache.beam beam-runners-parent - 0.1.0-incubating + ${beam.beam.version} pom org.apache.beam beam-runners-core-java - 0.1.0-incubating + ${beam.beam.version} google-http-client-jackson2 @@ -160,16 +169,17 @@ + org.apache.beam beam-runners-direct-java - 0.1.0-incubating - + ${beam.beam.version} + org.apache.beam beam-runners-flink_2.10 - 0.1.0-incubating + ${beam.beam.version} @@ -183,10 +193,11 @@ + org.apache.beam beam-runners-flink_2.10-examples - 0.1.0-incubating + ${beam.beam.version} @@ -199,7 +210,7 @@ org.apache.beam beam-runners-google-cloud-dataflow-java - 0.1.0-incubating + ${beam.beam.version} @@ -208,23 +219,21 @@ - + org.apache.beam beam-runners-spark - 0.1.0-incubating + ${beam.beam.version} jar - - - ${project.groupId} zeppelin-interpreter ${project.version} provided + org.apache.commons commons-exec @@ -242,8 +251,6 @@ - - org.apache.maven.plugins maven-deploy-plugin diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java index ca37edff2c4..2b2ab4ecd74 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -1,27 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.beam; import java.io.File; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Date; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.UUID; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; - -import com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** +* Beam interpreter * */ public class BeamInterpreter extends Interpreter { + Logger LOGGER = LoggerFactory.getLogger(BeamInterpreter.class); + public BeamInterpreter(Properties property) { super(property); } @@ -34,6 +49,7 @@ public void open() { @Override public void close() { File dir = new File("."); + // delete all .class files created while compilation process for (int i = 0; i < dir.list().length; i++) { File f = dir.listFiles()[i]; if (f.getAbsolutePath().contains(".class")) @@ -44,13 +60,14 @@ public void close() { @Override public InterpreterResult interpret(String code, InterpreterContext context) { - String className = "C" + UUID.randomUUID().toString().replace("-", ""); + // choosing new name to class containing Main method + String generatedClassName = "C" + UUID.randomUUID().toString().replace("-", ""); try { - String msg = StaticRepl.execute(className, code); - return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg); + String res = StaticRepl.execute(generatedClassName, code); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, res); } catch (Exception e) { - // e.printStackTrace(); + LOGGER.error("Exception in Interpreter while interpret", e); return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); } diff --git a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java index 6fbeadf6927..e0a8e32f7e7 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.beam; import javax.tools.Diagnostic; @@ -8,6 +25,9 @@ import javax.tools.SimpleJavaFileObject; import javax.tools.ToolProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.thoughtworks.qdox.JavaProjectBuilder; import com.thoughtworks.qdox.model.JavaClass; import com.thoughtworks.qdox.model.JavaSource; @@ -26,55 +46,60 @@ import java.util.List; /** - * @author Mahmoud + * StaticRepl for compling the java code in memory * */ public class StaticRepl { - public static String execute(String className, String code) throws Exception { + static Logger LOGGER = LoggerFactory.getLogger(StaticRepl.class); + + public static String execute(String generatedClassName, String code) throws Exception { JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); DiagnosticCollector diagnostics = new DiagnosticCollector(); + // Java parasing JavaProjectBuilder builder = new JavaProjectBuilder(); JavaSource src = builder.addSource(new StringReader(code)); + // get all classes in code (paragraph) List classes = src.getClasses(); - String classMainName = null; + String mainClassName = null; + + // Searching for class containing Main method for (int i = 0; i < classes.size(); i++) { boolean hasMain = false; + for (int j = 0; j < classes.get(i).getMethods().size(); j++) { + if (classes.get(i).getMethods().get(j).getName().equals("main")) { - classMainName = classes.get(i).getName(); + mainClassName = classes.get(i).getName(); hasMain = true; break; } + } if (hasMain == true) break; } - if (classMainName == null) + // if there isn't Main method, will retuen error + if (mainClassName == null) throw new Exception("There isn't any class containing Main method."); - code = code.replace(classMainName, className); - - StringWriter writer = new StringWriter(); - PrintWriter out = new PrintWriter(writer); - - out.println(code); - out.close(); - - JavaFileObject file = new JavaSourceFromString(className, writer.toString()); + // replace name of class containing Main method with generated name + code = code.replace(mainClassName, generatedClassName); + JavaFileObject file = new JavaSourceFromString(generatedClassName, code.toString()); Iterable compilationUnits = Arrays.asList(file); ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + // Creating new stream to get the output data PrintStream newOut = new PrintStream(baosOut); PrintStream newErr = new PrintStream(baosErr); - // IMPORTANT: Save the old System.out! + // Save the old System.out! PrintStream oldOut = System.out; PrintStream oldErr = System.err; // Tell Java to use your special stream @@ -83,63 +108,74 @@ public static String execute(String className, String code) throws Exception { CompilationTask task = compiler.getTask(null, null, diagnostics, null, null, compilationUnits); + // executing the compilation process boolean success = task.call(); + + // if success is false will get error if (!success) { for (Diagnostic diagnostic : diagnostics.getDiagnostics()) { if (diagnostic.getLineNumber() == -1) continue; - System.out.println("line " + diagnostic.getLineNumber() + " : " + System.err.println("line " + diagnostic.getLineNumber() + " : " + diagnostic.getMessage(null)); } - } - if (success) { + System.out.flush(); + System.err.flush(); + + System.setOut(oldOut); + System.setErr(oldErr); + LOGGER.error("Exception in Interpreter while compilation", baosErr.toString()); + throw new Exception(baosErr.toString()); + } else { try { + // creating new class loader URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI() .toURL() }); - Class.forName(className, true, classLoader) + // execute the Main method + Class.forName(generatedClassName, true, classLoader) .getDeclaredMethod("main", new Class[] { String[].class }) .invoke(null, new Object[] { null }); - System.out.flush(); System.err.flush(); + // set the stream to old stream System.setOut(oldOut); System.setErr(oldErr); return baosOut.toString(); + } catch (ClassNotFoundException e) { - e.printStackTrace(newErr); + LOGGER.error("Exception in Interpreter while Class not found", e); System.err.println("Class not found: " + e); throw new Exception(baosErr.toString()); + } catch (NoSuchMethodException e) { - e.printStackTrace(newErr); + LOGGER.error("Exception in Interpreter while No such method", e); System.err.println("No such method: " + e); throw new Exception(baosErr.toString()); + } catch (IllegalAccessException e) { - e.printStackTrace(newErr); + LOGGER.error("Exception in Interpreter while Illegal access", e); System.err.println("Illegal access: " + e); throw new Exception(baosErr.toString()); + } catch (InvocationTargetException e) { - e.printStackTrace(newErr); + LOGGER.error("Exception in Interpreter while Invocation target", e); System.err.println("Invocation target: " + e); throw new Exception(baosErr.toString()); + } finally { + System.out.flush(); System.err.flush(); System.setOut(oldOut); System.setErr(oldErr); } - } else { - System.out.flush(); - System.err.flush(); - - System.setOut(oldOut); - System.setErr(oldErr); - throw new Exception(baosOut.toString()); } + } } diff --git a/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java b/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java index 42caf925e46..c24ed41ddb9 100644 --- a/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java +++ b/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.zeppelin.beam; import static org.junit.Assert.assertEquals; @@ -13,7 +30,7 @@ /** * - * @author admin + * BeamInterpreterTest * */ public class BeamInterpreterTest { diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md index 87490e094eb..17d34eb8fa6 100644 --- a/docs/interpreter/beam.md +++ b/docs/interpreter/beam.md @@ -4,6 +4,20 @@ title: "Beam Interpreter" description: "" group: interpreter --- + + {% include JB/setup %} # Beam interpreter for Apache Zeppelin @@ -60,3 +74,9 @@ public class MinimalWordCount { } ``` + +## Contributors +1- Mahmoud Fathy Elgamal (@mfelgamal) +2- Fouad Ali Al-Sayadi (@FouadMA) + + diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index e599084904b..99fd8dfb07c 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -42,10 +42,12 @@ The following components are provided under Apache License. (Apache 2.0) Apache Kylin (http://kylin.apache.org/) (Apache 2.0) Apache Lens (http://lens.apache.org/) (Apache 2.0) Apache Flink (http://flink.apache.org/) + (Apache 2.0) Apache Beam (http://beam.apache.org/) (Apache 2.0) Apache Thrift (http://thrift.apache.org/) (Apache 2.0) Apache Lucene (https://lucene.apache.org/) (Apache 2.0) Apache Zookeeper (org.apache.zookeeper:zookeeper:jar:3.4.5 - http://zookeeper.apache.org/) (Apache 2.0) Chill (com.twitter:chill-java:jar:0.8.0 - https://github.com/twitter/chill/) + (Apache 2.0) QDox (com.thoughtworks.qdox:qdox:jar:2.0-M3 - https://github.com/paul-hammant/qdox/) (Apache 2.0) Codehaus Plexus (org.codehaus.plexus:plexus:jar:1.5.6 - https://codehaus-plexus.github.io/) (Apache 2.0) findbugs jsr305 (com.google.code.findbugs:jsr305:jar:1.3.9 - http://findbugs.sourceforge.net/) (Apache 2.0) Google Guava (com.google.guava:guava:15.0 - https://code.google.com/p/guava-libraries/) diff --git a/zeppelin-web/src/index.html b/zeppelin-web/src/index.html index 3d5b19bbbd5..9b049552d59 100644 --- a/zeppelin-web/src/index.html +++ b/zeppelin-web/src/index.html @@ -137,7 +137,7 @@ - + diff --git a/zeppelin-web/test/karma.conf.js b/zeppelin-web/test/karma.conf.js index 33f3fe6ab51..64e66c25bab 100644 --- a/zeppelin-web/test/karma.conf.js +++ b/zeppelin-web/test/karma.conf.js @@ -55,7 +55,7 @@ module.exports = function(config) { 'bower_components/angular-xeditable/dist/js/xeditable.js', 'bower_components/highlightjs/highlight.pack.js', 'bower_components/lodash/lodash.js', - 'bower_components/angular-filter/dist/angular-filter.min.js', + 'bower_components/angular-filter/dist/angular-filter.js', 'bower_components/ngtoast/dist/ngToast.js', 'bower_components/ng-focus-if/focusIf.js', 'bower_components/bootstrap3-dialog/dist/js/bootstrap-dialog.min.js', diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index eb8f27b78d6..4a80c7efaf5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -529,7 +529,7 @@ public static enum ConfVars { + "org.apache.zeppelin.scalding.ScaldingInterpreter," + "org.apache.zeppelin.jdbc.JDBCInterpreter," + "org.apache.zeppelin.hbase.HbaseInterpreter," - + "org.apache.zeppelin.bigquery.BigQueryInterpreter", + + "org.apache.zeppelin.bigquery.BigQueryInterpreter," + "org.apache.zeppelin.beam.BeamInterpreter"), ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), From 9b1b385b7ce39f182db8c322b29096127d113155 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Wed, 17 Aug 2016 12:45:59 +0200 Subject: [PATCH 09/21] put beam in alphabetical order --- conf/interpreter-list | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/interpreter-list b/conf/interpreter-list index 77c4c4001a4..098b3c6c188 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -19,12 +19,12 @@ alluxio org.apache.zeppelin:zeppelin-alluxio:0.6.1 Alluxio interpreter angular org.apache.zeppelin:zeppelin-angular:0.6.1 HTML and AngularJS view rendering +beam org.apache.zeppelin:zeppelin-beam:0.6.1 Beam interpreter bigquery org.apache.zeppelin:zeppelin-bigquery:0.6.1 BigQuery interpreter cassandra org.apache.zeppelin:zeppelin-cassandra_2.11:0.6.1 Cassandra interpreter built with Scala 2.11 elasticsearch org.apache.zeppelin:zeppelin-elasticsearch:0.6.1 Elasticsearch interpreter file org.apache.zeppelin:zeppelin-file:0.6.1 HDFS file interpreter flink org.apache.zeppelin:zeppelin-flink_2.11:0.6.1 Flink interpreter built with Scala 2.11 -beam org.apache.zeppelin:zeppelin-beam:0.6.1 Beam interpreter hbase org.apache.zeppelin:zeppelin-hbase:0.6.1 Hbase interpreter ignite org.apache.zeppelin:zeppelin-ignite_2.11:0.6.1 Ignite interpreter built with Scala 2.11 jdbc org.apache.zeppelin:zeppelin-jdbc:0.6.1 Jdbc interpreter From 75fc4f7eeb6dcc36d10ff61a3489fcd694e7129c Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Wed, 17 Aug 2016 22:15:24 +0200 Subject: [PATCH 10/21] add interpreter to navigation.html and remove extra spaces and lines --- beam/pom.xml | 5 +---- .../java/org/apache/zeppelin/beam/StaticRepl.java | 12 +++++++----- docs/_includes/themes/zeppelin/_navigation.html | 1 + docs/interpreter/beam.md | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/beam/pom.xml b/beam/pom.xml index 09ac0d275cf..3156c172a66 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -14,10 +14,7 @@ 4.0.0 - - - - + zeppelin org.apache.zeppelin diff --git a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java index e0a8e32f7e7..8a57f006616 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java @@ -84,8 +84,10 @@ public static String execute(String generatedClassName, String code) throws Exce } // if there isn't Main method, will retuen error - if (mainClassName == null) + if (mainClassName == null) { + LOGGER.error("Exception for Main method", "There isn't any class containing Main method."); throw new Exception("There isn't any class containing Main method."); + } // replace name of class containing Main method with generated name code = code.replace(mainClassName, generatedClassName); @@ -150,22 +152,22 @@ public static String execute(String generatedClassName, String code) throws Exce LOGGER.error("Exception in Interpreter while Class not found", e); System.err.println("Class not found: " + e); throw new Exception(baosErr.toString()); - + } catch (NoSuchMethodException e) { LOGGER.error("Exception in Interpreter while No such method", e); System.err.println("No such method: " + e); throw new Exception(baosErr.toString()); - + } catch (IllegalAccessException e) { LOGGER.error("Exception in Interpreter while Illegal access", e); System.err.println("Illegal access: " + e); throw new Exception(baosErr.toString()); - + } catch (InvocationTargetException e) { LOGGER.error("Exception in Interpreter while Invocation target", e); System.err.println("Invocation target: " + e); throw new Exception(baosErr.toString()); - + } finally { System.out.flush(); diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index 9bd9967244a..e86ffb789bb 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -47,6 +47,7 @@
  • Available Interpreters
  • Alluxio
  • +
  • Beam
  • BigQuery
  • Cassandra
  • Elasticsearch
  • diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md index 17d34eb8fa6..1ac3e4b7b31 100644 --- a/docs/interpreter/beam.md +++ b/docs/interpreter/beam.md @@ -26,12 +26,12 @@ limitations under the License. ## Overview [Apache Beam](http://beam.incubator.apache.org) is an open source unified platform for data processing pipelines. A pipeline can be build using one of the Beam SDKs. -The execution of the pipeline is done by different Runners . Currently, Beam supports Apache Flink Runner, Apache Spark Runner, and Google Dataflow Runner. +The execution of the pipeline is done by different Runners. Currently, Beam supports Apache Flink Runner, Apache Spark Runner, and Google Dataflow Runner. ## How to use -Basically, you can write normal Beam java code where you can determine the Runner. You should write the main method inside a class becuase the interpreter invoke this main to execute the pipline. Unlike Zeppelin normal pattern, each paragraph is considered a separate job, there isn't any relation to any other paragraph. +Basically, you can write normal Beam java code where you can determine the Runner. You should write the main method inside a class becuase the interpreter invoke this main to execute the pipline. Unlike Zeppelin normal pattern, each paragraph is considered a separate job, there isn't any relation to any other paragraph. -The following is a demonstration of a word count example +The following is a demonstration of a word count example ``` %beam From 5cb7c7b1a2e81d985dd4f890d273dde102082cce Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Sun, 21 Aug 2016 19:26:24 +0200 Subject: [PATCH 11/21] Add some changes to doc and pom file --- beam/pom.xml | 600 +++++++++++++++++++-------------------- docs/interpreter/beam.md | 13 +- 2 files changed, 299 insertions(+), 314 deletions(-) diff --git a/beam/pom.xml b/beam/pom.xml index 3156c172a66..e59ab094127 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -10,308 +10,298 @@ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - - - 4.0.0 - - - zeppelin - org.apache.zeppelin - 0.7.0-SNAPSHOT - .. - - - org.apache.zeppelin - zeppelin-beam - jar - 0.7.0-SNAPSHOT - Zeppelin: Beam interpreter - - - 2.3.0 - 1.4.1 - 0.1.0-incubating - - - - - - org.apache.spark - spark-core_2.10 - ${beam.spark.version} - - - slf4j-log4j12 - org.slf4j - - - netty-all - io.netty - - - akka-actor_2.10 - org.spark-project.akka - - - akka-remote_2.10 - org.spark-project.akka - - - akka-slf4j_2.10 - org.spark-project.akka - - - - - - org.apache.spark - spark-streaming_2.10 - ${beam.spark.version} - - - - org.apache.hadoop - hadoop-mapreduce-client-core - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-common - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-hdfs - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-client - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-annotations - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-yarn-common - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-mapreduce-client-common - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-core - 1.2.1 - - - - com.thoughtworks.qdox - qdox - 2.0-M3 - - - - org.apache.beam - beam-runners-parent - ${beam.beam.version} - pom - - - - org.apache.beam - beam-runners-core-java - ${beam.beam.version} - - - google-http-client-jackson2 - com.google.http-client - - - - - - org.apache.beam - beam-runners-direct-java - ${beam.beam.version} - - - - org.apache.beam - beam-runners-flink_2.10 - ${beam.beam.version} - - - - slf4j-log4j12 - org.slf4j - - - netty-all - io.netty - - - - - - - org.apache.beam - beam-runners-flink_2.10-examples - ${beam.beam.version} - - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.beam - beam-runners-google-cloud-dataflow-java - ${beam.beam.version} - - - - google-http-client-jackson2 - com.google.http-client - - - - - - org.apache.beam - beam-runners-spark - ${beam.beam.version} - jar - - - - ${project.groupId} - zeppelin-interpreter - ${project.version} - provided - - - - org.apache.commons - commons-exec - 1.3 - - - - junit - junit - test - - - - - - - - - org.apache.maven.plugins - maven-deploy-plugin - 2.7 - - true - - - - - maven-enforcer-plugin - 1.3.1 - - - enforce - none - - - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/../../interpreter/beam - - false - false - true - runtime - - - - copy-artifact - package - - copy - - - ${project.build.directory}/../../interpreter/beam - - false - false - true - runtime - - - ${project.groupId} - ${project.artifactId} - ${project.version} - ${project.packaging} - - - - - - - - - \ No newline at end of file + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.7.0-SNAPSHOT + .. + + + org.apache.zeppelin + zeppelin-beam + jar + 0.7.0-SNAPSHOT + Zeppelin: Beam interpreter + + + 2.3.0 + 1.4.1 + 0.1.0-incubating + + + + + org.apache.spark + spark-core_2.10 + ${beam.spark.version} + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + akka-actor_2.10 + org.spark-project.akka + + + akka-remote_2.10 + org.spark-project.akka + + + akka-slf4j_2.10 + org.spark-project.akka + + + + + + org.apache.spark + spark-streaming_2.10 + ${beam.spark.version} + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-common + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-hdfs + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-client + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-annotations + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-yarn-common + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-mapreduce-client-common + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-core + 1.2.1 + + + + com.thoughtworks.qdox + qdox + 2.0-M3 + + + + org.apache.beam + beam-runners-parent + ${beam.beam.version} + pom + + + + org.apache.beam + beam-runners-core-java + ${beam.beam.version} + + + google-http-client-jackson2 + com.google.http-client + + + + + + org.apache.beam + beam-runners-direct-java + ${beam.beam.version} + + + + org.apache.beam + beam-runners-flink_2.10 + ${beam.beam.version} + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + + + + org.apache.beam + beam-runners-flink_2.10-examples + ${beam.beam.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${beam.beam.version} + + + google-http-client-jackson2 + com.google.http-client + + + + + + org.apache.beam + beam-runners-spark + ${beam.beam.version} + jar + + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.commons + commons-exec + 1.3 + + + + junit + junit + test + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/beam + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/beam + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md index 1ac3e4b7b31..1f66edbe3f4 100644 --- a/docs/interpreter/beam.md +++ b/docs/interpreter/beam.md @@ -1,7 +1,7 @@ --- layout: page -title: "Beam Interpreter" -description: "" +title: Beam interpreter in Apache Zeppelin +description: Apache Beam is an open source, unified programming model that you can use to create a data processing pipeline. group: interpreter --- - - 4.0.0 - - - zeppelin - org.apache.zeppelin - 0.7.0-SNAPSHOT - .. - - - org.apache.zeppelin - zeppelin-beam - jar - 0.7.0-SNAPSHOT - Zeppelin: Beam interpreter - - - 2.3.0 - 1.4.1 - 0.2.0-incubating - - - - - org.apache.spark - spark-core_2.10 - ${beam.spark.version} - - - slf4j-log4j12 - org.slf4j - - - netty-all - io.netty - - - akka-actor_2.10 - org.spark-project.akka - - - akka-remote_2.10 - org.spark-project.akka - - - akka-slf4j_2.10 - org.spark-project.akka - - - - - - org.apache.spark - spark-streaming_2.10 - ${beam.spark.version} - + + 4.0.0 - - org.apache.hadoop - hadoop-mapreduce-client-core - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-common - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-hdfs - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-client - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-annotations - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-yarn-common - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-mapreduce-client-common - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - + + zeppelin + org.apache.zeppelin + 0.7.0-SNAPSHOT + .. + - - com.thoughtworks.qdox - qdox - 2.0-M3 - - - - org.apache.beam - beam-runners-parent - ${beam.beam.version} - pom - - - - org.apache.beam - beam-runners-core-java - ${beam.beam.version} - - - google-http-client-jackson2 - com.google.http-client - - - - - - org.apache.beam - beam-runners-direct-java - ${beam.beam.version} - - - - org.apache.beam - beam-runners-flink_2.10 - ${beam.beam.version} - - - slf4j-log4j12 - org.slf4j - - - netty-all - io.netty - - - - - - org.apache.beam - beam-runners-flink_2.10-examples - ${beam.beam.version} - - - slf4j-log4j12 - org.slf4j - - - - - javax.servlet - javax.servlet-api - 3.1.0 - + org.apache.zeppelin + zeppelin-beam + jar + 0.7.0-SNAPSHOT + Zeppelin: Beam interpreter - - - org.apache.beam - beam-runners-google-cloud-dataflow-java - ${beam.beam.version} - - - google-http-client-jackson2 - com.google.http-client - - - - - - org.apache.beam - beam-runners-spark - ${beam.beam.version} - jar - - - - ${project.groupId} - zeppelin-interpreter - ${project.version} - provided - - - - org.apache.commons - commons-exec - 1.3 - - - - junit - junit - test - - - - - - - - org.apache.maven.plugins - maven-deploy-plugin - 2.7 - - true - - - - - maven-enforcer-plugin - 1.3.1 - - - enforce - none - - - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/../../interpreter/beam - false - false - true - runtime - - - - copy-artifact - package - - copy - - - ${project.build.directory}/../../interpreter/beam - false - false - true - runtime - - - ${project.groupId} - ${project.artifactId} - ${project.version} - ${project.packaging} - - - - - - - - + + 2.3.0 + 1.4.1 + 0.2.0-incubating + + + + + org.apache.spark + spark-core_2.10 + ${beam.spark.version} + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + akka-actor_2.10 + org.spark-project.akka + + + akka-remote_2.10 + org.spark-project.akka + + + akka-slf4j_2.10 + org.spark-project.akka + + + + + + org.apache.spark + spark-streaming_2.10 + ${beam.spark.version} + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-common + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-hdfs + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-client + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-annotations + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-yarn-common + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-mapreduce-client-common + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.thoughtworks.qdox + qdox + 2.0-M3 + + + + org.apache.beam + beam-runners-parent + ${beam.beam.version} + pom + + + + org.apache.beam + beam-runners-core-java + ${beam.beam.version} + + + google-http-client-jackson2 + com.google.http-client + + + + + + org.apache.beam + beam-runners-direct-java + ${beam.beam.version} + + + + org.apache.beam + beam-runners-flink_2.10 + ${beam.beam.version} + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + + + + org.apache.beam + beam-runners-flink_2.10-examples + ${beam.beam.version} + + + slf4j-log4j12 + org.slf4j + + + + + javax.servlet + javax.servlet-api + 3.1.0 + + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${beam.beam.version} + + + google-http-client-jackson2 + com.google.http-client + + + + + + org.apache.beam + beam-runners-spark + ${beam.beam.version} + jar + + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.commons + commons-exec + 1.3 + + + + junit + junit + test + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/beam + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/beam + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java index 2b2ab4ecd74..415fd30bf4f 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.beam; import java.io.File; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -30,12 +31,12 @@ import org.slf4j.LoggerFactory; /** -* Beam interpreter -* -*/ + * Beam interpreter + * + */ public class BeamInterpreter extends Interpreter { - Logger LOGGER = LoggerFactory.getLogger(BeamInterpreter.class); + Logger logger = LoggerFactory.getLogger(BeamInterpreter.class); public BeamInterpreter(Properties property) { super(property); @@ -52,8 +53,9 @@ public void close() { // delete all .class files created while compilation process for (int i = 0; i < dir.list().length; i++) { File f = dir.listFiles()[i]; - if (f.getAbsolutePath().contains(".class")) + if (f.getAbsolutePath().contains(".class")) { f.delete(); + } } } @@ -67,7 +69,7 @@ public InterpreterResult interpret(String code, InterpreterContext context) { String res = StaticRepl.execute(generatedClassName, code); return new InterpreterResult(InterpreterResult.Code.SUCCESS, res); } catch (Exception e) { - LOGGER.error("Exception in Interpreter while interpret", e); + logger.error("Exception in Interpreter while interpret", e); return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); } @@ -91,7 +93,7 @@ public int getProgress(InterpreterContext context) { @Override public List completion(String buf, int cursor) { - return null; + return Collections.emptyList(); } } diff --git a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java index 8ed355fc9a1..670992f5713 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java @@ -35,9 +35,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.PrintStream; -import java.io.PrintWriter; import java.io.StringReader; -import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URL; @@ -46,11 +44,12 @@ import java.util.List; /** + * * StaticRepl for compling the java code in memory * */ public class StaticRepl { - static Logger LOGGER = LoggerFactory.getLogger(StaticRepl.class); + static Logger logger = LoggerFactory.getLogger(StaticRepl.class); public static String execute(String generatedClassName, String code) throws Exception { @@ -70,22 +69,21 @@ public static String execute(String generatedClassName, String code) throws Exce boolean hasMain = false; for (int j = 0; j < classes.get(i).getMethods().size(); j++) { - if (classes.get(i).getMethods().get(j).getName().equals("main")) { mainClassName = classes.get(i).getName(); hasMain = true; break; } - } - if (hasMain == true) + if (hasMain == true) { break; + } } // if there isn't Main method, will retuen error if (mainClassName == null) { - LOGGER.error("Exception for Main method", "There isn't any class containing Main method."); + logger.error("Exception for Main method", "There isn't any class containing Main method."); throw new Exception("There isn't any class containing Main method."); } @@ -116,8 +114,9 @@ public static String execute(String generatedClassName, String code) throws Exce // if success is false will get error if (!success) { for (Diagnostic diagnostic : diagnostics.getDiagnostics()) { - if (diagnostic.getLineNumber() == -1) + if (diagnostic.getLineNumber() == -1) { continue; + } System.err.println("line " + diagnostic.getLineNumber() + " : " + diagnostic.getMessage(null)); } @@ -126,7 +125,7 @@ public static String execute(String generatedClassName, String code) throws Exce System.setOut(oldOut); System.setErr(oldErr); - LOGGER.error("Exception in Interpreter while compilation", baosErr.toString()); + logger.error("Exception in Interpreter while compilation", baosErr.toString()); throw new Exception(baosErr.toString()); } else { try { @@ -149,25 +148,25 @@ public static String execute(String generatedClassName, String code) throws Exce return baosOut.toString(); } catch (ClassNotFoundException e) { - LOGGER.error("Exception in Interpreter while Class not found", e); + logger.error("Exception in Interpreter while Class not found", e); System.err.println("Class not found: " + e); e.printStackTrace(newErr); throw new Exception(baosErr.toString()); } catch (NoSuchMethodException e) { - LOGGER.error("Exception in Interpreter while No such method", e); + logger.error("Exception in Interpreter while No such method", e); System.err.println("No such method: " + e); e.printStackTrace(newErr); throw new Exception(baosErr.toString()); } catch (IllegalAccessException e) { - LOGGER.error("Exception in Interpreter while Illegal access", e); + logger.error("Exception in Interpreter while Illegal access", e); System.err.println("Illegal access: " + e); e.printStackTrace(newErr); throw new Exception(baosErr.toString()); } catch (InvocationTargetException e) { - LOGGER.error("Exception in Interpreter while Invocation target", e); + logger.error("Exception in Interpreter while Invocation target", e); System.err.println("Invocation target: " + e); e.printStackTrace(newErr); throw new Exception(baosErr.toString()); diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md index 1f66edbe3f4..cbcd5e37d51 100644 --- a/docs/interpreter/beam.md +++ b/docs/interpreter/beam.md @@ -31,38 +31,85 @@ The execution of the pipeline is done by different Runners. Currently, Beam supp ## How to use Basically, you can write normal Beam java code where you can determine the Runner. You should write the main method inside a class becuase the interpreter invoke this main to execute the pipeline. Unlike Zeppelin normal pattern, each paragraph is considered as a separate job, there isn't any relation to any other paragraph. -The following is a demonstration of a word count example +The following is a demonstration of a word count example with data represented in array of strings +But it can read data from files by replacing `Create.of(SENTENCES).withCoder(StringUtf8Coder.of())` with `TextIO.Read.from("path/to/filename.txt")` ```java %beam -// imports are omitted to save space +// most used imports +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.*; +import org.apache.spark.SparkContext; +import org.apache.beam.runners.direct.*; +import org.apache.beam.sdk.runners.*; +import org.apache.beam.sdk.options.*; +import org.apache.beam.runners.spark.*; +import org.apache.beam.runners.spark.io.ConsoleIO; +import org.apache.beam.runners.flink.*; +import org.apache.beam.runners.flink.examples.WordCount.Options; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.options.PipelineOptions; public class MinimalWordCount { static List s = new ArrayList<>(); + + static final String[] SENTENCES_ARRAY = new String[] { + "Hadoop is the Elephant King!", + "A yellow and elegant thing.", + "He never forgets", + "Useful data, or lets", + "An extraneous element cling!", + "A wonderful king is Hadoop.", + "The elephant plays well with Sqoop.", + "But what helps him to thrive", + "Are Impala, and Hive,", + "And HDFS in the group.", + "Hadoop is an elegant fellow.", + "An elephant gentle and mellow.", + "He never gets mad,", + "Or does anything bad,", + "Because, at his core, he is yellow", + }; + static final List SENTENCES = Arrays.asList(SENTENCES_ARRAY); public static void main(String[] args) { Options options = PipelineOptionsFactory.create().as(Options.class); - options.setRunner(FlinkPipelineRunner.class); + options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.from("/home/admin/mahmoud/work/bigdata/beam/shakespeare/input/file1.txt")) - .apply(ParDo.named("ExtractWords").of(new DoFn() { - @Override - public void processElement(ProcessContext c) { - for (String word : c.element().split("[^a-zA-Z']+")) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - })) + p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of())) + .apply("ExtractWords", ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) { + for (String word : c.element().split("[^a-zA-Z']+")) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + })) .apply(Count. perElement()) .apply("FormatResults", ParDo.of(new DoFn, String>() { @Override public void processElement(DoFn, String>.ProcessContext arg0) - throws Exception { + throws Exception { s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue()); - - } + } })); p.run(); System.out.println("%table word\tcount"); From 3d654271c93dd324d3cec85a0c2505e9214a4c07 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Wed, 7 Sep 2016 15:48:48 +0200 Subject: [PATCH 16/21] update .travis.yml file --- .travis.yml | 4 +- beam/pom.xml | 563 ++++++++++++++++++++++++--------------------------- 2 files changed, 269 insertions(+), 298 deletions(-) diff --git a/.travis.yml b/.travis.yml index 27a0e393cbf..4470ab83526 100644 --- a/.travis.yml +++ b/.travis.yml @@ -40,7 +40,7 @@ matrix: # Test all modules with spark 2.0.0 and scala 2.11 - jdk: "oraclejdk7" - env: SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + env: SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -pl \!beam -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test all modules with scala 2.10 - jdk: "oraclejdk7" @@ -48,7 +48,7 @@ matrix: # Test all modules with scala 2.11 - jdk: "oraclejdk7" - env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -pl \!beam -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test spark module for 1.5.2 - jdk: "oraclejdk7" diff --git a/beam/pom.xml b/beam/pom.xml index d22690df45d..7c31ada8c62 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -10,299 +10,270 @@ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - - 4.0.0 - - - zeppelin - org.apache.zeppelin - 0.7.0-SNAPSHOT - .. - - - org.apache.zeppelin - zeppelin-beam - jar - 0.7.0-SNAPSHOT - Zeppelin: Beam interpreter - - - 2.3.0 - 1.4.1 - 0.2.0-incubating - - - - - org.apache.spark - spark-core_2.10 - ${beam.spark.version} - - - slf4j-log4j12 - org.slf4j - - - netty-all - io.netty - - - akka-actor_2.10 - org.spark-project.akka - - - akka-remote_2.10 - org.spark-project.akka - - - akka-slf4j_2.10 - org.spark-project.akka - - - - - - org.apache.spark - spark-streaming_2.10 - ${beam.spark.version} - - - - org.apache.hadoop - hadoop-mapreduce-client-core - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-common - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-hdfs - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-client - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hadoop - hadoop-annotations - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-yarn-common - ${beam.hadoop.version} - - - - org.apache.hadoop - hadoop-mapreduce-client-common - ${beam.hadoop.version} - - - slf4j-log4j12 - org.slf4j - - - - - - com.thoughtworks.qdox - qdox - 2.0-M3 - - - - org.apache.beam - beam-runners-parent - ${beam.beam.version} - pom - - - - org.apache.beam - beam-runners-core-java - ${beam.beam.version} - - - google-http-client-jackson2 - com.google.http-client - - - - - - org.apache.beam - beam-runners-direct-java - ${beam.beam.version} - - - - org.apache.beam - beam-runners-flink_2.10 - ${beam.beam.version} - - - slf4j-log4j12 - org.slf4j - - - netty-all - io.netty - - - - - - org.apache.beam - beam-runners-flink_2.10-examples - ${beam.beam.version} - - - slf4j-log4j12 - org.slf4j - - - - - javax.servlet - javax.servlet-api - 3.1.0 - - - - - org.apache.beam - beam-runners-google-cloud-dataflow-java - ${beam.beam.version} - - - google-http-client-jackson2 - com.google.http-client - - - - - - org.apache.beam - beam-runners-spark - ${beam.beam.version} - jar - - - - ${project.groupId} - zeppelin-interpreter - ${project.version} - provided - - - - org.apache.commons - commons-exec - 1.3 - - - - junit - junit - test - - - - - - - - org.apache.maven.plugins - maven-deploy-plugin - 2.7 - - true - - - - - maven-enforcer-plugin - 1.3.1 - - - enforce - none - - - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/../../interpreter/beam - false - false - true - runtime - - - - copy-artifact - package - - copy - - - ${project.build.directory}/../../interpreter/beam - false - false - true - runtime - - - ${project.groupId} - ${project.artifactId} - ${project.version} - ${project.packaging} - - - - - - - - - + + 4.0.0 + + zeppelin + org.apache.zeppelin + 0.7.0-SNAPSHOT + .. + + org.apache.zeppelin + zeppelin-beam + jar + 0.7.0-SNAPSHOT + Zeppelin: Beam interpreter + + 2.3.0 + 1.4.1 + 0.2.0-incubating + + + + org.apache.spark + spark-core_2.10 + ${beam.spark.version} + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + akka-actor_2.10 + org.spark-project.akka + + + akka-remote_2.10 + org.spark-project.akka + + + akka-slf4j_2.10 + org.spark-project.akka + + + + + org.apache.spark + spark-streaming_2.10 + ${beam.spark.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-common + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-hdfs + ${beam.hadoop.version} + + + org.apache.hadoop + hadoop-client + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-annotations + ${beam.hadoop.version} + + + org.apache.hadoop + hadoop-yarn-common + ${beam.hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-common + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + com.thoughtworks.qdox + qdox + 2.0-M3 + + + org.apache.beam + beam-runners-parent + ${beam.beam.version} + pom + + + org.apache.beam + beam-runners-core-java + ${beam.beam.version} + + + google-http-client-jackson2 + com.google.http-client + + + + + org.apache.beam + beam-runners-direct-java + ${beam.beam.version} + + + org.apache.beam + beam-runners-flink_2.10 + ${beam.beam.version} + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + + + org.apache.beam + beam-runners-flink_2.10-examples + ${beam.beam.version} + + + slf4j-log4j12 + org.slf4j + + + + + javax.servlet + javax.servlet-api + 3.1.0 + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${beam.beam.version} + + + google-http-client-jackson2 + com.google.http-client + + + + + org.apache.beam + beam-runners-spark + ${beam.beam.version} + jar + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + org.apache.commons + commons-exec + 1.3 + + + junit + junit + test + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/beam + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/beam + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + \ No newline at end of file From ca88f942f863181b15a3bb11ea02c82f37129bcd Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Thu, 8 Sep 2016 11:34:26 +0200 Subject: [PATCH 17/21] edit pom file and .travis.yml --- .travis.yml | 4 ++-- beam/pom.xml | 59 +++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4470ab83526..9936994d78d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -40,7 +40,7 @@ matrix: # Test all modules with spark 2.0.0 and scala 2.11 - jdk: "oraclejdk7" - env: SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -pl \!beam -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + env: SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -pl !beam -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test all modules with scala 2.10 - jdk: "oraclejdk7" @@ -48,7 +48,7 @@ matrix: # Test all modules with scala 2.11 - jdk: "oraclejdk7" - env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -pl \!beam -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -pl !beam -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test spark module for 1.5.2 - jdk: "oraclejdk7" diff --git a/beam/pom.xml b/beam/pom.xml index 7c31ada8c62..03a81e70ce5 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -1,33 +1,43 @@ - + + 4.0.0 + zeppelin org.apache.zeppelin 0.7.0-SNAPSHOT .. + org.apache.zeppelin zeppelin-beam jar 0.7.0-SNAPSHOT Zeppelin: Beam interpreter + 2.3.0 1.4.1 0.2.0-incubating + org.apache.spark @@ -56,11 +66,13 @@
    + org.apache.spark spark-streaming_2.10 ${beam.spark.version} + org.apache.hadoop hadoop-mapreduce-client-core @@ -72,6 +84,7 @@
    + org.apache.hadoop hadoop-common @@ -83,11 +96,13 @@
    + org.apache.hadoop hadoop-hdfs ${beam.hadoop.version} + org.apache.hadoop hadoop-client @@ -99,16 +114,19 @@ + org.apache.hadoop hadoop-annotations ${beam.hadoop.version} + org.apache.hadoop hadoop-yarn-common ${beam.hadoop.version} + org.apache.hadoop hadoop-mapreduce-client-common @@ -120,17 +138,20 @@ + com.thoughtworks.qdox qdox 2.0-M3 + org.apache.beam beam-runners-parent ${beam.beam.version} pom + org.apache.beam beam-runners-core-java @@ -142,11 +163,13 @@ + org.apache.beam beam-runners-direct-java ${beam.beam.version} + org.apache.beam beam-runners-flink_2.10 @@ -162,6 +185,7 @@ + org.apache.beam beam-runners-flink_2.10-examples @@ -173,11 +197,13 @@ + javax.servlet javax.servlet-api 3.1.0 + org.apache.beam beam-runners-google-cloud-dataflow-java @@ -189,31 +215,37 @@ + org.apache.beam beam-runners-spark ${beam.beam.version} jar + ${project.groupId} zeppelin-interpreter ${project.version} provided + org.apache.commons commons-exec 1.3 + junit junit test +
    + org.apache.maven.plugins maven-deploy-plugin @@ -222,6 +254,7 @@ true + maven-enforcer-plugin 1.3.1 @@ -232,6 +265,7 @@ + maven-dependency-plugin 2.8 @@ -274,6 +308,7 @@ + -
    \ No newline at end of file + From 750041cbf8c8697fef2a4a1416b697f260e3091a Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Fri, 9 Sep 2016 18:48:28 +0200 Subject: [PATCH 18/21] Add readme file and modify pom file and travis.yml --- .travis.yml | 6 +++--- beam/README.md | 25 +++++++++++++++++++++++++ pom.xml | 8 +++++++- 3 files changed, 35 insertions(+), 4 deletions(-) create mode 100644 beam/README.md diff --git a/.travis.yml b/.travis.yml index 9936994d78d..e2a115cc89f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -40,15 +40,15 @@ matrix: # Test all modules with spark 2.0.0 and scala 2.11 - jdk: "oraclejdk7" - env: SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -pl !beam -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + env: SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test all modules with scala 2.10 - jdk: "oraclejdk7" - env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.10" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pbeam -Pexamples -Pscala-2.10" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test all modules with scala 2.11 - jdk: "oraclejdk7" - env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -pl !beam -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test spark module for 1.5.2 - jdk: "oraclejdk7" diff --git a/beam/README.md b/beam/README.md new file mode 100644 index 00000000000..57150a0208a --- /dev/null +++ b/beam/README.md @@ -0,0 +1,25 @@ +# Overview +Beam interpreter for Apache Zeppelin + +# Architecture +Current interpreter implementation supports the static repl. It compiles the code in memory, execute it and redirect the output to zeppelin. + +## Building the Beam Interpreter +You have to first build the Beam interpreter by enable the **beam** profile as follows: + +``` +mvn clean package -Pbeam -DskipTests +``` + +### Notice +- Flink runner comes with binary compiled for scala 2.10. So, currently we support only Scala 2.10 + +### Technical overview + + * Upon starting an interpreter, an instance of `JavaCompiler` is created. + + * When the user runs commands with beam, the `JavaParser` go through the code to get a class that contains the main method. + + * Then it replaces the class name with random class name to avoid overriding while compilation. it creates new out & err stream to get the data in new stream instead of the console, to redirect output to zeppelin. + + * If there is any error during compilation, it can catch and redirect to zeppelin. diff --git a/pom.xml b/pom.xml index 38c5e86b2f1..c93f4b8d782 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,6 @@ angular shell livy - beam hbase postgresql jdbc @@ -580,6 +579,13 @@ + + beam + + beam + + + examples From 27d769005ec7bc075b42b10485c7d69dfff96063 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Sat, 17 Sep 2016 23:02:32 +0200 Subject: [PATCH 19/21] set spark version to 1.6.1 and some modifications --- beam/pom.xml | 8 +++++- .../apache/zeppelin/beam/BeamInterpreter.java | 2 +- .../org/apache/zeppelin/beam/StaticRepl.java | 25 +++---------------- 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/beam/pom.xml b/beam/pom.xml index 03a81e70ce5..221da6d545a 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -34,11 +34,17 @@ 2.3.0 - 1.4.1 + 1.6.1 0.2.0-incubating + + io.netty + netty-all + 4.1.1.Final + + org.apache.spark spark-core_2.10 diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java index 415fd30bf4f..caa91c3e4cb 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -53,7 +53,7 @@ public void close() { // delete all .class files created while compilation process for (int i = 0; i < dir.list().length; i++) { File f = dir.listFiles()[i]; - if (f.getAbsolutePath().contains(".class")) { + if (f.getAbsolutePath().endsWith(".class")) { f.delete(); } } diff --git a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java index 670992f5713..65e481b7118 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java @@ -147,27 +147,10 @@ public static String execute(String generatedClassName, String code) throws Exce return baosOut.toString(); - } catch (ClassNotFoundException e) { - logger.error("Exception in Interpreter while Class not found", e); - System.err.println("Class not found: " + e); - e.printStackTrace(newErr); - throw new Exception(baosErr.toString()); - - } catch (NoSuchMethodException e) { - logger.error("Exception in Interpreter while No such method", e); - System.err.println("No such method: " + e); - e.printStackTrace(newErr); - throw new Exception(baosErr.toString()); - - } catch (IllegalAccessException e) { - logger.error("Exception in Interpreter while Illegal access", e); - System.err.println("Illegal access: " + e); - e.printStackTrace(newErr); - throw new Exception(baosErr.toString()); - - } catch (InvocationTargetException e) { - logger.error("Exception in Interpreter while Invocation target", e); - System.err.println("Invocation target: " + e); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException e) { + logger.error("Exception in Interpreter while execution", e); + System.err.println(e); e.printStackTrace(newErr); throw new Exception(baosErr.toString()); From 55c1322356bc23349be2d42dd81a15d8320de82a Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Sun, 18 Sep 2016 15:18:48 +0200 Subject: [PATCH 20/21] set spark version to 1.6.2 and throw original exception --- beam/pom.xml | 2 +- .../main/java/org/apache/zeppelin/beam/StaticRepl.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/beam/pom.xml b/beam/pom.xml index 221da6d545a..b0f165647b6 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -34,7 +34,7 @@ 2.3.0 - 1.6.1 + 1.6.2 0.2.0-incubating diff --git a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java index 65e481b7118..d14151dea3f 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java @@ -69,7 +69,8 @@ public static String execute(String generatedClassName, String code) throws Exce boolean hasMain = false; for (int j = 0; j < classes.get(i).getMethods().size(); j++) { - if (classes.get(i).getMethods().get(j).getName().equals("main")) { + if (classes.get(i).getMethods().get(j).getName().equals("main") && classes.get(i) + .getMethods().get(j).getModifiers().toString().contains("static")) { mainClassName = classes.get(i).getName(); hasMain = true; break; @@ -83,8 +84,9 @@ public static String execute(String generatedClassName, String code) throws Exce // if there isn't Main method, will retuen error if (mainClassName == null) { - logger.error("Exception for Main method", "There isn't any class containing Main method."); - throw new Exception("There isn't any class containing Main method."); + logger.error("Exception for Main method", "There isn't any class " + + "containing static main method."); + throw new Exception("There isn't any class containing static main method."); } // replace name of class containing Main method with generated name @@ -152,7 +154,7 @@ public static String execute(String generatedClassName, String code) throws Exce logger.error("Exception in Interpreter while execution", e); System.err.println(e); e.printStackTrace(newErr); - throw new Exception(baosErr.toString()); + throw new Exception(baosErr.toString(), e); } finally { From da66c2782799220e38017d7d91713e2e125b9056 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Tue, 20 Sep 2016 15:59:07 +0200 Subject: [PATCH 21/21] Modify condition of checking static modifier --- beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java index d14151dea3f..ed81146bb1c 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java @@ -70,7 +70,7 @@ public static String execute(String generatedClassName, String code) throws Exce for (int j = 0; j < classes.get(i).getMethods().size(); j++) { if (classes.get(i).getMethods().get(j).getName().equals("main") && classes.get(i) - .getMethods().get(j).getModifiers().toString().contains("static")) { + .getMethods().get(j).isStatic()) { mainClassName = classes.get(i).getName(); hasMain = true; break;