-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25062][SQL] Clean up BlockLocations in InMemoryFileIndex #22603
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -315,7 +315,12 @@ object InMemoryFileIndex extends Logging { | |||
// which is very slow on some file system (RawLocalFileSystem, which is launch a | |||
// subprocess and parse the stdout). | |||
try { | |||
val locations = fs.getFileBlockLocations(f, 0, f.getLen) | |||
val locations = fs.getFileBlockLocations(f, 0, f.getLen).map( | |||
loc => if (loc.getClass == classOf[BlockLocation]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lo.isInstanceOf[BlockLocation]
? Or even better, what about using pattern matching?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mgaido91, but loc is always an instance of BlockLocation
(might be a subclass such as HdfsBlockLocation
) so isInstanceOf[BlockLocation] or pattern matching would return always true.
I want to test that the class of loc is exactly BlockLocation
and if it is we don't need to convert it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah right, sorry @peter-toth. Thanks. Anyway, please move loc
to the previous line and use curly braces for map. I think that is the most widely spread syntax in the codebase. Thanks.
Change-Id: I57c862ca076015f36aaee1da02c7fce80d740890
ok to test |
Test build #96856 has finished for PR 22603 at commit
|
@@ -315,7 +315,13 @@ object InMemoryFileIndex extends Logging { | |||
// which is very slow on some file system (RawLocalFileSystem, which is launch a | |||
// subprocess and parse the stdout). | |||
try { | |||
val locations = fs.getFileBlockLocations(f, 0, f.getLen) | |||
val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @peter-toth .
Could you add one line comment to explain this conversion?
|
||
val inMemoryFileIndex = new InMemoryFileIndex( | ||
spark, Seq(new Path(file.getCanonicalPath)), Map.empty, None) { | ||
def leafFileStatuses = leafFiles.map(_._2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, def leafFileStatuses = leafFiles.values
?
@@ -248,6 +248,25 @@ class FileIndexSuite extends SharedSQLContext { | |||
assert(spark.read.parquet(path.getAbsolutePath).schema.exists(_.name == colToUnescape)) | |||
} | |||
} | |||
|
|||
test("SPARK-25062 - InMemoryCache stores only simple BlockLocations") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InMemoryCache
-> InMemoryFileIndex
? And, simple BlockLocations
may look unclear later.
Thanks @dongjoon-hyun for the review. I've fixed your findings. |
Test build #96932 has finished for PR 22603 at commit
|
Could you review this, @cloud-fan , @gatorsmile , @HyukjinKwon ? |
class SpecialBlockLocationFileSystem extends RawLocalFileSystem { | ||
|
||
class SpecialBlockLocation( | ||
names: Array[String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 spaces indentation
length: Long) extends BlockLocation(names, hosts, offset, length) | ||
|
||
override def getFileBlockLocations( | ||
file: FileStatus, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
LGTM |
@peter-toth . Could you address @cloud-fan 's comments? |
Change-Id: Ifc1a90ade3938cdaf049d2c0c874f1840f6fcc28
Thanks @cloud-fan for the review. I've fixed your findings. |
Test build #97065 has finished for PR 22603 at commit
|
Congratulation for your first contribution, @peter-toth . And, thank you, @cloud-fan and @mgaido91 . Merged to master. |
@peter-toth . What is your Apache JIRA user id? I need to assign you to the resolved SPARK-25062, but I cannot find your id and user name |
Thanks @dongjoon-hyun , |
## What changes were proposed in this pull request? `InMemoryFileIndex` contains a cache of `LocatedFileStatus` objects. Each `LocatedFileStatus` object can contain several `BlockLocation`s or some subclass of it. Filling up this cache by listing files happens recursively either on the driver or on the executors, depending on the parallel discovery threshold (`spark.sql.sources.parallelPartitionDiscovery.threshold`). If the listing happens on the executors block location objects are converted to simple `BlockLocation` objects to ensure serialization requirements. If it happens on the driver then there is no conversion and depending on the file system a `BlockLocation` object can be a subclass like `HdfsBlockLocation` and consume more memory. This PR adds the conversion to the latter case and decreases memory consumption. ## How was this patch tested? Added unit test. Closes apache#22603 from peter-toth/SPARK-25062. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
InMemoryFileIndex
contains a cache ofLocatedFileStatus
objects. EachLocatedFileStatus
object can contain severalBlockLocation
s or some subclass of it. Filling up this cache by listing files happens recursively either on the driver or on the executors, depending on the parallel discovery threshold (spark.sql.sources.parallelPartitionDiscovery.threshold
). If the listing happens on the executors block location objects are converted to simpleBlockLocation
objects to ensure serialization requirements. If it happens on the driver then there is no conversion and depending on the file system aBlockLocation
object can be a subclass likeHdfsBlockLocation
and consume more memory. This PR adds the conversion to the latter case and decreases memory consumption.How was this patch tested?
Added unit test.