Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

Commit

Permalink
CRUNCH-618: Run on Spark 2. Contributed by Gergő Pásztor.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Apr 13, 2017
1 parent ce9aaa3 commit 047d8fd
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 59 deletions.
2 changes: 1 addition & 1 deletion crunch-kafka/pom.xml
Expand Up @@ -40,7 +40,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<artifactId>kafka_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
Expand Down
2 changes: 1 addition & 1 deletion crunch-scrunch/pom.xml
Expand Up @@ -38,7 +38,7 @@ under the License.
<artifactId>scala-compiler</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</dependency>
<dependency>
Expand Down
Expand Up @@ -22,39 +22,42 @@ import java.io.FileOutputStream
import java.util.jar.JarFile
import java.util.jar.JarOutputStream

import scala.tools.nsc.io.VirtualDirectory

import com.google.common.io.Files
import org.junit.Assert.assertNotNull
import org.junit.Test
import org.apache.crunch.test.CrunchTestSupport
import org.scalatest.junit.JUnitSuite
import org.apache.crunch.scrunch.CrunchSuite

import scala.tools.nsc.interpreter.{ReplDir, ReplOutput}
import scala.tools.nsc.settings.MutableSettings

/**
* Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}.
* Tests creating jars from a {@link scala.tools.nsc.interpreter.ReplDir}.
*/
class InterpreterJarTest extends CrunchSuite {

/**
* Tests transforming a virtual directory into a temporary jar file.
* Tests transforming an output directory into a temporary jar file.
*/
@Test def virtualDirToJar: Unit = {
// Create a virtual directory and populate with some mock content.
val root = new VirtualDirectory("testDir", None)
@Test def outputDirToJar: Unit = {
// Create an output directory and populate with some mock content.
val settings = new MutableSettings(e => println("ERROR: "+e))
val dirSetting = settings.StringSetting("-Yrepl-outdir", "path", "Test path", "")
val root: ReplDir = new ReplOutput(dirSetting).dir
// Add some subdirectories to the root.
(1 to 10).foreach { i =>
val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory]
val subdir = root.subdirectoryNamed("subdir" + i)
// Add some classfiles to each sub directory.
(1 to 10).foreach { j =>
subdir.fileNamed("MyClass" + j + ".class")
}
}

// Now generate a jar file from the virtual directory.
// Now generate a jar file from the output directory.
val tempJar = new File(tempDir.getRootFile(), "replJar.jar")
val jarStream = new JarOutputStream(new FileOutputStream(tempJar))
InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream)
InterpreterRunner.addOutputDirectoryToJar(root, "top/pack/name/", jarStream)
jarStream.close()

// Verify the contents of the jar.
Expand Down
Expand Up @@ -264,8 +264,8 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily {
}

private def products[T <: Product](tpe: Type, mirror: Mirror): PType[T] = {
val ctor = tpe.member(nme.CONSTRUCTOR).asMethod
val args = ctor.paramss.head.map(x => (x.name.toString,
val ctor = tpe.member(termNames.CONSTRUCTOR).asMethod
val args = ctor.paramLists.head.map(x => (x.name.toString,
typeToPType(x.typeSignature, mirror)))
val out = (x: Product) => TupleN.of(x.productIterator.toArray.asInstanceOf[Array[Object]] : _*)
val rtc = mirror.runtimeClass(tpe)
Expand Down
Expand Up @@ -29,11 +29,9 @@ import scala.tools.nsc.ObjectRunner
import scala.tools.nsc.Properties
import scala.tools.nsc.ScriptRunner
import scala.tools.nsc.interpreter.ILoop
import scala.tools.nsc.io.Jar
import scala.tools.nsc.io.VirtualDirectory
import scala.tools.nsc.io.{AbstractFile, Jar}
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration

import org.apache.crunch.util.DistCache
import org.apache.commons.io.IOUtils

Expand Down Expand Up @@ -126,7 +124,7 @@ object InterpreterRunner extends MainGenericRunner {
ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments)
}
else runTarget() match {
case Left(ex) => errorFn(ex.toString())
case Left(ex) => errorFn(ex.getMessage(), Some(ex))
case Right(b) => b
}
}
Expand All @@ -145,26 +143,26 @@ object InterpreterRunner extends MainGenericRunner {
def createReplCodeJar(): File = {
var jarStream: JarOutputStream = null
try {
val virtualDirectory = repl.intp.virtualDirectory.asInstanceOf[VirtualDirectory]
val outputDirectory = repl.replOutput.dir
val tempDir = Files.createTempDir()
val tempJar = new File(tempDir, "replJar.jar")
jarStream = new JarOutputStream(new FileOutputStream(tempJar))
addVirtualDirectoryToJar(virtualDirectory, "", jarStream)
addOutputDirectoryToJar(outputDirectory, "", jarStream)
return tempJar
} finally {
IOUtils.closeQuietly(jarStream)
}
}

/**
* Add the contents of the specified virtual directory to a jar. This method will recursively
* Add the contents of the specified output directory to a jar. This method will recursively
* descend into subdirectories to add their contents.
*
* @param dir The virtual directory whose contents should be added.
* @param entryPath The entry path for classes found in the virtual directory.
* @param dir The output directory whose contents should be added.
* @param entryPath The entry path for classes found in the output directory.
* @param jarStream An output stream for writing the jar file.
*/
def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream:
def addOutputDirectoryToJar(dir: AbstractFile, entryPath: String, jarStream:
JarOutputStream): Unit = {
dir.foreach { file =>
if (file.isDirectory) {
Expand All @@ -173,8 +171,7 @@ object InterpreterRunner extends MainGenericRunner {
val entry: JarEntry = new JarEntry(dirPath)
jarStream.putNextEntry(entry)
jarStream.closeEntry()
addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory],
dirPath, jarStream)
addOutputDirectoryToJar(file, dirPath, jarStream)
} else if (file.hasExtension("class")) {
// Add class files as an entry in the jar file and write the class to the jar.
val entry: JarEntry = new JarEntry(entryPath + file.name)
Expand All @@ -197,7 +194,11 @@ object InterpreterRunner extends MainGenericRunner {
// Generate a jar of REPL code and add to the distributed cache.
val replJarFile = createReplCodeJar()
DistCache.addJarToDistributedCache(configuration, replJarFile)
// Get the paths to jars added with the :cp command.
/**
* Get the paths to jars added with the :cp command.
* The next line will cause a Deprecation Warning, because of the 'repl.addedClasspath', but
* we can safely ignore it as we are not using it to modify the classpath.
*/
val addedJarPaths = repl.addedClasspath.split(':')
addedJarPaths.foreach {
path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path)
Expand Down
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.fn;

import java.util.Iterator;

class IterableIterator<T> implements Iterable<T> {
private final Iterator<T> itr;
IterableIterator(Iterator<T> itr) {
this.itr = itr;
}
public Iterator<T> iterator() { return itr;}
}
Expand Up @@ -30,7 +30,7 @@ public abstract class SDoubleFlatMapFunction<T> extends SparkDoFn<T, Double>
@Override
public void process(T input, Emitter<Double> emitter) {
try {
for (Double d : call(input)) {
for (Double d : new IterableIterator<Double>(call(input))) {
emitter.emit(d);
}
} catch (Exception e) {
Expand Down
Expand Up @@ -30,7 +30,7 @@ public abstract class SFlatMapFunction<T, R> extends SparkDoFn<T, R>
@Override
public void process(T input, Emitter<R> emitter) {
try {
for (R r : call(input)) {
for (R r : new IterableIterator<R>(call(input))) {
emitter.emit(r);
}
} catch (Exception e) {
Expand Down
Expand Up @@ -32,7 +32,7 @@ public abstract class SFlatMapFunction2<K, V, R> extends DoFn<Pair<K, V>, R>
@Override
public void process(Pair<K, V> input, Emitter<R> emitter) {
try {
for (R r : call(input.first(), input.second())) {
for (R r : new IterableIterator<R>(call(input.first(), input.second()))) {
emitter.emit(r);
}
} catch (Exception e) {
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.crunch.fn;

import java.util.Iterator;

import org.apache.spark.api.java.function.DoubleFlatMapFunction;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
Expand Down Expand Up @@ -62,7 +64,7 @@ public Tuple2<K, V> call(T t) throws Exception {
public static <T, R> SFlatMapFunction<T, R> wrap(final FlatMapFunction<T, R> f) {
return new SFlatMapFunction<T, R>() {
@Override
public Iterable<R> call(T t) throws Exception {
public Iterator<R> call(T t) throws Exception {
return f.call(t);
}
};
Expand All @@ -71,7 +73,7 @@ public Iterable<R> call(T t) throws Exception {
public static <K, V, R> SFlatMapFunction2<K, V, R> wrap(final FlatMapFunction2<K, V, R> f) {
return new SFlatMapFunction2<K, V, R>() {
@Override
public Iterable<R> call(K k, V v) throws Exception {
public Iterator<R> call(K k, V v) throws Exception {
return f.call(k, v);
}
};
Expand All @@ -89,7 +91,7 @@ public double call(T t) throws Exception {
public static <T> SDoubleFlatMapFunction<T> wrap(final DoubleFlatMapFunction<T> f) {
return new SDoubleFlatMapFunction<T>() {
@Override
public Iterable<Double> call(T t) throws Exception {
public Iterator<Double> call(T t) throws Exception {
return f.call(t);
}
};
Expand Down
Expand Up @@ -32,7 +32,7 @@ public abstract class SPairFlatMapFunction<T, K, V> extends SparkDoFn<T, Pair<K,
@Override
public void process(T input, Emitter<Pair<K, V>> emitter) {
try {
for (Tuple2<K, V> kv : call(input)) {
for (Tuple2<K, V> kv : new IterableIterator<Tuple2<K, V>>(call(input))) {
emitter.emit(Pair.of(kv._1(), kv._2()));
}
} catch (Exception e) {
Expand Down
Expand Up @@ -45,7 +45,7 @@ public CombineMapsideFunction(CombineFn<K, V> combineFn, SparkRuntimeContext ctx
}

@Override
public Iterable<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception {
public Iterator<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception {
ctxt.initialize(combineFn, null);
Map<K, List<V>> cache = Maps.newHashMap();
int cnt = 0;
Expand All @@ -63,7 +63,7 @@ public Iterable<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception
}
}

return new Flattener<K, V>(cache);
return new Flattener<K, V>(cache).iterator();
}

private Map<K, List<V>> reduce(Map<K, List<V>> cache) {
Expand Down
Expand Up @@ -27,12 +27,7 @@

public class CrunchPairTuple2<K, V> implements PairFlatMapFunction<Iterator<Pair<K, V>>, K, V> {
@Override
public Iterable<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator) throws Exception {
return new Iterable<Tuple2<K, V>>() {
@Override
public Iterator<Tuple2<K, V>> iterator() {
return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc());
}
};
public Iterator<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator) throws Exception {
return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc());
}
}
Expand Up @@ -37,9 +37,9 @@ public FlatMapPairDoFn(DoFn<Pair<K, V>, T> fn, SparkRuntimeContext ctxt) {
}

@Override
public Iterable<T> call(Iterator<Tuple2<K, V>> input) throws Exception {
public Iterator<T> call(Iterator<Tuple2<K, V>> input) throws Exception {
ctxt.initialize(fn, null);
return new CrunchIterable<Pair<K, V>, T>(fn,
Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc()));
Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())).iterator();
}
}
Expand Up @@ -37,10 +37,10 @@ public PairFlatMapDoFn(DoFn<T, Pair<K, V>> fn, SparkRuntimeContext ctxt) {
}

@Override
public Iterable<Tuple2<K, V>> call(Iterator<T> input) throws Exception {
public Iterator<Tuple2<K, V>> call(Iterator<T> input) throws Exception {
ctxt.initialize(fn, null);
return Iterables.transform(
new CrunchIterable<T, Pair<K, V>>(fn, input),
GuavaUtils.<K, V>pair2tupleFunc());
GuavaUtils.<K, V>pair2tupleFunc()).iterator();
}
}
Expand Up @@ -50,14 +50,9 @@ public ReduceGroupingFunction(GroupingOptions options,
}

@Override
public Iterable<Tuple2<ByteArray, List<byte[]>>> call(
public Iterator<Tuple2<ByteArray, List<byte[]>>> call(
final Iterator<Tuple2<ByteArray, List<byte[]>>> iter) throws Exception {
return new Iterable<Tuple2<ByteArray, List<byte[]>>>() {
@Override
public Iterator<Tuple2<ByteArray, List<byte[]>>> iterator() {
return new GroupingIterator(iter, rawComparator());
}
};
return new GroupingIterator(iter, rawComparator());
}

private RawComparator<?> rawComparator() {
Expand Down
10 changes: 5 additions & 5 deletions pom.xml
Expand Up @@ -105,11 +105,11 @@ under the License.
<avro.classifier>hadoop2</avro.classifier>

<kafka.version>0.10.0.1</kafka.version>
<scala.base.version>2.10</scala.base.version>
<scala.version>2.10.4</scala.version>
<scala.base.version>2.11</scala.base.version>
<scala.version>2.11.8</scala.version>
<scalatest.version>2.2.4</scalatest.version>
<spark.version>1.3.1</spark.version>
<jline.version>2.10.4</jline.version>
<spark.version>2.0.0</spark.version>
<jline.version>2.12.1</jline.version>
<jsr305.version>1.3.9</jsr305.version>
</properties>

Expand Down Expand Up @@ -456,7 +456,7 @@ under the License.
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>${jline.version}</version>
</dependency>
Expand Down

0 comments on commit 047d8fd

Please sign in to comment.