Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#596] feat(netty): Use off heap memory to read HDFS data #806

Merged
merged 38 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e302695
[#596] Use off heap memory to read HDFS data
jerqi Apr 10, 2023
8ae5ded
fix
jerqi Apr 10, 2023
5a55ab1
fix
jerqi Apr 10, 2023
cf2c023
fix
jerqi Apr 10, 2023
7a5e968
fix
jerqi Apr 10, 2023
40fbc88
fix
jerqi Apr 10, 2023
d6f4498
fix
jerqi Apr 10, 2023
0bd5ca7
fix
jerqi Apr 10, 2023
578bb49
fix
jerqi Apr 10, 2023
da5ceea
fix
jerqi Apr 10, 2023
f451194
fix
jerqi Apr 10, 2023
78ef7d6
fix
jerqi Apr 11, 2023
9ce762e
fix
jerqi Apr 11, 2023
590546a
fix
jerqi Apr 11, 2023
868606a
fix
jerqi Apr 11, 2023
4f70825
fix
jerqi Apr 11, 2023
d154283
fix
jerqi Apr 11, 2023
a325fbd
fix
jerqi Apr 11, 2023
ab6bdda
Update client-spark/common/src/main/java/org/apache/spark/shuffle/rea…
jerqi Apr 11, 2023
8e84ce7
Update storage/src/main/java/org/apache/uniffle/storage/request/Creat…
jerqi Apr 12, 2023
7914648
Update storage/src/main/java/org/apache/uniffle/storage/handler/impl/…
jerqi Apr 12, 2023
4d37ea6
fix
jerqi Apr 12, 2023
9fc2897
fix
jerqi Apr 12, 2023
f20a874
fix
jerqi Apr 12, 2023
74e13c7
fix
jerqi Apr 12, 2023
4f9a5be
fix
jerqi Apr 12, 2023
d690521
fix
jerqi Apr 12, 2023
6fd9692
fix
jerqi Apr 12, 2023
32a7471
fix
jerqi Apr 12, 2023
1046c53
fix ut
jerqi Apr 12, 2023
742fff9
fix
jerqi Apr 12, 2023
e5c15d2
fix
jerqi Apr 12, 2023
37faa8e
fix
jerqi Apr 12, 2023
8feb03a
add doc
jerqi Apr 12, 2023
016907c
fix error
jerqi Apr 12, 2023
77d8afb
Merge remote-tracking branch 'upstream/master' into reader-off
jerqi Apr 12, 2023
5fab041
address comments
jerqi Apr 12, 2023
d198fb5
fix comment
jerqi Apr 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList,
readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable);
readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable, false);
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, merger, copyPhase, reporter, metrics,
shuffleReadClient, blockIdBitmap.getLongCardinality(), RssMRConfig.toRssConf(rssJobConf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ public class RssSparkConfig {
+ "whether this conf is set or not"))
.createWithDefault("");

public static final ConfigEntry<Boolean> RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE = createBooleanBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConf.OFF_HEAP_MEMORY_ENABLE.key())
.doc(RssClientConf.OFF_HEAP_MEMORY_ENABLE.description()))
.createWithDefault(RssClientConf.OFF_HEAP_MEMORY_ENABLE.defaultValue());

public static final ConfigEntry<Integer> RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.RssUtils;

public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C>> {

Expand Down Expand Up @@ -74,9 +75,14 @@ public RssShuffleDataIterator(
this.codec = compress ? Codec.newInstance(rssConf) : null;
}

public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data, int size) {
public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data) {
clearDeserializationStream();
byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size), true);
// Unpooled.wrapperBuffer will return a ByteBuf, but this ByteBuf won't release direct/heap memory
// when the ByteBuf is released. This is because the UnpooledDirectByteBuf's doFree is false
// when it is constructed from user provided ByteBuffer.
// The `releaseOnClose` parameter doesn't take effect, we would release the data ByteBuffer
// manually.
byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data), true);
deserializationStream = serializerInstance.deserializeStream(byteBufInputStream);
return deserializationStream.asKeyValueIterator();
}
Expand All @@ -89,6 +95,7 @@ private void clearDeserializationStream() {
LOG.warn("Can't close ByteBufInputStream, memory may be leaked.");
}
}

if (deserializationStream != null) {
deserializationStream.close();
}
Expand All @@ -109,10 +116,10 @@ public boolean hasNext() {
long fetchDuration = System.currentTimeMillis() - startFetch;
shuffleReadMetrics.incFetchWaitTime(fetchDuration);
if (rawData != null) {
int uncompressedLen = uncompress(rawBlock, rawData);
uncompress(rawBlock, rawData);
// create new iterator for shuffle data
long startSerialization = System.currentTimeMillis();
recordsIterator = createKVIterator(uncompressedData, uncompressedLen);
recordsIterator = createKVIterator(uncompressedData);
long serializationDuration = System.currentTimeMillis() - startSerialization;
readTime += fetchDuration;
serializeTime += serializationDuration;
Expand All @@ -139,15 +146,21 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {
int uncompressedLen = rawBlock.getUncompressLength();
if (codec != null) {
if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
jerqi marked this conversation as resolved.
Show resolved Hide resolved
// todo: support off-heap bytebuffer
uncompressedData = ByteBuffer.allocate(uncompressedLen);
if (uncompressedData != null) {
RssUtils.releaseByteBuffer(uncompressedData);
}
uncompressedData = rawData.isDirect()
? ByteBuffer.allocateDirect(uncompressedLen) : ByteBuffer.allocate(uncompressedLen);
}
uncompressedData.clear();
long startDecompress = System.currentTimeMillis();
codec.decompress(rawData, uncompressedLen, uncompressedData, 0);
unCompressedBytesLength += uncompressedLen;
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
// ByteBuffer's limit may not uncompressDataLength after using compress method.
// So we need set limit here.
uncompressedData.limit(uncompressedData.position() + uncompressedLen);
jerqi marked this conversation as resolved.
Show resolved Hide resolved
} else {
uncompressedData = rawData;
}
Expand All @@ -162,6 +175,11 @@ public Product2<K, C> next() {

public BoxedUnit cleanup() {
clearDeserializationStream();
// Uncompressed data is released in this class, Compressed data is release in the class ShuffleReadClientImpl
// So if codec is null, we don't release the data when the stream is closed
if (codec != null) {
RssUtils.releaseByteBuffer(uncompressedData);
}
if (shuffleReadClient != null) {
shuffleReadClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public Iterator<Product2<K, C>> read() {
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, shuffleId, startPartition, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable);
shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable,
rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE));
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator rssShuffleDataIterator = new RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(), shuffleReadClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class MultiPartitionIterator<K, C> extends AbstractIterator<Product2<K, C>> {
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, shuffleId, partition, storageType, basePath, indexReadLimit, readBufferSize,
1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList,
hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable);
hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable,
rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE));
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator<K, C> iterator = new RssShuffleDataIterator<>(
shuffleDependency.serializer(), shuffleReadClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest
request.getHadoopConf(),
request.getIdHelper(),
request.getShuffleDataDistributionType(),
request.isExpectedTaskIdsBitmapFilterEnable()
request.isExpectedTaskIdsBitmapFilterEnable(),
request.isOffHeapEnabled()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
private final List<ShuffleServerInfo> shuffleServerInfoList;
private int shuffleId;
private int partitionId;
private byte[] readBuffer;
private ByteBuffer readBuffer;
private Roaring64NavigableMap blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
private Roaring64NavigableMap pendingBlockIds;
Expand Down Expand Up @@ -78,7 +78,8 @@ public ShuffleReadClientImpl(
Configuration hadoopConf,
IdHelper idHelper,
ShuffleDataDistributionType dataDistributionType,
boolean expectedTaskIdsBitmapFilterEnable) {
boolean expectedTaskIdsBitmapFilterEnable,
boolean offHeapEnabled) {
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.blockIdBitmap = blockIdBitmap;
Expand Down Expand Up @@ -106,6 +107,9 @@ public ShuffleReadClientImpl(
if (expectedTaskIdsBitmapFilterEnable) {
request.useExpectedTaskIdsBitmapFilter();
}
if (offHeapEnabled) {
request.enableOffHeap();
}

List<Long> removeBlockIds = Lists.newArrayList();
blockIdBitmap.forEach(bid -> {
Expand Down Expand Up @@ -142,7 +146,7 @@ public ShuffleReadClientImpl(
this(storageType, appId, shuffleId, partitionId, indexReadLimit,
partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
idHelper, ShuffleDataDistributionType.NORMAL, false);
idHelper, ShuffleDataDistributionType.NORMAL, false, false);
}

@Override
Expand Down Expand Up @@ -219,8 +223,10 @@ public CompressedShuffleBlock readShuffleBlockData() {
}

if (bs != null) {
return new CompressedShuffleBlock(ByteBuffer.wrap(readBuffer,
bs.getOffset(), bs.getLength()), bs.getUncompressLength());
ByteBuffer compressedBuffer = readBuffer.duplicate();
compressedBuffer.position(bs.getOffset());
compressedBuffer.limit(bs.getOffset() + bs.getLength());
return new CompressedShuffleBlock(compressedBuffer, bs.getUncompressLength());
}
// current segment hasn't data, try next segment
return readShuffleBlockData();
Expand All @@ -238,8 +244,11 @@ private int read() {
if (sdr == null) {
return 0;
}
readBuffer = sdr.getData();
if (readBuffer == null || readBuffer.length == 0) {
if (readBuffer != null) {
RssUtils.releaseByteBuffer(readBuffer);
}
readBuffer = sdr.getDataBuffer();
if (readBuffer == null || readBuffer.capacity() == 0) {
return 0;
}
bufferSegmentQueue.addAll(sdr.getBufferSegments());
Expand All @@ -253,6 +262,9 @@ public void checkProcessedBlockIds() {

@Override
public void close() {
if (readBuffer != null) {
RssUtils.releaseByteBuffer(readBuffer);
}
if (clientReadHandler != null) {
clientReadHandler.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class CreateShuffleReadClientRequest {
private IdHelper idHelper;
private ShuffleDataDistributionType shuffleDataDistributionType = ShuffleDataDistributionType.NORMAL;
private boolean expectedTaskIdsBitmapFilterEnable = false;
private boolean offHeapEnabled = false;

public CreateShuffleReadClientRequest(
String appId,
Expand All @@ -61,33 +62,14 @@ public CreateShuffleReadClientRequest(
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
ShuffleDataDistributionType dataDistributionType,
boolean expectedTaskIdsBitmapFilterEnable) {
boolean expectedTaskIdsBitmapFilterEnable,
boolean offHeapEnabled) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable);
hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable, offHeapEnabled);
this.shuffleDataDistributionType = dataDistributionType;
}

public CreateShuffleReadClientRequest(
String appId,
int shuffleId,
int partitionId,
String storageType,
String basePath,
int indexReadLimit,
int readBufferSize,
int partitionNumPerRange,
int partitionNum,
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
boolean expectedTaskIdsBitmapFilterEnable) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable);
}

public CreateShuffleReadClientRequest(
String appId,
int shuffleId,
Expand All @@ -103,7 +85,8 @@ public CreateShuffleReadClientRequest(
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
IdHelper idHelper,
boolean expectedTaskIdsBitmapFilterEnable) {
boolean expectedTaskIdsBitmapFilterEnable,
boolean offHeapEnabled) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
Expand All @@ -119,6 +102,28 @@ public CreateShuffleReadClientRequest(
this.hadoopConf = hadoopConf;
this.idHelper = idHelper;
this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
this.offHeapEnabled = offHeapEnabled;
}

public CreateShuffleReadClientRequest(
String appId,
int shuffleId,
int partitionId,
String storageType,
String basePath,
int indexReadLimit,
int readBufferSize,
int partitionNumPerRange,
int partitionNum,
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
boolean expectedTaskIdsBitmapFilterEnable,
boolean offHeapEnabled) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable, offHeapEnabled);
}

public String getAppId() {
Expand Down Expand Up @@ -184,4 +189,8 @@ public ShuffleDataDistributionType getShuffleDataDistributionType() {
public boolean isExpectedTaskIdsBitmapFilterEnable() {
return expectedTaskIdsBitmapFilterEnable;
}

public boolean isOffHeapEnabled() {
return offHeapEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.uniffle.common;

import java.nio.ByteBuffer;
import java.util.List;

import com.google.common.collect.Lists;

public class ShuffleDataResult {

private final byte[] data;
private final ByteBuffer data;
private final List<BufferSegment> bufferSegments;

public ShuffleDataResult() {
Expand All @@ -34,12 +35,29 @@ public ShuffleDataResult(byte[] data) {
this(data, Lists.newArrayList());
}

public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
public ShuffleDataResult(ByteBuffer data, List<BufferSegment> bufferSegments) {
this.data = data;
this.bufferSegments = bufferSegments;
}

public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
this(data != null ? ByteBuffer.wrap(data) : null, bufferSegments);
}

public byte[] getData() {
if (data == null) {
return null;
}
if (data.hasArray()) {
return data.array();
}
ByteBuffer dataBuffer = data.duplicate();
byte[] byteArray = new byte[dataBuffer.remaining()];
dataBuffer.get(byteArray);
return byteArray;
}

public ByteBuffer getDataBuffer() {
return data;
}

Expand All @@ -48,7 +66,7 @@ public List<BufferSegment> getBufferSegments() {
}

public boolean isEmpty() {
return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.length == 0;
return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.capacity() == 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,10 @@ public class RssClientConf {
.intType()
.noDefaultValue()
.withDescription("internal configuration to indicate which port is actually bind for shuffle manager service.");

public static final ConfigOption<Boolean> OFF_HEAP_MEMORY_ENABLE = ConfigOptions
.key("rss.client.off.heap.memory.enable")
.booleanType()
.defaultValue(false)
.withDescription("Client can use off heap memory");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Enumeration;
Expand All @@ -47,6 +48,7 @@
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
import io.netty.channel.unix.Errors;
import io.netty.util.internal.PlatformDependent;
import org.eclipse.jetty.util.MultiException;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -364,4 +366,11 @@ public static List<String> getConfiguredLocalDirs(RssConf conf) {
return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH);
}
}

public static void releaseByteBuffer(ByteBuffer byteBuffer) {
if (byteBuffer == null || !byteBuffer.isDirect()) {
return;
}
PlatformDependent.freeDirectBuffer(byteBuffer);
}
}
Loading