Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(netty): io.netty.util.internal.OutOfDirectMemoryError. #1155

Closed
3 tasks done
leixm opened this issue Aug 17, 2023 · 0 comments · Fixed by #1151
Closed
3 tasks done

fix(netty): io.netty.util.internal.OutOfDirectMemoryError. #1155

leixm opened this issue Aug 17, 2023 · 0 comments · Fixed by #1151

Comments

@leixm
Copy link
Contributor

leixm commented Aug 17, 2023

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

Currently using the code of the master branch to test a large data set, an io.netty.util.internal.OutOfDirectMemoryError exception will occur.

Is there a memory leak?

I tried to use the end-to-end test method. I found that in the scenario of 5 concurrency and 500G, the oom exception did not appear, but in the scenario of 40 concurrency and 500G, the oom exception appeared. It seems that the problem of oom is caused by actual memory occupies larger than used_buffer instead of memory leak.

When this line of code ByteBuf data = ByteBufUtils.readSlice(byteBuf); was executed in org.apache.uniffle.common.netty.protocol.Decoders#decodeShuffleBlockInfo, the life cycle of the ByteBuf applied inside the netty framework is extended, TransportFrameDecoder# When channelRead executes frame.release();, ByteBuf cannot be released. I am not sure whether the Bytebuf object generated by the netty framework contains other content. Judging from the current test results, it seems to lead to off-heap memory Occupies much larger than used_buffer.

Affects Version(s)

master

Uniffle Server Log Output

io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 85899345920, max: 85899345920)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:802)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202)
        at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:140)
        at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:745)

Uniffle Engine Log Output

No response

Uniffle Server Configurations

rss.rpc.server.port 19997
rss.jetty.http.port 19996
rss.server.netty.port 29999
rss.storage.basePath xxx,xxx
rss.storage.type MEMORY_LOCALFILE
rss.coordinator.quorum xxx:19999
rss.server.buffer.capacity 40gb
rss.server.read.buffer.capacity 20gb
rss.server.flush.thread.alive 50
rss.server.flush.threadPool.size 4
rss.server.disk.capacity 1t
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
rss.server.commit.timeout 600000
rss.server.app.expired.withoutHeartbeat 300000
rss.server.flush.cold.storage.threshold.size 64m

Uniffle Engine Configurations

No response

Additional context

Test script:

package org.apache.uniffle.client.tools;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class EndToEndStressTest {
    private static Logger LOG;
    static class StressTest {
        private String shuffleServerAddress;
        private String appId;
        private int shuffleId;
        private int partitionNum;
        private int threadNum;
        private long sendBytes;

        private long bytesPerThread;
        private long roundsPerThread;

        private int bytesPerPartition;

        private AtomicInteger atomicInteger = new AtomicInteger();
        ShuffleServerClient shuffleServerClient;

        Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks;

        ExecutorService threadPoolExecutor;

        CountDownLatch countDownLatch;
        private volatile boolean stopped = false;

        private int blockSize;
        public StressTest(String shuffleServerAddress, String appId, int shuffleId, int partitionNum, int threadNum, long sendBytes, int blockSize) {
            this.shuffleServerAddress = shuffleServerAddress;
            this.appId = appId;
            this.shuffleId = shuffleId;
            this.partitionNum = partitionNum;
            this.threadNum = threadNum;
            this.sendBytes = sendBytes;
            this.blockSize = blockSize;
            this.bytesPerThread = sendBytes / threadNum;
            this.roundsPerThread = bytesPerThread / blockSize;
            this.bytesPerPartition = (int) (bytesPerThread / (roundsPerThread * partitionNum));
            LOG = LoggerFactory.getLogger(EndToEndStressTest.class + "-" + appId);
            shuffleIdToBlocks = Maps.newHashMap();
            LOG.info("bytesPerThread={}, roundsPerThread={}, bytesPerPartition={}", bytesPerThread, roundsPerThread, bytesPerPartition);
            Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
            for (int partitionId = 0; partitionId < partitionNum; partitionId++) {
                byte[] data = new byte[bytesPerPartition];
                for (int j = 0; j < bytesPerPartition; j++) {
                    data[j] = (byte) (new Random()).nextInt(127);
                }
                int mockTaskAttemptId =  (new Random()).nextInt(100);
                partitionToBlocks.put(partitionId, Lists.newArrayList(new ShuffleBlockInfo(0, partitionId,
                        ClientUtils.getBlockId(partitionId, mockTaskAttemptId, atomicInteger.getAndIncrement()),
                        data.length, ChecksumUtils.getCrc32(data), data, Lists.newArrayList(), data.length, 0, mockTaskAttemptId)));
            }
            shuffleIdToBlocks.put(0, partitionToBlocks);
            ShuffleServerClientFactory clientFactory = ShuffleServerClientFactory.getInstance();
            RssConf rssConf = new RssConf();
            rssConf.set(RssClientConf.NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER, 20);
            shuffleServerClient = clientFactory.getShuffleServerClient(
                    ClientType.GRPC_NETTY.name(), new ShuffleServerInfo("1", shuffleServerAddress.split(":")[0],
                            Integer.parseInt(shuffleServerAddress.split(":")[1]), Integer.parseInt(shuffleServerAddress.split(":")[2])), rssConf
            );
            countDownLatch = new CountDownLatch(threadNum);

            List<PartitionRange> partitionRangeList = Lists.newArrayList();
            for (int i = 0; i < partitionNum; i++) {
                partitionRangeList.add(new PartitionRange(i, i));
            }

            LOG.info("Start register shuffle!");
            RssRegisterShuffleRequest rssRegisterShuffleRequest = new RssRegisterShuffleRequest(appId, 0, partitionRangeList, "");
            shuffleServerClient.registerShuffle(rssRegisterShuffleRequest);
            LOG.info("Finished register shuffle!");
        }

        public void run() throws InterruptedException {
            LOG.info("Start end-to-end stress test!");
            threadPoolExecutor = Executors.newFixedThreadPool(threadNum);
            for (int i = 0; i < threadNum; i++) {
                threadPoolExecutor.submit(this::send);
            }

            RssAppHeartBeatRequest heartBeatRequest = new RssAppHeartBeatRequest("test_app", 60 * 1000);
            while (countDownLatch.getCount() > 0) {
                shuffleServerClient.sendHeartBeat(heartBeatRequest);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    LOG.error("heartBeat error!", e);
                    break;
                }
            }

            countDownLatch.await();
        }
        public void send() {
            LOG.info("Start end-to-end stress send!");

            for (int i = 0; i < roundsPerThread && !stopped; i++) {
                LOG.info("round-{}", i);
                RssSendShuffleDataRequest request = new RssSendShuffleDataRequest(appId, 100, 10000L, shuffleIdToBlocks);
                RssSendShuffleDataResponse response = shuffleServerClient.sendShuffleData(request);
                if(response.getStatusCode() != StatusCode.SUCCESS) {
                    LOG.error("sendShuffleData error! {}", response.getMessage());
                }
            }
            countDownLatch.countDown();
        }

        public void stop() {
            stopped = true;
            threadPoolExecutor.shutdown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        String appId = args[0];
        String shuffleServerAddress = args[1];
        int partitionNun = Integer.parseInt(args[2]);
        int thread = Integer.parseInt(args[3]);
        long sendBytes = Long.parseLong(args[4]);
        int blockSize = Integer.parseInt(args[5]);
        StressTest stressTest = new StressTest(shuffleServerAddress, appId, 0, partitionNun, thread, sendBytes, blockSize);
        stressTest.run();
        stressTest.stop();
    }
}

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
jerqi pushed a commit that referenced this issue Aug 17, 2023
…1151)

### What changes were proposed in this pull request?

Fix io.netty.util.internal.OutOfDirectMemoryError.

### Why are the changes needed?

Fix io.netty.util.internal.OutOfDirectMemoryError.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.

Co-authored-by: leixianming <leixianming@didiglobal.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant