Skip to content

Commit

Permalink
[SPARK-4795][Core] Redesign the "primitive type => Writable" implicit…
Browse files Browse the repository at this point in the history
… APIs to make them be activated automatically

Try to redesign the "primitive type => Writable" implicit APIs to make them be activated automatically and without breaking binary compatibility.

However, this PR will breaking the source compatibility if people use `xxxToXxxWritable` occasionally. See the unit test in `graphx`.

Author: zsxwing <zsxwing@gmail.com>

Closes #3642 from zsxwing/SPARK-4795 and squashes the following commits:

914b2d6 [zsxwing] Add implicit back to the Writables methods
0b9017f [zsxwing] Add some docs
a0e8509 [zsxwing] Merge branch 'master' into SPARK-4795
39343de [zsxwing] Fix the unit test
64853af [zsxwing] Reorganize the rest 'implicit' methods in SparkContext
  • Loading branch information
zsxwing authored and rxin committed Feb 4, 2015
1 parent 1077f2e commit d37978d
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 33 deletions.
70 changes: 67 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1749,8 +1749,14 @@ object SparkContext extends Logging {
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
rdd: RDD[(K, V)]) = {
val kf = implicitly[K => Writable]
val vf = implicitly[V => Writable]
// Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf)
implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf)
RDD.rddToSequenceFileRDDFunctions(rdd)
}

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
Expand All @@ -1767,20 +1773,35 @@ object SparkContext extends Logging {
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
RDD.numericRDDToDoubleRDDFunctions(rdd)

// Implicit conversions to common Writable types, for saveAsSequenceFile
// The following deprecated functions have already been moved to `object WritableFactory` to
// make the compiler find them automatically. They are still kept here for backward compatibility.

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def stringToText(s: String): Text = new Text(s)

private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
Expand Down Expand Up @@ -2070,7 +2091,7 @@ object WritableConverter {
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}

// The following implicit functions were in SparkContext before 1.2 and users had to
// The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
Expand Down Expand Up @@ -2103,3 +2124,46 @@ object WritableConverter {
implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}

/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
* The Writable class will be used in `SequenceFileRDDFunctions`.
*/
private[spark] class WritableFactory[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: T => Writable) extends Serializable

object WritableFactory {

private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W)
: WritableFactory[T] = {
val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]]
new WritableFactory[T](_ => writableClass, convert)
}

implicit def intWritableFactory: WritableFactory[Int] =
simpleWritableFactory(new IntWritable(_))

implicit def longWritableFactory: WritableFactory[Long] =
simpleWritableFactory(new LongWritable(_))

implicit def floatWritableFactory: WritableFactory[Float] =
simpleWritableFactory(new FloatWritable(_))

implicit def doubleWritableFactory: WritableFactory[Double] =
simpleWritableFactory(new DoubleWritable(_))

implicit def booleanWritableFactory: WritableFactory[Boolean] =
simpleWritableFactory(new BooleanWritable(_))

implicit def bytesWritableFactory: WritableFactory[Array[Byte]] =
simpleWritableFactory(new BytesWritable(_))

implicit def stringWritableFactory: WritableFactory[String] =
simpleWritableFactory(new Text(_))

implicit def writableWritableFactory[T <: Writable: ClassTag]: WritableFactory[T] =
simpleWritableFactory(w => w)

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ private[python] class WritableToDoubleArrayConverter extends Converter[Any, Arra
* given directory (probably a temp directory)
*/
object WriteInputFormatTestDataGenerator {
import SparkContext._

def main(args: Array[String]) {
val path = args(0)
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ package org.apache
* contains operations available only on RDDs of Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can
* be saved as SequenceFiles. These operations are automatically available on any RDD of the right
* type (e.g. RDD[(Int, Int)] through implicit conversions except `saveAsSequenceFile`. You need to
* `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
* type (e.g. RDD[(Int, Int)] through implicit conversions.
*
* Java programmers should reference the [[org.apache.spark.api.java]] package
* for Spark programming APIs in Java.
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text}
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.spark._
Expand Down Expand Up @@ -57,8 +54,7 @@ import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, Bernoulli
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
* through implicit conversions except `saveAsSequenceFile`. You need to
* `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
Expand Down Expand Up @@ -1527,7 +1523,7 @@ abstract class RDD[T: ClassTag](
*/
object RDD {

// The following implicit functions were in SparkContext before 1.2 and users had to
// The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
Expand All @@ -1541,9 +1537,15 @@ object RDD {
new AsyncRDDActions(rdd)
}

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
new SequenceFileRDDFunctions(rdd)
implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V],
keyWritableFactory: WritableFactory[K],
valueWritableFactory: WritableFactory[V])
: SequenceFileRDDFunctions[K, V] = {
implicit val keyConverter = keyWritableFactory.convert
implicit val valueConverter = valueWritableFactory.convert
new SequenceFileRDDFunctions(rdd,
keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt))
}

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,35 @@ import org.apache.spark.Logging
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
*
* Import `org.apache.spark.SparkContext._` at the top of their program to use these functions.
*/
class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
self: RDD[(K, V)])
self: RDD[(K, V)],
_keyWritableClass: Class[_ <: Writable],
_valueWritableClass: Class[_ <: Writable])
extends Logging
with Serializable {

@deprecated("It's used to provide backward compatibility for pre 1.3.0.", "1.3.0")
def this(self: RDD[(K, V)]) {
this(self, null, null)
}

private val keyWritableClass =
if (_keyWritableClass == null) {
// pre 1.3.0, we need to use Reflection to get the Writable class
getWritableClass[K]()
} else {
_keyWritableClass
}

private val valueWritableClass =
if (_valueWritableClass == null) {
// pre 1.3.0, we need to use Reflection to get the Writable class
getWritableClass[V]()
} else {
_valueWritableClass
}

private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
Expand All @@ -55,6 +77,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
c.asInstanceOf[Class[_ <: Writable]]
}


/**
* Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
* and value types. If the key or value are Writable, then we use their classes directly;
Expand All @@ -65,26 +88,28 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
def anyToWritable[U <% Writable](u: U): Writable = u

val keyClass = getWritableClass[K]
val valueClass = getWritableClass[V]
val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass)
// TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and
// valueWritableClass at the compile time. To implement that, we need to add type parameters to
// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a
// breaking change.
val convertKey = self.keyClass != keyWritableClass
val convertValue = self.valueClass != valueWritableClass

logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
valueClass.getSimpleName + ")" )
logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
valueWritableClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
}
}
}
1 change: 0 additions & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{NewHadoopRDD, HadoopRDD}
import org.apache.spark.util.Utils

Expand Down
14 changes: 11 additions & 3 deletions core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,21 @@ class ImplicitSuite {
}

def testRddToSequenceFileRDDFunctions(): Unit = {
// TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions.
// That will be a breaking change.
import org.apache.spark.SparkContext.intToIntWritable
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.saveAsSequenceFile("/a/test/path")
}

def testRddToSequenceFileRDDFunctionsWithWritable(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)]
= mockRDD
rdd.saveAsSequenceFile("/a/test/path")
}

def testRddToSequenceFileRDDFunctionsWithBytesArray(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(Int, Array[Byte])] = mockRDD
rdd.saveAsSequenceFile("/a/test/path")
}

def testRddToOrderedRDDFunctions(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.sortByKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ShortestPathsSuite extends FunSuite with LocalSparkContext {
val graph = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 4).map(_.toLong)
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
case (v, spMap) => (v, spMap.mapValues(_.get))
case (v, spMap) => (v, spMap.mapValues(i => i))
}
assert(results.toSet === shortestPaths)
}
Expand Down

0 comments on commit d37978d

Please sign in to comment.