Skip to content

Commit

Permalink
[SYSTEMDS-3226] PFOR column group
Browse files Browse the repository at this point in the history
start of ColGroupPFOR

progress on PFOR now with many of the aggregates

fix docs

1960 test to go.

1790 tests left

1284 left ... but added 644 failures

compression updates

7 errors, 8 failures left

7 erros left

Fixed all component tests

move morphing part out

SDC 10x faster left

getting close

40 errors left
  • Loading branch information
Baunsgaard committed Dec 1, 2021
1 parent bb155bc commit 419cec9
Show file tree
Hide file tree
Showing 57 changed files with 5,194 additions and 1,674 deletions.
Expand Up @@ -55,6 +55,7 @@
import org.apache.sysds.runtime.compress.lib.CLALibReExpand;
import org.apache.sysds.runtime.compress.lib.CLALibRightMultBy;
import org.apache.sysds.runtime.compress.lib.CLALibScalar;
import org.apache.sysds.runtime.compress.lib.CLALibSlice;
import org.apache.sysds.runtime.compress.lib.CLALibSquash;
import org.apache.sysds.runtime.compress.lib.CLALibUnary;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
Expand Down Expand Up @@ -691,61 +692,14 @@ public void setOverlapping(boolean overlapping) {
@Override
public MatrixBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock ret) {
validateSliceArgument(rl, ru, cl, cu);
MatrixBlock tmp;
if(rl == ru && cl == cu) {
// get a single index, and return in a matrixBlock
tmp = new MatrixBlock(1, 1, 0);
tmp.appendValue(0, 0, getValue(rl, cl));
return tmp;
}
else if(rl == 0 && ru == getNumRows() - 1) {
tmp = sliceColumns(cl, cu);
tmp.recomputeNonZeros();
return tmp;
}
else if(cl == 0 && cu == getNumColumns() - 1) {
// Row Slice. Potential optimization if the slice contains enough rows.
// +1 since the implementation arguments for slice is inclusive values for ru
// and cu. It is not inclusive in decompression, and construction of MatrixBlock.
tmp = new MatrixBlock(ru + 1 - rl, getNumColumns(), false).allocateDenseBlock();
for(AColGroup g : getColGroups())
g.decompressToBlock(tmp, rl, ru + 1, -rl, 0);
tmp.recomputeNonZeros();
tmp.examSparsity();
return tmp;
}
else {
// In the case where an internal matrix is sliced out, then first slice out the
// columns to an compressed intermediate.
tmp = sliceColumns(cl, cu);
// Then call slice recursively, to do the row slice.
// Since we do not copy the index structure but simply maintain a pointer to the
// original this is fine.
tmp = tmp.slice(rl, ru, 0, tmp.getNumColumns() - 1, ret);
return tmp;
}
}

private CompressedMatrixBlock sliceColumns(int cl, int cu) {
CompressedMatrixBlock ret = new CompressedMatrixBlock(this.getNumRows(), cu + 1 - cl);
List<AColGroup> newColGroups = new ArrayList<>();
for(AColGroup grp : getColGroups()) {
AColGroup slice = grp.sliceColumns(cl, cu + 1);
if(slice != null)
newColGroups.add(slice);
}
ret.allocateColGroupList(newColGroups);
ret.recomputeNonZeros();
ret.overlappingColGroups = this.isOverlapping();
return ret;
return CLALibSlice.slice(this, rl, ru, cl, cu, deep);
}

@Override
public void slice(ArrayList<IndexedMatrixValue> outlist, IndexRange range, int rowCut, int colCut, int blen,
int boundaryRlen, int boundaryClen) {
printDecompressWarning(
MatrixBlock tmp = getUncompressed(
"slice for distribution to spark. (Could be implemented such that it does not decompress)");
MatrixBlock tmp = getUncompressed();
tmp.slice(outlist, range, rowCut, colCut, blen, boundaryRlen, boundaryClen);
}

Expand Down
Expand Up @@ -250,10 +250,6 @@ else if(mb.isEmpty()) {
if(res == null)
return abortCompression();

if(compSettings.isInSparkInstruction) {
// clear soft reference to uncompressed block in case of spark.
res.clearSoftReferenceToDecompressed();
}
return new ImmutablePair<>(res, _stats);
}

Expand Down
Expand Up @@ -48,7 +48,7 @@ public abstract class AColGroup implements Serializable {

/** Public super types of compression ColGroups supported */
public enum CompressionType {
UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC
UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, PFOR,
}

/**
Expand All @@ -57,7 +57,7 @@ public enum CompressionType {
* Protected such that outside the ColGroup package it should be unknown which specific subtype is used.
*/
protected enum ColGroupType {
UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros;
UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, PFOR;
}

/** The ColGroup Indexes contained in the ColGroup */
Expand Down Expand Up @@ -132,14 +132,27 @@ public long estimateInMemorySize() {
}

/**
* Decompress the contents of the column group into the target matrix,.
* Decompress a range of rows into a sparse block
*
* @param target A matrix block where the columns covered by this column group have not yet been filled in.
* @param rl Row to start decompression from
* @param ru Row to end decompression at (not inclusive)
* Note that this is using append, so the sparse column indexes need to be sorted afterwards.
*
* @param sb Sparse Target block
* @param rl Row to start at
* @param ru Row to end at
*/
public final void decompressToSparseBlock(SparseBlock sb, int rl, int ru) {
decompressToSparseBlock(sb, rl, ru, 0, 0);
}

/**
* Decompress a range of rows into a dense block
*
* @param db Sparse Target block
* @param rl Row to start at
* @param ru Row to end at
*/
public final void decompressToBlock(MatrixBlock target, int rl, int ru) {
decompressToBlock(target, rl, ru, 0, 0);
public final void decompressToDenseBlock(DenseBlock db, int rl, int ru) {
decompressToDenseBlock(db, rl, ru, 0, 0);
}

/**
Expand Down Expand Up @@ -326,33 +339,29 @@ public double get(int r, int c) {
protected abstract ColGroupType getColGroupType();

/**
* Decompress the contents of the column group without counting non zeros
* Decompress into the DenseBlock. (no NNZ handling)
*
* The offsets helps us decompress into specific target areas of the output matrix.
*
* If OffR and OffC is 0, then decompression output starts at row offset equal to rl,
* @param db Target DenseBlock
* @param rl Row to start decompression from
* @param ru Row to end decompression at
* @param offR Row offset into the target to decompress
* @param offC Column offset into the target to decompress
*/
public abstract void decompressToDenseBlock(DenseBlock db, int rl, int ru, int offR, int offC);

/**
* Decompress into the SparseBlock. (no NNZ handling)
*
* If for instance a MiniBatch of rows 10 to 15, then target would be 5 rows high and arguments would look like:
*
* cg.decompressToBlock(target, 10, 15, -10, 0)
* Note this method is allowing to calls to append since it is assumed that the sparse column indexes are sorted
* afterwards
*
* @param target a matrix block where the columns covered by this column group have not yet been filled in.
* @param rl Row to start decompression at.
* @param ru Row to end decompression at (not inclusive).
* @param offR RowOffset into target to assign from.
* @param offC ColumnOffset into the target matrix to assign from.
* @param sb Target SparseBlock
* @param rl Row to start decompression from
* @param ru Row to end decompression at
* @param offR Row offset into the target to decompress
* @param offC Column offset into the target to decompress
*/
public final void decompressToBlock(MatrixBlock target, int rl, int ru, int offR, int offC){
if(target.isInSparseFormat())
decompressToSparseBlock(target.getSparseBlock(), rl, ru, offR, offC);
else
decompressToDenseBlock(target.getDenseBlock(), rl, ru, offR, offC);
}


protected abstract void decompressToDenseBlock(DenseBlock db, int rl, int ru,int offR, int offC);

protected abstract void decompressToSparseBlock(SparseBlock sb, int rl, int ru, int offR, int offC);
public abstract void decompressToSparseBlock(SparseBlock sb, int rl, int ru, int offR, int offC);

/**
* Right matrix multiplication with this column group.
Expand Down
Expand Up @@ -55,11 +55,15 @@ protected AColGroupCompressed(int[] colIndices) {

protected abstract void computeColMxx(double[] c, Builtin builtin);

protected abstract void computeSum(double[] c, int nRows, boolean square);
protected abstract void computeSum(double[] c, int nRows);

protected abstract void computeRowSums(double[] c, boolean square, int rl, int ru);
protected abstract void computeRowSums(double[] c, int rl, int ru);

protected abstract void computeColSums(double[] c, int nRows, boolean square);
protected abstract void computeSumSq(double[] c, int nRows);

protected abstract void computeRowSumsSq(double[] c, int rl, int ru);

protected abstract void computeColSumsSq(double[] c, int nRows);

protected abstract void computeRowMxx(double[] c, Builtin builtin, int rl, int ru);

Expand All @@ -79,22 +83,27 @@ public double getMax() {
return computeMxx(Double.NEGATIVE_INFINITY, Builtin.getBuiltinFnObject(BuiltinCode.MAX));
}

@Override
public void computeColSums(double[] c, int nRows) {
computeColSums(c, nRows, false);
}

@Override
public final void unaryAggregateOperations(AggregateUnaryOperator op, double[] c, int nRows, int rl, int ru) {
final ValueFunction fn = op.aggOp.increOp.fn;
if(fn instanceof Plus || fn instanceof KahanPlus || fn instanceof KahanPlusSq) {
boolean square = fn instanceof KahanPlusSq;
if(op.indexFn instanceof ReduceAll)
computeSum(c, nRows, square);
else if(op.indexFn instanceof ReduceCol)
computeRowSums(c, square, rl, ru);
else if(op.indexFn instanceof ReduceRow)
computeColSums(c, nRows, square);
if(square){
if(op.indexFn instanceof ReduceAll)
computeSumSq(c, nRows);
else if(op.indexFn instanceof ReduceCol)
computeRowSumsSq(c, rl, ru);
else if(op.indexFn instanceof ReduceRow)
computeColSumsSq(c, nRows);
}
else{
if(op.indexFn instanceof ReduceAll)
computeSum(c, nRows);
else if(op.indexFn instanceof ReduceCol)
computeRowSums(c, rl, ru);
else if(op.indexFn instanceof ReduceRow)
computeColSums(c, nRows);
}
}
else if(fn instanceof Multiply) {
if(op.indexFn instanceof ReduceAll)
Expand Down
Expand Up @@ -38,7 +38,6 @@
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.matrix.data.LibMatrixMult;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;

/**
* Base class for column groups encoded with value dictionary. This include column groups such as DDC OLE and RLE.
Expand Down Expand Up @@ -171,7 +170,7 @@ protected abstract void decompressToSparseBlockDenseDictionary(SparseBlock ret,
double[] values);

@Override
public final int getNumValues() {
public int getNumValues() {
return _dict.getNumberOfValues(_colIndexes.length);
}

Expand Down Expand Up @@ -286,56 +285,21 @@ private double[] rightMMPreAggSparse(int numVals, SparseBlock b, int[] aggregate
}

@Override
protected final double computeMxx(double c, Builtin builtin) {
protected double computeMxx(double c, Builtin builtin) {
if(_zeros)
c = builtin.execute(c, 0);
return _dict.aggregate(c, builtin);

}

@Override
protected final void computeColMxx(double[] c, Builtin builtin) {
protected void computeColMxx(double[] c, Builtin builtin) {
if(_zeros)
for(int x = 0; x < _colIndexes.length; x++)
c[_colIndexes[x]] = builtin.execute(c[_colIndexes[x]], 0);

_dict.aggregateCols(c, builtin, _colIndexes);
}

/**
* Method for use by subclasses. Applies a scalar operation to the value metadata stored in the dictionary.
*
* @param op scalar operation to perform
* @return transformed copy of value metadata for this column group
*/
protected final ADictionary applyScalarOp(ScalarOperator op) {
return _dict.clone().inplaceScalarOp(op);
}

/**
* Method for use by subclasses. Applies a scalar operation to the value metadata stored in the dictionary. This
* specific method is used in cases where an new entry is to be added in the dictionary.
*
* Method should only be called if the newVal is not 0! Also the newVal should already have the operator applied.
*
* @param op The Operator to apply to the underlying data.
* @param newVal The new Value to append to the underlying data.
* @param numCols The number of columns in the ColGroup, to specify how many copies of the newVal should be appended.
* @return The new Dictionary containing the values.
*/
protected final ADictionary applyScalarOp(ScalarOperator op, double newVal, int numCols) {
return _dict.applyScalarOp(op, newVal, numCols);
}

protected static double[] allocDVector(int len, boolean reset) {
return new double[len];
}

protected static int[] allocIVector(int len, boolean reset) {
LOG.error("deprecated allocIVector");
return new int[len + 1];
}

@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
Expand All @@ -362,16 +326,23 @@ public long getExactSizeOnDisk() {
public abstract int[] getCounts(int[] out);

@Override
protected final void computeSum(double[] c, int nRows, boolean square) {
if(square)
c[0] += _dict.sumsq(getCounts(), _colIndexes.length);
else
c[0] += _dict.sum(getCounts(), _colIndexes.length);
protected void computeSum(double[] c, int nRows) {
c[0] += _dict.sum(getCounts(), _colIndexes.length);
}

@Override
public void computeColSums(double[] c, int nRows) {
_dict.colSum(c, getCounts(), _colIndexes);
}

@Override
protected void computeSumSq(double[] c, int nRows) {
c[0] += _dict.sumSq(getCounts(), _colIndexes.length);
}

@Override
protected final void computeColSums(double[] c, int nRows, boolean square) {
_dict.colSum(c, getCounts(), _colIndexes, square);
protected void computeColSumsSq(double[] c, int nRows) {
_dict.colSumSq(c, getCounts(), _colIndexes);
}

@Override
Expand Down Expand Up @@ -425,7 +396,7 @@ public AColGroupValue copy() {
}

@Override
protected final AColGroup sliceSingleColumn(int idx) {
protected AColGroup sliceSingleColumn(int idx) {
final AColGroupValue ret = (AColGroupValue) copy();
ret._colIndexes = new int[] {0};
if(_colIndexes.length == 1)
Expand All @@ -437,28 +408,28 @@ protected final AColGroup sliceSingleColumn(int idx) {
}

@Override
protected final AColGroup sliceMultiColumns(int idStart, int idEnd, int[] outputCols) {
protected AColGroup sliceMultiColumns(int idStart, int idEnd, int[] outputCols) {
final AColGroupValue ret = (AColGroupValue) copy();
ret._dict = ret._dict.sliceOutColumnRange(idStart, idEnd, _colIndexes.length);
ret._colIndexes = outputCols;
return ret;
}

@Override
protected final void tsmm(double[] result, int numColumns, int nRows) {
protected void tsmm(double[] result, int numColumns, int nRows) {
final int[] counts = getCounts();
tsmm(result, numColumns, counts, _dict, _colIndexes);
}

@Override
public final boolean containsValue(double pattern) {
public boolean containsValue(double pattern) {
if(pattern == 0 && _zeros)
return true;
return _dict.containsValue(pattern);
}

@Override
public final long getNumberNonZeros(int nRows) {
public long getNumberNonZeros(int nRows) {
int[] counts = getCounts();
return _dict.getNumberNonZeros(counts, _colIndexes.length);
}
Expand Down

0 comments on commit 419cec9

Please sign in to comment.