Skip to content

Commit

Permalink
[SPARK-15826][CORE] PipedRDD to allow configurable char encoding
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Link to jira which describes the problem: https://issues.apache.org/jira/browse/SPARK-15826

The fix in this PR is to allow users specify encoding in the pipe() operation. For backward compatibility,
keeping the default value to be system default.

## How was this patch tested?

Ran existing unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes apache#13563 from tejasapatil/pipedrdd_utf8.
  • Loading branch information
tejasapatil authored and zsxwing committed Jun 15, 2016
1 parent 9b234b5 commit 279bd4a
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Expand Up @@ -284,6 +284,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
}

/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String],
env: JMap[String, String],
separateWorkingDir: Boolean,
bufferSize: Int,
encoding: String): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding)
}

/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
Expand Down
22 changes: 5 additions & 17 deletions core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Expand Up @@ -47,22 +47,10 @@ private[spark] class PipedRDD[T: ClassTag](
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit,
separateWorkingDir: Boolean,
bufferSize: Int)
bufferSize: Int,
encoding: String)
extends RDD[String](prev) {

// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(
prev: RDD[T],
command: String,
envVars: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
separateWorkingDir, 8192)


override def getPartitions: Array[Partition] = firstParent[T].partitions

/**
Expand Down Expand Up @@ -129,7 +117,7 @@ private[spark] class PipedRDD[T: ClassTag](
override def run(): Unit = {
val err = proc.getErrorStream
try {
for (line <- Source.fromInputStream(err).getLines) {
for (line <- Source.fromInputStream(err)(encoding).getLines) {
// scalastyle:off println
System.err.println(line)
// scalastyle:on println
Expand All @@ -147,7 +135,7 @@ private[spark] class PipedRDD[T: ClassTag](
override def run(): Unit = {
TaskContext.setTaskContext(context)
val out = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(proc.getOutputStream), bufferSize))
new OutputStreamWriter(proc.getOutputStream, encoding), bufferSize))
try {
// scalastyle:off println
// input the pipe context firstly
Expand All @@ -171,7 +159,7 @@ private[spark] class PipedRDD[T: ClassTag](
}.start()

// Return an iterator that read lines from the process's stdout
val lines = Source.fromInputStream(proc.getInputStream).getLines()
val lines = Source.fromInputStream(proc.getInputStream)(encoding).getLines
new Iterator[String] {
def next(): String = {
if (!hasNext()) {
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -21,6 +21,7 @@ import java.util.Random

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.io.Codec
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}

Expand Down Expand Up @@ -698,14 +699,14 @@ abstract class RDD[T: ClassTag](
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): RDD[String] = withScope {
new PipedRDD(this, command)
pipe(command)
}

/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {
new PipedRDD(this, command, env)
pipe(command, env)
}

/**
Expand All @@ -726,6 +727,8 @@ abstract class RDD[T: ClassTag](
* for (e &lt;- record._2) {f(e)}
* @param separateWorkingDir Use separate working directories for each task.
* @param bufferSize Buffer size for the stdin writer for the piped process.
* @param encoding Char encoding used for interacting (via stdin, stdout and stderr) with
* the piped process
* @return the result RDD
*/
def pipe(
Expand All @@ -734,12 +737,14 @@ abstract class RDD[T: ClassTag](
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false,
bufferSize: Int = 8192): RDD[String] = withScope {
bufferSize: Int = 8192,
encoding: String = Codec.defaultCharsetCodec.name): RDD[String] = withScope {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
separateWorkingDir,
bufferSize)
bufferSize,
encoding)
}

/**
Expand Down
12 changes: 11 additions & 1 deletion core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.rdd
import java.io.File

import scala.collection.Map
import scala.io.Codec
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
Expand Down Expand Up @@ -207,7 +208,16 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
val pipedRdd =
new PipedRDD(
nums,
PipedRDD.tokenize("printenv " + varName),
Map(),
null,
null,
false,
4092,
Codec.defaultCharsetCodec.name)
val tContext = TaskContext.empty()
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
Expand Down

0 comments on commit 279bd4a

Please sign in to comment.