From fcb44ce68a384b50b2be827f7bb607b7c02c282b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 17 Aug 2015 16:26:41 -0700 Subject: [PATCH 1/2] Moved blocking call to ArrayBlockingQueue outside the synchronized to prevent deadlock. --- .../streaming/receiver/BlockGenerator.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 300e820d01777..71a9b610f10d8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -227,16 +227,21 @@ private[streaming] class BlockGenerator( def isStopped(): Boolean = state == StoppedAll /** Change the buffer to which single records are added to. */ - private def updateCurrentBuffer(time: Long): Unit = synchronized { + private def updateCurrentBuffer(time: Long): Unit = { try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[Any] - if (newBlockBuffer.size > 0) { - val blockId = StreamBlockId(receiverId, time - blockIntervalMs) - val newBlock = new Block(blockId, newBlockBuffer) - listener.onGenerateBlock(blockId) + var newBlock: Block = null + synchronized { + if (currentBuffer.nonEmpty) { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[Any] + val blockId = StreamBlockId(receiverId, time - blockIntervalMs) + listener.onGenerateBlock(blockId) + newBlock = new Block(blockId, newBlockBuffer) + } + } + + if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full - logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } } catch { case ie: InterruptedException => From cb7bba2f3ba1f3af87a55f2fc4f38da142099206 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Aug 2015 00:34:38 -0700 Subject: [PATCH 2/2] Addressed comments. --- .../apache/spark/streaming/receiver/BlockGenerator.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 71a9b610f10d8..421d60ae359f8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -255,9 +255,13 @@ private[streaming] class BlockGenerator( private def keepPushingBlocks() { logInfo("Started block pushing thread") - def isGeneratingBlocks = synchronized { state == Active || state == StoppedAddingData } + def areBlocksBeingGenerated: Boolean = synchronized { + state != StoppedGeneratingBlocks + } + try { - while (isGeneratingBlocks) { + // While blocks are being generated, keep polling for to-be-pushed blocks and push them. + while (areBlocksBeingGenerated) { Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None =>