Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Scrunch jobs can be launched from a REPL. #42

Closed
wants to merge 1 commit into from

3 participants

@lockoff

This commit modifies the Scrunch project so that Scrunch jobs can be run from
a Scala REPL. Users can run a Scala REPL capable of launching Scrunch jobs by
building Scrunch using mvn package and running bin/scrunch from the
distribution directory that results. Several changes have been made to the
project to accomplish this:

  1. The project has been modified to produce a release distribution. The
    distribution is created by maven when mvn package is run. A distribution
    folder and tarball are created. The distribution folder contains a bin dir that
    contains scripts, a lib dir that contains all library jars, and a log dir that
    contains a log4j configuration file.

  2. A modified Scala REPL was added to the project. An object InterpreterRunner
    was created that launches a Scala REPL. It's a modification of Scala's
    MainGenericRunner. The new Scrunch version allows client code to determine if a
    REPL is actually running, and includes methods for creating a jar from the code
    compiled from REPL input. A script named "scrunch" was added to the project
    that, when run, launches this modified Scala REPL. The script is a modification
    of the script distributed with Scala that launches the Scala REPL.

  3. Scrunch's Pipeline class was modified so that any MapReduce pipeline
    constructed automatically adds the Scrunch lib jars to the Distributed Cache of
    the job and to the classpaths of run tasks.

  4. Methods on PCollection/PTable/etc. that result in a job being launched were
    modified to check if the REPL is running and, if so, create a jar of code
    compiled from REPL input and ship that jar with the job so that it's on the
    classpath of run tasks.

  5. To facilitate extensions, From/To/At objects were changed to traits, with
    likewise named singleton objects that extend the traits created.

  6. The examples in the examples directory, and the script scrunch.py for running
    those examples, are included in the project distribution. The scrunch.py script
    was renamed to scrunch-job.py and modified to cope with the new project
    distribution structure and take advantage of the fact that Scrunch lib jars are
    now automatically added to the classpath of run jobs.

I started an integration test for actually launching jobs but the MiniMRCluster
testing framework does not behave properly when jars are added to the
distributed cache. The problem is related to MAPREDUCE-2884. I have verified
that jobs can be launched from the REPL using an actual cluster.

@kiyan kiyan Scrunch jobs can be launched from a REPL.
This commit modifies the Scrunch project so that Scrunch jobs can be run from
a Scala REPL.  Users can run a Scala REPL capable of launching Scrunch jobs by
building Scrunch using `mvn package` and running bin/scrunch from the
distribution directory that results. Several changes have been made to the
project to accomplish this:

1. The project has been modified to produce a release distribution. The
distribution is created by maven when `mvn package` is run. A distribution
folder and tarball are created. The distribution folder contains a bin dir that
contains scripts, a lib dir that contains all library jars, and a log dir that
contains a log4j configuration file.

2. A modified Scala REPL was added to the project. An object InterpreterRunner
was created that launches a Scala REPL.  It's a modification of Scala's
MainGenericRunner.  The new Scrunch version allows client code to determine if a
REPL is actually running, and includes methods for creating a jar from the code
compiled from REPL input.  A script named "scrunch" was added to the project
that, when run, launches this modified Scala REPL.  The script is a modification
of the script distributed with Scala that launches the Scala REPL.

3. Scrunch's Pipeline class was modified so that any MapReduce pipeline
constructed automatically adds the Scrunch lib jars to the Distributed Cache of
the job and to the classpaths of run tasks.

4. Methods on PCollection/PTable/etc. that result in a job being launched were
modified to check if the REPL is running and, if so, create a jar of code
compiled from REPL input and ship that jar with the job so that it's on the
classpath of run tasks.

5. To facilitate extensions, From/To/At objects were changed to traits, with
likewise named singleton objects that extend the traits created.

6. The examples in the examples directory, and the script scrunch.py for running
those examples, are included in the project distribution.  The scrunch.py script
was renamed to scrunch-job.py and modified to cope with the new project
distribution structure and take advantage of the fact that Scrunch lib jars are
now automatically added to the classpath of run jobs.

I started an integration test for actually launching jobs but the MiniMRCluster
testing framework does not behave properly when jars are added to the
distributed cache.  The problem is related to MAPREDUCE-2884. I have verified
that jobs can be launched from the REPL using an actual cluster.
6bcf329
@jwills
Owner

Okay to revert this one, I think.

@jwills
Owner

Integrated into apache crunch (incubating)

@jwills jwills closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 28, 2012
  1. @kiyan

    Scrunch jobs can be launched from a REPL.

    kiyan authored
    This commit modifies the Scrunch project so that Scrunch jobs can be run from
    a Scala REPL.  Users can run a Scala REPL capable of launching Scrunch jobs by
    building Scrunch using `mvn package` and running bin/scrunch from the
    distribution directory that results. Several changes have been made to the
    project to accomplish this:
    
    1. The project has been modified to produce a release distribution. The
    distribution is created by maven when `mvn package` is run. A distribution
    folder and tarball are created. The distribution folder contains a bin dir that
    contains scripts, a lib dir that contains all library jars, and a log dir that
    contains a log4j configuration file.
    
    2. A modified Scala REPL was added to the project. An object InterpreterRunner
    was created that launches a Scala REPL.  It's a modification of Scala's
    MainGenericRunner.  The new Scrunch version allows client code to determine if a
    REPL is actually running, and includes methods for creating a jar from the code
    compiled from REPL input.  A script named "scrunch" was added to the project
    that, when run, launches this modified Scala REPL.  The script is a modification
    of the script distributed with Scala that launches the Scala REPL.
    
    3. Scrunch's Pipeline class was modified so that any MapReduce pipeline
    constructed automatically adds the Scrunch lib jars to the Distributed Cache of
    the job and to the classpaths of run tasks.
    
    4. Methods on PCollection/PTable/etc. that result in a job being launched were
    modified to check if the REPL is running and, if so, create a jar of code
    compiled from REPL input and ship that jar with the job so that it's on the
    classpath of run tasks.
    
    5. To facilitate extensions, From/To/At objects were changed to traits, with
    likewise named singleton objects that extend the traits created.
    
    6. The examples in the examples directory, and the script scrunch.py for running
    those examples, are included in the project distribution.  The scrunch.py script
    was renamed to scrunch-job.py and modified to cope with the new project
    distribution structure and take advantage of the fact that Scrunch lib jars are
    now automatically added to the classpath of run jobs.
    
    I started an integration test for actually launching jobs but the MiniMRCluster
    testing framework does not behave properly when jars are added to the
    distributed cache.  The problem is related to MAPREDUCE-2884. I have verified
    that jobs can be launched from the REPL using an actual cluster.
This page is out of date. Refresh to see the latest.
Showing with 708 additions and 35 deletions.
  1. +3 −0  .gitignore
  2. +45 −6 scrunch/pom.xml
  3. +75 −0 scrunch/src/main/assembly/release.xml
  4. +11 −0 scrunch/src/main/conf/log4j.properties
  5. 0  scrunch/{ → src/main}/examples/ClassyPageRank.scala
  6. 0  scrunch/{ → src/main}/examples/PageRank.scala
  7. 0  scrunch/{ → src/main}/examples/WordCount.scala
  8. +9 −3 scrunch/src/main/scala/com/cloudera/scrunch/IO.scala
  9. +9 −5 scrunch/src/main/scala/com/cloudera/scrunch/PCollection.scala
  10. +4 −2 scrunch/src/main/scala/com/cloudera/scrunch/PTable.scala
  11. +25 −2 scrunch/src/main/scala/com/cloudera/scrunch/Pipeline.scala
  12. +5 −1 scrunch/src/main/scala/com/cloudera/scrunch/PipelineLike.scala
  13. +201 −0 scrunch/src/main/scala/com/cloudera/scrunch/interpreter/InterpreterRunner.scala
  14. +16 −0 scrunch/src/main/scripts/imports.scala
  15. +163 −0 scrunch/src/main/scripts/scrunch
  16. +11 −11 scrunch/{scripts/scrunch.py → src/main/scripts/scrunch-job.py}
  17. +11 −0 scrunch/src/test/resources/log4j.properties
  18. +67 −0 scrunch/src/test/scala/com/cloudera/scrunch/interpreter/InterpreterJarTest.scala
  19. +5 −5 src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
  20. +37 −0 src/main/java/com/cloudera/crunch/util/DistCache.java
  21. +11 −0 src/test/resources/log4j.properties
View
3  .gitignore
@@ -3,3 +3,6 @@
.settings
.cache
target
+*.iml
+.idea
+
View
51 scrunch/pom.xml
@@ -1,3 +1,4 @@
+<!--suppress ALL -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloudera.crunch</groupId>
@@ -8,6 +9,7 @@
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
+ <scala.version>2.9.2</scala.version>
</properties>
<repositories>
@@ -22,7 +24,17 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.9.2</version>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>jline</artifactId>
+ <version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.cloudera.crunch</groupId>
@@ -68,6 +80,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
@@ -85,11 +102,33 @@
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>jar-with-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ </execution>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
View
75 scrunch/src/main/assembly/release.xml
@@ -0,0 +1,75 @@
+<!--
+ Assembly configuration for the release bundle.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>release</id>
+ <formats>
+ <format>dir</format>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <!-- readme -->
+ <useDefaultExcludes>false</useDefaultExcludes>
+ <outputDirectory>/</outputDirectory>
+ <fileMode>0644</fileMode>
+ <includes>
+ <include>README.md</include>
+ </includes>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <!-- scripts -->
+ <useDefaultExcludes>false</useDefaultExcludes>
+ <outputDirectory>bin</outputDirectory>
+ <directory>src/main/scripts</directory>
+ <fileMode>0755</fileMode>
+ <excludes>
+ <exclude>*~</exclude>
+ <exclude>*.swp</exclude>
+ </excludes>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <!-- conf dir -->
+ <useDefaultExcludes>false</useDefaultExcludes>
+ <outputDirectory>conf</outputDirectory>
+ <directory>src/main/conf</directory>
+ <fileMode>0644</fileMode>
+ <excludes>
+ <exclude>*~</exclude>
+ <exclude>*.swp</exclude>
+ </excludes>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <!-- examples dir -->
+ <useDefaultExcludes>false</useDefaultExcludes>
+ <outputDirectory>examples</outputDirectory>
+ <directory>src/main/examples</directory>
+ <fileMode>0644</fileMode>
+ <excludes>
+ <exclude>*~</exclude>
+ <exclude>*.swp</exclude>
+ </excludes>
+ <filtered>true</filtered>
+ </fileSet>
+ </fileSets>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ <scope>runtime</scope>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <fileMode>0644</fileMode>
+ <!--
+ <excludes>
+ <exclude>org.apache.hadoop:hadoop-core</exclude>
+ <exclude>org.apache.hbase:hbase</exclude>
+ </excludes>
+ -->
+ </dependencySet>
+ </dependencySets>
+</assembly>
View
11 scrunch/src/main/conf/log4j.properties
@@ -0,0 +1,11 @@
+log4j.rootLogger=${scrunch.logger}
+
+# By default, log INFO to the console.
+scrunch.logger=INFO,console
+
+# Define the console appender.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
+
View
0  scrunch/examples/ClassyPageRank.scala → scrunch/src/main/examples/ClassyPageRank.scala
File renamed without changes
View
0  scrunch/examples/PageRank.scala → scrunch/src/main/examples/PageRank.scala
File renamed without changes
View
0  scrunch/examples/WordCount.scala → scrunch/src/main/examples/WordCount.scala
File renamed without changes
View
12 scrunch/src/main/scala/com/cloudera/scrunch/IO.scala
@@ -18,23 +18,29 @@ import com.cloudera.crunch.io.{From => from, To => to, At => at}
import com.cloudera.crunch.types.avro.AvroType
import org.apache.hadoop.fs.Path;
-object From {
+trait From {
def avroFile[T](path: String, atype: AvroType[T]) = from.avroFile(path, atype)
def avroFile[T](path: Path, atype: AvroType[T]) = from.avroFile(path, atype)
def textFile(path: String) = from.textFile(path)
def textFile(path: Path) = from.textFile(path)
}
-object To {
+object From extends From
+
+trait To {
def avroFile[T](path: String) = to.avroFile(path)
def avroFile[T](path: Path) = to.avroFile(path)
def textFile(path: String) = to.textFile(path)
def textFile(path: Path) = to.textFile(path)
}
-object At {
+object To extends To
+
+trait At {
def avroFile[T](path: String, atype: AvroType[T]) = at.avroFile(path, atype)
def avroFile[T](path: Path, atype: AvroType[T]) = at.avroFile(path, atype)
def textFile(path: String) = at.textFile(path)
def textFile(path: Path) = at.textFile(path)
}
+
+object At extends At
View
14 scrunch/src/main/scala/com/cloudera/scrunch/PCollection.scala
@@ -18,6 +18,7 @@ import com.cloudera.crunch.{DoFn, Emitter, FilterFn, MapFn}
import com.cloudera.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target}
import com.cloudera.crunch.lib.Aggregate
import com.cloudera.scrunch.Conversions._
+import interpreter.InterpreterRunner
import scala.collection.JavaConversions
class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] {
@@ -42,20 +43,23 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
def by[K: PTypeH](f: S => K): PTable[K, S] = {
val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType())
- parallelDo(mapKeyFn[S, K](f), ptype)
+ parallelDo(mapKeyFn[S, K](f), ptype)
}
def groupBy[K: PTypeH](f: S => K): PGroupedTable[K, S] = {
by(f).groupByKey
}
-
- def materialize() = JavaConversions.iterableAsScalaIterable[S](native.materialize)
+
+ def materialize() = {
+ InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+ JavaConversions.iterableAsScalaIterable[S](native.materialize)
+ }
def wrap(newNative: AnyRef) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
-
+
def count() = {
val count = new PTable[S, java.lang.Long](Aggregate.count(native))
- count.mapValues(_.longValue())
+ count.mapValues(_.longValue())
}
def max() = wrap(Aggregate.max(native))
View
6 scrunch/src/main/scala/com/cloudera/scrunch/PTable.scala
@@ -17,6 +17,7 @@ package com.cloudera.scrunch
import com.cloudera.crunch.{DoFn, Emitter, FilterFn, MapFn}
import com.cloudera.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
import com.cloudera.crunch.lib.{Join, Aggregate, Cogroup, PTables}
+import interpreter.InterpreterRunner
import java.util.{Collection => JCollect}
import scala.collection.JavaConversions._
@@ -107,10 +108,11 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
def wrap(newNative: AnyRef) = {
new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
}
-
+
def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native
-
+
def materialize(): Iterable[(K, V)] = {
+ InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
native.materialize.view.map(x => (x.first, x.second))
}
View
27 scrunch/src/main/scala/com/cloudera/scrunch/Pipeline.scala
@@ -14,6 +14,8 @@
*/
package com.cloudera.scrunch
+import interpreter.InterpreterRunner
+import java.io.File
import java.lang.Class
import org.apache.hadoop.conf.Configuration
@@ -21,6 +23,8 @@ import org.apache.hadoop.conf.Configuration
import com.cloudera.crunch.{Pipeline => JPipeline}
import com.cloudera.crunch.impl.mem.MemPipeline
import com.cloudera.crunch.impl.mr.MRPipeline
+import com.cloudera.crunch.util.DistCache
+import org.slf4j.LoggerFactory
/**
* Manages the state of a pipeline execution.
@@ -70,14 +74,33 @@ class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
* Companion object. Contains subclasses of Pipeline.
*/
object Pipeline {
+ val log = LoggerFactory.getLogger(classOf[Pipeline])
+
/**
* Pipeline for running jobs on a hadoop cluster.
*
* @param clazz Type of the class using the pipeline.
* @param configuration Hadoop configuration to use.
*/
- class MapReducePipeline (clazz: Class[_], configuration: Configuration)
- extends Pipeline(new MRPipeline(clazz, configuration))
+ class MapReducePipeline (clazz: Class[_], configuration: Configuration) extends Pipeline(
+ {
+ // Attempt to add all jars in the Scrunch distribution lib directory to the job that will
+ // be run.
+ val jarPath = DistCache.findContainingJar(classOf[com.cloudera.scrunch.Pipeline])
+ if (jarPath != null) {
+ val scrunchJarFile = new File(jarPath)
+ DistCache.addJarDirToDistributedCache(configuration, scrunchJarFile.getParent())
+ } else {
+ log.warn("Could not locate Scrunch jar file, so could not add Scrunch jars to the " +
+ "job(s) about to be run.")
+ }
+ if (InterpreterRunner.repl == null) {
+ new MRPipeline(clazz, configuration)
+ } else {
+ // We're running in the REPL, so we'll use the crunch jar as the job jar.
+ new MRPipeline(classOf[com.cloudera.crunch.Pipeline], configuration)
+ }
+ })
/**
* Pipeline for running jobs in memory.
View
6 scrunch/src/main/scala/com/cloudera/scrunch/PipelineLike.scala
@@ -20,6 +20,7 @@ import com.cloudera.crunch.{Pipeline => JPipeline}
import com.cloudera.crunch.Source
import com.cloudera.crunch.TableSource
import com.cloudera.crunch.Target
+import com.cloudera.scrunch.interpreter.InterpreterRunner
trait PipelineLike {
def jpipeline: JPipeline
@@ -68,7 +69,10 @@ trait PipelineLike {
* Constructs and executes a series of MapReduce jobs in order
* to write data to the output targets.
*/
- def run(): Unit = jpipeline.run()
+ def run(): Unit = {
+ InterpreterRunner.addReplJarsToJob(getConfiguration())
+ jpipeline.run()
+ }
/**
* Run any remaining jobs required to generate outputs and then
View
201 scrunch/src/main/scala/com/cloudera/scrunch/interpreter/InterpreterRunner.scala
@@ -0,0 +1,201 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.scrunch.interpreter
+
+
+import java.io.File
+import java.io.FileOutputStream
+import java.util.jar.JarEntry
+import java.util.jar.JarOutputStream
+
+import scala.tools.nsc.GenericRunnerCommand
+import scala.tools.nsc.Global
+import scala.tools.nsc.MainGenericRunner
+import scala.tools.nsc.ObjectRunner
+import scala.tools.nsc.Properties
+import scala.tools.nsc.ScriptRunner
+import scala.tools.nsc.interpreter.ILoop
+import scala.tools.nsc.io.Jar
+import scala.tools.nsc.io.VirtualDirectory
+
+import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+
+import com.cloudera.crunch.util.DistCache
+
+/**
+ * An object used to run a Scala REPL with modifications to facilitate Scrunch jobs running
+ * within the REPL.
+ */
+object InterpreterRunner extends MainGenericRunner {
+
+ // The actual Scala repl.
+ var repl: ILoop = null
+
+ /**
+ * Checks whether or not the Scala repl has been started.
+ *
+ * @return <code>true</code> if the repl is running, <code>false</code> otherwise.
+ */
+ def isReplRunning() = repl == null
+
+ /**
+ * The main entry point for the REPL. This method is lifted from
+ * {@link scala.tools.nsc.MainGenericRunner} and modified to facilitate testing whether or not
+ * the REPL is actually running.
+ *
+ * @param args Arguments used on the command line to start the REPL.
+ * @return <code>true</code> if execution was successful, <code>false</code> otherwise.
+ */
+ override def process(args: Array[String]): Boolean = {
+ val command = new GenericRunnerCommand(args.toList, (x: String) => errorFn(x))
+ import command.{settings, howToRun, thingToRun}
+ // Defines a nested function to retrieve a sample compiler if necessary.
+ def sampleCompiler = new Global(settings)
+
+ import Properties.{versionString, copyrightString}
+ if (!command.ok) {
+ return errorFn("\n" + command.shortUsageMsg)
+ } else if (settings.version.value) {
+ return errorFn("Scala code runner %s -- %s".format(versionString, copyrightString))
+ } else if (command.shouldStopWithInfo) {
+ return errorFn(command getInfoMessage sampleCompiler)
+ }
+
+ // Functions to retrieve settings values that were passed to REPL invocation.
+ // The -e argument provides a Scala statement to execute.
+ // The -i option requests that a file be preloaded into the interactive shell.
+ def isE = !settings.execute.isDefault
+ def dashe = settings.execute.value
+ def isI = !settings.loadfiles.isDefault
+ def dashi = settings.loadfiles.value
+
+ // Function to retrieve code passed by -e and -i options to REPL.
+ def combinedCode = {
+ val files = if (isI) dashi map (file => scala.tools.nsc.io.File(file).slurp()) else Nil
+ val str = if (isE) List(dashe) else Nil
+ files ++ str mkString "\n\n"
+ }
+
+ import GenericRunnerCommand._
+
+ // Function for running the target command. It can run an object with main, a script, or
+ // an interactive REPL.
+ def runTarget(): Either[Throwable, Boolean] = howToRun match {
+ case AsObject =>
+ ObjectRunner.runAndCatch(settings.classpathURLs, thingToRun, command.arguments)
+ case AsScript =>
+ ScriptRunner.runScriptAndCatch(settings, thingToRun, command.arguments)
+ case AsJar =>
+ ObjectRunner.runAndCatch(
+ scala.tools.nsc.io.File(thingToRun).toURL +: settings.classpathURLs,
+ new Jar(thingToRun).mainClass getOrElse sys.error("Cannot find main class for jar: " +
+ thingToRun),
+ command.arguments
+ )
+ case Error =>
+ Right(false)
+ case _ =>
+ // We start the shell when no arguments are given.
+ repl = new ILoop
+ Right(repl.process(settings))
+ }
+
+ /**If -e and -i were both given, we want to execute the -e code after the
+ * -i files have been included, so they are read into strings and prepended to
+ * the code given in -e. The -i option is documented to only make sense
+ * interactively so this is a pretty reasonable assumption.
+ *
+ * This all needs a rewrite though.
+ */
+ if (isE) {
+ ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments)
+ }
+ else runTarget() match {
+ case Left(ex) => errorFn(ex)
+ case Right(b) => b
+ }
+ }
+
+ def main(args: Array[String]) {
+ val retVal = process(args)
+ if (!retVal)
+ sys.exit(1)
+ }
+
+ /**
+ * Creates a jar file containing the code thus far compiled by the REPL in a temporary directory.
+ *
+ * @return A file object representing the jar file created.
+ */
+ def createReplCodeJar(): File = {
+ var jarStream: JarOutputStream = null
+ try {
+ val virtualDirectory = repl.virtualDirectory
+ val tempDir = Files.createTempDir()
+ val tempJar = new File(tempDir, "replJar.jar")
+ jarStream = new JarOutputStream(new FileOutputStream(tempJar))
+ addVirtualDirectoryToJar(virtualDirectory, "", jarStream)
+ return tempJar
+ } finally {
+ IOUtils.closeQuietly(jarStream)
+ }
+ }
+
+ /**
+ * Add the contents of the specified virtual directory to a jar. This method will recursively
+ * descend into subdirectories to add their contents.
+ *
+ * @param dir The virtual directory whose contents should be added.
+ * @param entryPath The entry path for classes found in the virtual directory.
+ * @param jarStream An output stream for writing the jar file.
+ */
+ def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream:
+ JarOutputStream): Unit = {
+ dir.foreach { file =>
+ if (file.isDirectory) {
+ // Recursively descend into subdirectories, adjusting the package name as we do.
+ addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory],
+ entryPath + file.name + "/", jarStream)
+ } else if (file.hasExtension("class")) {
+ // Add class files as an entry in the jar file and write the class to the jar.
+ val entry: JarEntry = new JarEntry(entryPath + file.name)
+ jarStream.putNextEntry(entry)
+ jarStream.write(file.toByteArray)
+ }
+ }
+ }
+
+ /**
+ * Generates a jar containing the code thus far compiled by the REPL,
+ * and adds that jar file to the distributed cache of jobs using the specified configuration.
+ * Also adds any jars added with the :cp command to the user's job.
+ *
+ * @param configuration The configuration of jobs that should use the REPL code jar.
+ */
+ def addReplJarsToJob(configuration: Configuration): Unit = {
+ if (repl != null) {
+ // Generate a jar of REPL code and add to the distributed cache.
+ val replJarFile = createReplCodeJar()
+ DistCache.addJarToDistributedCache(configuration, replJarFile)
+ // Get the paths to jars added with the :cp command.
+ val addedJarPaths = repl.addedClasspath.split(':')
+ addedJarPaths.foreach {
+ path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path)
+ }
+ }
+ }
+}
View
16 scrunch/src/main/scripts/imports.scala
@@ -0,0 +1,16 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+import com.cloudera.scrunch._
+
View
163 scrunch/src/main/scripts/scrunch
@@ -0,0 +1,163 @@
+#!/bin/bash --posix
+#
+##############################################################################
+# Copyright 2002-2011, LAMP/EPFL
+#
+# This is free software; see the distribution for copying conditions.
+# There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+##############################################################################
+
+# Identify the bin dir in the distribution from which this script is running.
+bin=`dirname $0`
+bin=`cd ${bin} && pwd`
+
+# Set the directory where libraries for scrunch shell live.
+SCRUNCH_LIB_DIR="${bin}/../lib"
+# Set the conf directory for the scrunch distribution.
+SCRUNCH_CONF_DIR="${bin}/../conf"
+# Set the main class used to run scrunch shell.
+MAIN_CLASS="com.cloudera.scrunch.interpreter.InterpreterRunner"
+
+# Not sure what the right default is here: trying nonzero.
+scala_exit_status=127
+saved_stty=""
+
+# restore stty settings (echo in particular)
+function restoreSttySettings() {
+ if [[ -n $SCALA_RUNNER_DEBUG ]]; then
+ echo "restoring stty: $saved_stty"
+ fi
+
+ stty $saved_stty
+ saved_stty=""
+}
+
+function onExit() {
+ if [[ "$saved_stty" != "" ]]; then
+ restoreSttySettings
+ exit $scala_exit_status
+ fi
+}
+
+# to reenable echo if we are interrupted before completing.
+trap onExit INT
+
+# save terminal settings
+saved_stty=$(stty -g 2>/dev/null)
+# clear on error so we don't later try to restore them
+if [[ ! $? ]]; then
+ saved_stty=""
+fi
+if [[ -n $SCALA_RUNNER_DEBUG ]]; then
+ echo "saved stty: $saved_stty"
+fi
+
+cygwin=false;
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+esac
+
+# Constructing scrunch shell classpath.
+SCRUNCH_SHELL_CLASSPATH=""
+# Add files in conf dir.
+for ext in "$SCRUNCH_CONF_DIR"/* ; do
+ if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then
+ SCRUNCH_SHELL_CLASSPATH="$ext"
+ else
+ SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext"
+ fi
+done
+# Add files in lib dir.
+for ext in "$SCRUNCH_LIB_DIR"/*.jar ; do
+ if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then
+ SCRUNCH_SHELL_CLASSPATH="$ext"
+ else
+ SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext"
+ fi
+done
+
+# Constructing Hadoop classpath.
+if [ -z "$HADOOP_HOME" ]; then
+ echo "HADOOP_HOME must be set to run the Scrunch shell."
+ exit 1
+fi
+HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
+
+CYGWIN_JLINE_TERMINAL=
+if $cygwin; then
+ if [ "$OS" = "Windows_NT" ] && cygpath -m .>/dev/null 2>/dev/null ; then
+ format=mixed
+ else
+ format=windows
+ fi
+ SCRUNCH_SHELL_CLASSPATH=`cygpath --path --$format "$SCRUNCH_SHELL_CLASSPATH"`
+ case "$TERM" in
+ rxvt* | xterm*)
+ stty -icanon min 1 -echo
+ CYGWIN_JLINE_TERMINAL="-Djline.terminal=scala.tools.jline.UnixTerminal"
+ ;;
+ esac
+fi
+
+[ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xmx256M -Xms32M"
+
+# break out -D and -J options and add them to JAVA_OPTS as well
+# so they reach the underlying JVM in time to do some good. The
+# -D options will be available as system properties.
+declare -a java_args
+declare -a scala_args
+
+# Don't use the bootstrap classloader.
+CPSELECT="-classpath "
+
+while [ $# -gt 0 ]; do
+ case "$1" in
+ -D*)
+ # pass to scala as well: otherwise we lose it sometimes when we
+ # need it, e.g. communicating with a server compiler.
+ java_args=("${java_args[@]}" "$1")
+ scala_args=("${scala_args[@]}" "$1")
+ shift
+ ;;
+ -J*)
+ # as with -D, pass to scala even though it will almost
+ # never be used.
+ java_args=("${java_args[@]}" "${1:2}")
+ scala_args=("${scala_args[@]}" "$1")
+ shift
+ ;;
+ -toolcp)
+ TOOL_CLASSPATH="$TOOL_CLASSPATH:$2"
+ shift 2
+ ;;
+ *)
+ scala_args=("${scala_args[@]}" "$1")
+ shift
+ ;;
+ esac
+done
+
+# reset "$@" to the remaining args
+set -- "${scala_args[@]}"
+
+if [ -z "$JAVACMD" -a -n "$JAVA_HOME" -a -x "$JAVA_HOME/bin/java" ]; then
+ JAVACMD="$JAVA_HOME/bin/java"
+fi
+
+"${JAVACMD:=java}" \
+ $JAVA_OPTS \
+ "${java_args[@]}" \
+ ${CPSELECT}${TOOL_CLASSPATH}":"${HADOOP_CLASSPATH}":"${SCRUNCH_SHELL_CLASSPATH} \
+ -Dscala.usejavacp=true \
+ -Denv.emacs="$EMACS" \
+ $CYGWIN_JLINE_TERMINAL \
+ $MAIN_CLASS "$@" \
+ -i ${bin}/imports.scala \
+ -Yrepl-sync
+# The -Yrepl-sync option is a fix for the 2.9.1 REPL. This should probably not be necessary in the future.
+
+# record the exit status lest it be overwritten:
+# then reenable echo and propagate the code.
+scala_exit_status=$?
+onExit
View
22 scrunch/scripts/scrunch.py → scrunch/src/main/scripts/scrunch-job.py
@@ -26,8 +26,8 @@
else:
ORIGINAL_FILE = __file__
-SCIENCE_ROOT = os.path.abspath(os.path.dirname(ORIGINAL_FILE)+"/../")
-JARFILE = SCIENCE_ROOT + "/target/scrunch-0.2.0-jar-with-dependencies.jar" #what jar has all the depencies for this job
+DIST_ROOT = os.path.abspath(os.path.dirname(ORIGINAL_FILE)+"/../")
+LIB_DIR = DIST_ROOT + "/lib" # Dir with all scrunch dependencies.
TMPDIR = "/tmp"
BUILDDIR = TMPDIR + "/script-build"
COMPILE_CMD = "java -cp %s/scala-library.jar:%s/scala-compiler.jar -Dscala.home=%s scala.tools.nsc.Main" % (SCALA_LIB, SCALA_LIB, SCALA_LIB)
@@ -77,12 +77,13 @@ def get_job_name(file):
else:
return file
-JARPATH = os.path.abspath(JARFILE)
-if not os.path.exists(JARPATH):
- sys.stderr.write("Scrunch assembly jar not found; run mvn assembly:assembly to construct it.\n")
+LIB_PATH = os.path.abspath(LIB_DIR)
+if not os.path.exists(LIB_PATH):
+ sys.stderr.write("Scrunch distribution lib directory not found; run mvn package to construct a distribution to run examples from.\n")
sys.exit(1)
-
-JARBASE = os.path.basename(JARFILE)
+LIB_JARS = glob.glob(os.path.join(LIB_PATH, "*.jar"))
+LIB_CP = ":".join(LIB_JARS)
+
JOBPATH = os.path.abspath(JOBFILE)
JOB = get_job_name(JOBFILE)
JOBJAR = JOB + ".jar"
@@ -96,19 +97,18 @@ def build_job_jar():
if os.path.exists(BUILDDIR):
shutil.rmtree(BUILDDIR)
os.makedirs(BUILDDIR)
- cmd = "%s -classpath %s:%s -d %s %s" % (COMPILE_CMD, JARPATH, HADOOP_JARS, BUILDDIR, JOBFILE)
+ cmd = "%s -classpath %s:%s -d %s %s" % (COMPILE_CMD, LIB_CP, HADOOP_JARS, BUILDDIR, JOBFILE)
print cmd
if subprocess.call(cmd, shell=True):
shutil.rmtree(BUILDDIR)
sys.exit(1)
- shutil.copy(JARPATH, JOBJARPATH)
- jar_cmd = "jar uf %s -C %s ." % (JOBJARPATH, BUILDDIR)
+ jar_cmd = "jar cf %s -C %s ." % (JOBJARPATH, BUILDDIR)
subprocess.call(jar_cmd, shell=True)
shutil.rmtree(BUILDDIR)
def hadoop_command():
- return "%s/bin/hadoop jar %s %s %s" % (HADOOP_HOME, JOBJARPATH, JOB, " ".join(argv))
+ return "HADOOP_CLASSPATH=%s ; %s/bin/hadoop jar %s %s %s" % (LIB_CP, HADOOP_HOME, JOBJARPATH, JOB, " ".join(argv))
if is_file() and needs_rebuild():
build_job_jar()
View
11 scrunch/src/test/resources/log4j.properties
@@ -0,0 +1,11 @@
+log4j.rootLogger=${scrunch.logger}
+
+# By default, log INFO to the console.
+scrunch.logger=INFO,console
+
+# Define the console appender.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
+
View
67 scrunch/src/test/scala/com/cloudera/scrunch/interpreter/InterpreterJarTest.scala
@@ -0,0 +1,67 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.scrunch.interpreter
+
+import java.io.File
+import java.io.FileOutputStream
+import java.util.jar.JarFile
+import java.util.jar.JarOutputStream
+
+import scala.tools.nsc.io.VirtualDirectory
+
+import com.google.common.io.Files
+import org.junit.Assert.assertNotNull
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Tests creating jars from {@link scala.tools.nsc.io.VirtualDirectory}.
+ */
+class InterpreterJarTest extends JUnitSuite {
+
+ /**
+ * Tests transforming a virtual directory into a temporary jar file.
+ */
+ @Test def virtualDirToJar: Unit = {
+ // Create a virtual directory and populate with some mock content.
+ val root = new VirtualDirectory("testDir", None)
+ // Add some subdirectories to the root.
+ (1 to 10).foreach { i =>
+ val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory]
+ // Add some classfiles to each sub directory.
+ (1 to 10).foreach { j =>
+ subdir.fileNamed("MyClass" + j + ".class")
+ }
+ }
+
+ // Now generate a jar file from the virtual directory.
+ val tempDir = Files.createTempDir()
+ tempDir.deleteOnExit()
+ val tempJar = new File(tempDir, "replJar.jar")
+ val jarStream = new JarOutputStream(new FileOutputStream(tempJar))
+ InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream)
+ jarStream.close()
+
+ // Verify the contents of the jar.
+ val jarFile = new JarFile(tempJar)
+ (1 to 10).foreach { i =>
+ (1 to 10).foreach { j =>
+ val entryName = "top/pack/name/subdir" + i + "/MyClass" + j + ".class"
+ val entry = jarFile.getEntry(entryName)
+ assertNotNull("Jar entry " + entryName + " not found in generated jar.", entry)
+ }
+ }
+ }
+}
View
10 src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
@@ -61,7 +61,7 @@ public static JobPrototype createMapOnlyJob(
private final Set<JobPrototype> dependencies = Sets.newHashSet();
private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
private final Path workingPath;
-
+
private HashMultimap<Target, NodePath> targetsToNodePaths;
private DoTableImpl<?,?> combineFnTable;
@@ -109,7 +109,7 @@ private CrunchJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline
conf = job.getConfiguration();
conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
job.setJarByClass(jarClass);
-
+
Set<DoNode> outputNodes = Sets.newHashSet();
Set<Target> targets = targetsToNodePaths.keySet();
Path outputPath = new Path(workingPath, "output");
@@ -148,7 +148,7 @@ private CrunchJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline
group.configureShuffle(job);
- DoNode mapOutputNode = group.getGroupingNode();
+ DoNode mapOutputNode = group.getGroupingNode();
Set<DoNode> mapNodes = Sets.newHashSet();
for (NodePath nodePath : mapNodePaths) {
// Advance these one step, since we've already configured
@@ -176,7 +176,7 @@ private CrunchJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline
job.setInputFormatClass(CrunchInputFormat.class);
}
job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
-
+
return new CrunchJob(job, outputPath, outputHandler);
}
@@ -198,7 +198,7 @@ private String createJobName(String pipelineName, List<DoNode> mapNodes, DoNode
}
return builder.build();
}
-
+
private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
while (iter.hasNext()) {
PCollectionImpl<?> collect = iter.next();
View
37 src/main/java/com/cloudera/crunch/util/DistCache.java
@@ -19,6 +19,9 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.URI;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.Enumeration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -118,6 +121,40 @@ public static void addJarToDistributedCache(Configuration conf, String jarFile)
}
/**
+ * Finds the path to a jar that contains the class provided, if any. There is no guarantee that
+ * the jar returned will be the first on the classpath to contain the file. This method is
+ * basically lifted out of Hadoop's {@link org.apache.hadoop.mapred.JobConf} class.
+ *
+ * @param jarClass The class the jar file should contain.
+ * @return The path to a jar file that contains the class, or <code>null</code> if no such jar
+ * exists.
+ * @throws IOException If there is a problem searching for the jar file.
+ */
+ public static String findContainingJar(Class jarClass) throws IOException {
+ ClassLoader loader = jarClass.getClassLoader();
+ String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class";
+ for(Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
+ URL url = (URL) itr.nextElement();
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ // URLDecoder is a misnamed class, since it actually decodes
+ // x-www-form-urlencoded MIME type rather than actual
+ // URL encoding (which the file path has). Therefore it would
+ // decode +s to ' 's which is incorrect (spaces are actually
+ // either unencoded or encoded as "%20"). Replace +s first, so
+ // that they are kept sacred during the decoding process.
+ toReturn = toReturn.replaceAll("\\+", "%2B");
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+ return null;
+ }
+
+ /**
* Adds all jars under the specified directory to the distributed cache of jobs using the
* provided configuration. The jars will be placed on the classpath of tasks run by the job.
* This method does not descend into subdirectories when adding jars.
View
11 src/test/resources/log4j.properties
@@ -0,0 +1,11 @@
+log4j.rootLogger=${crunch.logger}
+
+# By default, log INFO to the console.
+crunch.logger=INFO,console
+
+# Define the console appender.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
+
Something went wrong with that request. Please try again.