Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ private static void injectCompressionDirective(Hop hop, CompressConfig compress,
}

private static boolean satisfiesSizeConstraintsForCompression(Hop hop) {
return hop.getDim2() >= 1 &&
((hop.getDim1() >= 1000 && hop.getDim2() < 100) || hop.getDim1() / hop.getDim2() >= 1000);
if(hop.getDim2() >= 1) {
return (hop.getDim1() >= 1000 && hop.getDim2() < 100) || hop.getDim1() / hop.getDim2() >= 75;
}
return false;
}

private static boolean satisfiesCompressionCondition(Hop hop) {
Expand Down Expand Up @@ -191,8 +193,11 @@ private static boolean satisfiesAutoCompressionCondition(Hop hop, DMLProgram pro
}

private static boolean satisfiesCostCompressionCondition(Hop hop, DMLProgram prog) {
return satisfiesAggressiveCompressionCondition(hop) && hop.dimsKnown(false) &&
analyseProgram(hop, prog).isValidAggressiveCompression();
boolean satisfies = true;
satisfies &= satisfiesAggressiveCompressionCondition(hop);
satisfies &= hop.dimsKnown(false);
satisfies &= analyseProgram(hop, prog).isValidAggressiveCompression();
return satisfies;

}

Expand All @@ -210,7 +215,7 @@ private static class ProbeStatus {
private int numberCompressedOpsExecuted = 0;
private int numberDecompressedOpsExecuted = 0;
private int inefficientSupportedOpsExecuted = 0;
private int superEfficientSuportedOpsExecuted = 0;
// private int superEfficientSupportedOpsExecuted = 0;

private boolean foundStart = false;
private boolean usedInLoop = false;
Expand Down Expand Up @@ -363,7 +368,6 @@ private void handleFunctionOps(Hop current) {
private void handleApplicableOps(Hop current) {
// Valid with uncompressed outputs
boolean compUCOut = false;
LOG.error(current);
// // tsmm
// compUCOut |= (current instanceof AggBinaryOp && current.getDim2() <= current.getBlocksize() &&
// ((AggBinaryOp) current).checkTransposeSelf() == MMTSJType.LEFT);
Expand Down Expand Up @@ -403,7 +407,7 @@ private void handleApplicableOps(Hop current) {
boolean metaOp = HopRewriteUtils.isUnary(current, OpOp1.NROW, OpOp1.NCOL);
boolean ctableOp = HopRewriteUtils.isTernary(current, OpOp3.CTABLE);

if(ctableOp){
if(ctableOp) {
numberCompressedOpsExecuted += 4;
compCOut = true;
}
Expand Down Expand Up @@ -431,7 +435,7 @@ private boolean isValidAggressiveCompression() {
if(LOG.isDebugEnabled())
LOG.debug(this.toString());
return (inefficientSupportedOpsExecuted < numberCompressedOpsExecuted) &&
(usedInLoop || numberCompressedOpsExecuted > 3) && numberDecompressedOpsExecuted < 1;
(usedInLoop || numberCompressedOpsExecuted > 3) && numberDecompressedOpsExecuted < 1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,35 +205,21 @@ public MatrixBlock decompress() {

// preallocation sparse rows to avoid repeated reallocations
MatrixBlock ret = new MatrixBlock(rlen, clen, false, -1);

ret.allocateDenseBlock();
// (nonZeros == -1) ?
// .allocateBlock() : new MatrixBlock(rlen, clen, sparse,
// nonZeros).allocateBlock();

// if(ret.isInSparseFormat()) {
// int[] rnnz = new int[rlen];
// // for(ColGroup grp : _colGroups)
// // grp.countNonZerosPerRow(rnnz, 0, rlen);
// ret.allocateSparseRowsBlock();
// SparseBlock rows = ret.getSparseBlock();
// for(int i = 0; i < rlen; i++)
// rows.allocate(i, rnnz[i]);
// }
// todo Add sparse decompress.

// core decompression (append if sparse)
for(AColGroup grp : _colGroups)
grp.decompressToBlockUnSafe(ret, 0, rlen, 0, grp.getValues());

// post-processing (for append in decompress)
if(ret.getNonZeros() == -1 || nonZeros == -1) {
ret.recomputeNonZeros();
}
else {
ret.setNonZeros(nonZeros);
}
if(ret.isInSparseFormat())
ret.sortSparseRows();

if(nonZeros == -1)
ret.setNonZeros(this.recomputeNonZeros());
else
ret.setNonZeros(nonZeros);

if(DMLScript.STATISTICS || LOG.isDebugEnabled()) {
double t = time.stop();
LOG.debug("decompressed block w/ k=" + 1 + " in " + t + "ms.");
Expand All @@ -256,9 +242,12 @@ public MatrixBlock decompress(int k) {
Timing time = new Timing(true);

MatrixBlock ret = new MatrixBlock(rlen, clen, false, -1).allocateBlock();
ret.allocateDenseBlock();
if(nonZeros == -1)
ret.setNonZeros(this.recomputeNonZeros());
else
ret.setNonZeros(nonZeros);

nonZeros = 0;
boolean overlapping = isOverlapping();
try {
ExecutorService pool = CommonThreadPool.get(k);
int rlen = getNumRows();
Expand All @@ -268,37 +257,47 @@ public MatrixBlock decompress(int k) {
ArrayList<DecompressTask> tasks = new ArrayList<>();
for(int i = 0; i < k & i * blklen < getNumRows(); i++)
tasks.add(
new DecompressTask(_colGroups, ret, i * blklen, Math.min((i + 1) * blklen, rlen), overlapping));
new DecompressTask(_colGroups, ret, i * blklen, Math.min((i + 1) * blklen, rlen), overlappingColGroups));
List<Future<Long>> rtasks = pool.invokeAll(tasks);
pool.shutdown();
for(Future<Long> rt : rtasks)
nonZeros += rt.get(); // error handling
rt.get(); // error handling
}
catch(InterruptedException | ExecutionException ex) {
LOG.error("Parallel decompression failed defaulting to non parallel implementation " + ex.getMessage());
nonZeros = -1;
ex.printStackTrace();
return decompress();
}
if(overlapping) {
ret.recomputeNonZeros();
}
else {
ret.setNonZeros(nonZeros);
}

if(DMLScript.STATISTICS || LOG.isDebugEnabled()) {
double t = time.stop();
LOG.debug("decompressed block w/ k=" + k + " in " + time.stop() + "ms.");
DMLCompressionStatistics.addDecompressTime(t, k);
}

return ret;
}

public CompressedMatrixBlock squash(int k) {
return CLALibSquash.squash(this, k);
}

@Override
public long recomputeNonZeros() {
if(overlappingColGroups) {
nonZeros = clen * rlen;
}
else {
long nnz = 0;
for(AColGroup g : _colGroups) {
nnz += g.getNumberNonZeros();
}
nonZeros = nnz;
}
return nonZeros;

}

/**
* Obtain an upper bound on the memory used to store the compressed block.
*
Expand Down Expand Up @@ -497,6 +496,7 @@ else if(ctype == ChainType.XtwXv)
CLALibLeftMultBy.leftMultByMatrixTransposed(this, tmp, out, k);
out = LibMatrixReorg.transposeInPlace(out, k);

out.recomputeNonZeros();
return out;
}

Expand Down Expand Up @@ -811,7 +811,7 @@ else if(cl == 0 && cu == getNumColumns() - 1) {
tmp = new MatrixBlock(ru + 1 - rl, getNumColumns(), false).allocateDenseBlock();
for(AColGroup g : getColGroups())
g.decompressToBlock(tmp, rl, ru + 1, 0);

tmp.recomputeNonZeros();
return tmp;
}
else {
Expand All @@ -825,6 +825,7 @@ else if(cl == 0 && cu == getNumColumns() - 1) {
// this is fine.
tmp = tmp.slice(rl, ru, 0, tmp.getNumColumns() - 1, ret);
}
tmp.recomputeNonZeros();
ret = tmp;
return tmp;
}
Expand Down Expand Up @@ -1234,7 +1235,7 @@ public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, Well1024a b
public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed, int k) {
throw new DMLRuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported.");
}

@Override
public MatrixBlock seqOperationsInPlace(double from, double to, double incr) {
// output should always be uncompressed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private Pair<MatrixBlock, CompressionStatistics> compressMatrix() {
if(res == null)
return abortCompression();

res.recomputeNonZeros();
return new ImmutablePair<>(res, _stats);
}

Expand Down Expand Up @@ -190,12 +191,15 @@ private void cleanupPhase() {
mb.cleanupBlock(true, true);

_stats.size = res.estimateCompressedSizeInMemory();
_stats.originalSize = mb.estimateSizeInMemory();
_stats.originalSize = original.estimateSizeInMemory();
_stats.denseSize = MatrixBlock.estimateSizeInMemory(original.getNumRows(), original.getNumColumns(), 1.0);
_stats.ratio = _stats.originalSize / (double) _stats.size;

if(_stats.ratio < 1) {
LOG.info("--compressed size: " + _stats.size);
LOG.info("--compression ratio: " + _stats.ratio);
LOG.info("--dense size: " + _stats.denseSize);
LOG.info("--original size: " + _stats.originalSize);
LOG.info("--compressed size: " + _stats.size);
LOG.info("--compression ratio: " + _stats.ratio );
LOG.info("Abort block compression because compression ratio is less than 1.");
res = null;
return;
Expand Down Expand Up @@ -239,8 +243,10 @@ private void logPhase() {
LOG.debug("--compression phase " + phase + " Cleanup : " + _stats.getLastTimePhase());
LOG.debug("--col groups types " + _stats.getGroupsTypesString());
LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
LOG.debug("--compressed size: " + _stats.size);
LOG.debug("--compression ratio: " + _stats.ratio);
LOG.debug("--dense size: " + _stats.denseSize);
LOG.debug("--original size: " + _stats.originalSize);
LOG.debug("--compressed size: " + _stats.size);
LOG.debug("--compression ratio: " + _stats.ratio );
int[] lengths = new int[res.getColGroups().size()];
int i = 0;
for(AColGroup colGroup : res.getColGroups()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class CompressionStatistics {
private double lastPhase;
public double ratio;
public long originalSize;
public long denseSize;
public long estimatedSizeColGroups;
public long estimatedSizeCols;
public long size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ public boolean isDense() {

public abstract boolean containsValue(double pattern);

public abstract long getNumberNonZeros();

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,6 @@ public static ADictionary read(DataInput in, boolean lossy) throws IOException {
public abstract ADictionary reExpandColumns(int max);

public abstract boolean containsValue(double pattern);

public abstract long getNumberNonZeros(int[] counts, int nCol);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ protected ColGroupEmpty() {
super();
}


/**
* Constructs an Constant Colum Group, that contains only one tuple, with the given value.
*
Expand All @@ -49,6 +50,14 @@ public ColGroupEmpty(int[] colIndices, int numRows) {
_zeros = true;
}

public static ColGroupEmpty generate(int nCol, int nRow){
int[] cols = new int[nCol];
for(int i =0; i < nCol; i++){
cols[i] =i;
}
return new ColGroupEmpty(cols,nRow);
}

@Override
public int[] getCounts(int[] out) {
// nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,9 @@ public AColGroup copy() {
public boolean containsValue(double pattern){
return _data.containsValue(pattern);
}

@Override
public long getNumberNonZeros(){
return _data.getNonZeros();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1030,4 +1030,10 @@ public void leftMultBySelfDiagonalColGroup(double[] result, int numColumns) {
public boolean containsValue(double pattern){
return _dict.containsValue(pattern);
}

@Override
public long getNumberNonZeros(){
int[] counts = getCounts();
return _dict.getNumberNonZeros(counts, _colIndexes.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,20 @@ public boolean containsValue(double pattern) {

return false;
}

@Override
public long getNumberNonZeros(int[] counts, int nCol){
long nnz = 0;
final int nRow = _values.length / nCol;
for(int i = 0; i < nRow; i++){
long rowCount = 0;
final int off = i * nCol;
for(int j = off; j < off + nCol; j++){
if(_values[j] != 0)
rowCount ++;
}
nnz += rowCount * counts[i];
}
return nnz;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -452,4 +452,20 @@ public boolean containsValue(double pattern){
return false;
throw new NotImplementedException("Not contains value on Q Dictionary");
}

@Override
public long getNumberNonZeros(int[] counts, int nCol){
long nnz = 0;
final int nRow = _values.length / nCol;
for(int i = 0; i < nRow; i++){
long rowCount = 0;
final int off = i * nCol;
for(int j = off; j < off + nCol; j++){
if(_values[j] != 0)
rowCount ++;
}
nnz += rowCount * counts[i];
}
return nnz;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private static Long getCompressionSize(CompressionType ct, EstimationFactors fac
case UNCOMPRESSED:
return ColGroupSizes.estimateInMemorySizeUncompressed(fact.numRows,
fact.numCols,
((double) fact.numVals / (fact.numRows * fact.numCols)));
((double) fact.numOffs / (fact.numRows * fact.numCols)));
case SDC:
if(fact.numOffs == 1)
return ColGroupSizes.estimateInMemorySizeSDCSingle(fact.numCols,
Expand Down
Loading