Skip to content

Commit

Permalink
[SPARK-4671][Streaming]Do not replicate streaming block when WAL is e…
Browse files Browse the repository at this point in the history
…nabled

Currently streaming block will be replicated when specific storage level is set, since WAL is already fault tolerant, so replication is needless and will hurt the throughput of streaming application.

Hi tdas , as per discussed about this issue, I fixed with this implementation, I'm not is this the way you want, would you mind taking a look at it? Thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes #3534 from jerryshao/SPARK-4671 and squashes the following commits:

500b456 [jerryshao] Do not replicate streaming block when WAL is enabled
  • Loading branch information
jerryshao authored and tdas committed Dec 23, 2014
1 parent 10d69e9 commit 3f5f4cc
Showing 1 changed file with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)

private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
s" write ahead log is enabled, change to serialization false")
}
if (storageLevel.replication > 1) {
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
s"write ahead log is enabled, change to replication 1")
}

StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}

if (storageLevel != effectiveStorageLevel) {
logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
s"$effectiveStorageLevel when write ahead log is enabled")
}

// Manages rolling log files
private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
Expand Down Expand Up @@ -156,7 +174,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in block manager
val storeInBlockManagerFuture = Future {
val putResult =
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
Expand Down

0 comments on commit 3f5f4cc

Please sign in to comment.