Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24296][CORE] Replicate large blocks as a stream. #21451

Closed
wants to merge 6 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented May 29, 2018

When replicating large cached RDD blocks, it can be helpful to replicate
them as a stream, to avoid using large amounts of memory during the
transfer. This also allows blocks larger than 2GB to be replicated.

Added unit tests in DistributedSuite. Also ran tests on a cluster for
blocks > 2gb.

@squito
Copy link
Contributor Author

squito commented May 29, 2018

EDIT: no longer a WIP, as all the dependencies are in.

@SparkQA
Copy link

SparkQA commented May 29, 2018

Test build #91266 has finished for PR 21451 at commit 7e517e4.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class UploadBlockStream extends BlockTransferMessage

@SparkQA
Copy link

SparkQA commented May 29, 2018

Test build #91268 has finished for PR 21451 at commit 6ca6f8d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class UploadBlockStream extends BlockTransferMessage

@SparkQA
Copy link

SparkQA commented May 29, 2018

Test build #91271 has finished for PR 21451 at commit 68c5d5f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @param callback Callback which should be invoked exactly once upon success or failure of the
* RPC.
*/
public abstract void receive(
TransportClient client,
ByteBuffer message,
StreamData streamData,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to add a parameter. Change the message parameter to InputStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there are other ways to do this, but I wanted to leave the old code paths as close relatively untouched to minimize the behavior change / risk of bugs. I also think its helpful to clearly separate out a portion that is read entirely into memory vs. the streaming portion, it makes it easier to work with. Also InputStream suggests the data is getting pulled instead of pushed.

your earlier approach definitely gave a lot of inspiration for this change. I'm hoping that making it a more isolated change helps us make progress here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about incorporating parameter message into parameter streamData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna move discussion here #21346 since that is the PR that will introduce this api

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see any red flags but definitely would like another look after the other change goes in - but not sure I'll have time for that.

/**
* A request to Upload a block, which the destintation should receive as a stream.
*
* The actual block data is not contained here. It is in the streamData in the RpcHandler.receive()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update to match API.

* Put the given block that will be received as a stream.
*
* When this method is called, the data itself is not available -- it needs to be handled within
* the callbacks of <code>streamData</code>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update comment.

val message = BlockTransferMessage.Decoder.fromByteBuffer(messageHeader)
message match {
case uploadBlockStream: UploadBlockStream =>
val (level: StorageLevel, classTag: ClassTag[_]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is off here.

Using .asInstanceOf[UploadBlockStream] would achieve the same goal here with less indentation, just with a different exception...

// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than memory-mapping the file.
// Until then, replication can cause the process to use too much memory and get killed
// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's

@@ -723,7 +770,9 @@ private[spark] class BlockManager(
}

if (data != null) {
return Some(new ChunkedByteBuffer(data))
val chunkSize =
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to turn this into a config constant? I'm seeing it in a bunch of places.

@@ -1341,12 +1390,16 @@ private[spark] class BlockManager(
try {
val onePeerStartTime = System.nanoTime
logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
// This thread keeps a lock on the block, so we do not want the netty thread to unlock
// block when it finishes sending the message.
val mb = new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/mb/buffer

Confusing in a place that deals with sizes all over.

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92398 has finished for PR 21451 at commit 1cc0f3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class UploadBlockStream extends BlockTransferMessage

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92407 has finished for PR 21451 at commit fa1928a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92427 has finished for PR 21451 at commit bdfa6ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Jul 18, 2018

@mridulm @jerryshao @felixcheung last one in the 2GB block limit series. just rebased to include the updates to #21440. I will also run my tests on a cluster here with this: https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala
will report the results from that, probably tomorrow

thanks for all the reviews!

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93250 has finished for PR 21451 at commit 335e26d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class UploadBlockStream extends BlockTransferMessage

@squito
Copy link
Contributor Author

squito commented Jul 19, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93255 has finished for PR 21451 at commit 335e26d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class UploadBlockStream extends BlockTransferMessage

When replicating large cached RDD blocks, it can be helpful to replicate
them as a stream, to avoid using large amounts of memory during the
transfer.  This also allows blocks larger than 2GB to be replicated.

Added unit tests in DistributedSuite.  Also ran tests on a cluster for
blocks > 2gb.
@SparkQA
Copy link

SparkQA commented Jul 25, 2018

Test build #93549 has finished for PR 21451 at commit fe31a7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class UploadBlockStream extends BlockTransferMessage

@squito
Copy link
Contributor Author

squito commented Jul 25, 2018

fyi, I did finally run my scale tests again on a cluster, and shuffles, remote reads, and replication worked for blocks over 2gb (sorry got sidetracked with a few other things in the meantime)

@squito squito changed the title [SPARK-24296][CORE][WIP] Replicate large blocks as a stream. [SPARK-24296][CORE] Replicate large blocks as a stream. Jul 26, 2018
Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made one pass through, need to look at some thing in more depth to make sure I understand

@@ -73,10 +73,32 @@ class NettyBlockRpcServer(
}
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
logInfo(s"Receiving replicated block $blockId with level ${level} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it could be pretty verbose, put at debug or trace?


private[spark] val MEMORY_MAP_LIMIT_FOR_TESTS =
ConfigBuilder("spark.storage.memoryMapLimitForTests")
.internal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a .doc that says is for testing only

override def receiveStream(
client: TransportClient,
messageHeader: ByteBuffer,
responseContext: RpcResponseCallback): StreamCallbackWithID = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix spacing

.asInstanceOf[(StorageLevel, ClassTag[_])]
}
val blockId = BlockId(message.blockId)
logInfo(s"Receiving replicated block $blockId with level ${level} as stream " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug?

// stream.
channel.close()
// TODO Even if we're only going to write the data to disk after this, we end up using a lot
// of memory here. We wont' get a jvm OOM, but might get killed by the OS / cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling won't

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agree this could be an issue with yarn since overhead memory might not be big enough, can we file a jira to specifically track this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed SPARK-25035

@rezasafi
Copy link
Contributor

rezasafi commented Aug 3, 2018

LGTM good to me (after applying the @tgravescs comments). Great job on the whole issue.

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94319 has finished for PR 21451 at commit 6d059f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;

/**
* A request to Upload a block, which the destintation should receive as a stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spelling destination

// we just write to a temp file, and call putBytes on the data in that file.
val tmpFile = diskBlockManager.createTempLocalBlock()._2
new StreamCallbackWithID {
val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to honor spark.io.encryption.enabled here to encrypt the file on local disk?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah sure looks like it.

@SparkQA
Copy link

SparkQA commented Aug 13, 2018

Test build #94695 has finished for PR 21451 at commit 034acb4.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #94725 has finished for PR 21451 at commit c45e702.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

test this please

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks sane to me.

@@ -28,11 +28,15 @@ trait EncryptionFunSuite {
* for the test to modify the provided SparkConf.
*/
final protected def encryptionTest(name: String)(fn: SparkConf => Unit) {
encryptionTestHelper(name) { case (name, conf) =>
test(name)(fn(conf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toChunkedByteBuffer is also pretty memory-hungry, right? You'll end up needing enough memory to hold the entire file in memory, if I read the code right.

This is probably ok for now, but should probably mention it in your TODO above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you store the entire file in memory (after decrypting). its not memory mapped either, so it'll probably be a regular OOM (depending on memory mode). updated the comment

@SparkQA
Copy link

SparkQA commented Aug 14, 2018

Test build #94762 has finished for PR 21451 at commit c45e702.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #94778 has finished for PR 21451 at commit 44149a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Aug 21, 2018

@tgravescs @vanzin any more comments? I think I've addressed everything

@vanzin
Copy link
Contributor

vanzin commented Aug 21, 2018

LGTM. Merging to master.

@asfgit asfgit closed this in 99d2e4e Aug 21, 2018
@gatorsmile
Copy link
Member

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, localhost, executor 1): java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_0_piece0 of broadcast_0: 1651574976 != 1165629262
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1320)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:84)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:367)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1347)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:373)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: corrupt remote block broadcast_0_piece0 of broadcast_0: 1651574976 != 1165629262
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:167)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:151)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:151)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:151)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$apply$2.apply(TorrentBroadcast.scala:231)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:211)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1313)
	... 13 more

Is this possible a bug introduced by this PR? After merging this PR, I saw this error multiple times. https://issues.apache.org/jira/browse/SPARK-25422

cc @squito @cloud-fan @vanzin @tgravescs

@squito
Copy link
Contributor Author

squito commented Sep 17, 2018

looking. so far seems unrelated to me, but as you've said its failed in a few builds so I'm gonna keep digging. The error is occurring before any rdds are getting replicated via the new code path. This change is mostly not touching the path involved in sending a broadcast.

I've been unable to repro so far despite running the test hundreds of times, but I might need to run more tests or put in some pauses or something. gonna compare with other test runs with teh failure as well.

@gatorsmile
Copy link
Member

@squito Thanks for digging it!

This PR introduced the failed test case. We have to know whether it exposes any serious bug (if it is not introduced by this PR) and impacts our 2.4 release.

@squito
Copy link
Contributor Author

squito commented Sep 19, 2018

still looking -- will put comments on the jira so its more visible

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants