Skip to content

Commit

Permalink
Added pipe() operation on RDDs for mapping through a shell command.
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Jun 20, 2011
1 parent a413b8e commit 23b1c30
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
61 changes: 61 additions & 0 deletions core/src/main/scala/spark/PipedRDD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package spark

import java.io.PrintWriter
import java.util.StringTokenizer

import scala.collection.mutable.ArrayBuffer
import scala.io.Source


/**
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
class PipedRDD[T: ClassManifest](parent: RDD[T], command: Seq[String])
extends RDD[String](parent.context) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(parent: RDD[T], command: String) = this(parent, PipedRDD.tokenize(command))

override def splits = parent.splits

override val dependencies = List(new OneToOneDependency(parent))

override def compute(split: Split): Iterator[String] = {
val proc = Runtime.getRuntime.exec(command.toArray)
val env = SparkEnv.get

// Start a thread to print the process's stderr to ours
new Thread("stderr reader for " + command) {
override def run() {
for(line <- Source.fromInputStream(proc.getErrorStream).getLines)
System.err.println(line)
}
}.start()

// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + command) {
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
for(elem <- parent.iterator(split))
out.println(elem)
out.close()
}
}.start()

// Return an iterator that read lines from the process's stdout
Source.fromInputStream(proc.getInputStream).getLines
}
}

object PipedRDD {
// Split a string into words using a standard StringTokenizer
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
val tok = new StringTokenizer(command)
while(tok.hasMoreElements)
buf += tok.nextToken()
buf
}
}
6 changes: 6 additions & 0 deletions core/src/main/scala/spark/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
groupBy[K](func, sc.numCores)

def pipe(command: String): RDD[String] =
new PipedRDD(this, command)

def pipe(command: Seq[String]): RDD[String] =
new PipedRDD(this, command)

// Parallel operations

def foreach(f: T => Unit) {
Expand Down

0 comments on commit 23b1c30

Please sign in to comment.