Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14065]Increase probability of using cached serialized status #11886

Closed
wants to merge 5 commits into from
Closed

[SPARK-14065]Increase probability of using cached serialized status #11886

wants to merge 5 commits into from

Conversation

viper-kun
Copy link
Contributor

Increase probability of using cached serialized status

@viper-kun viper-kun changed the title Increase probability of using cached serialized status [SPARK-14065]Increase probability of using cached serialized status Mar 22, 2016
// Add them into the table only if the epoch hasn't changed while we were working
epochLock.synchronized {

// If we got here, we failed to find the serialized locations in the cache, so we pulled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is related to your problem? you've put the serialization into code holding the lock now, which has a downside. I'm not clear from your description what the problem is. What do you mean by "serialize Mapstatus is in serial model from different shuffle stage."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem description:
Execute the query listed in jira(https://issues.apache.org/jira/browse/SPARK-14065), all tasks in some executor are slow.
image

image

Slow executor logs show that executor gets RpcTimeoutException

Error sending message [message = GetMapOutputStatuses(1)] in 1 attempts | org.apache.spark.Logging$class.logWarning(Logging.scala:92)
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.network.timeout

Driver logs shows that serialize mapstatus is slow

16/03/22 11:47:07 INFO [dispatcher-event-loop-36] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes
16/03/22 11:47:14 INFO [dispatcher-event-loop-30] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes
16/03/22 11:47:21 INFO [dispatcher-event-loop-3] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes
16/03/22 11:47:27 INFO [dispatcher-event-loop-32] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes
16/03/22 11:47:34 INFO [dispatcher-event-loop-31] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes
16/03/22 11:47:41 INFO [dispatcher-event-loop-38] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes
16/03/22 11:47:47 INFO [dispatcher-event-loop-4] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes
16/03/22 11:47:54 INFO [dispatcher-event-loop-37] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes
16/03/22 11:48:00 INFO [dispatcher-event-loop-28] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes

When reduce task start, all executors will get mapstatus from driver.Because "MapOutputTracker.serializeMapStatuses(statuses)" is out of epochLock.synchronized {}
,all thread will do the operation - MapOutputTracker.serializeMapStatuses(statuses). In function serializeMapStatuses, it has sync on statuses. So Serialize is one by one.
Every serialize cost 7 seconds. We have 80 executors, it total cost 560 seconds. The result is some executor get mapstatus timeout.
This patch put "MapOutputTracker.serializeMapStatuses(statuses)" into epochLock.synchronized {}, it will increase probability of using cached serialized status.

I have test listed sql in 30T tpcds. The result shows it faster than old.
image

What do you mean by "serialize Mapstatus is in serial model from different shuffle stage."
--- Now, serialize different stage mapstatus is in parallel. If we put "MapOutputTracker.serializeMapStatuses(statuses)" into epochLock.synchronized {}, serialize different stage mapstatus is in serial. Whether it will be question?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, to paraphrase: everyone finds the cache is empty one by one very quickly, and everyone proceeds to load the cached value. Isn't it simpler to put this load in the cachedSerializedStatuses.get(shuffleId) match ... None => block then?

This code is pretty old but CC @mateiz or @JoshRosen in case they have an opinion since they touched it last

@srowen
Copy link
Member

srowen commented Mar 31, 2016

@viper-kun what do you think of my last comment about moving the load? is that simpler?

@viper-kun
Copy link
Contributor Author

@srowen The logic is the same to me.If necessary, I will put serializeMapStatus into match...None.

@srowen
Copy link
Member

srowen commented Apr 6, 2016

I personally would move this into the block above. The logic seems to be more obvious that way, since the logic to fill the missing cache entry is then in the branch where there is nothing in the cache. It should be equivalent logically.

@srowen
Copy link
Member

srowen commented Apr 11, 2016

Ping @viper-kun

@srowen
Copy link
Member

srowen commented Apr 19, 2016

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56222 has finished for PR 11886 at commit 3b2e8e9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Apr 23, 2016

Jenkins retest this please

@srowen
Copy link
Member

srowen commented Apr 23, 2016

Jenkins add to whitelist

@SparkQA
Copy link

SparkQA commented Apr 23, 2016

Test build #56806 has finished for PR 11886 at commit 8f8133c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -431,6 +431,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
var statuses: Array[MapStatus] = null
var epochGotten: Long = -1
var byteArr: Array[Byte] = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is OK, but this can still go in the synchronized block and a few lines down, to keep the scope narrower. I think it's fairly OK as it though.

This certainly solves the problem you identified. It does mean more work is done while holding the lock, so that no other threads can retrieve cached statuses while this is happening. I assume that's why it was written that way to begin with, even if it causes the problem you identify.

I wonder if there is any decent way to rewrite this to manage that better? it would require some object to synchronize on per serialized status so that only one thread can retrieve it. Easily solved with a bit of redirection, like holding a simple AtomicReference to the byte array and locking on the former.

Worth it or over-thinking it? I wouldn't mind hearing a though from @andrewor14 or @vanzin if they have instincts on this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment on the main thread

@srowen
Copy link
Member

srowen commented May 1, 2016

@viper-kun what do you think? I'm not as sure of the impact if this now prevents two cached statuses from being read at once.

@andrewor14
Copy link
Contributor

andrewor14 commented May 3, 2016

The high level fix makes sense. I think the right approach here would be to have a read write lock, where all reads are concurrent but the writes block everything. However, that's probably overkill in this case so I'm OK with the patch as is.

Are there any potential sources of regressions? Now we hold a lock while serializing, which could take a while. @viper-kun can you do a benchmark on other queries to see if this has any negative performance impacts?

// out a snapshot of the locations as "statuses"; let's serialize and return that
byteArr = MapOutputTracker.serializeMapStatuses(statuses)
logInfo("Size of output statuses for shuffle %d is %d bytes"
.format(shuffleId, byteArr.length))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually is not ok because the synchronized block is going to block the dispatcher threads which could then cause heartbeats and other messages to not be processed. For small things its fine but once you get to larger ones you would have issues.

See PR #12113 which fixes issues with large map output statuses and I believe fixes this same issue because only one thread will serialize and the rest will use the cached version.. Note that there are still improvements we could make with regards to caching. We could be more proactive to cache things up front, but since the cached statuses are all cleared when something changes you have to know which ones to cache again.

@tgravescs
Copy link
Contributor

Note this is fixed by #12113 which does a lot more for the map output statuses, it would be great to get that in.

@viper-kun
Copy link
Contributor Author

@tgravescs ok, i close it.

@viper-kun viper-kun closed this May 11, 2016
@viper-kun viper-kun deleted the patch-2 branch January 18, 2017 09:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants