Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#596] feat(netty): Use off heap memory to read HDFS data #806

Merged
merged 38 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e302695
[#596] Use off heap memory to read HDFS data
jerqi Apr 10, 2023
8ae5ded
fix
jerqi Apr 10, 2023
5a55ab1
fix
jerqi Apr 10, 2023
cf2c023
fix
jerqi Apr 10, 2023
7a5e968
fix
jerqi Apr 10, 2023
40fbc88
fix
jerqi Apr 10, 2023
d6f4498
fix
jerqi Apr 10, 2023
0bd5ca7
fix
jerqi Apr 10, 2023
578bb49
fix
jerqi Apr 10, 2023
da5ceea
fix
jerqi Apr 10, 2023
f451194
fix
jerqi Apr 10, 2023
78ef7d6
fix
jerqi Apr 11, 2023
9ce762e
fix
jerqi Apr 11, 2023
590546a
fix
jerqi Apr 11, 2023
868606a
fix
jerqi Apr 11, 2023
4f70825
fix
jerqi Apr 11, 2023
d154283
fix
jerqi Apr 11, 2023
a325fbd
fix
jerqi Apr 11, 2023
ab6bdda
Update client-spark/common/src/main/java/org/apache/spark/shuffle/rea…
jerqi Apr 11, 2023
8e84ce7
Update storage/src/main/java/org/apache/uniffle/storage/request/Creat…
jerqi Apr 12, 2023
7914648
Update storage/src/main/java/org/apache/uniffle/storage/handler/impl/…
jerqi Apr 12, 2023
4d37ea6
fix
jerqi Apr 12, 2023
9fc2897
fix
jerqi Apr 12, 2023
f20a874
fix
jerqi Apr 12, 2023
74e13c7
fix
jerqi Apr 12, 2023
4f9a5be
fix
jerqi Apr 12, 2023
d690521
fix
jerqi Apr 12, 2023
6fd9692
fix
jerqi Apr 12, 2023
32a7471
fix
jerqi Apr 12, 2023
1046c53
fix ut
jerqi Apr 12, 2023
742fff9
fix
jerqi Apr 12, 2023
e5c15d2
fix
jerqi Apr 12, 2023
37faa8e
fix
jerqi Apr 12, 2023
8feb03a
add doc
jerqi Apr 12, 2023
016907c
fix error
jerqi Apr 12, 2023
77d8afb
Merge remote-tracking branch 'upstream/master' into reader-off
jerqi Apr 12, 2023
5fab041
address comments
jerqi Apr 12, 2023
d198fb5
fix comment
jerqi Apr 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ public RssShuffleDataIterator(
this.codec = compress ? Codec.newInstance(rssConf) : null;
}

public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data, int size) {
public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data) {
clearDeserializationStream();
byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size), true);
// Uncompressed data is released in this class, Compressed data is release in the class ShuffleReadClientImpl
// So if codec is null, we don't release the data when the stream is closed
byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data), codec != null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to unify where the buffer is released?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems difficult. I don't have good idea.

deserializationStream = serializerInstance.deserializeStream(byteBufInputStream);
return deserializationStream.asKeyValueIterator();
}
Expand Down Expand Up @@ -108,10 +110,10 @@ public boolean hasNext() {
long fetchDuration = System.currentTimeMillis() - startFetch;
shuffleReadMetrics.incFetchWaitTime(fetchDuration);
if (rawData != null) {
int uncompressedLen = uncompress(rawBlock, rawData);
uncompress(rawBlock, rawData);
// create new iterator for shuffle data
long startSerialization = System.currentTimeMillis();
recordsIterator = createKVIterator(uncompressedData, uncompressedLen);
recordsIterator = createKVIterator(uncompressedData);
long serializationDuration = System.currentTimeMillis() - startSerialization;
readTime += fetchDuration;
serializeTime += serializationDuration;
Expand All @@ -137,11 +139,9 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {

int uncompressedLen = rawBlock.getUncompressLength();
if (codec != null) {
if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
jerqi marked this conversation as resolved.
Show resolved Hide resolved
// todo: support off-heap bytebuffer
uncompressedData = ByteBuffer.allocate(uncompressedLen);
}
uncompressedData.clear();
// todo: when we have netty data transportation, we will only use off heap memory.
uncompressedData = rawData.isDirect()
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
? ByteBuffer.allocateDirect(uncompressedLen) : ByteBuffer.allocate(uncompressedLen);
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
long startDecompress = System.currentTimeMillis();
codec.decompress(rawData, uncompressedLen, uncompressedData, 0);
unCompressedBytesLength += uncompressedLen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
private final List<ShuffleServerInfo> shuffleServerInfoList;
private int shuffleId;
private int partitionId;
private byte[] readBuffer;
private ByteBuffer readBuffer;
private Roaring64NavigableMap blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
private Roaring64NavigableMap pendingBlockIds;
Expand Down Expand Up @@ -219,8 +219,10 @@ public CompressedShuffleBlock readShuffleBlockData() {
}

if (bs != null) {
return new CompressedShuffleBlock(ByteBuffer.wrap(readBuffer,
bs.getOffset(), bs.getLength()), bs.getUncompressLength());
ByteBuffer compressedBuffer = readBuffer.duplicate();
compressedBuffer.position(bs.getOffset());
compressedBuffer.limit(bs.getOffset() + bs.getLength());
return new CompressedShuffleBlock(compressedBuffer, bs.getUncompressLength());
}
// current segment hasn't data, try next segment
return readShuffleBlockData();
Expand All @@ -238,8 +240,14 @@ private int read() {
if (sdr == null) {
return 0;
}
readBuffer = sdr.getData();
if (readBuffer == null || readBuffer.length == 0) {
if (readBuffer != null) {
boolean isReleased = RssUtils.releaseByteBuffer(readBuffer);
if (!isReleased) {
LOG.warn("release read byte buffer fail, it shouldn't happen frequently");
}
}
readBuffer = sdr.getDataBuffer();
if (readBuffer == null || readBuffer.capacity() == 0) {
return 0;
}
bufferSegmentQueue.addAll(sdr.getBufferSegments());
Expand All @@ -253,6 +261,12 @@ public void checkProcessedBlockIds() {

@Override
public void close() {
if (readBuffer != null) {
boolean isReleased = RssUtils.releaseByteBuffer(readBuffer);
if (!isReleased) {
LOG.warn("release read byte buffer fail when the read client is closed");
}
}
if (clientReadHandler != null) {
clientReadHandler.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.uniffle.common;

import java.nio.ByteBuffer;
import java.util.List;

import com.google.common.collect.Lists;

public class ShuffleDataResult {

private final byte[] data;
private final ByteBuffer data;
private final List<BufferSegment> bufferSegments;

public ShuffleDataResult() {
Expand All @@ -34,12 +35,29 @@ public ShuffleDataResult(byte[] data) {
this(data, Lists.newArrayList());
}

public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
public ShuffleDataResult(List<BufferSegment> bufferSegments, ByteBuffer data) {
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
this.data = data;
this.bufferSegments = bufferSegments;
}

public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
this(bufferSegments, data != null ? ByteBuffer.wrap(data) : null);
}

public byte[] getData() {
if (data == null) {
return null;
}
if (data.hasArray()) {
return data.array();
}
ByteBuffer dataBuffer = data.duplicate();
byte[] byteArray = new byte[dataBuffer.remaining()];
dataBuffer.get(byteArray);
return byteArray;
}

public ByteBuffer getDataBuffer() {
return data;
}

Expand All @@ -48,7 +66,7 @@ public List<BufferSegment> getBufferSegments() {
}

public boolean isEmpty() {
return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.length == 0;
return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.capacity() == 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Enumeration;
Expand All @@ -46,6 +47,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
import io.netty.buffer.Unpooled;
import io.netty.channel.unix.Errors;
import org.eclipse.jetty.util.MultiException;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
Expand Down Expand Up @@ -364,4 +366,8 @@ public static List<String> getConfiguredLocalDirs(RssConf conf) {
return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH);
}
}

public static boolean releaseByteBuffer(ByteBuffer byteBuffer) {
return Unpooled.wrappedBuffer(byteBuffer).release();
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@

package org.apache.uniffle.storage.api;

import java.nio.ByteBuffer;

public interface FileReader {

byte[] read(long offset, int length);

byte[] read();

ByteBuffer readByteBuffer(long offset, int length);
jerqi marked this conversation as resolved.
Show resolved Hide resolved

ByteBuffer readByteBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -28,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.storage.api.FileReader;

Expand Down Expand Up @@ -79,6 +81,34 @@ public byte[] read() {
}
}

@Override
public ByteBuffer readByteBuffer(long offset, int length) {
try {
fsDataInputStream.seek(offset);
ByteBuffer buffer = ByteBuffer.allocateDirect(length);
readFully(buffer);
buffer.flip();
return buffer;
} catch (Exception e) {
LOG.warn("Can't read buffer data for path:" + path + " with offset[" + offset + "], length[" + length + "]", e);
return ByteBuffer.allocateDirect(0);
}
}

@Override
public ByteBuffer readByteBuffer() {
try {
long length = fileSystem.getFileStatus(path).getLen();
if (length > Integer.MAX_VALUE) {
jerqi marked this conversation as resolved.
Show resolved Hide resolved
throw new RssException("File length is too long");
jerqi marked this conversation as resolved.
Show resolved Hide resolved
}
return readByteBuffer(0, (int) length);
jerqi marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
LOG.warn("Can't read buffer data for path:" + path, e);
return ByteBuffer.allocateDirect(0);
}
}

public long getOffset() throws IOException {
return fsDataInputStream.getPos();
}
Expand All @@ -90,6 +120,15 @@ public synchronized void close() throws IOException {
}
}

private void readFully(ByteBuffer buffer) throws IOException {
while (buffer.hasRemaining()) {
int result = fsDataInputStream.read(buffer);
if (result < 0) {
return;
}
}
}

public Path getPath() {
return path;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.uniffle.storage.handler.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -106,17 +107,18 @@ protected ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegmen
return null;
}

byte[] data = readShuffleData(shuffleDataSegment.getOffset(), expectedLength);
if (data.length == 0) {
ByteBuffer data = readShuffleDataByteBuffer(shuffleDataSegment.getOffset(), expectedLength);
int length = data.limit() - data.position();
if (length == 0) {
LOG.warn("Fail to read expected[{}] data, actual[{}] and segment is {} from file {}.data",
expectedLength, data.length, shuffleDataSegment, filePrefix);
expectedLength, length, shuffleDataSegment, filePrefix);
return null;
}

ShuffleDataResult shuffleDataResult = new ShuffleDataResult(data, shuffleDataSegment.getBufferSegments());
ShuffleDataResult shuffleDataResult = new ShuffleDataResult(shuffleDataSegment.getBufferSegments(), data);
if (shuffleDataResult.isEmpty()) {
LOG.warn("Shuffle data is empty, expected length {}, data length {}, segment {} in file {}.data",
expectedLength, data.length, shuffleDataSegment, filePrefix);
expectedLength, length, shuffleDataSegment, filePrefix);
return null;
}

Expand All @@ -133,6 +135,17 @@ protected byte[] readShuffleData(long offset, int expectedLength) {
return data;
}

private ByteBuffer readShuffleDataByteBuffer(long offset, int expectedLength) {
ByteBuffer data = dataReader.readByteBuffer(offset, expectedLength);
int length = data.limit() - data.position();
if (length != expectedLength) {
LOG.warn("Fail to read byte buffer expected[{}] data, actual[{}] from file {}.data",
expectedLength, length, filePrefix);
return ByteBuffer.allocateDirect(0);
}
return data;
}

private long getDataFileLen() {
try {
return dataReader.getFileLen();
Expand Down Expand Up @@ -168,7 +181,4 @@ public List<ShuffleDataSegment> getShuffleDataSegments() {
return shuffleDataSegments;
}

public String getFilePrefix() {
return filePrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,6 +77,16 @@ public byte[] read() {
}
}

@Override
public ByteBuffer readByteBuffer(long offset, int length) {
throw new UnsupportedOperationException("Local file reader don't support off heap read now");
}

@Override
public ByteBuffer readByteBuffer() {
throw new UnsupportedOperationException("Local file reader don't support off heap read now");
}

@Override
public synchronized void close() {
if (dataInputStream != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,6 @@ public static String getShuffleDataPath(String appId, int shuffleId, int start,
String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(start), String.valueOf(end)));
}

public static String getUploadShuffleDataPath(String appId, int shuffleId, int partitionId) {
return String.join(
HDFS_PATH_SEPARATOR,
appId,
String.valueOf(shuffleId),
String.valueOf(partitionId));
}

public static String getCombineDataPath(String appId, int shuffleId) {
return String.join(
HDFS_PATH_SEPARATOR,
Expand Down Expand Up @@ -193,10 +185,6 @@ public static void createDirIfNotExist(FileSystem fileSystem, String pathString)
}
}

// index file header is $PartitionNum | [($PartitionId | $PartitionFileLength | $PartitionDataFileLength), ] | $CRC
public static long getIndexFileHeaderLen(int partitionNum) {
return 4 + (4 + 8 + 8) * (long) partitionNum + 8;
}

public static long uploadFile(File file, HdfsFileWriter writer, int bufferSize) throws IOException {
try (FileInputStream inputStream = new FileInputStream(file)) {
Expand Down