From 5625558b3b86e96e6f9126b75d781b452a6f66cf Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Wed, 26 Aug 2015 15:32:51 -0700 Subject: [PATCH 1/4] SPOI-5823 - Downstream container falling behind when buffer spooling is enabled --- .../bufferserver/internal/DataList.java | 367 +++++++++++------- .../bufferserver/internal/FastDataList.java | 10 +- .../bufferserver/server/Server.java | 79 +++- 3 files changed, 281 insertions(+), 175 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index baa052a086..1efe455fb8 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -18,7 +18,12 @@ import java.io.IOException; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +37,7 @@ import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.bufferserver.util.SerializedData; import com.datatorrent.bufferserver.util.VarInt; +import com.datatorrent.netlet.AbstractClient; import com.datatorrent.netlet.util.VarInt.MutableInt; /** @@ -44,23 +50,25 @@ public class DataList { private final int MAX_COUNT_OF_INMEM_BLOCKS; protected final String identifier; - private final Integer blocksize; + private final int blockSize; private HashMap> listeners = new HashMap>(); protected HashSet all_listeners = new HashSet(); protected Block first; protected Block last; protected Storage storage; - protected ExecutorService autoflushExecutor; + protected ExecutorService autoFlushExecutor; protected ExecutorService storageExecutor; + protected final BlockingQueue freeBuffers; + protected final List suspendedClients; public int getBlockSize() { - return blocksize; + return blockSize; } - public void rewind(int baseSeconds, int windowId) throws IOException + public synchronized void rewind(int baseSeconds, int windowId) throws IOException { - long longWindowId = (long)baseSeconds << 32 | windowId; + long longWindowId = (long) baseSeconds << 32 | windowId; for (Block temp = first; temp != null; temp = temp.next) { @@ -81,25 +89,24 @@ public void rewind(int baseSeconds, int windowId) throws IOException } } - public void reset() + public synchronized void reset() { listeners.clear(); all_listeners.clear(); if (storage != null) { while (first != null) { - if (first.uniqueIdentifier > 0) { - logger.debug("discarding {} {} in reset", identifier, first.uniqueIdentifier); - storage.discard(identifier, first.uniqueIdentifier); - } + first.discard(false); first = first.next; } + } else { + first = null; } } - public void purge(int baseSeconds, int windowId) + public synchronized void purge(int baseSeconds, int windowId) { - long longWindowId = (long)baseSeconds << 32 | windowId; + long longWindowId = (long) baseSeconds << 32 | windowId; logger.debug("purge request for windowId {}", Codec.getStringWindowId(longWindowId)); Block prev = null; @@ -113,14 +120,11 @@ public void purge(int baseSeconds, int windowId) break; } - if (storage != null && temp.uniqueIdentifier > 0) { - logger.debug("discarding {} {} in purge", identifier, temp.uniqueIdentifier); - - storage.discard(identifier, temp.uniqueIdentifier); - } + temp.discard(false); prev = temp; } + resumeSuspendedClients(); } /** @@ -131,19 +135,14 @@ public String getIdentifier() return identifier; } - public DataList(String identifier, int blocksize, int numberOfCacheBlocks, int refCount) - { - this(identifier, blocksize, numberOfCacheBlocks); - first.refCount = refCount; - } - - public DataList(String identifier, int blocksize, int numberOfCacheBlocks) + public DataList(String identifier, int blockSize, int numberOfCacheBlocks) { this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks; this.identifier = identifier; - this.blocksize = blocksize; - first = new Block(identifier, blocksize); - last = first; + this.blockSize = blockSize; + freeBuffers = Queues.newArrayBlockingQueue(numberOfCacheBlocks); + suspendedClients = Lists.newArrayList(); + first = last = new Block(identifier, blockSize); } public DataList(String identifier) @@ -195,8 +194,7 @@ public void flush(final int writeOffset) last.starting_window = baseSeconds | bwt.getWindowId(); last.ending_window = last.starting_window; //logger.debug("assigned both window id {}", last); - } - else { + } else { last.ending_window = baseSeconds | bwt.getWindowId(); //logger.debug("assigned last window id {}", last); } @@ -204,13 +202,12 @@ public void flush(final int writeOffset) case MessageType.RESET_WINDOW_VALUE: Tuple rwt = Tuple.getTuple(last.data, processingOffset, size); - baseSeconds = (long)rwt.getBaseSeconds() << 32; + baseSeconds = (long) rwt.getBaseSeconds() << 32; break; } processingOffset += size; size = 0; - } - else { + } else { if (writeOffset == last.data.length) { nextOffset.integer = 0; processingOffset = 0; @@ -223,7 +220,7 @@ public void flush(final int writeOffset) last.writingOffset = writeOffset; - autoflushExecutor.submit(new Runnable() + autoFlushExecutor.submit(new Runnable() { @Override public void run() @@ -236,9 +233,9 @@ public void run() }); } - public void setAutoflushExecutor(final ExecutorService es) + public void setAutoFlushExecutor(final ExecutorService es) { - autoflushExecutor = es; + autoFlushExecutor = es; } public void setSecondaryStorage(Storage storage, ExecutorService es) @@ -252,7 +249,7 @@ public void setSecondaryStorage(Storage storage, ExecutorService es) */ protected final HashMap iterators = new HashMap(); - public DataListIterator getIterator(Block block) + protected DataListIterator getIterator(Block block) { return new DataListIterator(block); } @@ -288,9 +285,7 @@ public boolean delIterator(Iterator iterator) DataListIterator dli = (DataListIterator)iterator; for (Entry e : iterators.entrySet()) { if (e.getValue() == dli) { - if (dli.da != null) { - dli.da.release(false); - } + dli.close(); iterators.remove(e.getKey()); released = true; break; @@ -310,20 +305,17 @@ public void addDataListener(DataListener dl) HashSet set; if (listeners.containsKey(partition)) { set = listeners.get(partition); - } - else { + } else { set = new HashSet(); listeners.put(partition, set); } set.add(dl); } - } - else { + } else { HashSet set; if (listeners.containsKey(DataListener.NULL_PARTITION)) { set = listeners.get(DataListener.NULL_PARTITION); - } - else { + } else { set = new HashSet(); listeners.put(DataListener.NULL_PARTITION, set); } @@ -341,8 +333,7 @@ public void removeDataListener(DataListener dl) listeners.get(partition).remove(dl); } } - } - else { + } else { if (listeners.containsKey(DataListener.NULL_PARTITION)) { listeners.get(DataListener.NULL_PARTITION).remove(dl); } @@ -351,26 +342,40 @@ public void removeDataListener(DataListener dl) all_listeners.remove(dl); } - public void addBuffer(byte[] array) + public boolean addSuspendedClient(final AbstractClient client) { - last.next = new Block(identifier, array); - last.next.starting_window = last.ending_window; - last.next.ending_window = last.ending_window; - last = last.next; + return suspendedClients.add(client); + } - //logger.debug("addbuffer last = {}", last); - int inmemBlockCount; + public synchronized boolean resumeSuspendedClients() + { + boolean resumedSuspendedClients = false; + int memoryBlockCount = 0; + for (Block temp = first; temp != null; temp = temp.next) { + if (temp.data != null && ++memoryBlockCount == MAX_COUNT_OF_INMEM_BLOCKS) { + return resumedSuspendedClients; + } + } + for (AbstractClient client : suspendedClients) { + resumedSuspendedClients |= client.resumeReadIfSuspended(); + } + suspendedClients.clear(); + return resumedSuspendedClients; + } + + public boolean isMemoryBlockAvailable() + { + int memoryBlockCount = 0; - inmemBlockCount = 0; for (Block temp = first; temp != null; temp = temp.next) { - if (temp.data != null) { - inmemBlockCount++; + if (temp.data != null && ++memoryBlockCount == MAX_COUNT_OF_INMEM_BLOCKS) { + break; } } - if (inmemBlockCount >= MAX_COUNT_OF_INMEM_BLOCKS) { - //logger.debug("InmemBlockCount before releaes {}", inmemBlockCount); - for (Block temp = first; temp != null; temp = temp.next) { + //logger.debug("Data List Memory Block count {}", memoryBlockCount); + if (memoryBlockCount > MAX_COUNT_OF_INMEM_BLOCKS / 2 && storage != null) { + for (Block temp = first; temp != last; temp = temp.next) { boolean found = false; for (DataListIterator iterator : iterators.values()) { if (iterator.da == temp) { @@ -379,15 +384,30 @@ public void addBuffer(byte[] array) } } - if (!found && temp.data != null) { - temp.release(true); - if (--inmemBlockCount < MAX_COUNT_OF_INMEM_BLOCKS) { - break; - } + if (!found && temp.data != null && temp.release(false)) { + break; } } - //logger.debug("InmemBlockCount after release {}", inmemBlockCount); } + + return memoryBlockCount < MAX_COUNT_OF_INMEM_BLOCKS; + + } + + public byte[] getOrAllocateBuffer() + { + byte[] buffer = freeBuffers.poll(); + if (buffer == null) { + buffer = new byte[blockSize]; + } + return buffer; + } + + public void addBuffer(byte[] array) + { + last.next = new Block(identifier, array, last.ending_window, last.ending_window); + last = last.next; + //logger.debug("Data List last Memory Block {}", last); } public byte[] getBuffer(long windowId) @@ -484,7 +504,7 @@ public class Block /** * The starting window which is available in this data array. */ - long starting_window = -1; + long starting_window; /** * the ending window which is available in this data array */ @@ -500,7 +520,7 @@ public class Block /** * how count of references to this block. */ - int refCount; + AtomicInteger refCount; public Block(String id, int size) { @@ -508,10 +528,17 @@ public Block(String id, int size) } public Block(String id, byte[] array) + { + this(id, array, -1, 0); + } + + public Block(final String id, final byte[] array, final long starting_window, final long ending_window) { identifier = id; data = array; - refCount = 1; + refCount = new AtomicInteger(1); + this.starting_window = starting_window; + this.ending_window = ending_window; } void getNextData(SerializedData current) @@ -530,27 +557,28 @@ void getNextData(SerializedData current) public long rewind(long windowId) { long bs = starting_window & 0x7fffffff00000000L; - DataListIterator dli = getIterator(this); - done: - while (dli.hasNext()) { - SerializedData sd = dli.next(); - switch (sd.buffer[sd.dataOffset]) { - case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); - bs = (long)rwt.getBaseSeconds() << 32; - if (bs > windowId) { - writingOffset = sd.offset; - break done; - } - break; + try (DataListIterator dli = getIterator(this)) { + done: + while (dli.hasNext()) { + SerializedData sd = dli.next(); + switch (sd.buffer[sd.dataOffset]) { + case MessageType.RESET_WINDOW_VALUE: + ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + bs = (long) rwt.getBaseSeconds() << 32; + if (bs > windowId) { + writingOffset = sd.offset; + break done; + } + break; - case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); - if ((bs | bwt.getWindowId()) >= windowId) { - writingOffset = sd.offset; - break done; - } - break; + case MessageType.BEGIN_WINDOW_VALUE: + BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + if ((bs | bwt.getWindowId()) >= windowId) { + writingOffset = sd.offset; + break done; + } + break; + } } } @@ -558,16 +586,12 @@ public long rewind(long windowId) starting_window = windowId; ending_window = windowId; //logger.debug("assigned both window id {}", this); - } - else if (windowId < ending_window) { + } else if (windowId < ending_window) { ending_window = windowId; //logger.debug("assigned end window id {}", this); } - if (uniqueIdentifier != 0) { - storage.discard(identifier, uniqueIdentifier); - uniqueIdentifier = 0; - } + discard(false); return bs; } @@ -580,39 +604,40 @@ public void purge(long longWindowId) long bs = starting_window & 0xffffffff00000000L; SerializedData lastReset = null; - DataListIterator dli = getIterator(this); - done: - while (dli.hasNext()) { - SerializedData sd = dli.next(); - switch (sd.buffer[sd.dataOffset]) { - case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); - bs = (long)rwt.getBaseSeconds() << 32; - lastReset = sd; - break; - - case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); - if ((bs | bwt.getWindowId()) > longWindowId) { - found = true; - if (lastReset != null) { + try (DataListIterator dli = getIterator(this)) { + done: + while (dli.hasNext()) { + SerializedData sd = dli.next(); + switch (sd.buffer[sd.dataOffset]) { + case MessageType.RESET_WINDOW_VALUE: + ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + bs = (long) rwt.getBaseSeconds() << 32; + lastReset = sd; + break; + + case MessageType.BEGIN_WINDOW_VALUE: + BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + if ((bs | bwt.getWindowId()) > longWindowId) { + found = true; + if (lastReset != null) { /* * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple. */ - if (sd.offset >= lastReset.length) { - sd.offset -= lastReset.length; - if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) { - System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length); + if (sd.offset >= lastReset.length) { + sd.offset -= lastReset.length; + if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) { + System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length); + } } + + this.starting_window = bs | bwt.getWindowId(); + this.readingOffset = sd.offset; + //logger.debug("assigned starting window id {}", this); } - this.starting_window = bs | bwt.getWindowId(); - this.readingOffset = sd.offset; - //logger.debug("assigned starting window id {}", this); + break done; } - - break done; - } + } } } @@ -654,14 +679,11 @@ public void purge(long longWindowId) logger.warn("Unhandled condition while purging the data purge to offset {}", sd.offset); } - if (uniqueIdentifier != 0) { - storage.discard(identifier, uniqueIdentifier); - uniqueIdentifier = 0; - } + discard(false); } } - private Runnable getRetriever(final int uniqueIdentifier, final Storage storage) + private Runnable getRetriever() { return new Runnable() { @@ -673,24 +695,24 @@ public void run() data = lData; readingOffset = 0; writingOffset = data.length; - if (refCount > 1) { + if (refCount.get() > 1) { Block.this.notifyAll(); } } } - }; } - synchronized void acquire(boolean wait) + protected void acquire(boolean wait) { - if (refCount++ == 0 && uniqueIdentifier > 0 && storage != null) { + if (storage != null && refCount.getAndIncrement() == 0) { assert (data == null); + final Runnable retriever = getRetriever(); if (wait) { - getRetriever(uniqueIdentifier, storage).run(); + retriever.run(); } else { - storageExecutor.submit(getRetriever(uniqueIdentifier, storage)); + storageExecutor.submit(retriever); } } else if (wait && data == null) { @@ -710,43 +732,81 @@ private Runnable getStorer(final byte[] data, final int readingOffset, final int @Override public void run() { - int i = storage.store(identifier, data, readingOffset, writingOffset); - if (i == 0) { + if (uniqueIdentifier == 0) { + uniqueIdentifier = storage.store(identifier, data, readingOffset, writingOffset); + } + if (uniqueIdentifier == 0) { logger.warn("Storage returned unexpectedly, please check the status of the spool directory!"); } else { synchronized (Block.this) { - Block.this.uniqueIdentifier = i; - if (refCount == 0) { + if (refCount.get() == 0) { + logger.debug("release block {} to disk", Block.this); + freeBuffers.offer(Block.this.data); Block.this.data = null; + } else { + logger.debug("Keeping Data List Memory Block {} due to {} references.", Block.this, refCount); } } + resumeSuspendedClients(); } } - }; } - synchronized void release(boolean wait) + protected boolean release(boolean wait) { - if (--refCount == 0 && storage != null) { - if (uniqueIdentifier != 0) { - data = null; - return; - } + if (storage != null && refCount.decrementAndGet() == 0) { + final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); if (wait) { - getStorer(data, readingOffset, writingOffset, storage).run(); + logger.info("Releasing Data List Memory Block {}", this); + storer.run(); } else { - storageExecutor.submit(getStorer(data, readingOffset, writingOffset, storage)); + logger.info("Scheduling release of Data List Memory Block {}", this); + storageExecutor.submit(storer); } + return true; + } else { + logger.debug("Keeping Data List Memory Block {} due to {} references.", this, refCount); + return false; } } + private Runnable getDiscarder() + { + return new Runnable() + { + @Override + public void run() + { + if (uniqueIdentifier > 0) { + logger.debug("discarding {} {}", identifier, uniqueIdentifier); + storage.discard(identifier, uniqueIdentifier); + uniqueIdentifier = 0; + } + } + }; + } + + protected boolean discard(final boolean wait) + { + if (storage != null) { + final Runnable discarder = getDiscarder(); + if (wait) { + discarder.run(); + } else { + storageExecutor.submit(discarder); + } + return true; + } + return false; + } + @Override public String toString() { - return "Block{" + "identifier=" + identifier + ", data=" + (data == null ? "null" : data.length) + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length) + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window) + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) @@ -760,7 +820,7 @@ public String toString() * * @since 0.3.2 */ - public class DataListIterator implements Iterator + public class DataListIterator implements Iterator, AutoCloseable { Block da; SerializedData current; @@ -812,9 +872,9 @@ public synchronized boolean hasNext() case 0: if (da.writingOffset == buffer.length) { if (da.next == null) { + assert (da == last); return false; } - da.release(false); da.next.acquire(true); da = da.next; @@ -841,6 +901,7 @@ public synchronized boolean hasNext() else { if (da.writingOffset == buffer.length) { if (da.next == null) { + assert (da == last); return false; } else { @@ -881,6 +942,16 @@ public void remove() { current.buffer[current.dataOffset] = MessageType.NO_MESSAGE_VALUE; } + + @Override + public void close() + { + if (da != null) { + da.release(false); + } + da = null; + buffer = null; + } void rewind(int processingOffset) { diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java index d260b37e39..2e1ce757e5 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java @@ -39,12 +39,6 @@ public FastDataList(String identifier, int blocksize, int numberOfCacheBlocks) super(identifier, blocksize, numberOfCacheBlocks); } - public FastDataList(String identifier, int blocksize, int numberOfCacheBlocks, int refCount) - { - super(identifier, blocksize, numberOfCacheBlocks, refCount); - } - - long item; @Override @@ -102,7 +96,7 @@ public void flush(final int writeOffset) last.writingOffset = writeOffset; - autoflushExecutor.submit(new Runnable() + autoFlushExecutor.submit(new Runnable() { @Override public void run() @@ -116,7 +110,7 @@ public void run() } @Override - public FastDataListIterator getIterator(Block block) + protected FastDataListIterator getIterator(Block block) { return new FastDataListIterator(block); } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 7fb4823876..27c961c4dd 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -262,7 +262,7 @@ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, Abstra //logger.debug("old list = {}", dl); } else { - dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks, 0) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks, 0); + dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); publisherBuffers.put(upstream_identifier, dl); //logger.debug("new list = {}", dl); } @@ -401,7 +401,7 @@ public void onMessage(byte[] buffer, int offset, int size) PublishRequestTuple publisherRequest = (PublishRequestTuple)request; DataList dl = handlePublisherRequest(publisherRequest, this); - dl.setAutoflushExecutor(serverHelperExecutor); + dl.setAutoFlushExecutor(serverHelperExecutor); Publisher publisher; if (publisherRequest.getVersion().equals(Tuple.FAST_VERSION)) { @@ -616,6 +616,25 @@ public void onMessage(byte[] buffer, int offset, int size) dirty = true; } + @Override + public boolean resumeReadIfSuspended() + { + eventloop.submit(new Runnable() + { + @Override + public void run() + { + final int interestOps = key.interestOps(); + if ((interestOps & SelectionKey.OP_READ) == 0) { + logger.debug("Resuming read on key {} with attachment {}", key, key.attachment()); + read(0); + key.interestOps(interestOps | SelectionKey.OP_READ); + } + } + }); + return true; + } + @Override public void read(int len) { @@ -634,7 +653,9 @@ public void read(int len) * so we allocate a new byteBuffer and copy over the partially written data to the * new byteBuffer and start as if we always had full room but not enough data. */ - switchToNewBuffer(buffer, readOffset); + if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) { + return; + } } } else if (dirty) { @@ -660,10 +681,13 @@ else if (dirty) { /* * hit wall while writing serialized data, so have to allocate a new byteBuffer. */ - switchToNewBuffer(buffer, readOffset - VarInt.getSize(size)); + if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) { + readOffset -= VarInt.getSize(size); + size = 0; + return; + } size = 0; - } - else if (dirty) { + } else if (dirty) { dirty = false; datalist.flush(writeOffset); } @@ -673,21 +697,38 @@ else if (dirty) { while (true); } - public void switchToNewBuffer(byte[] array, int offset) + private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset) { - byte[] newBuffer = new byte[datalist.getBlockSize()]; - byteBuffer = ByteBuffer.wrap(newBuffer); - if (array == null || array.length - offset == 0) { - writeOffset = 0; + synchronized (datalist) + { + if (switchToNewBuffer(buffer, readOffset)) { + return true; + } + suspendReadIfResumed(); + datalist.addSuspendedClient(this); + return false; } - else { - writeOffset = array.length - offset; - System.arraycopy(buffer, offset, newBuffer, 0, writeOffset); - byteBuffer.position(writeOffset); + } + + private boolean switchToNewBuffer(final byte[] array, final int offset) + { + if (datalist.isMemoryBlockAvailable()) { + final byte[] newBuffer = datalist.getOrAllocateBuffer(); + byteBuffer = ByteBuffer.wrap(newBuffer); + if (array == null || array.length - offset == 0) { + writeOffset = 0; + } else { + writeOffset = array.length - offset; + System.arraycopy(buffer, offset, newBuffer, 0, writeOffset); + byteBuffer.position(writeOffset); + } + buffer = newBuffer; + readOffset = 0; + datalist.addBuffer(buffer); + return true; + } else { + return false; } - buffer = newBuffer; - readOffset = 0; - datalist.addBuffer(buffer); } @Override @@ -714,7 +755,7 @@ public void handleException(Exception cce, EventLoop el) @Override public String toString() { - return "Server.Publisher{" + "datalist=" + datalist + '}'; + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {datalist=" + datalist + '}'; } private volatile boolean torndown; From 85bcd754869c260aed732022d310e5f9a6936fc5 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Fri, 28 Aug 2015 09:09:45 -0700 Subject: [PATCH 2/4] SPOI-5823 - Downstream container falling behind when buffer spooling is enabled --- .../bufferserver/internal/DataList.java | 121 +++++++++--------- .../bufferserver/internal/FastDataList.java | 3 +- .../bufferserver/server/Server.java | 4 + 3 files changed, 65 insertions(+), 63 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 1efe455fb8..27b0873a6f 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -60,6 +60,7 @@ public class DataList protected ExecutorService storageExecutor; protected final BlockingQueue freeBuffers; protected final List suspendedClients; + private volatile int numberOfInMemBlockPermits; public int getBlockSize() { @@ -70,12 +71,17 @@ public synchronized void rewind(int baseSeconds, int windowId) throws IOExceptio { long longWindowId = (long) baseSeconds << 32 | windowId; + numberOfInMemBlockPermits = MAX_COUNT_OF_INMEM_BLOCKS; for (Block temp = first; temp != null; temp = temp.next) { + if (temp.data != null) { + numberOfInMemBlockPermits--; + } if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) { if (temp != last) { temp.next = null; last = temp; + last.acquire(true); } this.baseSeconds = temp.rewind(longWindowId); @@ -87,6 +93,10 @@ public synchronized void rewind(int baseSeconds, int windowId) throws IOExceptio for (DataListIterator dli : iterators.values()) { dli.rewind(processingOffset); } + + if (numberOfInMemBlockPermits < 0) { + throw new RuntimeException("Exceeded allowed memory block allocation by " + (-numberOfInMemBlockPermits)); + } } public synchronized void reset() @@ -102,6 +112,7 @@ public synchronized void reset() } else { first = null; } + numberOfInMemBlockPermits = MAX_COUNT_OF_INMEM_BLOCKS; } public synchronized void purge(int baseSeconds, int windowId) @@ -119,6 +130,9 @@ public synchronized void purge(int baseSeconds, int windowId) first.purge(longWindowId); break; } + if (temp.data != null) { + numberOfInMemBlockPermits++; + } temp.discard(false); @@ -137,12 +151,16 @@ public String getIdentifier() public DataList(String identifier, int blockSize, int numberOfCacheBlocks) { + if (numberOfCacheBlocks < 1) { + throw new IllegalArgumentException("Invalid number of Data List Memory blocks " + numberOfCacheBlocks); + } this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks; this.identifier = identifier; this.blockSize = blockSize; freeBuffers = Queues.newArrayBlockingQueue(numberOfCacheBlocks); suspendedClients = Lists.newArrayList(); first = last = new Block(identifier, blockSize); + this.numberOfInMemBlockPermits = MAX_COUNT_OF_INMEM_BLOCKS - 1; } public DataList(String identifier) @@ -350,48 +368,18 @@ public boolean addSuspendedClient(final AbstractClient client) public synchronized boolean resumeSuspendedClients() { boolean resumedSuspendedClients = false; - int memoryBlockCount = 0; - for (Block temp = first; temp != null; temp = temp.next) { - if (temp.data != null && ++memoryBlockCount == MAX_COUNT_OF_INMEM_BLOCKS) { - return resumedSuspendedClients; + if (numberOfInMemBlockPermits > 0) { + for (AbstractClient client : suspendedClients) { + resumedSuspendedClients |= client.resumeReadIfSuspended(); } + suspendedClients.clear(); } - for (AbstractClient client : suspendedClients) { - resumedSuspendedClients |= client.resumeReadIfSuspended(); - } - suspendedClients.clear(); return resumedSuspendedClients; } public boolean isMemoryBlockAvailable() { - int memoryBlockCount = 0; - - for (Block temp = first; temp != null; temp = temp.next) { - if (temp.data != null && ++memoryBlockCount == MAX_COUNT_OF_INMEM_BLOCKS) { - break; - } - } - - //logger.debug("Data List Memory Block count {}", memoryBlockCount); - if (memoryBlockCount > MAX_COUNT_OF_INMEM_BLOCKS / 2 && storage != null) { - for (Block temp = first; temp != last; temp = temp.next) { - boolean found = false; - for (DataListIterator iterator : iterators.values()) { - if (iterator.da == temp) { - found = true; - break; - } - } - - if (!found && temp.data != null && temp.release(false)) { - break; - } - } - } - - return memoryBlockCount < MAX_COUNT_OF_INMEM_BLOCKS; - + return numberOfInMemBlockPermits > 0; } public byte[] getOrAllocateBuffer() @@ -406,7 +394,11 @@ public byte[] getOrAllocateBuffer() public void addBuffer(byte[] array) { last.next = new Block(identifier, array, last.ending_window, last.ending_window); + last.release(false); last = last.next; + if (--numberOfInMemBlockPermits < 0) { + throw new RuntimeException("Exceeded allowed memory block allocation by " + (-numberOfInMemBlockPermits)); + } //logger.debug("Data List last Memory Block {}", last); } @@ -539,6 +531,7 @@ public Block(final String id, final byte[] array, final long starting_window, fi refCount = new AtomicInteger(1); this.starting_window = starting_window; this.ending_window = ending_window; + logger.debug("Allocated new Data List memory Block {}", this); } void getNextData(SerializedData current) @@ -695,6 +688,9 @@ public void run() data = lData; readingOffset = 0; writingOffset = data.length; + if (--numberOfInMemBlockPermits < 0) { + throw new RuntimeException("Exceeded allowed memory block allocation by " + (-numberOfInMemBlockPermits)); + } if (refCount.get() > 1) { Block.this.notifyAll(); } @@ -702,10 +698,10 @@ public void run() } }; } - + protected void acquire(boolean wait) { - if (storage != null && refCount.getAndIncrement() == 0) { + if (refCount.getAndIncrement() == 0 && storage != null) { assert (data == null); final Runnable retriever = getRetriever(); if (wait) { @@ -744,6 +740,7 @@ public void run() logger.debug("release block {} to disk", Block.this); freeBuffers.offer(Block.this.data); Block.this.data = null; + numberOfInMemBlockPermits++; } else { logger.debug("Keeping Data List Memory Block {} due to {} references.", Block.this, refCount); } @@ -754,22 +751,20 @@ public void run() }; } - protected boolean release(boolean wait) + protected void release(boolean wait) { - if (storage != null && refCount.decrementAndGet() == 0) { + if (refCount.decrementAndGet() == 0 && storage != null) { + assert (Block.this.next != null); final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); - if (wait) { + if (wait && numberOfInMemBlockPermits == 0) { logger.info("Releasing Data List Memory Block {}", this); storer.run(); - } - else { + } else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS/2) { logger.info("Scheduling release of Data List Memory Block {}", this); storageExecutor.submit(storer); } - return true; } else { logger.debug("Keeping Data List Memory Block {} due to {} references.", this, refCount); - return false; } } @@ -789,7 +784,7 @@ public void run() }; } - protected boolean discard(final boolean wait) + protected void discard(final boolean wait) { if (storage != null) { final Runnable discarder = getDiscarder(); @@ -798,9 +793,7 @@ protected boolean discard(final boolean wait) } else { storageExecutor.submit(discarder); } - return true; } - return false; } @Override @@ -809,7 +802,7 @@ public String toString() return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length) + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window) - + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) + + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) + '}'; } @@ -871,13 +864,16 @@ public synchronized boolean hasNext() case -1: case 0: if (da.writingOffset == buffer.length) { - if (da.next == null) { - assert (da == last); - return false; + synchronized (DataList.this) { + if (da.next == null) { + assert (da == last); + return false; + } + logger.debug("{}: switching to the next block {}->{}", this, da, da.next); + da.release(false); + da.next.acquire(true); + da = da.next; } - da.release(false); - da.next.acquire(true); - da = da.next; size = 0; buffer = da.data; readOffset = da.readingOffset; @@ -900,18 +896,19 @@ public synchronized boolean hasNext() } else { if (da.writingOffset == buffer.length) { - if (da.next == null) { - assert (da == last); - return false; - } - else { + synchronized (DataList.this) { + if (da.next == null) { + assert (da == last); + return false; + } + logger.debug("{}: switching to the next block {}->{}", this, da, da.next); da.release(false); da.next.acquire(true); da = da.next; - size = 0; - readOffset = nextOffset.integer = da.readingOffset; - buffer = da.data; } + size = 0; + readOffset = nextOffset.integer = da.readingOffset; + buffer = da.data; } else { return false; diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java index 2e1ce757e5..0d659078be 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java @@ -194,7 +194,7 @@ public synchronized boolean hasNext() if (da.next == null) { return false; } - + logger.debug("{}: switching to the next block {}->{}", this, da, da.next); da.release(false); da.next.acquire(true); da = da.next; @@ -220,6 +220,7 @@ public synchronized boolean hasNext() return false; } else { + logger.debug("{}: switching to the next block {}->{}", this, da, da.next); da.release(false); da.next.acquire(true); da = da.next; diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 27c961c4dd..c9a6c8df3f 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -97,6 +97,10 @@ public synchronized void registered(SelectionKey key) @Override public void unregistered(SelectionKey key) { + for (Entry entry : publisherChannels.entrySet()) { + eventloop.disconnect(entry.getValue()); + } + serverHelperExecutor.shutdown(); storageHelperExecutor.shutdown(); try { From 022ab77d95b691bcb7c4c4be66e9ea498df6ce00 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Fri, 28 Aug 2015 16:28:58 -0700 Subject: [PATCH 3/4] SPOI-5823 - Downstream container falling behind when buffer spooling is enabled --- .../java/com/datatorrent/bufferserver/internal/DataList.java | 3 +-- .../main/java/com/datatorrent/bufferserver/server/Server.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 27b0873a6f..0fdea876c1 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -701,8 +701,7 @@ public void run() protected void acquire(boolean wait) { - if (refCount.getAndIncrement() == 0 && storage != null) { - assert (data == null); + if (refCount.getAndIncrement() == 0 && storage != null && data == null) { final Runnable retriever = getRetriever(); if (wait) { retriever.run(); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index c9a6c8df3f..463890fe7b 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -705,7 +705,7 @@ private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int off { synchronized (datalist) { - if (switchToNewBuffer(buffer, readOffset)) { + if (switchToNewBuffer(buffer, offset)) { return true; } suspendReadIfResumed(); From 755f2e25bc9d56c1e70a58ed1f91c03654b348c0 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Mon, 31 Aug 2015 08:34:04 -0700 Subject: [PATCH 4/4] SPOI-5823 - Downstream container falling behind when buffer spooling is enabled --- .../bufferserver/internal/DataList.java | 68 +++++++++---------- 1 file changed, 31 insertions(+), 37 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 0fdea876c1..633d36e8de 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -531,7 +531,7 @@ public Block(final String id, final byte[] array, final long starting_window, fi refCount = new AtomicInteger(1); this.starting_window = starting_window; this.ending_window = ending_window; - logger.debug("Allocated new Data List memory Block {}", this); + //logger.debug("Allocated new Data List memory Block {}", this); } void getNextData(SerializedData current) @@ -736,12 +736,12 @@ public void run() else { synchronized (Block.this) { if (refCount.get() == 0) { - logger.debug("release block {} to disk", Block.this); + //logger.debug("release block {} to disk", Block.this); freeBuffers.offer(Block.this.data); Block.this.data = null; numberOfInMemBlockPermits++; } else { - logger.debug("Keeping Data List Memory Block {} due to {} references.", Block.this, refCount); + //logger.debug("Keeping Data List Memory Block {} due to {} references.", Block.this, refCount); } } resumeSuspendedClients(); @@ -756,14 +756,14 @@ protected void release(boolean wait) assert (Block.this.next != null); final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); if (wait && numberOfInMemBlockPermits == 0) { - logger.info("Releasing Data List Memory Block {}", this); + //logger.info("Releasing Data List Memory Block {}", this); storer.run(); } else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS/2) { - logger.info("Scheduling release of Data List Memory Block {}", this); + //logger.info("Scheduling release of Data List Memory Block {}", this); storageExecutor.submit(storer); } } else { - logger.debug("Keeping Data List Memory Block {} due to {} references.", this, refCount); + //logger.debug("Keeping Data List Memory Block {} due to {} references.", this, refCount); } } @@ -849,7 +849,7 @@ public int getReadOffset() * @return boolean */ @Override - public synchronized boolean hasNext() + public boolean hasNext() { while (size == 0) { size = VarInt.read(buffer, readOffset, da.writingOffset, nextOffset); @@ -868,7 +868,7 @@ public synchronized boolean hasNext() assert (da == last); return false; } - logger.debug("{}: switching to the next block {}->{}", this, da, da.next); + //logger.debug("{}: switching to the next block {}->{}", this, da, da.next); da.release(false); da.next.acquire(true); da = da.next; @@ -876,43 +876,37 @@ public synchronized boolean hasNext() size = 0; buffer = da.data; readOffset = da.readingOffset; - } - else { + } else { return false; } } } - while (true) { - if (nextOffset.integer + size <= da.writingOffset) { - current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset); - current.dataOffset = nextOffset.integer; - //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) { - // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset); - // logger.debug("next t = {}", t); - //} - return true; - } - else { - if (da.writingOffset == buffer.length) { - synchronized (DataList.this) { - if (da.next == null) { - assert (da == last); - return false; - } - logger.debug("{}: switching to the next block {}->{}", this, da, da.next); - da.release(false); - da.next.acquire(true); - da = da.next; - } - size = 0; - readOffset = nextOffset.integer = da.readingOffset; - buffer = da.data; - } - else { + if (nextOffset.integer + size <= da.writingOffset) { + current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset); + current.dataOffset = nextOffset.integer; + //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) { + // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset); + // logger.debug("next t = {}", t); + //} + return true; + } else if (da.writingOffset == buffer.length) { + synchronized (DataList.this) { + if (da.next == null) { + assert (da == last); return false; } + //logger.debug("{}: switching to the next block {}->{}", this, da, da.next); + da.release(false); + da.next.acquire(true); + da = da.next; } + size = 0; + readOffset = nextOffset.integer = da.readingOffset; + buffer = da.data; + return hasNext(); + } else { + return false; } }