From f2a66df49ecd9d4fe780d303b1aaa55ee86ca76f Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 11 May 2015 16:56:00 +0200 Subject: [PATCH 1/8] Initial implementation of interpreter for Apache Flink --- conf/zeppelin-site.xml.template | 2 +- flink/pom.xml | 390 ++++++++++++++++++ .../zeppelin/flink/FlinkEnvironment.java | 130 ++++++ .../org/apache/zeppelin/flink/FlinkIMain.java | 74 ++++ .../zeppelin/flink/FlinkInterpreter.java | 317 ++++++++++++++ .../org/apache/zeppelin/flink/JarHelper.java | 199 +++++++++ .../zeppelin/flink/FlinkInterpreterTest.java | 64 +++ pom.xml | 1 + .../zeppelin/conf/ZeppelinConfiguration.java | 3 +- 9 files changed, 1178 insertions(+), 2 deletions(-) create mode 100644 flink/pom.xml create mode 100644 flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java create mode 100644 flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java create mode 100644 flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java create mode 100644 flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java create mode 100644 flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 9f773d50add..e10c85e953c 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -66,7 +66,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/flink/pom.xml b/flink/pom.xml new file mode 100644 index 00000000000..8dcd236c3d8 --- /dev/null +++ b/flink/pom.xml @@ -0,0 +1,390 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.5.0-incubating-SNAPSHOT + + + org.apache.zeppelin + zeppelin-flink + jar + 0.5.0-incubating-SNAPSHOT + Zeppelin: Flink + Zeppelin flink support + http://zeppelin.incubator.apache.org + + + 0.9.0-milestone-1 + 2.3.7 + 2.10 + 2.10.4 + + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + + com.google.code.gson + gson + + + + commons-collections + commons-collections + + + + org.apache.flink + flink-core + ${flink.version} + + + + org.apache.flink + flink-clients + ${flink.version} + + + + org.apache.flink + flink-runtime + ${flink.version} + + + com.typesafe.akka + akka-actor_2.10 + + + com.typesafe.akka + akka-remote_2.10 + + + com.typesafe.akka + akka-slf4j_2.10 + + + + + + org.apache.flink + flink-scala + ${flink.version} + + + + com.typesafe.akka + akka-actor_${flink.scala.binary.version} + ${flink.akka.version} + + + + com.typesafe.akka + akka-remote_${flink.scala.binary.version} + ${flink.akka.version} + + + + com.typesafe.akka + akka-slf4j_${flink.scala.binary.version} + ${flink.akka.version} + + + + com.typesafe.akka + akka-testkit_${flink.scala.binary.version} + ${flink.akka.version} + + + + + org.scala-lang + scala-library + ${flink.scala.version} + + + + org.scala-lang + scala-compiler + ${flink.scala.version} + + + + org.scala-lang + scala-reflect + ${flink.scala.version} + + + + junit + junit + test + + + + + + + org.apache.rat + apache-rat-plugin + + + **/.idea/ + **/*.iml + .gitignore + **/.settings/* + **/.classpath + **/.project + **/target/** + **/README.md + dependency-reduced-pom.xml + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.1.4 + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.8 + + true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + + + **/*.scala + **/*.java + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + + add-test-source + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + org.scalastyle + scalastyle-maven-plugin + 0.5.0 + + + + check + + + + + false + true + true + false + ${basedir}/src/main/scala + ${basedir}/src/test/scala + ${project.basedir}/../_tools/scalastyle.xml + ${project.basedir}/target/scalastyle-output.xml + UTF-8 + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.17 + + 1 + false + -Xmx1024m -XX:MaxPermSize=256m + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.4 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/flink + false + false + true + runtime + + + + + + + maven-dependency-plugin + 2.8 + + + package + + copy + + + ${project.build.directory}/../../interpreter/flink + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java new file mode 100644 index 00000000000..ed3e8915046 --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import java.io.File; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.PlanExecutor; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class FlinkEnvironment extends ExecutionEnvironment { + Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class); + + private String host; + private int port; + + private FlinkIMain imain; + + public FlinkEnvironment(String host, int port, FlinkIMain imain) { + this.host = host; + this.port = port; + this.imain = imain; + + logger.info("jobManager host={}, port={}", host, port); + } + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + JavaPlan plan = createProgramPlan(jobName); + + File jarFile = imain.jar(); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, + jarFile.getAbsolutePath()); + + JobExecutionResult result = executor.executePlan(plan); + + if (jarFile.isFile()) { + jarFile.delete(); + } + + return result; + } + + @Override + public String getExecutionPlan() throws Exception { + JavaPlan plan = createProgramPlan("unnamed", false); + plan.setDefaultParallelism(getParallelism()); + registerCachedFilesWithPlan(plan); + + File jarFile = imain.jar(); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, + jarFile.getAbsolutePath()); + String jsonPlan = executor.getOptimizerPlanAsJSON(plan); + + if (jarFile != null && jarFile.isFile()) { + jarFile.delete(); + } + + return jsonPlan; + } + +/* + private File createJar() throws IOException { + // create execution environment + File jarFile = new File(System.getProperty("java.io.tmpdir") + + "/ZeppelinFlinkJar_" + System.currentTimeMillis() + ".jar"); + + + File[] classFiles = classDir.listFiles(); + if (classFiles == null) { + return null; + } + + byte buffer[] = new byte[10240]; + // Open archive file + FileOutputStream stream = new FileOutputStream(jarFile); + JarOutputStream out = new JarOutputStream(stream, new Manifest()); + + for (int i = 0; i < classFiles.length; i++) { + File classFile = classFiles[i]; + if (classFiles == null || !classFile.exists() + || classFile.isDirectory()) + continue; + + + // Add class + JarEntry jarAdd = new JarEntry(classFile.getName()); + jarAdd.setTime(classFile.lastModified()); + out.putNextEntry(jarAdd); + logger.info("add class {} into jar", classFile); + + // Write file to archive + FileInputStream in = new FileInputStream(classFile); + while (true) { + int nRead = in.read(buffer, 0, buffer.length); + if (nRead <= 0) + break; + out.write(buffer, 0, nRead); + } + in.close(); + } + + out.close(); + stream.close(); + return jarFile; + } + */ + +} diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java new file mode 100644 index 00000000000..57654a4ba0e --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java @@ -0,0 +1,74 @@ +package org.apache.zeppelin.flink; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.collection.Iterator; +import scala.reflect.io.AbstractFile; +import scala.reflect.io.VirtualDirectory; +import scala.tools.nsc.Settings; +import scala.tools.nsc.interpreter.IMain; + +/** + */ +public class FlinkIMain extends IMain { + Logger logger = LoggerFactory.getLogger(FlinkIMain.class); + + public FlinkIMain(Settings setting, PrintWriter out) { + super(setting, out); + } + + public File jar() throws IOException { + VirtualDirectory classDir = virtualDirectory(); + // create execution environment + File jarBuildDir = new File(System.getProperty("java.io.tmpdir") + + "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis()); + jarBuildDir.mkdirs(); + + File jarFile = new File(System.getProperty("java.io.tmpdir") + + "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar"); + + + Iterator vdIt = classDir.iterator(); + while (vdIt.hasNext()) { + AbstractFile fi = vdIt.next(); + if (fi.isDirectory()) { + Iterator fiIt = fi.iterator(); + while (fiIt.hasNext()) { + AbstractFile f = fiIt.next(); + + // directory for compiled line + File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name()); + lineDir.mkdirs(); + + // compiled classes for commands from shell + File writeFile = new File(lineDir.getAbsolutePath(), f.name()); + FileOutputStream outputStream = new FileOutputStream(writeFile); + InputStream inputStream = f.input(); + + // copy file contents + org.apache.commons.io.IOUtils.copy(inputStream, outputStream); + + inputStream.close(); + outputStream.close(); + } + } + } + + // jarr up + JarHelper jh = new JarHelper(); + jh.jarDir(jarBuildDir, jarFile); + + FileUtils.deleteDirectory(jarBuildDir); + return jarFile; + } + + +} diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java new file mode 100644 index 00000000000..76c43900270 --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.client.program.Client; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Console; +import scala.None; +import scala.Some; +import scala.tools.nsc.Settings; +import scala.tools.nsc.settings.MutableSettings.BooleanSetting; +import scala.tools.nsc.settings.MutableSettings.PathSetting; + +/** + * Interpreter for Apache Flink (http://flink.apache.org) + */ +public class FlinkInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class); + private Settings settings; + private ByteArrayOutputStream out; + private FlinkIMain imain; + private File classDir; + private Map binder; + private ExecutionEnvironment env; + private Configuration flinkConf; + private LocalFlinkMiniCluster localFlinkCluster; + private Client client; + + public FlinkInterpreter(Properties property) { + super(property); + } + + static { + Interpreter.register( + "flink", + "flink", + FlinkInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add("local", "true", "Run flink locally") + .add("jobmanager.rpc.address", "localhost", "Flink cluster") + .add("jobmanager.rpc.port", "6123", "Flink cluster") + .build() + ); + } + + @Override + public void open() { + URL[] urls = getClassloaderUrls(); + this.settings = new Settings(); + + // set classpath + PathSetting pathSettings = settings.classpath(); + String classpath = ""; + List paths = currentClassPath(); + for (File f : paths) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += f.getAbsolutePath(); + } + + if (urls != null) { + for (URL u : urls) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += u.getFile(); + } + } + + pathSettings.v_$eq(classpath); + settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); + settings.explicitParentLoader_$eq(new Some(Thread.currentThread() + .getContextClassLoader())); + BooleanSetting b = (BooleanSetting) settings.usejavacp(); + b.v_$eq(true); + settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); + + out = new ByteArrayOutputStream(); + imain = new FlinkIMain(settings, new PrintWriter(out)); + + initializeFlinkEnv(); + } + + private boolean localMode() { + return Boolean.parseBoolean(getProperty("local")); + } + + private String getRpcAddress() { + if (localMode()) { + return "localhost"; + } else { + return getProperty("jobmanager.rpc.address"); + } + } + + private int getRpcPort() { + if (localMode()) { + return localFlinkCluster.getJobManagerRPCPort(); + } else { + return Integer.parseInt(getProperty("jobmanager.rpc.port")); + } + } + + private void initializeFlinkEnv() { + // prepare bindings + imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + binder = (Map) getValue("_binder"); + + flinkConf = new org.apache.flink.configuration.Configuration(); + Properties intpProperty = getProperty(); + for (Object k : intpProperty.keySet()) { + String key = (String) k; + String val = toString(intpProperty.get(key)); + flinkConf.setString(key, val); + } + + if (localMode()) { + startFlinkMiniCluster(); + } + + env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain); + binder.put("env", new org.apache.flink.api.scala.ExecutionEnvironment(env)); + + // do import and create val + imain.interpret("@transient val env = " + + "_binder.get(\"env\")" + + ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]"); + + imain.interpret("import org.apache.flink.api.scala._"); + } + + + private List currentClassPath() { + List paths = classPath(Thread.currentThread().getContextClassLoader()); + String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); + if (cps != null) { + for (String cp : cps) { + paths.add(new File(cp)); + } + } + return paths; + } + + private List classPath(ClassLoader cl) { + List paths = new LinkedList(); + if (cl == null) { + return paths; + } + + if (cl instanceof URLClassLoader) { + URLClassLoader ucl = (URLClassLoader) cl; + URL[] urls = ucl.getURLs(); + if (urls != null) { + for (URL url : urls) { + paths.add(new File(url.getFile())); + } + } + } + return paths; + } + + public Object getValue(String name) { + Object ret = imain.valueOfTerm(name); + if (ret instanceof None) { + return null; + } else if (ret instanceof Some) { + return ((Some) ret).get(); + } else { + return ret; + } + } + + @Override + public void close() { + imain.close(); + + if (localMode()) { + stopFlinkMiniCluster(); + } + } + + @Override + public InterpreterResult interpret(String line, InterpreterContext context) { + if (line == null || line.trim().length() == 0) { + return new InterpreterResult(Code.SUCCESS); + } + + InterpreterResult result = interpret(line.split("\n"), context); + return result; + } + + public InterpreterResult interpret(String[] lines, InterpreterContext context) { + String[] linesToRun = new String[lines.length + 1]; + for (int i = 0; i < lines.length; i++) { + linesToRun[i] = lines[i]; + } + linesToRun[lines.length] = "print(\"\")"; + + Console.setOut(out); + System.setOut(new PrintStream(out)); + out.reset(); + Code r = null; + + String incomplete = ""; + for (String s : linesToRun) { + scala.tools.nsc.interpreter.Results.Result res = null; + try { + res = imain.interpret(incomplete + s); + } catch (Exception e) { + logger.info("Interpreter exception", e); + return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); + } + + r = getResultCode(res); + + if (r == Code.ERROR) { + return new InterpreterResult(r, out.toString()); + } else if (r == Code.INCOMPLETE) { + incomplete += s + "\n"; + } else { + incomplete = ""; + } + } + + if (r == Code.INCOMPLETE) { + return new InterpreterResult(r, "Incomplete expression"); + } else { + return new InterpreterResult(r, out.toString()); + } + } + + private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { + if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { + return Code.SUCCESS; + } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { + return Code.INCOMPLETE; + } else { + return Code.ERROR; + } + } + + + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List completion(String buf, int cursor) { + return new LinkedList(); + } + + private void startFlinkMiniCluster() { + localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false); + localFlinkCluster.waitForTaskManagersToBeRegistered(); + } + + private void stopFlinkMiniCluster() { + if (localFlinkCluster != null) { + localFlinkCluster.shutdown(); + localFlinkCluster = null; + } + } + + static final String toString(Object o) { + return (o instanceof String) ? (String) o : ""; + } + +} diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java new file mode 100644 index 00000000000..e924d6d53be --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java @@ -0,0 +1,199 @@ +package org.apache.zeppelin.flink; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.jar.JarEntry; +import java.util.jar.JarInputStream; +import java.util.jar.JarOutputStream; + +/** + * Provides utility services for jarring and unjarring files and directories. + * Note that a given instance of JarHelper is not threadsafe with respect to + * multiple jar operations. + * + * Copied from + * http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans + * /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source + * + * @author Patrick Calahan + */ +public class JarHelper { + // ======================================================================== + // Constants + + private static final int BUFFER_SIZE = 2156; + + // ======================================================================== + // Variables + + private byte[] mBuffer = new byte[BUFFER_SIZE]; + private int mByteCount = 0; + private boolean mVerbose = false; + private String mDestJarName = ""; + + // ======================================================================== + // Constructor + + /** + * Instantiates a new JarHelper. + */ + public JarHelper() { + } + + // ======================================================================== + // Public methods + + /** + * Jars a given directory or single file into a JarOutputStream. + */ + public void jarDir(File dirOrFile2Jar, File destJar) throws IOException { + + if (dirOrFile2Jar == null || destJar == null) { + throw new IllegalArgumentException(); + } + + mDestJarName = destJar.getCanonicalPath(); + FileOutputStream fout = new FileOutputStream(destJar); + JarOutputStream jout = new JarOutputStream(fout); + // jout.setLevel(0); + try { + jarDir(dirOrFile2Jar, jout, null); + } catch (IOException ioe) { + throw ioe; + } finally { + jout.close(); + fout.close(); + } + } + + /** + * Unjars a given jar file into a given directory. + */ + public void unjarDir(File jarFile, File destDir) throws IOException { + BufferedOutputStream dest = null; + FileInputStream fis = new FileInputStream(jarFile); + unjar(fis, destDir); + } + + /** + * Given an InputStream on a jar file, unjars the contents into the given + * directory. + */ + public void unjar(InputStream in, File destDir) throws IOException { + BufferedOutputStream dest = null; + JarInputStream jis = new JarInputStream(in); + JarEntry entry; + while ((entry = jis.getNextJarEntry()) != null) { + if (entry.isDirectory()) { + File dir = new File(destDir, entry.getName()); + dir.mkdir(); + if (entry.getTime() != -1) { + dir.setLastModified(entry.getTime()); + } + continue; + } + int count; + byte[] data = new byte[BUFFER_SIZE]; + File destFile = new File(destDir, entry.getName()); + if (mVerbose) { + System.out + .println("unjarring " + destFile + " from " + entry.getName()); + } + FileOutputStream fos = new FileOutputStream(destFile); + dest = new BufferedOutputStream(fos, BUFFER_SIZE); + while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { + dest.write(data, 0, count); + } + dest.flush(); + dest.close(); + if (entry.getTime() != -1) { + destFile.setLastModified(entry.getTime()); + } + } + jis.close(); + } + + public void setVerbose(boolean b) { + mVerbose = b; + } + + // ======================================================================== + // Private methods + + private static final char SEP = '/'; + + /** + * Recursively jars up the given path under the given directory. + */ + private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) + throws IOException { + if (mVerbose) { + System.out.println("checking " + dirOrFile2jar); + } + if (dirOrFile2jar.isDirectory()) { + String[] dirList = dirOrFile2jar.list(); + String subPath = (path == null) ? "" + : (path + dirOrFile2jar.getName() + SEP); + if (path != null) { + JarEntry je = new JarEntry(subPath); + je.setTime(dirOrFile2jar.lastModified()); + jos.putNextEntry(je); + jos.flush(); + jos.closeEntry(); + } + for (int i = 0; i < dirList.length; i++) { + File f = new File(dirOrFile2jar, dirList[i]); + jarDir(f, jos, subPath); + } + } else { + if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) { + if (mVerbose) { + System.out.println("skipping " + dirOrFile2jar.getPath()); + } + return; + } + + if (mVerbose) { + System.out.println("adding " + dirOrFile2jar.getPath()); + } + FileInputStream fis = new FileInputStream(dirOrFile2jar); + try { + JarEntry entry = new JarEntry(path + dirOrFile2jar.getName()); + entry.setTime(dirOrFile2jar.lastModified()); + jos.putNextEntry(entry); + while ((mByteCount = fis.read(mBuffer)) != -1) { + jos.write(mBuffer, 0, mByteCount); + if (mVerbose) { + System.out.println("wrote " + mByteCount + " bytes"); + } + } + jos.flush(); + jos.closeEntry(); + } catch (IOException ioe) { + throw ioe; + } finally { + fis.close(); + } + } + } + + // for debugging + public static void main(String[] args) throws IOException { + if (args.length < 2) { + System.err.println("Usage: JarHelper jarname.jar directory"); + return; + } + + JarHelper jarHelper = new JarHelper(); + jarHelper.mVerbose = true; + + File destJar = new File(args[0]); + File dirOrFile2Jar = new File(args[1]); + + jarHelper.jarDir(dirOrFile2Jar, destJar); + } +} diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java new file mode 100644 index 00000000000..091c4f3118a --- /dev/null +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import static org.junit.Assert.assertEquals; + +import java.util.Properties; + +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class FlinkInterpreterTest { + + private FlinkInterpreter flink; + private InterpreterContext context; + + @Before + public void setUp() { + Properties p = new Properties(); + flink = new FlinkInterpreter(p); + flink.open(); + context = new InterpreterContext(null, null, null, null, null, null, null); + } + + @After + public void tearDown() { + flink.close(); + flink.destroy(); + } + + @Test + public void testSimpleStatement() { + InterpreterResult result = flink.interpret("val a=1", context); + result = flink.interpret("print(a)", context); + assertEquals("1", result.message()); + } + + @Test + public void testWordCount() { + flink.interpret("val text = env.fromElements(\"To be or not to be\")", context); + flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context); + flink.interpret("counts.print()", context); + InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context); + assertEquals("", result.message()); + } +} diff --git a/pom.xml b/pom.xml index bbde0846c67..dd3c2a3684d 100644 --- a/pom.xml +++ b/pom.xml @@ -92,6 +92,7 @@ shell hive tajo + flink zeppelin-web zeppelin-server zeppelin-distribution diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index bbf46fc88d4..78a463cc196 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -389,7 +389,8 @@ public static enum ConfVars { + "org.apache.zeppelin.angular.AngularInterpreter," + "org.apache.zeppelin.shell.ShellInterpreter," + "org.apache.zeppelin.hive.HiveInterpreter," - + "org.apache.zeppelin.tajo.TajoInterpreter"), + + "org.apache.zeppelin.tajo.TajoInterpreter," + + "org.apache.zeppelin.flink.FlinkInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), From 27fc306a35e01cd5680236f75fce869b9e35d6b4 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 11 May 2015 19:43:52 +0200 Subject: [PATCH 2/8] Cleaning up --- .../java/org/apache/zeppelin/flink/FlinkInterpreter.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 76c43900270..b342f4e4bf4 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -19,7 +19,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.PrintStream; import java.io.PrintWriter; import java.net.URL; import java.net.URLClassLoader; @@ -29,7 +28,6 @@ import java.util.Properties; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.client.program.Client; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.zeppelin.interpreter.Interpreter; @@ -56,12 +54,10 @@ public class FlinkInterpreter extends Interpreter { private Settings settings; private ByteArrayOutputStream out; private FlinkIMain imain; - private File classDir; private Map binder; private ExecutionEnvironment env; private Configuration flinkConf; private LocalFlinkMiniCluster localFlinkCluster; - private Client client; public FlinkInterpreter(Properties property) { super(property); @@ -235,7 +231,6 @@ public InterpreterResult interpret(String[] lines, InterpreterContext context) { linesToRun[lines.length] = "print(\"\")"; Console.setOut(out); - System.setOut(new PrintStream(out)); out.reset(); Code r = null; From ebbd0dabc88b5be64b3eaafbda612c814c819b56 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 11 May 2015 19:55:53 +0200 Subject: [PATCH 3/8] Fix unittest and update comment --- .../zeppelin/flink/FlinkEnvironment.java | 51 +------------------ .../org/apache/zeppelin/flink/FlinkIMain.java | 1 + .../org/apache/zeppelin/flink/JarHelper.java | 3 ++ .../zeppelin/flink/FlinkInterpreterTest.java | 20 ++++---- 4 files changed, 17 insertions(+), 58 deletions(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java index ed3e8915046..629932b6667 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java @@ -27,7 +27,8 @@ import org.slf4j.LoggerFactory; /** - * + * The class override execute() method to create an PlanExecutor with + * jar file that packages classes from scala compiler. */ public class FlinkEnvironment extends ExecutionEnvironment { Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class); @@ -79,52 +80,4 @@ public String getExecutionPlan() throws Exception { return jsonPlan; } - -/* - private File createJar() throws IOException { - // create execution environment - File jarFile = new File(System.getProperty("java.io.tmpdir") - + "/ZeppelinFlinkJar_" + System.currentTimeMillis() + ".jar"); - - - File[] classFiles = classDir.listFiles(); - if (classFiles == null) { - return null; - } - - byte buffer[] = new byte[10240]; - // Open archive file - FileOutputStream stream = new FileOutputStream(jarFile); - JarOutputStream out = new JarOutputStream(stream, new Manifest()); - - for (int i = 0; i < classFiles.length; i++) { - File classFile = classFiles[i]; - if (classFiles == null || !classFile.exists() - || classFile.isDirectory()) - continue; - - - // Add class - JarEntry jarAdd = new JarEntry(classFile.getName()); - jarAdd.setTime(classFile.lastModified()); - out.putNextEntry(jarAdd); - logger.info("add class {} into jar", classFile); - - // Write file to archive - FileInputStream in = new FileInputStream(classFile); - while (true) { - int nRead = in.read(buffer, 0, buffer.length); - if (nRead <= 0) - break; - out.write(buffer, 0, nRead); - } - in.close(); - } - - out.close(); - stream.close(); - return jarFile; - } - */ - } diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java index 57654a4ba0e..7e9552df1a4 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java @@ -17,6 +17,7 @@ import scala.tools.nsc.interpreter.IMain; /** + * Scala compiler */ public class FlinkIMain extends IMain { Logger logger = LoggerFactory.getLogger(FlinkIMain.class); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java index e924d6d53be..85227bb19df 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java @@ -11,6 +11,9 @@ import java.util.jar.JarOutputStream; /** + * This class copied from flink-scala-shell. Once the flink-0.9 is published in + * the maven repository, this class can be removed + * * Provides utility services for jarring and unjarring files and directories. * Note that a given instance of JarHelper is not threadsafe with respect to * multiple jar operations. diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 091c4f3118a..264008adb21 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -23,25 +23,26 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.junit.After; -import org.junit.Before; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; public class FlinkInterpreterTest { - private FlinkInterpreter flink; - private InterpreterContext context; + private static FlinkInterpreter flink; + private static InterpreterContext context; - @Before - public void setUp() { + @BeforeClass + public static void setUp() { Properties p = new Properties(); flink = new FlinkInterpreter(p); flink.open(); context = new InterpreterContext(null, null, null, null, null, null, null); } - @After - public void tearDown() { + @AfterClass + public static void tearDown() { flink.close(); flink.destroy(); } @@ -53,12 +54,13 @@ public void testSimpleStatement() { assertEquals("1", result.message()); } + @Test public void testWordCount() { flink.interpret("val text = env.fromElements(\"To be or not to be\")", context); flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context); flink.interpret("counts.print()", context); InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context); - assertEquals("", result.message()); + assertEquals(Code.SUCCESS, result.code()); } } From 7be1f907777a75e537b46e2b4bb7580d024ade00 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 18 May 2015 12:23:24 +0900 Subject: [PATCH 4/8] Add apache snapshot repo --- flink/pom.xml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/flink/pom.xml b/flink/pom.xml index 8dcd236c3d8..cbb61035a6b 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -40,6 +40,21 @@ 2.10.4 + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + org.slf4j From e69e5ba9041c42beec99afa0b03bb380c47f6d37 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 18 May 2015 14:36:41 +0900 Subject: [PATCH 5/8] Add license --- .../org/apache/zeppelin/flink/FlinkIMain.java | 17 +++++++++++++++++ .../org/apache/zeppelin/flink/JarHelper.java | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java index 7e9552df1a4..257e6fe16e7 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.flink; import java.io.File; diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java index 85227bb19df..efc495118d7 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.flink; import java.io.BufferedOutputStream; From 501efb309a5acc6cf20f85a45ed2052777e93474 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 18 May 2015 14:47:50 +0900 Subject: [PATCH 6/8] Add scalastyle --- _tools/scalastyle.xml | 146 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 _tools/scalastyle.xml diff --git a/_tools/scalastyle.xml b/_tools/scalastyle.xml new file mode 100644 index 00000000000..f7bb0d4819c --- /dev/null +++ b/_tools/scalastyle.xml @@ -0,0 +1,146 @@ + + + + + + + + + + + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From 460cf46f7cc75aebb7ed587d0dd163fde2f07342 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 18 May 2015 18:04:30 +0900 Subject: [PATCH 7/8] jarr up -> jar up --- flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java index 257e6fe16e7..ee6516c8ca4 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java @@ -80,7 +80,7 @@ public File jar() throws IOException { } } - // jarr up + // jar up JarHelper jh = new JarHelper(); jh.jarDir(jarBuildDir, jarFile); From f08bd259c371a953ad7c1e0c6910919f26258713 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 8 Jun 2015 00:44:38 -0700 Subject: [PATCH 8/8] Update pom.xml after https://github.com/apache/incubator-zeppelin/pull/88 --- flink/pom.xml | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/flink/pom.xml b/flink/pom.xml index cbb61035a6b..68aa62d5e79 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -38,6 +38,7 @@ 2.3.7 2.10 2.10.4 + 2.0.1 @@ -78,11 +79,6 @@ gson - - commons-collections - commons-collections - - org.apache.flink flink-core @@ -99,20 +95,6 @@ org.apache.flink flink-runtime ${flink.version} - - - com.typesafe.akka - akka-actor_2.10 - - - com.typesafe.akka - akka-remote_2.10 - - - com.typesafe.akka - akka-slf4j_2.10 - - @@ -145,7 +127,6 @@ ${flink.akka.version} - org.scala-lang scala-library @@ -225,7 +206,7 @@ org.scalamacros - paradise_${scala.version} + paradise_${flink.scala.version} ${scala.macros.version}