Skip to content

Conversation

@shahidki31
Copy link
Contributor

What changes were proposed in this pull request?

Test steps to reproduce this:

  1. bin/spark-shell local-cluster[2,1,1024]
scala> import org.apache.spark.storage.StorageLevel
scala> val rdd = sc.parallelize(1 to 10, 1).persist(StorageLevel.MEMORY_ONLY_2)
scala> rdd.count

Events generated are shown like below

event: SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(1, 10.8.132.160, 65473, None),rdd_0_0,StorageLevel(memory, deserialized, 2 replicas),56,0))
event: SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(0, 10.8.132.160, 65474, None),rdd_0_0,StorageLevel(memory, deserialized, 1 replicas),56,0))

But in the UI, in the storage tab it displays in the description like,
"Memory Deserialized 1x Replicated", even though we have given replication as 2.

The root cause is that, the replication block update events will have replication factor 1. Hence in the AppStatusListener class, we overwrite whatever event comes later. If the replication event comes later, then we update replication factor as 1.

In the PR, I am fixing from the AppStatusListener class side, as we need to detect if the event is replication or not. Else we need to update the rdd store.

How was this patch tested?

Added UT and Manually tested.

Before patch:

Screenshot from 2019-04-18 14-50-06

After patch:
Screenshot from 2019-04-18 14-51-04

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104692 has finished for PR 24398 at commit 86d109d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shahidki31
Copy link
Contributor Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104694 has finished for PR 24398 at commit 8d3c32e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shahidki31
Copy link
Contributor Author

cc @vanzin @srowen @zsxwing, kindly review

var storageLevel: String = weakIntern(info.storageLevel.description)
var memoryUsed = 0L
var diskUsed = 0L
var storageInfo: StorageLevel = new StorageLevel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know this part well, but is it redundant with storageLevel above?

Copy link
Contributor Author

@shahidki31 shahidki31 Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above was just a string representation of storage level. from StorageInfo we can get individual parameters including replication.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but should we not just replace the field above with this richer object? or should this not use info.storageLevel as the initial value? maybe not, just jumped out at me as a question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. we can initialize storageInfo = info.storageLevel. But I'm not sure we can get rid of storageLevel, as there is a public method which sets the value. updated the code.


if (updatedStorageLevel.isDefined) {
rdd.setStorageLevel(updatedStorageLevel.get)
// Replicated block update events will have `storageLevel.replication=1`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a bug itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs more check, including impacts. Currently the fix is from UI side.

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104708 has finished for PR 24398 at commit fbcc0c7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shahidki31
Copy link
Contributor Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Apr 19, 2019

Test build #104727 has finished for PR 24398 at commit fbcc0c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Default value of `storageInfo.replication = 1` and hence if
// `storeLevel.replication = 2`, the replicated events won't overwrite in the store.
val storageInfo = rdd.storageInfo
val isReplicatedBlockUpdateEvent = storageLevel.replication < storageInfo.replication &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if (storageLevel.isValid) before accessing storageLevel.*?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, This line checks the storageLevel is valid or not.

val updatedStorageLevel = if (storageLevel.isValid) {
Some(storageLevel.description)
} else {
None
}
If not valid, then the updatedStorageLevel will be None. So, it won't come to this line (L-928).
Thanks

@vanzin
Copy link
Contributor

vanzin commented May 6, 2019

After reading more of the storage code lately, I wonder if this code shouldn't just report the original storage level always. i.g., LiveRDD shouldn't have a writable storageLevel field at all, and instead the UI should always use the storage level from the respective RDDInfo.

@shahidki31
Copy link
Contributor Author

Thanks @vanzin . I have updated the code.

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105198 has finished for PR 24398 at commit a22cd68.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shahidki31
Copy link
Contributor Author

Retest this please

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105205 has finished for PR 24398 at commit a22cd68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shahidki31
Copy link
Contributor Author

Test results after the updated code,

 bin/spark-shell local-cluster[2,1,1024]

scala> import org.apache.spark.storage.StorageLevel
scala> val rdd = sc.parallelize(1 to 10, 1).persist(StorageLevel.MEMORY_ONLY_2)
scala> rdd.count

Screenshot from 2019-05-07 15-48-23

@vanzin
Copy link
Contributor

vanzin commented May 7, 2019

So now the RDD storage level is what the user requested, which is fine. But what about the per-partition storage level? With your change it's just the same as the RDD level. Right thing to do would be to look at the behavior in Spark 2.2 and see how per-partition storage levels worked (unless someone remembers without looking at the code). You may have to propagate the block update's storage level to the partition.

@vanzin
Copy link
Contributor

vanzin commented May 7, 2019

Yes, as I thought, in 2.2 the partition storage level comes from the block update:
https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala#L235

@shahidki31
Copy link
Contributor Author

@vanzin Yes. The behavior seems different compared to the 2.2 branch. I will update the PR.

Screenshot from 2019-05-08 00-27-45
Screenshot from 2019-05-08 00-27-56

@vanzin
Copy link
Contributor

vanzin commented Sep 12, 2019

I created #25779 with a more complete fix for this, so closing this one.

@vanzin vanzin closed this Sep 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants