Skip to content

Conversation

@suyanNone
Copy link
Contributor

  1. I found if we use getChannel to put or get data, it will create DirectBuffer anyway, which is not controllable.

according openJDK source code: because it will create a ThreadLocal directBuffer pool, and is not provider a 100% percent way to sure the direct buffer to be released.

 sun.nio.ch.FileChannelImpl.java
public int write(ByteBuffer src) throws IOException {
210         ensureOpen();
211         if (!writable)
212             throw new NonWritableChannelException();
213         synchronized (positionLock) {
214             int n = 0;
215             int ti = -1;
216             try {
217                 begin();
218                 if (!isOpen())
219                     return 0;
220                 ti = threads.add();
221                 if (appending)
222                     position(size());
223                 do {
224                     n = IOUtil.write(fd, src, -1, nd, positionLock);
225                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
226                 return IOStatus.normalize(n);
227             } finally {
228                 threads.remove(ti);
229                 end(n > 0);
230                 assert IOStatus.check(n);
231             }
232         }
233     }
IOUtil.java

static int write(FileDescriptor fd, ByteBuffer src, long position,
74                      NativeDispatcher nd, Object lock)
75         throws IOException
76     {
77         if (src instanceof DirectBuffer)
78             return writeFromNativeBuffer(fd, src, position, nd, lock);
79 
80         // Substitute a native buffer
81         int pos = src.position();
82         int lim = src.limit();
83         assert (pos <= lim);
84         int rem = (pos <= lim ? lim - pos : 0);
85         ByteBuffer bb = null;
86         try {
87             bb = Util.getTemporaryDirectBuffer(rem);
88             bb.put(src);
89             bb.flip();
90             // Do not update src until we see how many bytes were written
91             src.position(pos);
92 
93             int n = writeFromNativeBuffer(fd, bb, position, nd, lock);
94             if (n > 0) {
95                 // now update src
96                 src.position(pos + n);
97             }
98             return n;
99         } finally {
100            Util.releaseTemporaryDirectBuffer(bb);
101        }
102    }
Util.java
     static ByteBuffer getTemporaryDirectBuffer(int size) {
61         ByteBuffer buf = null;
62         // Grab a buffer if available
63         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
64             SoftReference ref = (SoftReference)(bufferPool[i].get());
65             if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
66                 (buf.capacity() >= size)) {
67                 buf.rewind();
68                 buf.limit(size);
69                 bufferPool[i].set(null);
70                 return buf;
71             }
72         }
73 
74         // Make a new one
75         return ByteBuffer.allocateDirect(size);
76     }
 private static final int TEMP_BUF_POOL_SIZE = 3;
50 
51     // Per-thread soft cache of the last temporary direct buffer
52     private static ThreadLocal[] bufferPool;
53 
54     static {
55         bufferPool = new ThreadLocal[TEMP_BUF_POOL_SIZE];
56         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
57             bufferPool[i] = new ThreadLocal();
58     }
59 
60     static ByteBuffer getTemporaryDirectBuffer(int size) {
61         ByteBuffer buf = null;
62         // Grab a buffer if available
63         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
64             SoftReference ref = (SoftReference)(bufferPool[i].get());
65             if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
66                 (buf.capacity() >= size)) {
67                 buf.rewind();
68                 buf.limit(size);
69                 bufferPool[i].set(null);
70                 return buf;
71             }
72         }
73 
74         // Make a new one
75         return ByteBuffer.allocateDirect(size);
76     }
77 
78     static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
79         if (buf == null)
80             return;
81         // Put it in an empty slot if such exists
82         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
83             SoftReference ref = (SoftReference)(bufferPool[i].get());
84             if ((ref == null) || (ref.get() == null)) {
85                 bufferPool[i].set(new SoftReference(buf));
86                 return;
87             }
88         }
89         // Otherwise replace a smaller one in the cache if such exists
90         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
91             SoftReference ref = (SoftReference)(bufferPool[i].get());
92             ByteBuffer inCacheBuf = (ByteBuffer)ref.get();
93             if ((inCacheBuf == null) || (buf.capacity() > inCacheBuf.capacity())) {
94                 bufferPool[i].set(new SoftReference(buf));
95                 return;
96             }
97         }
98     }

@srowen
Copy link
Member

srowen commented Jun 2, 2015

Not all JDKs are OpenJDK, but, I'm also not sure why this would be better? we don't want to avoid a buffer necessarily?

@SparkQA
Copy link

SparkQA commented Jun 2, 2015

Test build #33984 has finished for PR 6586 at commit ed43257.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

@srowen
I think in all system, we want make physical memory to be more controllable.
We use spark on yarn, we always encounter direct buffer is out of control, and be killed by yarn. it cause a lot of task failed and begin to retry.

for the reason:
*memory_mapping block send to remote, the direct buffer will not be released because the thread is alive.it is a netty 4.0.23-final bug, I already report to netty community, and it give a solution

*according to this patch, this patch is for the block contains disk level
** read block not use memory mapping ,because it small than memory mapping threshold.
** droping memory_disk_ser level block into disk.
because channel contains a thread local direct buffer pool, it is soft reference, and can't be released as I wishes.

another thing to point, we use jdk not open jdk, I refer to openjdk because I can't found jdk source code.

@suyanNone suyanNone changed the title [SPARK-8044][CORE] Avoid to use directbuffer while reading or writing disk level block [SPARK-8044][CORE]Avoid to directbuffer out of control while reading or droping disk level block Jun 3, 2015
@suyanNone suyanNone changed the title [SPARK-8044][CORE]Avoid to directbuffer out of control while reading or droping disk level block [SPARK-8044][CORE]Avoid to make directbuffer out of control while reading or droping disk level block Jun 3, 2015
@suyanNone
Copy link
Contributor Author

@srowen
if want to make sense of physical memory, I think there have 2 ways:

  1. use fileOutputStream or fileinputStream to write byte[] directly instead of use channel to read or write
  2. Accoding each user's max rdd block size, just add more "3* max rdd block size " in memoryOverHead

@suyanNone
Copy link
Contributor Author

@srowen may can split the buffer into slices to read or write through channel.

@srowen
Copy link
Member

srowen commented Jun 3, 2015

Yes, but there are also advantages to letting it use byte buffers. They are held in a thread-local soft reference so are cached and reused, and are released under memory pressure. I don't think we want or need to manage this. What's the problem, that this increases off-heap memory usage? yes, but you can increase the amount of overhead YARN allows for this. Here you're just trading for potentially less-efficient manual on-heap management. I am not clear this is a good idea.

@suyanNone
Copy link
Contributor Author

@srowen if it just to adjust memoryoverhead arg, it will customized for every different application or even if the data increases day by day, may it will change again.

How about slice on the Bytebuffer, to ensure the direct buffer pool be more controllable.
something like:
while(condition){
channel.write(64MB)
}

it will ensure the direct buffer pool not larger than 64MB * 3

@suyanNone
Copy link
Contributor Author

@srowen
Today I spent some time to have a performance test.

If I just test 1 cycle, TestOutPutStream have a minor strength, may due to directbuffer creation and destroy is a time cost thing.

cycle: 1, data: 10Mb
TestOutputStream: 12
TestChannel: 14

cycle: 1, data: 50MB
TestOutputStream: 46
TestChannel: 54

cycle: 1, data: 100MB
TestOutputStream: 110
TestChannel: 112

cycle: 1, data: 500MB
TestOutputStream: 620
TestChannel: 600

While cycle is increased to 10.
FileOutputStream is in direct proportion. and channel thanks the directBuffer pool, it just increase a little time on the "cycle 1" time.

cycle: 10, data 10MB
TestOutputStream: 100
TestChannel: 16

cycle: 10, data 50MB
TestOutputStream: 474
TestChannel: 63

cycle: 10, data 100MB
TestOutputStream: 1118
TestChannel: 138

cycle:10, data:500MB
TestOutputStream: 6332
TestChannel: 690

And also according to test, the time to create a direct buffer is in direct proportion of data size.
so I think slice large data into small size will be good for performance and can reduce direct buffer pool size.

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34173 has finished for PR 6586 at commit 539c3e8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34179 has finished for PR 6586 at commit 379098b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jun 4, 2015

So, the problem here isn't heap memory right? because although this allocates and caches buffers quite freely, they are in soft refs, which can be removed if memory gets low. You are worried about off-heap memory right? Because nothing understands when "off heap memory is low".

I think your change slows things down a bit and adds a little extra complexity, but I don't know that we've shown there's a real problem here. Yes Spark jobs already use a lot of off heap memory but this is not a problem per se.

@JoshRosen
Copy link
Contributor

I haven't dug into this patch in detail yet, but to quickly address one comment: has the Netty bug been fixed in Spark by upgrading yet? If that's the case, is this patch still as urgent?

@suyanNone
Copy link
Contributor Author

@JoshRosen
The netty direct buffer leak problem fix on Netty-4.0.29-Final, but it is not release official yet.

Although I also not sure that patch is needed in common or not.
Because our PageRank app is always out of controller of DirectBuffer, I just want it be more Controllable. when lost of control, it will kill by yarn, and then cause retry. Yarn is strictly limited physical memory space. Yean, it is can enlarge memoryOverHead, and then enlarge the physical memory resource for that app, but that overhead memory is used just for block transfer phase, which be idle for other time. So in yarn mode, I‘m tend to make that direct buffer space be more controller and sacrifice some performance to write or read block through fileChannel.

@suyanNone
Copy link
Contributor Author

and I also think it may be more better to de-allocate direct Buffer while we send chunkFetchSuccess to requester.
I not sure I right or not, if I missing some thing in the spark code.
NioManagedBuffer ------(convertToNetty)----> Unpool.Wrapper(ByteBuffer)
Use Unpool.wrapper(), will set doNotFree Flag as true, so it will not free the byteBuffer in Netty, It need we to release in user context.

TransportRequestHandler.java

private void respond(final Encodable result) {
    final String remoteAddress = channel.remoteAddress().toString();
    channel.writeAndFlush(result).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
            if (result instanceof ChunkFetchSuccess) {
              ChunkFetchSuccess success = (ChunkFetchSuccess)result;
              if (success.buffer instanceof NioManagedBuffer) {
                JavaUtils.deallocateDirectBuffer(((ChunkFetchSuccess)result).buffer.nioByteBuffer());
              }
            }
JavaUtil.java
  public static void deallocateDirectBuffer(ByteBuffer buffer) {
    if (buffer != null && buffer instanceof MappedByteBuffer) {
      logger.trace("Unmapping " + buffer);
      if (((DirectBuffer)buffer).cleaner() != null ) {
        ((DirectBuffer)buffer).cleaner().clean();
      }
    }
  }

@srowen
Copy link
Member

srowen commented Jun 26, 2015

I believe this should be closed. The memory being allocated here is "on purpose" and not doing so adds complexity and slows things down. this is not a tradeoff that everyone wants to make when you can simply adjust your off-heap memory overhead.

@suyanNone
Copy link
Contributor Author

@srowen I am in vacation in the past 10 days, sorry too late to see that.
eh... ok, I will close this patch.

@suyanNone suyanNone closed this Jul 1, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants