Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter #25342

Conversation

mccheah
Copy link
Contributor

@mccheah mccheah commented Aug 3, 2019

What changes were proposed in this pull request?

Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer.

How was this patch tested?

Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-28571][CORE]SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter [SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter Aug 3, 2019
@SparkQA
Copy link

SparkQA commented Aug 3, 2019

Test build #108591 has finished for PR 25342 at commit 2f9b4ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@dongjoon-hyun
Copy link
Member

cc @jerryshao

@SparkQA
Copy link

SparkQA commented Aug 5, 2019

Test build #108635 has finished for PR 25342 at commit 2f9b4ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mccheah
Copy link
Contributor Author

mccheah commented Aug 5, 2019

@squito @yifeih also

*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
* TODO remove this, as this is only used by UnsafeRowSerializerSuite in the SQL project.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Could you file a JIRA and make this IDed TODO, please?

* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
def writePartitionedMapOutput(
shuffleId: Int, mapId: Int, mapOutputWriter: ShuffleMapOutputWriter): Array[Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: multi-line arg style

}

override def close(): Unit = {
if (isOpen) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but if there's an error in open() (e.g. when initializing wrappedStream) this will leave the underlying partitionStream opened.

Maybe this flag isn't needed and you can just check whether the fields are initialized?

Copy link

@whatlulumomo whatlulumomo Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worry is unnecessary because wrappedStream and objOut would must be initialized successfully if partitionStream is opened as OutputStream without exception.
And I think flag isOpen makes code easier to understand.

Copy link
Contributor

@vanzin vanzin Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because wrappedStream and objOut would must be initialized successfully

That's not necessarily a valid assumption. Compression codecs, e.g., may throw exceptions if the file is corrupt.

@gcz2022
Copy link

gcz2022 commented Aug 8, 2019

Will spill be supported in the series of PRs? @mccheah

curNumBytesWritten = numBytesWritten
}

private class CloseShieldOutputStream(delegate: OutputStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the usage of this class. Sorry I can only see the definition here.

@squito
Copy link
Contributor

squito commented Aug 8, 2019

@gczsjdy

Will spill be supported in the series of PRs?

No, spill is still to local disk. trying to generalize local spills was explicitly out of scope for now.

@gcz2022
Copy link

gcz2022 commented Aug 12, 2019

Thanks @squito


package org.apache.spark.util.collection

private[spark] trait PairsWriter {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: nit add docs where can this be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also done

* A key-value writer inspired by {@link DiskBlockObjectWriter} that pushes the bytes to an
* arbitrary partition writer instead of writing to local disk through the block manager.
*/
private[spark] class ShufflePartitionPairsWriter(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should instead be in o.a.s.s package?

@@ -46,7 +47,8 @@ private[spark] class DiskBlockObjectWriter(
writeMetrics: ShuffleWriteMetricsReporter,
val blockId: BlockId = null)
extends OutputStream
with Logging {
with Logging
with PairsWriter {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:nit add override to one function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this should be done now

Copy link

@whatlulumomo whatlulumomo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good job

@mccheah
Copy link
Contributor Author

mccheah commented Aug 17, 2019

Addressed comments.

@SparkQA
Copy link

SparkQA commented Aug 17, 2019

Test build #109240 has finished for PR 25342 at commit e99274f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mccheah
Copy link
Contributor Author

mccheah commented Aug 22, 2019

@vanzin @squito or @dongjoon-hyun - is this good to merge?

@SparkQA
Copy link

SparkQA commented Aug 27, 2019

Test build #109760 has finished for PR 25342 at commit 132bba9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mccheah
Copy link
Contributor Author

mccheah commented Aug 27, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Aug 27, 2019

Test build #109766 has finished for PR 25342 at commit 132bba9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other than marcelo's comment, looks good

*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
* TODO(SPARK-28764): remove this, as this is only used by UnsafeRowSerializerSuite in the SQL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't that test just call sorter.writePartitionedMapOutput(..., new LocalDiskMapOutputWriter(...)) ? Anyway fine to leave it for the follow up jira.

presentPrev
}).orElse(Some(e))
}
resolvedException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of anything wrong here, but seems safer to be using finally. kind of a stretch, but if some (badly implemented) stream throws a RuntimeException instead of an IOException you wouldn't clean up properly this way. The nesting gets a bit ugly, but you could do this:

def closeIfNonNull[T <: Closeable](x: T): T = {
  if (x != null) x.close()
  null.asInstanceOf[T]
}
Utils.tryWithSafeFinally {
  objOut = closeIfNonNull(objOut)
} {
  // normally closing objOut would close the inner streams as well, but just in case there was
  // an error in initialization etc. we make sure we clean the other streams up too
  Utils.tryWithSafeFinally {
    wrappedStream = closeIfNonNull(wrappedStream)
  } {
    partitionStream = closeIfNonNull(partitionStream)
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also prefer Imran's approach. I'm just a tiny bit worried about bad stream implementations that don't have an idempotent close(), since both your code and Imran's are calling it multiple times on certain streams.

Probably ok not to deal with that though.

override def write(key: Any, value: Any): Unit = {
if (!isOpen) {
open()
isOpen = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking of being nitpicky about error handling, this flag has weird semantics. If you call write and it fails to initialize the streams, and then you call write again, you'll potentially dereference still open streams.

val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId, context.taskAttemptId(), dep.partitioner.numPartitions)
val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
mapOutputWriter.commitAllPartitions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking that you don't need any changes here? Given your other change that made commitAllPartitions return the partition lengths.

@SparkQA
Copy link

SparkQA commented Aug 27, 2019

Test build #109825 has finished for PR 25342 at commit eec363c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

null.asInstanceOf[T]
}

private def tryCloseOrAddSuppressed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used anymore.

partitionStream = closeIfNonNull(partitionStream)
}
}
isOpen = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last comment about error handling. I'll just quote the AutoCloseable documentation instead:

It is strongly advised to relinquish the underlying resources and to internally
mark the resource as closed, prior to throwing the exception. 

Meaning, track whether you've closed the object, not whether it's opened. (isOpen can be replaced with objOut != null.) Then in close() do nothing if the stream has already been closed.

partitionPairsWriter.write(elem._1, elem._2)
}
}
var threwException = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shadow variable. But I wonder if tryWithSafeFinally isn't better here (and in the
"mirror" block above for the no-spill case).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this mirrors how UnsafeShuffleWriter and BypassMergeSortShuffleWriter approaches these cases - but those are written in Java so it's harder to use tryWithSafeFinally from there.

@SparkQA
Copy link

SparkQA commented Aug 27, 2019

Test build #109829 has finished for PR 25342 at commit 84cfc29.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 28, 2019

Test build #109835 has finished for PR 25342 at commit 2451185.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 28, 2019

Test build #109840 has finished for PR 25342 at commit 5ba53bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok with a minor thing. Will wait a bit to see if others have any comments.

if (partitionPairsWriter != null) {
partitionPairsWriter.close()
}
if (partitionWriter != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code? Or missing code?

@SparkQA
Copy link

SparkQA commented Aug 28, 2019

Test build #109884 has finished for PR 25342 at commit d483157.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Aug 30, 2019

Alright, no more comments, so merging to master.

@vanzin vanzin closed this in ea90ea6 Aug 30, 2019
mccheah added a commit to palantir/spark that referenced this pull request Sep 11, 2019
…rtShuffleWriter

Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer.

Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions.

Closes apache#25342 from mccheah/shuffle-writer-refactor-sort-shuffle-writer.

Lead-authored-by: mcheah <mcheah@palantir.com>
Co-authored-by: mccheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
mccheah added a commit to palantir/spark that referenced this pull request Sep 13, 2019
* Bring implementation into closer alignment with upstream.

Step to ease merge conflict resolution and build failure problems when we pull in changes from upstream.

* Cherry-pick BypassMergeSortShuffleWriter changes and shuffle writer API changes

* [SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice

The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API.

Existing unit tests.

Closes apache#25341 from mccheah/dont-redundantly-store-part-lengths.

Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter

Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer.

Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions.

Closes apache#25342 from mccheah/shuffle-writer-refactor-sort-shuffle-writer.

Lead-authored-by: mcheah <mcheah@palantir.com>
Co-authored-by: mccheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API.

* Resolve build issues and remaining semantic conflicts

* More build fixes

* More build fixes

* Attempt to fix build

* More build fixes

* [SPARK-29072] Put back usage of TimeTrackingOutputStream for UnsafeShuffleWriter and ShufflePartitionPairsWriter.

* Address comments

* Import ordering

* Fix stream reference
@@ -157,7 +157,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
new SortShuffleWriter(
shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffleBlockResolver is not needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants