Skip to content

Commit

Permalink
[Improvement] ShuffleBlock should be release when finished reading (#74)
Browse files Browse the repository at this point in the history
### **What changes were proposed in this pull request?**
release shuffleblock  when finished reading

### **Why are the changes needed?**
We found spark executor is easy be killed by yarn, and i found it is because executor use too mush offheap memory when read shuffle data.
I found most of offheap memory is used to store uncompressed shuffle Data, and this part of memory will be release only when GC is triggered

### **Does this PR introduce any user-facing change?**
No

### **How was this patch tested?**
Add new ut
  • Loading branch information
xianjingfeng committed Jul 29, 2022
1 parent 5b144a6 commit ccb39ed
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.RssShuffleUtils;
import org.apache.uniffle.common.exception.RssException;

public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C>> {

Expand All @@ -54,6 +55,7 @@ public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C
private DeserializationStream deserializationStream = null;
private ByteBufInputStream byteBufInputStream = null;
private long unCompressionLength = 0;
private ByteBuffer uncompressedData;

public RssShuffleDataIterator(
Serializer serializer,
Expand Down Expand Up @@ -106,8 +108,17 @@ public boolean hasNext() {
shuffleReadMetrics.incFetchWaitTime(fetchDuration);
if (compressedData != null) {
shuffleReadMetrics.incRemoteBytesRead(compressedData.limit() - compressedData.position());
// Directbytebuffers are not collected in time will cause executor easy
// be killed by cluster managers(such as YARN) for using too much offheap memory
if (uncompressedData != null && uncompressedData.isDirect()) {
try {
RssShuffleUtils.destroyDirectByteBuffer(uncompressedData);
} catch (Exception e) {
throw new RssException("Destroy DirectByteBuffer failed!", e);
}
}
long startDecompress = System.currentTimeMillis();
ByteBuffer uncompressedData = RssShuffleUtils.decompressData(
uncompressedData = RssShuffleUtils.decompressData(
compressedData, compressedBlock.getUncompressLength());
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.uniffle.common;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
Expand Down Expand Up @@ -56,4 +59,29 @@ public static ByteBuffer decompressData(ByteBuffer data, int uncompressLength, b
fastDecompressor.decompress(data, data.position(), uncompressData, 0, uncompressLength);
return uncompressData;
}

/**
* DirectByteBuffers are garbage collected by using a phantom reference and a
* reference queue. Every once a while, the JVM checks the reference queue and
* cleans the DirectByteBuffers. However, as this doesn't happen
* immediately after discarding all references to a DirectByteBuffer, it's
* easy to OutOfMemoryError yourself using DirectByteBuffers. This function
* explicitly calls the Cleaner method of a DirectByteBuffer.
*
* @param toBeDestroyed
* The DirectByteBuffer that will be "cleaned". Utilizes reflection.
*
*/
public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed)
throws IllegalArgumentException, IllegalAccessException,
InvocationTargetException, SecurityException, NoSuchMethodException {
Preconditions.checkArgument(toBeDestroyed.isDirect(),
"toBeDestroyed isn't direct!");
Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
cleanerMethod.setAccessible(true);
Object cleaner = cleanerMethod.invoke(toBeDestroyed);
Method cleanMethod = cleaner.getClass().getMethod("clean");
cleanMethod.setAccessible(true);
cleanMethod.invoke(cleaner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public class RssException extends RuntimeException {
public RssException(String message) {
super(message);
}

public RssException(String message, Throwable e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.nio.ByteBuffer;
import org.apache.commons.lang3.RandomUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RssShuffleUtilsTest {

Expand All @@ -46,4 +48,27 @@ public void testCompression(int size) {
assertArrayEquals(data, buffer2);
}

@Test
public void testDestroyDirectByteBuffer() throws Exception {
int size = 10;
byte b = 1;
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
for (int i = 0; i < size; i++) {
byteBuffer.put(b);
}
byteBuffer.flip();
RssShuffleUtils.destroyDirectByteBuffer(byteBuffer);
// The memory may not be released fast enough.
Thread.sleep(200);
boolean same = true;
byte[] read = new byte[size];
byteBuffer.get(read);
for (byte br : read) {
if (b != br) {
same = false;
break;
}
}
assertTrue(!same);
}
}

0 comments on commit ccb39ed

Please sign in to comment.