New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4397][Core] Reorganize 'implicit's to improve the API convenience #3262
Conversation
/cc @rxin |
Test build #23354 has started for PR 3262 at commit
|
Test build #23354 has finished for PR 3262 at commit
|
Test FAILed. |
Test build #23356 has started for PR 3262 at commit
|
|
||
def testRddToPairRDDFunctions(): Unit = { | ||
val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD | ||
rdd.groupByKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u add parentheses to groupByKey?
Test build #23358 has started for PR 3262 at commit
|
What's the distinction for intToIntWritable/writableConverters? |
Test build #23356 has finished for PR 3262 at commit
|
Test PASSed. |
Test build #23358 has finished for PR 3262 at commit
|
Test PASSed. |
Test build #23367 has started for PR 3262 at commit
|
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object ImplicitBackforwardCompatibilityApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ImplicitBackforwardCompatibilityApp")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 100).map(i => (i, i))
val rdd2 = rdd.groupByKey() // rddToPairRDDFunctions
val rdd3 = rdd2.sortByKey() // rddToOrderedRDDFunctions
val s1 = rdd3.map(_._1).stats() // numericRDDToDoubleRDDFunctions
println(s1)
val s2 = rdd3.map(_._1.toDouble).stats() // doubleRDDToDoubleRDDFunctions
println(s2)
val f = rdd2.countAsync() // rddToAsyncRDDActions
println(f.get())
rdd2.map { case (k, v) => (k, v.size)} saveAsSequenceFile ("/tmp/implicit_test_path") // rddToSequenceFileRDDFunctions
val a1 = sc.accumulator(123.4) // DoubleAccumulatorParam
a1.add(1.0)
println(a1.value)
val a2 = sc.accumulator(123) // IntAccumulatorParam
a2.add(3)
println(a2.value)
val a3 = sc.accumulator(123L) // LongAccumulatorParam
a3.add(11L)
println(a3.value)
val a4 = sc.accumulator(123F) // FloatAccumulatorParam
a4.add(1.1F)
println(a4.value)
{
sc.parallelize(1 to 10).map(i => (i, i)).saveAsSequenceFile("/tmp/implicit_test_int")
val r = sc.sequenceFile[Int, Int]("/tmp/implicit_test_int")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toLong, i.toLong)).saveAsSequenceFile("/tmp/implicit_test_long")
val r = sc.sequenceFile[Long, Long]("/tmp/implicit_test_long")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toDouble, i.toDouble)).saveAsSequenceFile("/tmp/implicit_test_double")
val r = sc.sequenceFile[Double, Double]("/tmp/implicit_test_double")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toFloat, i.toFloat)).saveAsSequenceFile("/tmp/implicit_test_float")
val r = sc.sequenceFile[Float, Float]("/tmp/implicit_test_float")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toString, i.toString)).saveAsSequenceFile("/tmp/implicit_test_string")
val r = sc.sequenceFile[String, String]("/tmp/implicit_test_string")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (true, false)).saveAsSequenceFile("/tmp/implicit_test_boolean")
val r = sc.sequenceFile[Boolean, Boolean]("/tmp/implicit_test_boolean")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (Array(i.toByte), Array(i.toByte))).saveAsSequenceFile("/tmp/implicit_test_bytes")
val r = sc.sequenceFile[Array[Byte], Array[Byte]]("/tmp/implicit_test_bytes")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toString, i.toString)).saveAsSequenceFile("/tmp/implicit_test_writable")
val r = sc.sequenceFile[org.apache.hadoop.io.Text, org.apache.hadoop.io.Text]("/tmp/implicit_test_writable")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
sc.stop()
}
}
I compiled the above codes with Spark 1.1.0 and ran it with the new Spark compiled from this PR. And it works correctly. For
Ref: http://eed3si9n.com/revisiting-implicits-without-import-tax A possible solution is creating a new class for class SequenceFileRDDFunctions[K, V](
self: RDD[(K, V)])(implicit keyConverter: NewWritableConverter[K], valueConverter: NewWritableConverter[V]) However, since it's a breaking change (of cause, we can also add a new SequenceFileRDDFunctions class to avoid breaking the old codes), I don't think it's worth us to change it. |
Test build #23367 has finished for PR 3262 at commit
|
Test PASSed. |
def addInPlace(t1: Float, t2: Float) = t1 + t2 | ||
def zero(initialValue: Float) = 0f | ||
} | ||
|
||
// TODO: Add AccumulatorParams for other types, e.g. lists and strings | ||
|
||
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) | ||
@deprecated("An API for backforward compatibility", "1.2.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update these accordingly too
@zsxwing just curious, with the old conversions being deprecated, is there any chance they'll create compiler warnings in common uses of the code? In any case this seems pretty cool if it doesn't actually break binary compatibility. I guess one risk is if it adds new implicits that cause something to compile differently, but it seems unlikely from a first glance. |
Ok I finally went through the code. I like the change and it is pretty clever. I believe it should preserve both source compatibility and binary compatibility. To summarize, the changes are:
It is still a tricky change so it'd be great to get more eyes. |
Test build #23425 has started for PR 3262 at commit
|
@rxin Thank you for the great summary and reviewing. Already updated it accordingly. |
@heathermiller @gzm0 - do you think this pr is good for merge now? |
|
||
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) | ||
@deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these comments are outdated (still refer to package object, but should refer to RDD
companion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. Fixed it.
Otherwise LGTM |
LGTM |
Test build #23716 has started for PR 3262 at commit
|
Test build #23716 has finished for PR 3262 at commit
|
Test PASSed. |
I'm merging this in master. Thanks for working on this @zsxwing and everybody else for reviewing. |
Yeah merging to master sounds fine; it's too late to put it in 1.2. |
Thanks for the patch @zsxwing, this is very cool. |
We reverted #3459 in branch-1.2 due to missing `import o.a.s.SparkContext._`, which is no longer needed in master (#3262). This PR adds #3459 back to branch-1.2 with correct imports. Github is out-of-sync now. The real changes are the last two commits. Author: Xiangrui Meng <meng@databricks.com> Closes #3473 from mengxr/SPARK-4604-1.2 and squashes the following commits: a7638a5 [Xiangrui Meng] add import o.a.s.SparkContext._ for v1.2 b749000 [Xiangrui Meng] [SPARK-4604][MLLIB] make MatrixFactorizationModel public
…+ doc updates We reverted #3439 in branch-1.2 due to missing `import o.a.s.SparkContext._`, which is no longer needed in master (#3262). This PR adds #3439 back to branch-1.2 with correct imports. Github is out-of-sync now. The real changes are the last two commits. Author: Joseph K. Bradley <joseph@databricks.com> Author: Xiangrui Meng <meng@databricks.com> Closes #3474 from mengxr/SPARK-4583-1.2 and squashes the following commits: aca2abb [Xiangrui Meng] add import o.a.s.SparkContext._ for v1.2 6b5564a [Joseph K. Bradley] [SPARK-4583] [mllib] LogLoss for GradientBoostedTrees fix + doc updates
This PR cleans up `import SparkContext._` in core for SPARK-4397(#3262) to prove it really works well. Author: zsxwing <zsxwing@gmail.com> Closes #3530 from zsxwing/SPARK-4397-cleanup and squashes the following commits: 04e2273 [zsxwing] Cleanup 'import SparkContext._' in core
As #3262 wasn't merged to branch 1.2, the `since` value of `deprecated` should be '1.3.0'. Author: zsxwing <zsxwing@gmail.com> Closes #3573 from zsxwing/SPARK-4397-version and squashes the following commits: 1daa03c [zsxwing] Change the 'since' value to '1.3.0'
This PR moved `implicit`s to `package object` and `companion object` to enable the Scala compiler search them automatically without explicit importing. It should not break any API. A test project for backforward compatibility is [here](https://github.com/zsxwing/SPARK-4397-Backforward-Compatibility). It proves the codes compiled with Spark 1.1.0 can run with this PR. To summarize, the changes are: * Deprecated the old implicit conversion functions: this preserves binary compatibility for code compiled against earlier versions of Spark. * Removed "implicit" from them so they are just normal functions: this made sure the compiler doesn't get confused and warn about multiple implicits in scope. * Created new implicit functions in package rdd object, which is part of the scope that scalac will search when looking for implicit conversions on various RDD objects. The disadvantage is there are duplicated codes in SparkContext for backforward compatibility. Author: zsxwing <zsxwing@gmail.com> Closes apache#3262 from zsxwing/SPARK-4397 and squashes the following commits: fc30314 [zsxwing] Update the comments 9c27aff [zsxwing] Move implicit functions to object RDD and forward old functions to new implicit ones directly 2b5f5a4 [zsxwing] Comments for the deprecated functions 52353de [zsxwing] Remove private[spark] from object WritableConverter 34641d4 [zsxwing] Move ImplicitSuite to org.apache.sparktest 7266218 [zsxwing] Add comments to warn the duplicate codes in SparkContext 185c12f [zsxwing] Remove simpleWritableConverter from SparkContext 3bdcae2 [zsxwing] Move WritableConverter implicits to object WritableConverter 9b73188 [zsxwing] Fix the code style issue 3ac4f07 [zsxwing] Add license header 1eda9e4 [zsxwing] Reorganize 'implicit's to improve the API convenience
This PR moved
implicit
s topackage object
andcompanion object
to enable the Scala compiler search them automatically without explicit importing.It should not break any API. A test project for backforward compatibility is here. It proves the codes compiled with Spark 1.1.0 can run with this PR.
To summarize, the changes are:
The disadvantage is there are duplicated codes in SparkContext for backforward compatibility.