Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI #6614

Closed
wants to merge 10 commits into from

Conversation

dibbhatt
Copy link

@dibbhatt dibbhatt commented Jun 3, 2015

In custom receiver if I call store with Iterator type (store(dataIterator: Iterator[T]): Unit ) , Spark UI does not show the correct count of records in block which leads to wrong value for Input Rate, Scheduling Delay and Input SIze. This pull request fix that issue.

@@ -139,6 +139,7 @@ private[streaming] class ReceiverSupervisorImpl(
val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case IteratorBlock(iterator) => iterator.length
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er, but this consumes the iterator. You can't do this right?

@dibbhatt
Copy link
Author

dibbhatt commented Jun 3, 2015

I am sorry. My bad . Do let me know if latest change looks fine

val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case IteratorBlock(iterator) =>
var arrayBuffer = ArrayBuffer(iterator.toArray : _*)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that it's safe to copy the iterator contents into memory here. I don't think it's worth it for the UI.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, but Custom Receiver storing blocks using iterator can not utilize the UI stats . Not sure if there is any wide usage of Iterator , but in my Kafka Consumer (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) I earlier used Iterator but now changed it to ArrayBuffer. But people who use Iterator will have this issue..

@andrewor14
Copy link
Contributor

ok to test

@andrewor14
Copy link
Contributor

@tdas @zsxwing

@andrewor14
Copy link
Contributor

@dibbhatt could you add [STREAMING] to the title of the PR?

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34131 has finished for PR 6614 at commit 20b9374.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dibbhatt dibbhatt changed the title [SPARK-8080] Receiver.store with Iterator does not give correct count at Spark UI [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Jun 4, 2015
@dibbhatt
Copy link
Author

dibbhatt commented Jun 4, 2015

added [STREAMING] to the title of the PR

val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case IteratorBlock(iterator) =>
var arrayBuffer = ArrayBuffer(iterator.toArray : _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite inefficient, as the data will be first copied to the arraybuffer, and then BlockManager will again use the ArrayBuffer's iterator to store the data. This is precisely why this wasnt done till now. :(

@tdas
Copy link
Contributor

tdas commented Jun 4, 2015

I think we need a different design. There is a way to count the elements in the iterator without putting it into some intermediate buffer. Rather, the iterator is going to be consumed any way (assuming StorageLevel is serialized) by the block manager. The counting can be done while that is happening. To do this you have to construct a special CountingIterator that wraps the original iterator.

class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T] {
   var count = 0
   def hasNext(): Boolean = iterator.hasNext
   def next() = {
    count+=1
    iterator.next()
   }
}

after iterating the original iterator via this counting iterator, you can get the count of the records it had iterated through. This number can then returned through BlockStoreResult object.

How does that sound?

@dibbhatt
Copy link
Author

dibbhatt commented Jun 4, 2015

It sounds good Tathagata. I will update the PR with the changes .

@dibbhatt
Copy link
Author

dibbhatt commented Jun 4, 2015

Hi @tdas . Let me know how this looks...

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34192 has finished for PR 6614 at commit f9aaa9f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T]

…rrect count at Spark UI

Fixed Scala Style check issue
@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34197 has finished for PR 6614 at commit c631a26.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -79,7 +87,10 @@ private[streaming] class BlockManagerBasedBlockHandler(
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
BlockManagerBasedStoreResult(blockId)
if(countIterator !=null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing spaces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised the scala checks didn't catch this. @rxin know what's wrong?

@tdas
Copy link
Contributor

tdas commented Jun 4, 2015

This is getting a little bit complicated to think how this will work for all combinations of ReceivedBlock types, and StorageLevels and ReceivedBlockHandler types. This must be done with unit tests. Mind adding unit tests for this A x B x C combinations in the ReceivedBlockHandlerSuite? These tests will allow us to clearly verify for what combinations we can do this correctly and what combinations we cannot do at all?

@dibbhatt
Copy link
Author

dibbhatt commented Jun 5, 2015

Sure Tathagata , will add a unit tests..

@tdas
Copy link
Contributor

tdas commented Jun 5, 2015

I am going to merge #6659 to prevent negative numbers in the UI. Could you please merge with the master after that PR is merged to verify that there are no negative numbers? Also, if possible, you unit test should verify this as well.

@dibbhatt
Copy link
Author

dibbhatt commented Jun 5, 2015

Sure

@dibbhatt
Copy link
Author

dibbhatt commented Jun 5, 2015

Just to understand when the message count may go wrong ..here is my understanding. Please let me know if I am wrong .
As I see the logic , if WAL is enabled , we are fine as WriteAheadLogBasedBlockHandler serialize the block . If WAL is not set , BlockManagerBasedBlockHandler use BlockManager's putIterator call for both Iterator and ArrayBuffer. If the storagelevel is ExternalStore or DiskStore , both will serialize the iterator and count comes fine... For MemoryStore also it also does a unrollSafely of the iterator if enough memory is there. The issue may come if memstore fails to do safe-unroll ( or does partial unroll ) dues to lack of free memory space. Then the reported count will show less than actual number of records in block. This issue will happen irrespective of replication factor.

Will this be a very common behavior ? In my environment for every storage level I get correct count of number of kafka messages I pumped. Probably I need to hit the memory limit of MemoryStore unrollSafely to run out of memory....which seems to be rare scenario .

Let me know what you think.

@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34293 timed out for PR 6614 at commit 51de69e after a configured wait of 175m.

@tdas
Copy link
Contributor

tdas commented Jun 5, 2015

Good point. I think the solution is to make the CountingIterator verify whether the end of the internal iterator was reached or not (that is hasNext had returned false or not). Accordingly we will know whether the count is correct or not. In cases where the count is in complete (e.g. MEMORY_ONLY, with not enough memory), the count can be ignored because in those cases the block was not successfully put into the memory. And so the count can be considered to be zero. In fact ideally we should be throwing exceptions saying that block could not be inserted in the block manager.

Does that make sense?

}

test("BlockManagerBasedBlockHandler - count messages MEMORY_AND_DISK") {
handler = makeBlockManagerBasedBlockHandler(blockManager, StorageLevel.MEMORY_AND_DISK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is repeated code! Cant these be made into a function which take storage level as a parameter? Each of these unit tests then just calls that function with appropriate parameters. In fact, the ReceivedBlockHandler type can also be a parameter, leading to further code reduction.

Dibyendu Bhattacharya added 2 commits June 7, 2015 16:04
Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
	streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@dibbhatt
Copy link
Author

dibbhatt commented Jun 7, 2015

What I observed is , when Block is not able to unrollSafely to memory if enough space is not there, BlockManager won't try to put the block and ReceivedBlockHandler will throw SparkException as it could not find the block id in PutResult. Thus block count won't go wrong if block is not able to unroll safely for MEMORY_ONLY settings. So I was wrong earlier ...

For MEMORY_DISK settings , if BlockManager not able to unroll block to memory, block will still get deseralized to Disk. Same for WAL based store. So for those cases ( storage level = memory + disk ) also count will come fine if Block not able to unroll to memory.

thus I added the isFullyConsumed in the CountingIterator but have not used it as such case will never happen that block not fully consumed and ReceivedBlockHandler still get the block ID.

I have added few test cases to cover those block unrolling scenarios also.

@SparkQA
Copy link

SparkQA commented Jun 7, 2015

Test build #34391 timed out for PR 6614 at commit c250fb5 after a configured wait of 150m.

@dibbhatt
Copy link
Author

dibbhatt commented Jun 7, 2015

hi @tdas . I can see the build passed all test cases but it timed out.

@zsxwing
Copy link
Member

zsxwing commented Jun 7, 2015

@dibbhatt could you resolve the conflicts again? Thank you.

@dibbhatt
Copy link
Author

dibbhatt commented Jun 7, 2015

Hi @zsxwing , I resolved the conflicts . This PR modified the ReceivedBlockStoreResult which now takes the number Of Records, and that need to modify ReceiverSupervisorImpl and ReceivedBlockTrackerSuite from your PR to take the numRecords parameter... Is there any issue you see in merge ? Even though the build failed due to timeout issue, I can see all test cases are passed. Do let me know if I need to do anything.

@zsxwing
Copy link
Member

zsxwing commented Jun 8, 2015

@dibbhatt did you modify any commit by accident, or is it a GitHub bug? There are 219 files changed and it still contains conflicts.

@SparkQA
Copy link

SparkQA commented Jun 8, 2015

Test build #34403 has finished for PR 6614 at commit 28225d5.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@dibbhatt
Copy link
Author

dibbhatt commented Jun 8, 2015

hi @zsxwing ...No , I have made only commit for my changes . Not sure why it says 219 files changed. If you see the commits details , you can see only files related to this PR has changed. Just now I modified comments on a file to trigger the build once again.

This 219 files changed came after I merged my repo from upstream/master to take your PR changes. And I merged only your changed with mine and committed those .

Below are the my changes since the merge...and I have not committed all these 219 files :(

dibbhatt@0892156
dibbhatt@c250fb5
28225d5

@dibbhatt
Copy link
Author

dibbhatt commented Jun 8, 2015

@tdas @zsxwing , if this PR is screwed up , we can close this one and create a fresh one ? How to proceed further ?

@zsxwing
Copy link
Member

zsxwing commented Jun 8, 2015

I've no idea why it's screwed up. Feel free to open a new one.

@dibbhatt
Copy link
Author

dibbhatt commented Jun 8, 2015

Closing this PR. Will open a new one. 6614 got issue with merging from upstream/master and lot of unwanted files has come to this PR . Will open a new one

@dibbhatt dibbhatt closed this Jun 8, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants