Skip to content

Commit

Permalink
[SYSTEMDS-3002] CLA CoCoding without Dictionaries
Browse files Browse the repository at this point in the history
This commit make the co-coding cheaper if many column combinations is
constructed.

For example with BinaryMNIST and bin packing co-coding, the execution
time is halved from 6 to 3 sec in co-coding phase
This gives a total compression time of: 6 sec + IO of ~10. Also
notably is normal MNIST of 60 k values, there compression time is now
~200ms.

This change impacts the matrix cost based column co coding the most,
reducing the overall compression time in binary mnist from ~120 sec
to ~8 as well.

Closes #1296
  • Loading branch information
Baunsgaard committed Jun 7, 2021
1 parent 1464d11 commit c59ad18
Show file tree
Hide file tree
Showing 35 changed files with 861 additions and 310 deletions.
Expand Up @@ -78,6 +78,7 @@
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.CTableMap;
import org.apache.sysds.runtime.matrix.data.LibMatrixBincell;
import org.apache.sysds.runtime.matrix.data.LibMatrixDatagen;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
Expand Down Expand Up @@ -492,14 +493,14 @@ public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, Matri
return out;

BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());

MatrixBlock tmp = CLALibRightMultBy.rightMultByMatrix(this, v, null, k, true);
boolean allowOverlap = ConfigurationManager.getDMLConfig().getBooleanValue(DMLConfig.COMPRESSED_OVERLAPPING);
MatrixBlock tmp = CLALibRightMultBy.rightMultByMatrix(this, v, null, k, allowOverlap);

if(ctype == ChainType.XtwXv) {
// if(tmp instanceof CompressedMatrixBlock)
tmp = CLALibBinaryCellOp.binaryOperations(bop, (CompressedMatrixBlock) tmp, w, null);
// else
// LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
if(tmp instanceof CompressedMatrixBlock)
tmp = CLALibBinaryCellOp.binaryOperations(bop, (CompressedMatrixBlock) tmp, w, null);
else
LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
}

if(tmp instanceof CompressedMatrixBlock)
Expand Down
Expand Up @@ -170,7 +170,9 @@ private void classifyPhase() {
}

private void coCodePhase(CompressedSizeEstimator sizeEstimator, CompressedSizeInfo sizeInfos, int numRows) {
coCodeColGroups = PlanningCoCoder.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);
// for(int i = 0; i < 100000; i ++)
coCodeColGroups = PlanningCoCoder.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);

_stats.estimatedSizeCoCoded = coCodeColGroups.memoryEstimate();
logPhase();
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType;
import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory.EstimationType;

/**
* Compression Settings class, used as a bundle of parameters inside the Compression framework. See
Expand Down Expand Up @@ -99,10 +100,13 @@ public class CompressionSettings {
*/
public final int minimumSampleSize;

/** The sample type used for sampling */
public final EstimationType estimationType;

protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, String transposeInput,
boolean skipList, int seed, boolean lossy,
EnumSet<CompressionType> validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner,
int maxColGroupCoCode, double coCodePercentage, int minimumSampleSize) {
boolean skipList, int seed, boolean lossy, EnumSet<CompressionType> validCompressions,
boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage,
int minimumSampleSize, EstimationType estimationType) {
this.samplingRatio = samplingRatio;
this.allowSharedDictionary = allowSharedDictionary;
this.transposeInput = transposeInput;
Expand All @@ -115,7 +119,9 @@ protected CompressionSettings(double samplingRatio, boolean allowSharedDictionar
this.maxColGroupCoCode = maxColGroupCoCode;
this.coCodePercentage = coCodePercentage;
this.minimumSampleSize = minimumSampleSize;
LOG.debug(this);
this.estimationType = estimationType;
if(LOG.isDebugEnabled())
LOG.debug(this);
}

@Override
Expand All @@ -131,6 +137,8 @@ public String toString() {
sb.append("\n Max Static ColGroup CoCode: " + maxColGroupCoCode);
sb.append("\n Max cocodePercentage: " + coCodePercentage);
sb.append("\n Sample Percentage: " + samplingRatio);
if(samplingRatio < 1.0)
sb.append("\n Estimation Type: " + estimationType);
// If needed for debugging add more fields to the printing.
return sb.toString();
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.sysds.runtime.DMLCompressionException;
import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType;
import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory.EstimationType;

/**
* Builder pattern for Compression Settings. See CompressionSettings for details on values.
Expand All @@ -43,6 +44,7 @@ public class CompressionSettingsBuilder {
private int maxColGroupCoCode = 10000;
private double coCodePercentage = 0.01;
private int minimumSampleSize = 2000;
private EstimationType estimationType = EstimationType.HassAndStokes;

private final static double defaultSampleRate = 0.01;

Expand Down Expand Up @@ -261,6 +263,11 @@ public CompressionSettingsBuilder setMinimumSampleSize(int minimumSampleSize) {
return this;
}

public CompressionSettingsBuilder setEstimationType(EstimationType estimationType){
this.estimationType = estimationType;
return this;
}

/**
* Create the CompressionSettings object to use in the compression.
*
Expand All @@ -269,6 +276,6 @@ public CompressionSettingsBuilder setMinimumSampleSize(int minimumSampleSize) {
public CompressionSettings create() {
return new CompressionSettings(samplingRatio, allowSharedDictionary, transposeInput, skipList, seed, lossy,
validCompressions, sortValuesByLength, columnPartitioner, maxColGroupCoCode, coCodePercentage,
minimumSampleSize);
minimumSampleSize, estimationType);
}
}
Expand Up @@ -25,6 +25,7 @@
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.utils.Util;

public abstract class AColumnCoCoder {

Expand Down Expand Up @@ -54,15 +55,17 @@ protected CompressedSizeInfoColGroup join(CompressedSizeInfoColGroup lhs, Compre

protected CompressedSizeInfoColGroup joinWithAnalysis(CompressedSizeInfoColGroup lhs,
CompressedSizeInfoColGroup rhs) {
int[] joined = Util.join(lhs.getColumns(), rhs.getColumns());
return _est.estimateCompressedColGroupSize(joined,( lhs.getNumVals() + 1) * (rhs.getNumVals() + 1));
return _est.estimateJoinCompressedSize(lhs, rhs);
}

protected CompressedSizeInfoColGroup joinWithoutAnalysis(CompressedSizeInfoColGroup lhs,
CompressedSizeInfoColGroup rhs) {
int[] joined = Util.join(lhs.getColumns(), rhs.getColumns());
int numVals = lhs.getNumVals() + rhs.getNumVals();
return new CompressedSizeInfoColGroup(joined, numVals, _est.getNumRows());
if(numVals< 0 || numVals > _est.getNumRows())
return null;
else
return new CompressedSizeInfoColGroup(joined, numVals, _est.getNumRows());
}

protected CompressedSizeInfoColGroup analyze(CompressedSizeInfoColGroup g) {
Expand Down
Expand Up @@ -61,47 +61,67 @@ protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) {
}

private List<CompressedSizeInfoColGroup> join(List<CompressedSizeInfoColGroup> currentGroups) {

// return joinToSmallForAnalysis(currentGroups);
List<CompressedSizeInfoColGroup> filteredGroups = joinToSmallForAnalysis(currentGroups);
// return currentGroups;
Comparator<CompressedSizeInfoColGroup> comp = Comparator.comparing(CompressedSizeInfoColGroup::getNumVals);
Queue<CompressedSizeInfoColGroup> que = new PriorityQueue<>(currentGroups.size(), comp);
Queue<CompressedSizeInfoColGroup> que = new PriorityQueue<>(filteredGroups.size(), comp);
List<CompressedSizeInfoColGroup> ret = new ArrayList<>();

for(CompressedSizeInfoColGroup g : currentGroups)
que.add(g);

boolean finished = false;
while(!finished) {
if(que.peek() != null) {
CompressedSizeInfoColGroup l = que.poll();
if(que.peek() != null) {
CompressedSizeInfoColGroup r = que.poll();
int worstCaseJoinedSize = l.getNumVals() * r.getNumVals();
if(worstCaseJoinedSize < toSmallForAnalysis)
que.add(joinWithoutAnalysis(l, r));
else if(worstCaseJoinedSize < largestDistinct) {
CompressedSizeInfoColGroup g = joinWithAnalysis(l, r);
if(g.getNumVals() < largestDistinct)
que.add(joinWithAnalysis(l, r));
else {
ret.add(l);
que.add(r);
}
}
else {
ret.add(l);
que.add(r);
}
for(CompressedSizeInfoColGroup g : filteredGroups) {
if(g != null)
que.add(g);
}

CompressedSizeInfoColGroup l = que.poll();

while(que.peek() != null) {
CompressedSizeInfoColGroup r = que.peek();
int worstCaseJoinedSize = l.getNumVals() * r.getNumVals();
if(worstCaseJoinedSize < toSmallForAnalysis) {
que.poll();
que.add(joinWithoutAnalysis(l, r));
}
else if(worstCaseJoinedSize < largestDistinct) {
CompressedSizeInfoColGroup g = joinWithAnalysis(l, r);
if(g != null && g.getNumVals() < largestDistinct) {
que.poll();
que.add(g);
}
else
else
ret.add(l);
}
else
finished = true;
else
ret.add(l);

l = que.poll();
}

ret.add(l);

for(CompressedSizeInfoColGroup g : que)
ret.add(g);

return ret;
}

private List<CompressedSizeInfoColGroup> joinToSmallForAnalysis(List<CompressedSizeInfoColGroup> currentGroups) {
return currentGroups;
// List<CompressedSizeInfoColGroup> tmp = new ArrayList<>();
// int id = 0;
// while(id < currentGroups.size() - 1) {
// CompressedSizeInfoColGroup g1 = currentGroups.get(id);
// CompressedSizeInfoColGroup g2 = currentGroups.get(id + 1);
// if(g1.getNumVals() * g2.getNumVals() < toSmallForAnalysis) {
// tmp.add(joinWithoutAnalysis(g1, g2));
// }
// else {
// tmp.add(g1);
// tmp.add(g2);
// }
// id += 2;

// }
// return tmp;
}
}
Expand Up @@ -103,17 +103,22 @@ private class CostOfJoin implements Comparable<CostOfJoin> {

protected CostOfJoin(CompressedSizeInfoColGroup elm) {
this.elm = elm;
if(elm == null) {
this.cost = Double.POSITIVE_INFINITY;
}
else {

final int nCols = elm.getColumns().length;
final double nRows = _est.getNumRows();
final double preAggregateCost = nRows;
final int nCols = elm.getColumns().length;
final double nRows = _est.getNumRows();
final double preAggregateCost = nRows;

final int numberTuples = elm.getNumVals();
final double tupleSparsity = elm.getTupleSparsity();
final double postScalingCost = (nCols > 1 && tupleSparsity > 0.4) ? numberTuples * nCols : numberTuples *
nCols * tupleSparsity;
final int numberTuples = elm.getNumVals();
final double tupleSparsity = elm.getTupleSparsity();
final double postScalingCost = (nCols > 1 && tupleSparsity > 0.4) ? numberTuples *
nCols : numberTuples * nCols * tupleSparsity;

this.cost = preAggregateCost + postScalingCost;
this.cost = preAggregateCost + postScalingCost;
}
}

@Override
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.utils.Util;

public class PlanningCoCoder {

Expand Down Expand Up @@ -132,7 +133,10 @@ private static CompressedSizeInfo getCoCodingGroupsBruteForce(CompressedSizeInfo
List<CompressedSizeInfoColGroup> finalGroups = new ArrayList<>();
// For each bin of columns that is allowed to potentially cocode.
for(CompressedSizeInfoColGroup bin : bins.getInfo()) {
if(bin.getColumns().length == 1)
final int len = bin.getColumns().length;
if(len == 0)
continue;
else if(len == 1)
// early termination
finalGroups.add(bin);
else
Expand Down Expand Up @@ -248,7 +252,7 @@ else if(rightConst)
g = CompressedSizeInfoColGroup.addConstGroup(c, left, cs.validCompressions);
else {
st3++;
g = est.estimateCompressedColGroupSize(c);
g = est.estimateJoinCompressedSize(left, right);
}

if(leftConst || rightConst)
Expand Down
Expand Up @@ -200,7 +200,7 @@ private static AColGroup compressColGroupForced(MatrixBlock in, int[] colIndexes
CompressedSizeEstimator estimator = new CompressedSizeEstimatorExact(in, compSettings);

CompressedSizeInfoColGroup sizeInfo = new CompressedSizeInfoColGroup(
estimator.estimateCompressedColGroupSize(ubm, colIndexes), compSettings.validCompressions);
estimator.estimateCompressedColGroupSize(ubm, colIndexes), compSettings.validCompressions, ubm);

int numRows = compSettings.transposed ? in.getNumColumns() : in.getNumRows();
return compress(colIndexes, numRows, ubm, sizeInfo.getBestCompressionType(compSettings), compSettings, in,
Expand Down Expand Up @@ -285,8 +285,8 @@ private static AColGroup compressColGroupForced(MatrixBlock in, int[] colIndexes
public static AColGroup compress(int[] colIndexes, int rlen, ABitmap ubm, CompressionType compType,
CompressionSettings cs, MatrixBlock rawMatrixBlock, double tupleSparsity) {

if(compType == CompressionType.UNCOMPRESSED && PartitionerType.isCostBased(cs.columnPartitioner))
compType = CompressionType.DDC;
// if(compType == CompressionType.UNCOMPRESSED && PartitionerType.isCostBased(cs.columnPartitioner))
// compType = CompressionType.DDC;

final IntArrayList[] of = ubm.getOffsetList();

Expand Down Expand Up @@ -411,7 +411,7 @@ private static AColGroup setupSingleValueSDCColGroup(int[] colIndexes, int numRo

private static AColGroup compressDDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs,
double tupleSparsity) {
boolean zeros = ubm.getNumOffsets() < (long) rlen;
boolean zeros = ubm.getNumOffsets() < rlen;
ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity, zeros);
AMapToData data = MapToFactory.create(rlen, zeros, ubm.getOffsetList());
return new ColGroupDDC(colIndexes, rlen, dict, data, null);
Expand Down
Expand Up @@ -29,6 +29,20 @@ public abstract class AMapToData {

protected static final Log LOG = LogFactory.getLog(AMapToData.class.getName());

private int nUnique;

protected AMapToData(int nUnique) {
this.nUnique = nUnique;
}

public int getUnique() {
return nUnique;
}

protected void setUnique(int nUnique){
this.nUnique = nUnique;
}

public abstract int getIndex(int n);

public abstract void set(int n, int v);
Expand Down

0 comments on commit c59ad18

Please sign in to comment.