Skip to content

Commit

Permalink
[MINOR] Lazy write buffer optimization
Browse files Browse the repository at this point in the history
This commit optimize the lazy write buffer to pass through byte
arrays if provided instead of lazily evaulating them.
If provided byte arrays are large enough this is faster than the
previous lazy evaluation. Especially because we previously copied
over the byte array allocating the elements twice,
This commit also fixes a bug where if you provide a byte array that
is larger than the buffer it does not crash.

Closes #1972
  • Loading branch information
Baunsgaard committed Jan 5, 2024
1 parent 91291b6 commit 23bcd6d
Showing 1 changed file with 85 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.data.SparseBlock.Type;
import org.apache.sysds.runtime.data.SparseBlockFactory;
import org.apache.sysds.runtime.data.SparseBlockMCSR;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.LocalFileUtils;

public class LazyWriteBuffer
{
public class LazyWriteBuffer {
protected static final Log LOG = LogFactory.getLog(LazyWriteBuffer.class.getName());

public enum RPolicy {
FIFO, //first-in, first-out eviction
LRU //least recently used eviction
Expand All @@ -52,38 +59,28 @@ public static int writeBlock(String fname, CacheBlock<?> cb)
{
//obtain basic meta data of cache block
long lSize = getCacheBlockSize(cb);

if(lSize > _limit){ // if this block goes above limit
cb = compact(cb); // try to compact it
lSize = getCacheBlockSize(cb); // and update to new size of block
if(lSize > _limit){// if we are still above limit
reAllocate(lSize); // try to compact all blocks in memory.
}
}

boolean requiresWrite = (lSize > _limit //global buffer limit
|| !ByteBuffer.isValidCapacity(lSize, cb)); //local buffer limit
int numEvicted = 0;

//handle caching/eviction if it fits in writebuffer
if( !requiresWrite )
{
//handle caching/eviction if it fits in the write buffer
if(!requiresWrite) {
//create byte buffer handle (no block allocation yet)
ByteBuffer bbuff = new ByteBuffer( lSize );

//modify buffer pool
synchronized( _mQueue )
{
//evict matrices to make room (by default FIFO)
while( _size+lSize > _limit && !_mQueue.isEmpty() )
{
//remove first entry from eviction queue
Entry<String, ByteBuffer> entry = _mQueue.removeFirst();
String ftmp = entry.getKey();
ByteBuffer tmp = entry.getValue();

if( tmp != null ) {
//wait for pending serialization
tmp.checkSerialized();

//evict matrix
tmp.evictBuffer(ftmp);
tmp.freeMemory();
_size -= tmp.getSize();
numEvicted++;
}
}
// modify buffer pool
synchronized(_mQueue) {
// evict matrices to make room (by default FIFO)
numEvicted += evict(lSize);

//put placeholder into buffer pool (reserve mem)
_mQueue.addLast(fname, bbuff);
Expand All @@ -98,19 +95,75 @@ public static int writeBlock(String fname, CacheBlock<?> cb)
CacheStatistics.incrementFSWrites(numEvicted);
}
}
else
{
else {
//write directly to local FS (bypass buffer if too large)
LocalFileUtils.writeCacheBlockToLocal(fname, cb);
if( DMLScript.STATISTICS ) {
if( DMLScript.STATISTICS )
CacheStatistics.incrementFSWrites();
}

numEvicted++;
}

return numEvicted;
}

private static CacheBlock<?> compact(CacheBlock<?> cb){
// compact this block
if(cb instanceof MatrixBlock){
MatrixBlock mb = (MatrixBlock) cb;

// convert MCSR to CSR
if(mb.isInSparseFormat() && mb.getSparseBlock() instanceof SparseBlockMCSR)
mb.setSparseBlock(SparseBlockFactory.copySparseBlock(Type.MCSR, mb.getSparseBlock(), false));

return mb;
}
else {
return cb;
}
}

private static int reAllocate(long lSize) {
int numReAllocated = 0;
synchronized(_mQueue) {
if(_size + lSize > _limit) {
// compact all elements in buffer.
for(Entry<String, ByteBuffer> elm : _mQueue.entrySet()) {
ByteBuffer bf = elm.getValue();
if(bf._cdata != null){ // not serialized to bytes.
long before = getCacheBlockSize(bf._cdata);
bf._cdata = compact(bf._cdata);
long after = getCacheBlockSize(bf._cdata);
_size -= before - after;
}
}
}
}
return numReAllocated;
}

private static int evict(long lSize) throws IOException {
int numEvicted = 0;
while(_size + lSize > _limit && !_mQueue.isEmpty()) {
// remove first entry from eviction queue
Entry<String, ByteBuffer> entry = _mQueue.removeFirst();
String ftmp = entry.getKey();
ByteBuffer tmp = entry.getValue();

if(tmp != null) {
// wait for pending serialization
tmp.checkSerialized();

// evict matrix
tmp.evictBuffer(ftmp);
tmp.freeMemory();
_size -= tmp.getSize();
numEvicted++;
}
}
return numEvicted;
}

public static void deleteBlock(String fname)
{
boolean requiresDelete = true;
Expand Down Expand Up @@ -143,7 +196,7 @@ public static CacheBlock<?> readBlock(String fname, boolean matrix)
ldata = _mQueue.get(fname);

//modify eviction order (accordingly to access)
if( CacheableData.CACHING_BUFFER_POLICY == RPolicy.LRU
if(CacheableData.CACHING_BUFFER_POLICY == RPolicy.LRU
&& ldata != null )
{
//reinsert entry at end of eviction queue
Expand Down

0 comments on commit 23bcd6d

Please sign in to comment.