Permalink
Browse files

Slightly revised patch for CRUNCH-9, submitted by Kiyan Ahmadizadeh

  • Loading branch information...
1 parent a3f8e4a commit 1ed57904e095620aaee95fc9cb96773446f473e8 jwills committed Jul 10, 2012
View
@@ -3,3 +3,6 @@
.settings
.cache
target
+*.iml
+.idea
+
View
@@ -89,6 +89,16 @@ under the License.
<version>${scala.version}</version>
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>jline</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch</artifactId>
<version>0.3.0</version>
@@ -132,6 +142,11 @@ under the License.
</exclusions>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
@@ -157,11 +172,33 @@ under the License.
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>jar-with-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ </execution>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
@@ -0,0 +1,75 @@
+<!--
+ Assembly configuration for the release bundle.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>release</id>
+ <formats>
+ <format>dir</format>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <!-- readme -->
+ <useDefaultExcludes>false</useDefaultExcludes>
+ <outputDirectory>/</outputDirectory>
+ <fileMode>0644</fileMode>
+ <includes>
+ <include>README.md</include>
+ </includes>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <!-- scripts -->
+ <useDefaultExcludes>false</useDefaultExcludes>
+ <outputDirectory>bin</outputDirectory>
+ <directory>src/main/scripts</directory>
+ <fileMode>0755</fileMode>
+ <excludes>
+ <exclude>*~</exclude>
+ <exclude>*.swp</exclude>
+ </excludes>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <!-- conf dir -->
+ <useDefaultExcludes>false</useDefaultExcludes>
+ <outputDirectory>conf</outputDirectory>
+ <directory>src/main/conf</directory>
+ <fileMode>0644</fileMode>
+ <excludes>
+ <exclude>*~</exclude>
+ <exclude>*.swp</exclude>
+ </excludes>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <!-- examples dir -->
+ <useDefaultExcludes>false</useDefaultExcludes>
+ <outputDirectory>examples</outputDirectory>
+ <directory>src/main/examples</directory>
+ <fileMode>0644</fileMode>
+ <excludes>
+ <exclude>*~</exclude>
+ <exclude>*.swp</exclude>
+ </excludes>
+ <filtered>true</filtered>
+ </fileSet>
+ </fileSets>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ <scope>runtime</scope>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <fileMode>0644</fileMode>
+ <!--
+ <excludes>
+ <exclude>org.apache.hadoop:hadoop-core</exclude>
+ <exclude>org.apache.hbase:hbase</exclude>
+ </excludes>
+ -->
+ </dependencySet>
+ </dependencySets>
+</assembly>
@@ -0,0 +1,8 @@
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.scrunch=info, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
@@ -0,0 +1,27 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+import org.apache.scrunch.PipelineApp
+
+object WordCount extends PipelineApp {
+
+ def countWords(file: String) = {
+ read(from.textFile(file))
+ .flatMap(_.split("\\W+").filter(!_.isEmpty()))
+ .count
+ }
+
+ val counts = join(countWords(args(0)), countWords(args(1)))
+ write(counts, to.textFile(args(2)))
+}
@@ -21,23 +21,30 @@ import org.apache.crunch.io.{From => from, To => to, At => at}
import org.apache.crunch.types.avro.AvroType
import org.apache.hadoop.fs.Path;
-object From {
+trait From {
def avroFile[T](path: String, atype: AvroType[T]) = from.avroFile(path, atype)
def avroFile[T](path: Path, atype: AvroType[T]) = from.avroFile(path, atype)
def textFile(path: String) = from.textFile(path)
def textFile(path: Path) = from.textFile(path)
}
-object To {
+object From extends From
+
+trait To {
def avroFile[T](path: String) = to.avroFile(path)
def avroFile[T](path: Path) = to.avroFile(path)
def textFile(path: String) = to.textFile(path)
def textFile(path: Path) = to.textFile(path)
}
-object At {
+object To extends To
+
+trait At {
def avroFile[T](path: String, atype: AvroType[T]) = at.avroFile(path, atype)
def avroFile[T](path: Path, atype: AvroType[T]) = at.avroFile(path, atype)
def textFile(path: String) = at.textFile(path)
def textFile(path: Path) = at.textFile(path)
}
+
+object At extends At
+
@@ -17,11 +17,13 @@
*/
package org.apache.scrunch
+import scala.collection.JavaConversions
+
import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target}
import org.apache.crunch.lib.Aggregate
import org.apache.scrunch.Conversions._
-import scala.collection.JavaConversions
+import org.apache.scrunch.interpreter.InterpreterRunner
class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] {
import PCollection._
@@ -45,20 +47,23 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
def by[K: PTypeH](f: S => K): PTable[K, S] = {
val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType())
- parallelDo(mapKeyFn[S, K](f), ptype)
+ parallelDo(mapKeyFn[S, K](f), ptype)
}
def groupBy[K: PTypeH](f: S => K): PGroupedTable[K, S] = {
by(f).groupByKey
}
-
- def materialize() = JavaConversions.iterableAsScalaIterable[S](native.materialize)
+
+ def materialize() = {
+ InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+ JavaConversions.iterableAsScalaIterable[S](native.materialize)
+ }
def wrap(newNative: AnyRef) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
-
+
def count() = {
val count = new PTable[S, java.lang.Long](Aggregate.count(native))
- count.mapValues(_.longValue())
+ count.mapValues(_.longValue())
}
def max() = wrap(Aggregate.max(native))
@@ -17,11 +17,14 @@
*/
package org.apache.scrunch
+import java.util.{Collection => JCollect}
+
+import scala.collection.JavaConversions._
+
import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
import org.apache.crunch.lib.{Join, Aggregate, Cogroup, PTables}
-import java.util.{Collection => JCollect}
-import scala.collection.JavaConversions._
+import org.apache.scrunch.interpreter.InterpreterRunner
class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] {
import PTable._
@@ -110,10 +113,11 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
def wrap(newNative: AnyRef) = {
new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
}
-
+
def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native
-
+
def materialize(): Iterable[(K, V)] = {
+ InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
native.materialize.view.map(x => (x.first, x.second))
}
@@ -17,13 +17,16 @@
*/
package org.apache.scrunch
-import java.lang.Class
+import java.io.File
import org.apache.hadoop.conf.Configuration
+import org.slf4j.LoggerFactory
import org.apache.crunch.{Pipeline => JPipeline}
import org.apache.crunch.impl.mem.MemPipeline
import org.apache.crunch.impl.mr.MRPipeline
+import org.apache.crunch.util.DistCache
+import org.apache.scrunch.interpreter.InterpreterRunner
/**
* Manages the state of a pipeline execution.
@@ -73,14 +76,33 @@ class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
* Companion object. Contains subclasses of Pipeline.
*/
object Pipeline {
+ val log = LoggerFactory.getLogger(classOf[Pipeline])
+
/**
* Pipeline for running jobs on a hadoop cluster.
*
* @param clazz Type of the class using the pipeline.
* @param configuration Hadoop configuration to use.
*/
- class MapReducePipeline (clazz: Class[_], configuration: Configuration)
- extends Pipeline(new MRPipeline(clazz, configuration))
+ class MapReducePipeline (clazz: Class[_], configuration: Configuration) extends Pipeline(
+ {
+ // Attempt to add all jars in the Scrunch distribution lib directory to the job that will
+ // be run.
+ val jarPath = DistCache.findContainingJar(classOf[org.apache.scrunch.Pipeline])
+ if (jarPath != null) {
+ val scrunchJarFile = new File(jarPath)
+ DistCache.addJarDirToDistributedCache(configuration, scrunchJarFile.getParent())
+ } else {
+ log.warn("Could not locate Scrunch jar file, so could not add Scrunch jars to the " +
+ "job(s) about to be run.")
+ }
+ if (InterpreterRunner.repl == null) {
+ new MRPipeline(clazz, configuration)
+ } else {
+ // We're running in the REPL, so we'll use the crunch jar as the job jar.
+ new MRPipeline(classOf[org.apache.scrunch.Pipeline], configuration)
+ }
+ })
/**
* Pipeline for running jobs in memory.
@@ -23,6 +23,7 @@ import org.apache.crunch.{Pipeline => JPipeline}
import org.apache.crunch.Source
import org.apache.crunch.TableSource
import org.apache.crunch.Target
+import org.apache.scrunch.interpreter.InterpreterRunner
trait PipelineLike {
def jpipeline: JPipeline
@@ -71,7 +72,10 @@ trait PipelineLike {
* Constructs and executes a series of MapReduce jobs in order
* to write data to the output targets.
*/
- def run(): Unit = jpipeline.run()
+ def run(): Unit = {
+ InterpreterRunner.addReplJarsToJob(getConfiguration())
+ jpipeline.run()
+ }
/**
* Run any remaining jobs required to generate outputs and then
Oops, something went wrong.

0 comments on commit 1ed5790

Please sign in to comment.