Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15074][Shuffle] Cache shuffle index file to speedup shuffle fetch #12944

Closed
wants to merge 1 commit into from

Conversation

sitalkedia
Copy link

What changes were proposed in this pull request?

Shuffle fetch on large intermediate dataset is slow because the shuffle service open/close the index file for each shuffle fetch. This change introduces a cache for the index information so that we can avoid accessing the index files for each block fetch

How was this patch tested?

Tested by running a job on the cluster and the shuffle read time was reduced by 50%.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@holdenk
Copy link
Contributor

holdenk commented May 6, 2016

So a very minor style thing; it seems like the rest of the configuration values are exposed through accessor methods on TransportConf rather than directly exposing getInt, it might be better to try and expose this in the same was as serverThreads() or numConnectionsPerPeer()?

public long getLength() {
return length;
}
}
Copy link
Member

@HyukjinKwon HyukjinKwon May 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And a newline at the end of this file maybe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix, thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The convention seems to be to not have newlines at the end of files.

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it was rebased but I guess I meant below:

-}
+}
\ No newline at end of file

meaning

2016-07-19 9 11 19

EDITED: Oh, i just found https://github.com/apache/spark/blob/master/scalastyle-config.xml#L281-L282 and https://github.com/apache/spark/blob/master/scalastyle-config.xml#L117

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@sitalkedia
Copy link
Author

@holdenk - TransportConf is not specific to the , it is used to create Transport client in other modules as well. Since number of index cache entry is very specific to the ShuffleService, I did not want to expose that as an api in the TransportConf. Let me know what you think about it.

@sitalkedia
Copy link
Author

cc - @rxin

@sitalkedia
Copy link
Author

cc - @srowen

@sitalkedia
Copy link
Author

Can anyone take a look at it?
cc - @rxin

ByteBuffer buffer = ByteBuffer.allocate(size);
DataInputStream dis = new DataInputStream(new FileInputStream(indexFile));
dis.readFully(buffer.array());
dis.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() in finally block?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -95,6 +109,9 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.registeredExecutorFile = registeredExecutorFile;
int indexCacheEntries = conf.getInt(SPARK_SHUFFLE_SERVICE_INDEX_CACHE_ENTRIES,
DEFAULT_SPARK_SHUFFLE_SERVICE_INDEX_CACHE_ENTRIES);
this.shuffleIndexCache = new ShuffleIndexCache(indexCacheEntries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= new ShuffleIndexCache(conf.getInt("spark.shuffle.service.index.cache.size", 1024))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -53,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
logInfo("Connecting to driver, skedia1: " + driverUrl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, removed it.

@ericl
Copy link
Contributor

ericl commented Jul 20, 2016

lgtm @JoshRosen

@sitalkedia
Copy link
Author

@JoshRosen, can you take a look?

long offset = in.readLong();
long nextOffset = in.readLong();
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that this call will fail with ArrayIndexOutOfBoundsException if the reduceId is too large. In the old code, an invalid reduceId would lead to an IOException because we'd skip past the end of the input stream and try to read.

However, I don't think that this subtle change in behavior is going to necessarily cause problems from the caller's perspective since ArrayIndexOutOfBoundsException is also a RuntimeException and this code was already throwing RuntimeException in the "index file is missing" error case. Therefore, this looks good to me!

@JoshRosen
Copy link
Contributor

LGTM. I suppose we could also add similar functionality to the non-shuffle-service version of IndexShuffleBlockResolver, but I think that's a much lower priority because I suspect that most folks who want to optimize production shuffle performance will be using the external shuffle service anyways.

I've re-tested this locally and have confirmed that it still compiles and passes relevant tests, so I'm going to merge this to master. Thanks @sitalkedia!

@asfgit asfgit closed this in 9c15d07 Aug 4, 2016
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.ch.IOUtil;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh, it looks like this unused import is breaking things. Let me hotfix to remove it.

JoshRosen added a commit to JoshRosen/spark that referenced this pull request Aug 4, 2016
@JoshRosen
Copy link
Contributor

Hotfixing in #14499 to fix the build. My bad.

asfgit pushed a commit that referenced this pull request Aug 4, 2016
Author: Josh Rosen <joshrosen@databricks.com>

Closes #14499 from JoshRosen/hotfix.
@sitalkedia
Copy link
Author

Thanks @JoshRosen !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants