Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -1610,8 +1610,8 @@ public static void executeInMemoryReblock(ExecutionContext ec, String varin, Str

@SuppressWarnings("unchecked")
public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout, LineageItem litem) {
CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) ec.getCacheableData(varin);
CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) ec.getCacheableData(varout);
CacheableData<CacheBlock<?>> in = (CacheableData<CacheBlock<?>>) ec.getCacheableData(varin);
CacheableData<CacheBlock<?>> out = (CacheableData<CacheBlock<?>>) ec.getCacheableData(varout);

if( in.isFederated() ) {
out.setMetaData(in.getMetaData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.sysds.runtime.compress.lib.CLALibTSMM;
import org.apache.sysds.runtime.compress.lib.CLALibUnary;
import org.apache.sysds.runtime.compress.lib.CLALibUtils;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.data.DenseBlock;
Expand Down Expand Up @@ -591,7 +590,7 @@ public void setOverlapping(boolean overlapping) {
}

@Override
public MatrixBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock ret) {
public MatrixBlock slice(int rl, int ru, int cl, int cu, boolean deep, MatrixBlock ret) {
validateSliceArgument(rl, ru, cl, cu);
return CLALibSlice.slice(this, rl, ru, cl, cu, deep);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public long getInMemorySize() {

public static long getInMemorySize(int valuesCount) {
// object + values array + double
return 16 + MemoryEstimates.byteArrayCost(valuesCount) + 8;
return 16 + (long)MemoryEstimates.byteArrayCost(valuesCount) + 8;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void fill(int v) {

@Override
public long getInMemorySize() {
return getInMemorySize(_data.size());
return getInMemorySize(_data.size()-1);
}

public static long getInMemorySize(int dataLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ public class ByteBuffer
private final long _size;

protected byte[] _bdata = null; //sparse matrix
protected CacheBlock _cdata = null; //dense matrix/frame
protected CacheBlock<?> _cdata = null; //dense matrix/frame

public ByteBuffer( long size ) {
_size = size;
_serialized = false;
}

public void serializeBlock( CacheBlock cb )
public void serializeBlock( CacheBlock<?> cb )
throws IOException
{
_shallow = cb.isShallowSerialize(true);
Expand Down Expand Up @@ -85,10 +85,10 @@ public void serializeBlock( CacheBlock cb )
_serialized = true;
}

public CacheBlock deserializeBlock()
public CacheBlock<?> deserializeBlock()
throws IOException
{
CacheBlock ret = null;
CacheBlock<?> ret = null;

if( !_shallow ) { //sparse matrix / string frame
DataInput din = _matrix ? new CacheDataInput(_bdata) :
Expand Down Expand Up @@ -163,7 +163,7 @@ public void checkSerialized()
* @param cb cache block
* @return true if valid capacity
*/
public static boolean isValidCapacity( long size, CacheBlock cb )
public static boolean isValidCapacity( long size, CacheBlock<?> cb )
{
if( !cb.isShallowSerialize(true) ) { //SPARSE matrix blocks
// since cache blocks are serialized into a byte representation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import org.apache.hadoop.io.Writable;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.util.IndexRange;


/**
* Interface for all blocks handled by lazy write buffer. This abstraction
* allows us to keep the buffer pool independent of matrix and frame blocks.
*
*/
public interface CacheBlock extends Writable
public interface CacheBlock <T> extends Writable
{

public int getNumRows();
Expand Down Expand Up @@ -84,33 +85,89 @@ public interface CacheBlock extends Writable
* Free unnecessarily allocated empty block.
*/
public void compactEmptyBlock();

/**
* Slice a sub block out of the current block and write into the given output block. This method returns the passed
* instance if not null.
*
* @param ixrange index range inclusive
* @param ret outputBlock
* @return sub-block of cache block
*/
public CacheBlock<T> slice(IndexRange ixrange, T ret);

/**
* Slice a sub block out of the current block and write into the given output block.
* This method returns the passed instance if not null.
* Slice a sub block out of the current block and write into the given output block. This method returns the passed
* instance if not null.
*
* @param rl row lower
* @param ru row upper
* @param ru row upper inclusive
* @return sub-block of cache block
*/
public CacheBlock<T> slice(int rl, int ru);

/**
* Slice a sub block out of the current block and write into the given output block. This method returns the passed
* instance if not null.
*
* @param rl row lower
* @param ru row upper inclusive
* @param deep enforce deep-copy
* @return sub-block of cache block
*/
public CacheBlock<T> slice(int rl, int ru, boolean deep);

/**
* Slice a sub block out of the current block and write into the given output block. This method returns the passed
* instance if not null.
*
* @param rl row lower
* @param ru row upper inclusive
* @param cl column lower
* @param cu column upper
* @param cu column upper inclusive
* @return sub-block of cache block
*/
public CacheBlock<T> slice(int rl, int ru, int cl, int cu);

/**
* Slice a sub block out of the current block and write into the given output block. This method returns the passed
* instance if not null.
*
* @param rl row lower
* @param ru row upper inclusive
* @param cl column lower
* @param cu column upper inclusive
* @param block cache block
* @return sub-block of cache block
*/
public CacheBlock slice(int rl, int ru, int cl, int cu, CacheBlock block);
public CacheBlock<T> slice(int rl, int ru, int cl, int cu, T block);

/**
* Slice a sub block out of the current block and write into the given output block.
* This method returns the passed instance if not null.
*
* @param rl row lower
* @param ru row upper
* @param rl row lower
* @param ru row upper inclusive
* @param cl column lower
* @param cu column upper inclusive
* @param deep enforce deep-copy
* @return sub-block of cache block
*/
public CacheBlock<T> slice(int rl, int ru, int cl, int cu, boolean deep);

/**
* Slice a sub block out of the current block and write into the given output block.
* This method returns the passed instance if not null.
*
* @param rl row lower
* @param ru row upper inclusive
* @param cl column lower
* @param cu column upper
* @param cu column upper inclusive
* @param deep enforce deep-copy
* @param block cache block
* @return sub-block of cache block
*/
public CacheBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock block);
public CacheBlock<T> slice(int rl, int ru, int cl, int cu, boolean deep, T block);


/**
Expand All @@ -120,7 +177,7 @@ public interface CacheBlock extends Writable
* @param that cache block
* @param appendOnly ?
*/
public void merge(CacheBlock that, boolean appendOnly);
public void merge(T that, boolean appendOnly);

/**
* Returns the double value at the passed row and column.
Expand All @@ -129,7 +186,7 @@ public interface CacheBlock extends Writable
* @param c column of the value
* @return double value at the passed row and column
*/
double getDouble(int r, int c);
public double getDouble(int r, int c);

/**
* Returns the double value at the passed row and column.
Expand All @@ -138,7 +195,7 @@ public interface CacheBlock extends Writable
* @param c column of the value
* @return double value at the passed row and column
*/
double getDoubleNaN(int r, int c);
public double getDoubleNaN(int r, int c);

/**
* Returns the string of the value at the passed row and column.
Expand All @@ -147,5 +204,5 @@ public interface CacheBlock extends Writable
* @param c column of the value
* @return string of the value at the passed row and column
*/
String getString(int r, int c);
public String getString(int r, int c);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public class CacheBlockFactory
{
public static CacheBlock newInstance(int code) {
public static CacheBlock<?> newInstance(int code) {
switch( code ) {
case 0: return new MatrixBlock();
case 1: return new FrameBlock();
Expand All @@ -44,7 +44,17 @@ public static CacheBlock newInstance(int code) {
throw new RuntimeException("Unsupported cache block type: "+code);
}

public static int getCode(CacheBlock block) {
public static CacheBlock<?> newInstance(CacheBlock<?> block) {
if(block instanceof MatrixBlock)
return new MatrixBlock();
else if(block instanceof FrameBlock)
return new FrameBlock();
else if(block instanceof TensorBlock)
return new TensorBlock();
throw new RuntimeException("Unsupported cache block type: " + block.getClass().getName());
}

public static int getCode(CacheBlock<?> block) {
if (block instanceof MatrixBlock)
return 0;
else if (block instanceof FrameBlock)
Expand All @@ -54,7 +64,7 @@ else if (block instanceof TensorBlock)
throw new RuntimeException("Unsupported cache block type: " + block.getClass().getName());
}

public static ArrayList<?> getPairList(CacheBlock block) {
public static ArrayList<?> getPairList(CacheBlock<?> block) {
int code = getCode(block);
switch (code) {
case 0: return new ArrayList<Pair<MatrixIndexes, MatrixBlock>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void deleteFile(String fname) {
LocalFileUtils.deleteFileIfExists(fname, true);
}

public void serializeData(ByteBuffer bbuff, CacheBlock cb) {
public void serializeData(ByteBuffer bbuff, CacheBlock<?> cb) {
//sync or async file delete
if( CacheableData.CACHING_ASYNC_SERIALIZE )
_pool.submit(new CacheMaintenanceService.DataSerializerTask(bbuff, cb));
Expand Down Expand Up @@ -85,9 +85,9 @@ public void run() {

private static class DataSerializerTask implements Runnable {
private ByteBuffer _bbuff = null;
private CacheBlock _cb = null;
private CacheBlock<?> _cb = null;

public DataSerializerTask(ByteBuffer bbuff, CacheBlock cb) {
public DataSerializerTask(ByteBuffer bbuff, CacheBlock<?> cb) {
_bbuff = bbuff;
_cb = cb;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
* to allow Java garbage collection. If other parts of the system continue
* keep references to the cache block, its eviction will not release any memory.
*/
public abstract class CacheableData<T extends CacheBlock> extends Data
public abstract class CacheableData<T extends CacheBlock<?>> extends Data
{
private static final long serialVersionUID = -413810592207212835L;

Expand Down Expand Up @@ -1047,7 +1047,7 @@ protected boolean isBelowCachingThreshold() {
return (_data.getInMemorySize() <= CACHING_THRESHOLD);
}

public static boolean isBelowCachingThreshold(CacheBlock data) {
public static boolean isBelowCachingThreshold(CacheBlock<?> data) {
boolean ret;
if (OptimizerUtils.isUMMEnabled())
ret = UnifiedMemoryManager.getCacheBlockSize(data) <= CACHING_THRESHOLD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public enum RPolicy {
//maintenance service for synchronous or asynchronous delete of evicted files
private static CacheMaintenanceService _fClean;

public static int writeBlock(String fname, CacheBlock cb)
public static int writeBlock(String fname, CacheBlock<?> cb)
throws IOException
{
//obtain basic meta data of cache block
Expand Down Expand Up @@ -131,10 +131,10 @@ public static void deleteBlock(String fname)
_fClean.deleteFile(fname);
}

public static CacheBlock readBlock(String fname, boolean matrix)
public static CacheBlock<?> readBlock(String fname, boolean matrix)
throws IOException
{
CacheBlock cb = null;
CacheBlock<?> cb = null;
ByteBuffer ldata = null;

//probe write buffer
Expand Down Expand Up @@ -211,7 +211,7 @@ public static int getQueueSize() {
return _mQueue.size();
}

public static long getCacheBlockSize(CacheBlock cb) {
public static long getCacheBlockSize(CacheBlock<?> cb) {
return cb.isShallowSerialize() ?
cb.getInMemorySize() : cb.getExactSerializedSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ public static long getUMMFree() {
}

// Reads a cached object. This is called from cacheabledata implementations
public static CacheBlock readBlock(String fname, boolean matrix)
public static CacheBlock<?> readBlock(String fname, boolean matrix)
throws IOException
{
CacheBlock cb = null;
CacheBlock<?> cb = null;
ByteBuffer ldata = null;

//probe write buffer
Expand Down Expand Up @@ -336,7 +336,7 @@ public static int makeSpace(long reqSpace) {
}

// Write an object to the cache
public static int writeBlock(String fname, CacheBlock cb)
public static int writeBlock(String fname, CacheBlock<?> cb)
throws IOException
{
//obtain basic metadata of the cache block
Expand Down Expand Up @@ -380,7 +380,7 @@ public static int writeBlock(String fname, CacheBlock cb)
return numEvicted;
}

public static long getCacheBlockSize(CacheBlock cb) {
public static long getCacheBlockSize(CacheBlock<?> cb) {
return cb.isShallowSerialize() ?
cb.getInMemorySize() : cb.getExactSerializedSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ public void setFrameOutput(String varName, FrameBlock outputData) {
setVariable(varName, fo);
}

public static CacheableData<?> createCacheableData(CacheBlock cb) {
public static CacheableData<?> createCacheableData(CacheBlock<?> cb) {
if( cb instanceof MatrixBlock )
return createMatrixObject((MatrixBlock) cb);
else if( cb instanceof FrameBlock )
Expand Down
Loading