Skip to content

Commit bd741fa

Browse files
author
Sital Kedia
committed
[SPARK-17839][CORE] UnsafeSorterSpillReader should use Nio's directbuffer to read the spill files in order to avoid additional copy
1 parent 8a6bbe0 commit bd741fa

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.spark.io;
15+
16+
import java.io.File;
17+
import java.io.FileInputStream;
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.nio.ByteBuffer;
21+
import java.nio.channels.FileChannel;
22+
23+
/**
24+
* {@link InputStream} implementation which uses direct buffer
25+
* to read a file to avoid extra copy of data between Java and
26+
* native memory which happens when using {@link java.io.BufferedInputStream}
27+
*
28+
*/
29+
public final class NioBasedBufferedFileInputStream extends InputStream {
30+
31+
ByteBuffer bb;
32+
33+
FileChannel ch;
34+
35+
public NioBasedBufferedFileInputStream(File file, int bufferSize) throws IOException {
36+
bb = ByteBuffer.allocateDirect(bufferSize);
37+
FileInputStream f = new FileInputStream(file);
38+
ch = f.getChannel();
39+
ch.read(bb);
40+
bb.flip();
41+
}
42+
43+
public boolean refill() throws IOException {
44+
if (!bb.hasRemaining()) {
45+
bb.clear();
46+
int nRead = ch.read(bb);
47+
if (nRead == -1) {
48+
return false;
49+
}
50+
bb.flip();
51+
}
52+
return true;
53+
}
54+
55+
@Override
56+
public int read() throws IOException {
57+
if (!refill()) {
58+
return -1;
59+
}
60+
return bb.get();
61+
}
62+
63+
@Override
64+
public int read(byte[] b, int off, int len) throws IOException {
65+
if (!refill()) {
66+
return -1;
67+
}
68+
len = Math.min(len, bb.remaining());
69+
bb.get(b, off, len);
70+
return len;
71+
}
72+
73+
@Override
74+
public void close() throws IOException {
75+
ch.close();
76+
}
77+
}

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.io.Closeables;
2424

2525
import org.apache.spark.SparkEnv;
26+
import org.apache.spark.io.NioBasedBufferedFileInputStream;
2627
import org.apache.spark.serializer.SerializerManager;
2728
import org.apache.spark.storage.BlockId;
2829
import org.apache.spark.unsafe.Platform;
@@ -69,8 +70,8 @@ public UnsafeSorterSpillReader(
6970
bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
7071
}
7172

72-
final BufferedInputStream bs =
73-
new BufferedInputStream(new FileInputStream(file), (int) bufferSizeBytes);
73+
final NioBasedBufferedFileInputStream bs =
74+
new NioBasedBufferedFileInputStream(file, (int) bufferSizeBytes);
7475
try {
7576
this.in = serializerManager.wrapStream(blockId, bs);
7677
this.din = new DataInputStream(this.in);

0 commit comments

Comments
 (0)