Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4795][Core] Redesign the "primitive type => Writable" implicit APIs to make them be activated automatically #3642

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 74 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1675,8 +1675,13 @@ object SparkContext extends Logging {
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
rdd: RDD[(K, V)]) = {
val kf = implicitly[K => Writable]
val vf = implicitly[V => Writable]
implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf)
implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf)
RDD.rddToSequenceFileRDDFunctions(rdd)
}

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
Expand All @@ -1693,23 +1698,38 @@ object SparkContext extends Logging {
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
RDD.numericRDDToDoubleRDDFunctions(rdd)

// Implicit conversions to common Writable types, for saveAsSequenceFile
// The following deprecated functions have already been moved to `object WritableFactory` to
// make the compiler find them automatically. They are still kept here for backward compatibility.

implicit def intToIntWritable(i: Int) = new IntWritable(i)
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def intToIntWritable(i: Int) = new IntWritable(i)

implicit def longToLongWritable(l: Long) = new LongWritable(l)
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def longToLongWritable(l: Long) = new LongWritable(l)

implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def floatToFloatWritable(f: Float) = new FloatWritable(f)

implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)

implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)

implicit def bytesToBytesWritable (aob: Array[Byte]) = new BytesWritable(aob)
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def bytesToBytesWritable (aob: Array[Byte]) = new BytesWritable(aob)

implicit def stringToText(s: String) = new Text(s)
@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def stringToText(s: String) = new Text(s)

private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
private def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
: ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u

Expand Down Expand Up @@ -1996,7 +2016,7 @@ object WritableConverter {
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}

// The following implicit functions were in SparkContext before 1.2 and users had to
// The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
Expand Down Expand Up @@ -2029,3 +2049,46 @@ object WritableConverter {
implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}

/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
* The Writable class will be used in `SequenceFileRDDFunctions`.
*/
private[spark] class WritableFactory[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: T => Writable) extends Serializable

object WritableFactory {

private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W)
: WritableFactory[T] = {
val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]]
new WritableFactory[T](_ => writableClass, convert)
}

implicit def intWritableFactory: WritableFactory[Int] =
simpleWritableFactory(new IntWritable(_))

implicit def longWritableFactory: WritableFactory[Long] =
simpleWritableFactory(new LongWritable(_))

implicit def floatWritableFactory: WritableFactory[Float] =
simpleWritableFactory(new FloatWritable(_))

implicit def doubleWritableFactory: WritableFactory[Double] =
simpleWritableFactory(new DoubleWritable(_))

implicit def booleanWritableFactory: WritableFactory[Boolean] =
simpleWritableFactory(new BooleanWritable(_))

implicit def bytesWritableFactory: WritableFactory[Array[Byte]] =
simpleWritableFactory(new BytesWritable(_))

implicit def stringWritableFactory: WritableFactory[String] =
simpleWritableFactory(new Text(_))

implicit def writableWritableFactory[T <: Writable: ClassTag]: WritableFactory[T] =
simpleWritableFactory(w => w)

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ private[python] class WritableToDoubleArrayConverter extends Converter[Any, Arra
* given directory (probably a temp directory)
*/
object WriteInputFormatTestDataGenerator {
import SparkContext._

def main(args: Array[String]) {
val path = args(0)
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ package org.apache
* contains operations available only on RDDs of Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can
* be saved as SequenceFiles. These operations are automatically available on any RDD of the right
* type (e.g. RDD[(Int, Int)] through implicit conversions except `saveAsSequenceFile`. You need to
* `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
* type (e.g. RDD[(Int, Int)] through implicit conversions.
*
* Java programmers should reference the [[org.apache.spark.api.java]] package
* for Spark programming APIs in Java.
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text}
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.spark._
Expand Down Expand Up @@ -57,8 +54,7 @@ import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, Bernoulli
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
* through implicit conversions except `saveAsSequenceFile`. You need to
* `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
Expand Down Expand Up @@ -1407,7 +1403,7 @@ abstract class RDD[T: ClassTag](
*/
object RDD {

// The following implicit functions were in SparkContext before 1.2 and users had to
// The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
Expand All @@ -1421,9 +1417,15 @@ object RDD {
new AsyncRDDActions(rdd)
}

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
new SequenceFileRDDFunctions(rdd)
implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V],
keyWritableFactory: WritableFactory[K],
valueWritableFactory: WritableFactory[V])
: SequenceFileRDDFunctions[K, V] = {
implicit val keyConverter = keyWritableFactory.convert
implicit val valueConverter = valueWritableFactory.convert
new SequenceFileRDDFunctions(rdd,
keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt))
}

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,33 @@ import org.apache.spark.Logging
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
*
* Import `org.apache.spark.SparkContext._` at the top of their program to use these functions.
*/
class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
self: RDD[(K, V)])
self: RDD[(K, V)],
_keyWritableClass: Class[_ <: Writable],
_valueWritableClass: Class[_ <: Writable])
extends Logging
with Serializable {

@deprecated("It's used to provide backward compatibility for pre 1.3.0.", "1.3.0")
def this(self: RDD[(K, V)]) {
this(self, null, null)
}

private val keyWritableClass =
if (_keyWritableClass == null) {
getWritableClass[K]()
} else {
_keyWritableClass
}

private val valueWritableClass =
if (_valueWritableClass == null) {
getWritableClass[V]()
} else {
_valueWritableClass
}

private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
Expand All @@ -55,6 +75,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
c.asInstanceOf[Class[_ <: Writable]]
}


/**
* Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
* and value types. If the key or value are Writable, then we use their classes directly;
Expand All @@ -65,26 +86,24 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
def anyToWritable[U <% Writable](u: U): Writable = u

val keyClass = getWritableClass[K]
val valueClass = getWritableClass[V]
val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass)
val convertKey = self.keyClass != keyWritableClass
val convertValue = self.valueClass != valueWritableClass

logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
valueClass.getSimpleName + ")" )
logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
valueWritableClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
}
}
}
1 change: 0 additions & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{NewHadoopRDD, HadoopRDD}
import org.apache.spark.util.Utils

Expand Down
14 changes: 11 additions & 3 deletions core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,21 @@ class ImplicitSuite {
}

def testRddToSequenceFileRDDFunctions(): Unit = {
// TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions.
// That will be a breaking change.
import org.apache.spark.SparkContext.intToIntWritable
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.saveAsSequenceFile("/a/test/path")
}

def testRddToSequenceFileRDDFunctionsWithWritable(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)]
= mockRDD
rdd.saveAsSequenceFile("/a/test/path")
}

def testRddToSequenceFileRDDFunctionsWithBytesArray(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(Int, Array[Byte])] = mockRDD
rdd.saveAsSequenceFile("/a/test/path")
}

def testRddToOrderedRDDFunctions(): Unit = {
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
rdd.sortByKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ShortestPathsSuite extends FunSuite with LocalSparkContext {
val graph = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 4).map(_.toLong)
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
case (v, spMap) => (v, spMap.mapValues(_.get))
case (v, spMap) => (v, spMap.mapValues(i => i))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of breaking source compatibility, although it's used the implicit intToIntWritable occasionally. _.get => intToIntWritable(_).get.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can keep xxxToXxxWritable still implicit for the source compatibility?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think you can keep that and have the new one because the compiler will complain

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any example? I added implicit to them and compiled codes successfully.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they are not ambiguous, I'd add the implicits back to make sure we never break.

@ankurdave - why is there a get in this test case? Is the get just redundant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does look like the get is redundant, since Int should be sufficient for this purpose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they are not ambiguous, I'd add the implicits back to make sure we never break.

I added them back.

}
assert(results.toSet === shortestPaths)
}
Expand Down