Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI #6707

Closed
wants to merge 7 commits into from

Conversation

dibbhatt
Copy link

@dibbhatt dibbhatt commented Jun 8, 2015

@tdas @zsxwing this is the new PR for Spark-8080

I have merged #6659

Also to mention , for MEMORY_ONLY settings , when Block is not able to unrollSafely to memory if enough space is not there, BlockManager won't try to put the block and ReceivedBlockHandler will throw SparkException as it could not find the block id in PutResult. Thus number of records in block won't be counted if Block failed to unroll in memory. Which is fine.

For MEMORY_DISK settings , if BlockManager not able to unroll block to memory, block will still get deseralized to Disk. Same for WAL based store. So for those cases ( storage level = memory + disk ) number of records will be counted even though the block not able to unroll to memory.

thus I added the isFullyConsumed in the CountingIterator but have not used it as such case will never happen that block not fully consumed and ReceivedBlockHandler still get the block ID.

I have added few test cases to cover those block unrolling scenarios also.

U-PEROOT\UBHATD1 and others added 2 commits June 8, 2015 19:47
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create CountingIterator here and put numRecords = Some(countIterator.count) after putIterator? Then you could avoid matching block twice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think numRecords after putIterator will have issue if blockmanager not able to unroll the block safely to memory. In that case block-id will come as null and SparkException will be thrown. We should count the number of records only after block-id is there in putResult. Do let me know what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SparkException is thrown later, numRecords won't be used. Right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true..

@dibbhatt
Copy link
Author

dibbhatt commented Jun 8, 2015

Sure.. Will do the change..

…rrect count at Spark UI

Fixed comments given by @zsxwing
@dibbhatt
Copy link
Author

dibbhatt commented Jun 8, 2015

Hi @zsxwing, incorporated all your changes ..

// Any implementation of this trait will store a block id
def blockId: StreamBlockId
// Any implementation of this trait will have to return the number of records
def numRecords: Option[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a preference thing I guess, but this would sound better if it was called recordCount, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all other place where count is recorded (refer to this PR https://github.com/apache/spark/pull/6659/files), it call as numRecords. Just wanted to keep this consistent naming across all classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. I just find the num* method calls weird, when it could be called *count. But if it is consistent with everything else, then it is fine.

@harishreedharan
Copy link
Contributor

This looks really good. I posted some comments - mostly just minor (except one, which relates to counts for ByteBufferBlocks)

@dibbhatt
Copy link
Author

dibbhatt commented Jun 9, 2015

taken care couple of comments given by @harishreedharan
Not sure what to do with ByteBuffer case as there is no way to count number of messages in a ByteBufferBlock

…rrect count at Spark UI

Count ByteBufferBlock as 1 count
@dibbhatt
Copy link
Author

Make the changes as @harishreedharan suggested to count ByteBufferBlock as 1 count. Let me know if this looks fine .

@dibbhatt
Copy link
Author

hi @tdas @zsxwing @harishreedharan is this PR okay with you ? Just a followup if there is anything needs to be done. I know you all must be super busy with 1.4 release ..

@zsxwing
Copy link
Member

zsxwing commented Jun 12, 2015

This PR looks good to me. I feel a bit weird about counting ByteBufferBlock as 1, but I cannot find a better solution.

@@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
* that stores the metadata related to storage of blocks using
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
*/
private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (only if there are other changes): can you move both params to the next line.

// test received block with BlockManager based handler
withBlockManagerBasedBlockHandler { handler =>
val blockStoreResult = storeBlock(handler, receivedBlock)
assert(blockStoreResult.numRecords === expectedNumRecords)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the assert message as well, saying "Message count not matches for a TypeX block being inserted in BlockHandlerTypeY with StorageLevelZ". Helps to debug.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done..message will come like this in case of failure..

Some(25) did not equal Some(21) Message count not matches for a org.apache.spark.streaming.receiver.ArrayBufferBlock block being inserted using BlockManagerBasedBlockHandler with StorageLevel(false, true, false, true, 1) (ReceivedBlockHandlerSuite.scala:274)

@tdas
Copy link
Contributor

tdas commented Jun 12, 2015

Overall, the code change looks good. The additional tests need a bit more refactoring to be perfect and leak-proof. Thank you very very much for all this extra effort to implement this much needed array of tests. :)

@dibbhatt
Copy link
Author

hi @tdas kindly let me know how this looks. Just to mention the global state won't get changed if individual test cases modify the blockmanager or storagelevel as Before block reset back to default settings before each test case run. Tried to incorporate all comments , do let me know if this is fine.

@dibbhatt
Copy link
Author

hi @tdas . Let me know if latest changes are fine

if (isBlockManagedBasedBlockHandler) {
// test received block with BlockManager based handler
withBlockManagerBasedBlockHandler { handler =>
val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect indent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take care of it

@asfgit asfgit closed this in 3eaed87 Jun 19, 2015
@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

I merged it to master, will be available in Spark 1.5.

asfgit pushed a commit that referenced this pull request Jun 19, 2015
…orrect count at Spark UI

tdas  zsxwing this is the new PR for Spark-8080

I have merged #6659

Also to mention , for MEMORY_ONLY settings , when Block is not able to unrollSafely to memory if enough space is not there, BlockManager won't try to put the block and ReceivedBlockHandler will throw SparkException as it could not find the block id in PutResult. Thus number of records in block won't be counted if Block failed to unroll in memory. Which is fine.

For MEMORY_DISK settings , if BlockManager not able to unroll block to memory, block will still get deseralized to Disk. Same for WAL based store. So for those cases ( storage level = memory + disk )  number of records will be counted even though the block not able to unroll to memory.

thus I added the isFullyConsumed in the CountingIterator but have not used it as such case will never happen that block not fully consumed and ReceivedBlockHandler still get the block ID.

I have added few test cases to cover those block unrolling scenarios also.

Author: Dibyendu Bhattacharya <dibyendu.bhattacharya1@pearson.com>
Author: U-PEROOT\UBHATD1 <UBHATD1@PIN-L-PI046.PEROOT.com>

Closes #6707 from dibbhatt/master and squashes the following commits:

f6cb6b5 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
f37cfd8 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
5a8344a [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Count ByteBufferBlock as 1 count
fceac72 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
0153e7e [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Fixed comments given by @zsxwing
4c5931d [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
01e6dc8 [U-PEROOT\UBHATD1] A
@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

Also merged to branch 1.4, available in Spark 1.4.1

nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…orrect count at Spark UI

tdas  zsxwing this is the new PR for Spark-8080

I have merged apache#6659

Also to mention , for MEMORY_ONLY settings , when Block is not able to unrollSafely to memory if enough space is not there, BlockManager won't try to put the block and ReceivedBlockHandler will throw SparkException as it could not find the block id in PutResult. Thus number of records in block won't be counted if Block failed to unroll in memory. Which is fine.

For MEMORY_DISK settings , if BlockManager not able to unroll block to memory, block will still get deseralized to Disk. Same for WAL based store. So for those cases ( storage level = memory + disk )  number of records will be counted even though the block not able to unroll to memory.

thus I added the isFullyConsumed in the CountingIterator but have not used it as such case will never happen that block not fully consumed and ReceivedBlockHandler still get the block ID.

I have added few test cases to cover those block unrolling scenarios also.

Author: Dibyendu Bhattacharya <dibyendu.bhattacharya1@pearson.com>
Author: U-PEROOT\UBHATD1 <UBHATD1@PIN-L-PI046.PEROOT.com>

Closes apache#6707 from dibbhatt/master and squashes the following commits:

f6cb6b5 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
f37cfd8 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
5a8344a [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Count ByteBufferBlock as 1 count
fceac72 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
0153e7e [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Fixed comments given by @zsxwing
4c5931d [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
01e6dc8 [U-PEROOT\UBHATD1] A
@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

Thank you very much for this patch. This was a very important one,
especially the tests.

On Thu, Jun 18, 2015 at 8:02 PM, asfgit notifications@github.com wrote:

Closed #6707 #6707 via 3eaed87
3eaed87
.


Reply to this email directly or view it on GitHub
#6707 (comment).

@JoshRosen
Copy link
Contributor

I know it's about half a year too late to be complaining about this, but the formatting and control flow in this patch's tests make things really hard to debug and there's a lot of code formatting issues here. Next time, I think we should push for stronger test code quality on patches like this one, since this one is a giant mess and is very unpleasant to try to understand / edit.

}

private def testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: Boolean) {
// ByteBufferBlock-MEMORY_ONLY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is a messy / bad way of writing this test. If we want to test a whole bunch of cases, then I think it would make sense to use a for-loop to build up some test cases. I'd also omit the // comments here, since they're not helpful to me as a reader.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants