From 047d8fd36773608a3d2cf6445881173e7d26377c Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 13 Apr 2017 16:10:23 +0100 Subject: [PATCH] =?UTF-8?q?CRUNCH-618:=20Run=20on=20Spark=202.=20Contribut?= =?UTF-8?q?ed=20by=20Gerg=C5=91=20P=C3=A1sztor.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crunch-kafka/pom.xml | 2 +- crunch-scrunch/pom.xml | 2 +- .../interpreter/InterpreterJarTest.scala | 23 ++++++++------- .../apache/crunch/scrunch/PTypeFamily.scala | 4 +-- .../interpreter/InterpreterRunner.scala | 27 +++++++++--------- .../apache/crunch/fn/IterableIterator.java | 28 +++++++++++++++++++ .../crunch/fn/SDoubleFlatMapFunction.java | 2 +- .../apache/crunch/fn/SFlatMapFunction.java | 2 +- .../apache/crunch/fn/SFlatMapFunction2.java | 2 +- .../java/org/apache/crunch/fn/SFunctions.java | 8 ++++-- .../crunch/fn/SPairFlatMapFunction.java | 2 +- .../impl/spark/fn/CombineMapsideFunction.java | 4 +-- .../impl/spark/fn/CrunchPairTuple2.java | 9 ++---- .../crunch/impl/spark/fn/FlatMapPairDoFn.java | 4 +-- .../crunch/impl/spark/fn/PairFlatMapDoFn.java | 4 +-- .../impl/spark/fn/ReduceGroupingFunction.java | 9 ++---- pom.xml | 10 +++---- 17 files changed, 83 insertions(+), 59 deletions(-) create mode 100644 crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml index 961b106a..32429f5b 100644 --- a/crunch-kafka/pom.xml +++ b/crunch-kafka/pom.xml @@ -40,7 +40,7 @@ under the License. org.apache.kafka - kafka_2.10 + kafka_2.11 org.scala-lang diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml index 51925fbb..93760590 100644 --- a/crunch-scrunch/pom.xml +++ b/crunch-scrunch/pom.xml @@ -38,7 +38,7 @@ under the License. scala-compiler - org.scala-lang + jline jline diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala index 5ebc3033..bc9bd0fb 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala @@ -22,8 +22,6 @@ import java.io.FileOutputStream import java.util.jar.JarFile import java.util.jar.JarOutputStream -import scala.tools.nsc.io.VirtualDirectory - import com.google.common.io.Files import org.junit.Assert.assertNotNull import org.junit.Test @@ -31,30 +29,35 @@ import org.apache.crunch.test.CrunchTestSupport import org.scalatest.junit.JUnitSuite import org.apache.crunch.scrunch.CrunchSuite +import scala.tools.nsc.interpreter.{ReplDir, ReplOutput} +import scala.tools.nsc.settings.MutableSettings + /** - * Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}. + * Tests creating jars from a {@link scala.tools.nsc.interpreter.ReplDir}. */ class InterpreterJarTest extends CrunchSuite { /** - * Tests transforming a virtual directory into a temporary jar file. + * Tests transforming an output directory into a temporary jar file. */ - @Test def virtualDirToJar: Unit = { - // Create a virtual directory and populate with some mock content. - val root = new VirtualDirectory("testDir", None) + @Test def outputDirToJar: Unit = { + // Create an output directory and populate with some mock content. + val settings = new MutableSettings(e => println("ERROR: "+e)) + val dirSetting = settings.StringSetting("-Yrepl-outdir", "path", "Test path", "") + val root: ReplDir = new ReplOutput(dirSetting).dir // Add some subdirectories to the root. (1 to 10).foreach { i => - val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory] + val subdir = root.subdirectoryNamed("subdir" + i) // Add some classfiles to each sub directory. (1 to 10).foreach { j => subdir.fileNamed("MyClass" + j + ".class") } } - // Now generate a jar file from the virtual directory. + // Now generate a jar file from the output directory. val tempJar = new File(tempDir.getRootFile(), "replJar.jar") val jarStream = new JarOutputStream(new FileOutputStream(tempJar)) - InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream) + InterpreterRunner.addOutputDirectoryToJar(root, "top/pack/name/", jarStream) jarStream.close() // Verify the contents of the jar. diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala index 47cf637d..a140acdf 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala @@ -264,8 +264,8 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily { } private def products[T <: Product](tpe: Type, mirror: Mirror): PType[T] = { - val ctor = tpe.member(nme.CONSTRUCTOR).asMethod - val args = ctor.paramss.head.map(x => (x.name.toString, + val ctor = tpe.member(termNames.CONSTRUCTOR).asMethod + val args = ctor.paramLists.head.map(x => (x.name.toString, typeToPType(x.typeSignature, mirror))) val out = (x: Product) => TupleN.of(x.productIterator.toArray.asInstanceOf[Array[Object]] : _*) val rtc = mirror.runtimeClass(tpe) diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala index 0d843815..9416f1fc 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala @@ -29,11 +29,9 @@ import scala.tools.nsc.ObjectRunner import scala.tools.nsc.Properties import scala.tools.nsc.ScriptRunner import scala.tools.nsc.interpreter.ILoop -import scala.tools.nsc.io.Jar -import scala.tools.nsc.io.VirtualDirectory +import scala.tools.nsc.io.{AbstractFile, Jar} import com.google.common.io.Files import org.apache.hadoop.conf.Configuration - import org.apache.crunch.util.DistCache import org.apache.commons.io.IOUtils @@ -126,7 +124,7 @@ object InterpreterRunner extends MainGenericRunner { ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments) } else runTarget() match { - case Left(ex) => errorFn(ex.toString()) + case Left(ex) => errorFn(ex.getMessage(), Some(ex)) case Right(b) => b } } @@ -145,11 +143,11 @@ object InterpreterRunner extends MainGenericRunner { def createReplCodeJar(): File = { var jarStream: JarOutputStream = null try { - val virtualDirectory = repl.intp.virtualDirectory.asInstanceOf[VirtualDirectory] + val outputDirectory = repl.replOutput.dir val tempDir = Files.createTempDir() val tempJar = new File(tempDir, "replJar.jar") jarStream = new JarOutputStream(new FileOutputStream(tempJar)) - addVirtualDirectoryToJar(virtualDirectory, "", jarStream) + addOutputDirectoryToJar(outputDirectory, "", jarStream) return tempJar } finally { IOUtils.closeQuietly(jarStream) @@ -157,14 +155,14 @@ object InterpreterRunner extends MainGenericRunner { } /** - * Add the contents of the specified virtual directory to a jar. This method will recursively + * Add the contents of the specified output directory to a jar. This method will recursively * descend into subdirectories to add their contents. * - * @param dir The virtual directory whose contents should be added. - * @param entryPath The entry path for classes found in the virtual directory. + * @param dir The output directory whose contents should be added. + * @param entryPath The entry path for classes found in the output directory. * @param jarStream An output stream for writing the jar file. */ - def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream: + def addOutputDirectoryToJar(dir: AbstractFile, entryPath: String, jarStream: JarOutputStream): Unit = { dir.foreach { file => if (file.isDirectory) { @@ -173,8 +171,7 @@ object InterpreterRunner extends MainGenericRunner { val entry: JarEntry = new JarEntry(dirPath) jarStream.putNextEntry(entry) jarStream.closeEntry() - addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory], - dirPath, jarStream) + addOutputDirectoryToJar(file, dirPath, jarStream) } else if (file.hasExtension("class")) { // Add class files as an entry in the jar file and write the class to the jar. val entry: JarEntry = new JarEntry(entryPath + file.name) @@ -197,7 +194,11 @@ object InterpreterRunner extends MainGenericRunner { // Generate a jar of REPL code and add to the distributed cache. val replJarFile = createReplCodeJar() DistCache.addJarToDistributedCache(configuration, replJarFile) - // Get the paths to jars added with the :cp command. + /** + * Get the paths to jars added with the :cp command. + * The next line will cause a Deprecation Warning, because of the 'repl.addedClasspath', but + * we can safely ignore it as we are not using it to modify the classpath. + */ val addedJarPaths = repl.addedClasspath.split(':') addedJarPaths.foreach { path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path) diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java b/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java new file mode 100644 index 00000000..3c06c13b --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import java.util.Iterator; + +class IterableIterator implements Iterable { + private final Iterator itr; + IterableIterator(Iterator itr) { + this.itr = itr; + } + public Iterator iterator() { return itr;} +} diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java index f3f67cca..01b0b4c4 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java @@ -30,7 +30,7 @@ public abstract class SDoubleFlatMapFunction extends SparkDoFn @Override public void process(T input, Emitter emitter) { try { - for (Double d : call(input)) { + for (Double d : new IterableIterator(call(input))) { emitter.emit(d); } } catch (Exception e) { diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java index 1fecb768..9ee4d9ff 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java @@ -30,7 +30,7 @@ public abstract class SFlatMapFunction extends SparkDoFn @Override public void process(T input, Emitter emitter) { try { - for (R r : call(input)) { + for (R r : new IterableIterator(call(input))) { emitter.emit(r); } } catch (Exception e) { diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java index 0798f630..d7c6514b 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java @@ -32,7 +32,7 @@ public abstract class SFlatMapFunction2 extends DoFn, R> @Override public void process(Pair input, Emitter emitter) { try { - for (R r : call(input.first(), input.second())) { + for (R r : new IterableIterator(call(input.first(), input.second()))) { emitter.emit(r); } } catch (Exception e) { diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java index cc597462..0ba7a378 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java @@ -17,6 +17,8 @@ */ package org.apache.crunch.fn; +import java.util.Iterator; + import org.apache.spark.api.java.function.DoubleFlatMapFunction; import org.apache.spark.api.java.function.DoubleFunction; import org.apache.spark.api.java.function.FlatMapFunction; @@ -62,7 +64,7 @@ public Tuple2 call(T t) throws Exception { public static SFlatMapFunction wrap(final FlatMapFunction f) { return new SFlatMapFunction() { @Override - public Iterable call(T t) throws Exception { + public Iterator call(T t) throws Exception { return f.call(t); } }; @@ -71,7 +73,7 @@ public Iterable call(T t) throws Exception { public static SFlatMapFunction2 wrap(final FlatMapFunction2 f) { return new SFlatMapFunction2() { @Override - public Iterable call(K k, V v) throws Exception { + public Iterator call(K k, V v) throws Exception { return f.call(k, v); } }; @@ -89,7 +91,7 @@ public double call(T t) throws Exception { public static SDoubleFlatMapFunction wrap(final DoubleFlatMapFunction f) { return new SDoubleFlatMapFunction() { @Override - public Iterable call(T t) throws Exception { + public Iterator call(T t) throws Exception { return f.call(t); } }; diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java index 3b8e75ae..2becd481 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java @@ -32,7 +32,7 @@ public abstract class SPairFlatMapFunction extends SparkDoFn> emitter) { try { - for (Tuple2 kv : call(input)) { + for (Tuple2 kv : new IterableIterator>(call(input))) { emitter.emit(Pair.of(kv._1(), kv._2())); } } catch (Exception e) { diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java index 1bea08d7..231de77d 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java @@ -45,7 +45,7 @@ public CombineMapsideFunction(CombineFn combineFn, SparkRuntimeContext ctx } @Override - public Iterable> call(Iterator> iter) throws Exception { + public Iterator> call(Iterator> iter) throws Exception { ctxt.initialize(combineFn, null); Map> cache = Maps.newHashMap(); int cnt = 0; @@ -63,7 +63,7 @@ public Iterable> call(Iterator> iter) throws Exception } } - return new Flattener(cache); + return new Flattener(cache).iterator(); } private Map> reduce(Map> cache) { diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java index d6c544c0..ca3011f9 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java @@ -27,12 +27,7 @@ public class CrunchPairTuple2 implements PairFlatMapFunction>, K, V> { @Override - public Iterable> call(final Iterator> iterator) throws Exception { - return new Iterable>() { - @Override - public Iterator> iterator() { - return Iterators.transform(iterator, GuavaUtils.pair2tupleFunc()); - } - }; + public Iterator> call(final Iterator> iterator) throws Exception { + return Iterators.transform(iterator, GuavaUtils.pair2tupleFunc()); } } diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java index 8ec28349..aca59f35 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java @@ -37,9 +37,9 @@ public FlatMapPairDoFn(DoFn, T> fn, SparkRuntimeContext ctxt) { } @Override - public Iterable call(Iterator> input) throws Exception { + public Iterator call(Iterator> input) throws Exception { ctxt.initialize(fn, null); return new CrunchIterable, T>(fn, - Iterators.transform(input, GuavaUtils.tuple2PairFunc())); + Iterators.transform(input, GuavaUtils.tuple2PairFunc())).iterator(); } } diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java index 7f289cce..c012e965 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java @@ -37,10 +37,10 @@ public PairFlatMapDoFn(DoFn> fn, SparkRuntimeContext ctxt) { } @Override - public Iterable> call(Iterator input) throws Exception { + public Iterator> call(Iterator input) throws Exception { ctxt.initialize(fn, null); return Iterables.transform( new CrunchIterable>(fn, input), - GuavaUtils.pair2tupleFunc()); + GuavaUtils.pair2tupleFunc()).iterator(); } } diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java index d3dd69e6..eb14dfe4 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java @@ -50,14 +50,9 @@ public ReduceGroupingFunction(GroupingOptions options, } @Override - public Iterable>> call( + public Iterator>> call( final Iterator>> iter) throws Exception { - return new Iterable>>() { - @Override - public Iterator>> iterator() { - return new GroupingIterator(iter, rawComparator()); - } - }; + return new GroupingIterator(iter, rawComparator()); } private RawComparator rawComparator() { diff --git a/pom.xml b/pom.xml index 7a570c0c..5c45568d 100644 --- a/pom.xml +++ b/pom.xml @@ -105,11 +105,11 @@ under the License. hadoop2 0.10.0.1 - 2.10 - 2.10.4 + 2.11 + 2.11.8 2.2.4 - 1.3.1 - 2.10.4 + 2.0.0 + 2.12.1 1.3.9 @@ -456,7 +456,7 @@ under the License. - org.scala-lang + jline jline ${jline.version}