Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14540][Core] Fix remaining major issues for Scala 2.12 Support #21930

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Expand Up @@ -123,7 +123,10 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change this? I don't think it is a problem binary compatibility wise, but it seems a but weird since we don't use the result of the function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to work around scala/bug#11016 right? I'd prefer any solution that doesn't involve changing all the callers, but looks like both workarounds require something to be done. At least I'd document the purpose of U here.

That said, user code can call this right? And it would have to implement a similar change to work with 2.12? that's probably OK in the sense that any user app must make several changes to be compatible with 2.12.

I don't think 2.11 users would find there is a change to the binary API. Would a 2.11 user need to change its calls to specify a type for U with this change? because it looks like it's not optional, given that Spark code has to change its calls. Is that not a source incompatibility?

If it's not, then, I guess I wonder if you can avoid changing all the calls in Spark?

Copy link
Contributor Author

@skonto skonto Jul 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this covers that bug. So if you build with 2.11 you dont need to specify the type Unit (I tried that) when you make the call since there is no ambiguity, compiler does not face an overloading issue. So at the source level there shouldnt be an issue. Binary compatibility is also described in the doc.
With 2.12 both addTaskCompletionListener methods end up to be SAM types and the Unit adaption causes this issue. I am not sure we can do anything more here and this is specific to 2.12, otherwise you get compilation errors for that version. @retronym or @lrytz may add more context. I certainly should document this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, if it's binary- and source-compatible with existing user programs for 2.11 users, that's fine. Bets are off for 2.12 users anyway.

When the release notes are crafted for 2.4, we'll want to mention this JIRA (I'll tag it) and issues like this.

// Note that due to this scala bug: https://github.com/scala/bug/issues/11016, we need to make
// this function polymorphic for every scala version >= 2.12, otherwise an overloaded method
// resolution error occurs at compile time.
addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
})
Expand Down
Expand Up @@ -94,7 +94,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
// Start a thread to feed the process input from our parent's iterator
val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context)

context.addTaskCompletionListener { _ =>
context.addTaskCompletionListener[Unit] { _ =>
writerThread.shutdownOnTaskCompletion()
if (!reuseWorker || !released.get) {
try {
Expand Down
Expand Up @@ -262,7 +262,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
val blockManager = SparkEnv.get.blockManager
Option(TaskContext.get()) match {
case Some(taskContext) =>
taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId))
taskContext.addTaskCompletionListener[Unit](_ => blockManager.releaseLock(blockId))
case None =>
// This should only happen on the driver, where broadcast variables may be accessed
// outside of running tasks (e.g. when computing rdd.partitions()). In order to allow
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Expand Up @@ -278,7 +278,7 @@ class HadoopRDD[K, V](
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener { context =>
context.addTaskCompletionListener[Unit] { context =>
// Update the bytes read before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Expand Up @@ -77,7 +77,7 @@ class JdbcRDD[T: ClassTag](

override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
{
context.addTaskCompletionListener{ context => closeIfNeeded() }
context.addTaskCompletionListener[Unit]{ context => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
Expand Down
Expand Up @@ -214,7 +214,7 @@ class NewHadoopRDD[K, V](
}

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener { context =>
context.addTaskCompletionListener[Unit] { context =>
// Update the bytesRead before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
Expand Down
Expand Up @@ -300,7 +300,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val deserializeStream = serializer.deserializeStream(fileInputStream)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => deserializeStream.close())
context.addTaskCompletionListener[Unit](context => deserializeStream.close())

deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}
Expand Down
Expand Up @@ -104,7 +104,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener(_ => {
context.addTaskCompletionListener[Unit](_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
Expand Down
Expand Up @@ -346,7 +346,7 @@ final class ShuffleBlockFetcherIterator(

private[this] def initialize(): Unit = {
// Add a task completion callback (called in both success case and failure case) to cleanup.
context.addTaskCompletionListener(_ => cleanup())
context.addTaskCompletionListener[Unit](_ => cleanup())

// Split local and remote blocks.
val remoteRequests = splitLocalRemoteBlocks()
Expand Down
Expand Up @@ -827,7 +827,7 @@ private[storage] class PartiallySerializedBlock[T](
// completion listener here in order to ensure that `unrolled.dispose()` is called at least once.
// The dispose() method is idempotent, so it's safe to call it unconditionally.
Option(TaskContext.get()).foreach { taskContext =>
taskContext.addTaskCompletionListener { _ =>
taskContext.addTaskCompletionListener[Unit] { _ =>
// When a task completes, its unroll memory will automatically be freed. Thus we do not call
// releaseUnrollMemoryForThisTask() here because we want to avoid double-freeing.
unrolledBuffer.dispose()
Expand Down