Skip to content

Commit

Permalink
[SYSTEMDS-3527] CLA DeltaOffset Skip List
Browse files Browse the repository at this point in the history
This commit implements a skip list of the delta offsets,
the list is materialized at calls to getting an offset iterator
at an arbitrary row, and is hidden behind a SoftReference.

Closes #1812
  • Loading branch information
Baunsgaard committed Apr 24, 2023
1 parent 06886ec commit 9ef2e18
Show file tree
Hide file tree
Showing 26 changed files with 601 additions and 192 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ docs/_site
src/test/scripts/**/*.dmlt
src/test/scripts/functions/mlcontextin/
src/test/java/org/apache/sysds/test/component/compress/io/files
src/test/java/org/apache/sysds/test/component/compress/io/filesIOSpark/*
.factorypath

# Excluded sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public synchronized MatrixBlock decompress(int k) {

@Override
public void putInto(MatrixBlock target, int rowOffset, int colOffset, boolean sparseCopyShallow) {
CLALibDecompress.decompressTo(this, target, rowOffset, colOffset, 1);
CLALibDecompress.decompressTo(this, target, rowOffset, colOffset, 1, false);
}

/**
Expand Down Expand Up @@ -617,7 +617,8 @@ public MatrixBlock unaryOperations(UnaryOperator op, MatrixValue result) {

@Override
public boolean containsValue(double pattern) {
if(isOverlapping())
// Only if pattern is a finite value and overlapping then decompress.
if(isOverlapping() && Double.isFinite(pattern))
return getUncompressed("ContainsValue").containsValue(pattern);
else {
for(AColGroup g : _colGroups)
Expand Down Expand Up @@ -1071,6 +1072,11 @@ public void clearSoftReferenceToDecompressed() {
decompressedVersion = null;
}

public void clearCounts(){
for(AColGroup a : _colGroups)
a.clear();
}

@Override
public DenseBlock getDenseBlock() {
throw new DMLCompressionException("Should not get DenseBlock on a compressed Matrix");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,13 @@ public static AColGroup appendN(AColGroup[] groups) {
*/
public abstract ICLAScheme getCompressionScheme();

/**
* Clear variables that can be recomputed from the allocation of this columngroup.
*/
public void clear(){
// do nothing
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ else if(fn instanceof Builtin)
private final void sumSq(IndexFunction idx, double[] c, int nRows, int rl, int ru, double[] preAgg) {
if(idx instanceof ReduceAll)
computeSumSq(c, nRows);
else if(idx instanceof ReduceCol)
else if(idx instanceof ReduceCol) // This call works becasuse the preAgg is correctly the sumsq.
computeRowSums(c, rl, ru, preAgg);
else if(idx instanceof ReduceRow)
computeColSumsSq(c, nRows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ public AColGroup rexpandCols(int max, boolean ignore, boolean cast, int nRows) {
}
}

@Override
public void clear(){
counts = null;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ protected void decompressToDenseBlockSparseDictionary(DenseBlock db, int rl, int
protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
if(db.isContiguous() && _colIndexes.size() == db.getDim(1) && offC == 0)
decompressToDenseBlockAllColumnsContiguous(db, rl, ru, offR, offC);
decompressToDenseBlockAllColumnsContiguous(db, rl + offR, ru + offR);
else
decompressToDenseBlockGeneric(db, rl, ru, offR, offC);
}
Expand All @@ -254,15 +254,14 @@ protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, i
ret.append(offT, _colIndexes.get(j) + offC, _dict.getValue(j));
}

private void decompressToDenseBlockAllColumnsContiguous(DenseBlock db, int rl, int ru, int offR, int offC) {
private final void decompressToDenseBlockAllColumnsContiguous(final DenseBlock db, final int rl, final int ru) {
final double[] c = db.values(0);
final int nCol = _colIndexes.size();
final double[] values = _dict.getValues();
for(int r = rl; r < ru; r++) {
final int offStart = (offR + r) * nCol;
for(int vOff = 0, off = offStart; vOff < nCol; vOff++, off++)
c[off] += values[vOff];
}
final int start = rl * nCol;
final int end = ru * nCol;
for(int i = start; i < end; i++)
c[i] += values[i % nCol];
}

private void decompressToDenseBlockGeneric(DenseBlock db, int rl, int ru, int offR, int offC) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ protected void decompressToDenseBlockSparseDictionary(DenseBlock db, int rl, int
protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
if(db.isContiguous()) {
if(_colIndexes.size() == 1 && db.getDim(1) == 1)
final int nCol = db.getDim(1);
if(_colIndexes.size() == 1 && nCol == 1)
decompressToDenseBlockDenseDictSingleColOutContiguous(db, rl, ru, offR, offC, values);
else if(_colIndexes.size() == 1)
decompressToDenseBlockDenseDictSingleColContiguous(db, rl, ru, offR, offC, values);
else if(_colIndexes.size() == db.getDim(1)) // offC == 0 implied
else if(_colIndexes.size() == nCol) // offC == 0 implied
decompressToDenseBlockDenseDictAllColumnsContiguous(db, rl, ru, offR, values);
else if(offC == 0 && offR == 0)
decompressToDenseBlockDenseDictNoOff(db, rl, ru, values);
Expand All @@ -116,7 +117,7 @@ else if(offC == 0)
decompressToDenseBlockDenseDictGeneric(db, rl, ru, offR, offC, values);
}

private void decompressToDenseBlockDenseDictSingleColContiguous(DenseBlock db, int rl, int ru, int offR, int offC,
private final void decompressToDenseBlockDenseDictSingleColContiguous(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
final double[] c = db.values(0);
final int nCols = db.getDim(1);
Expand All @@ -131,14 +132,14 @@ public AMapToData getMapToData(){
return _data;
}

private void decompressToDenseBlockDenseDictSingleColOutContiguous(DenseBlock db, int rl, int ru, int offR, int offC,
private final void decompressToDenseBlockDenseDictSingleColOutContiguous(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
final double[] c = db.values(0);
for(int i = rl, offT = rl + offR + _colIndexes.get(0) + offC; i < ru; i++, offT++)
c[offT] += values[_data.getIndex(i)];
}

private void decompressToDenseBlockDenseDictAllColumnsContiguous(DenseBlock db, int rl, int ru, int offR,
private final void decompressToDenseBlockDenseDictAllColumnsContiguous(DenseBlock db, int rl, int ru, int offR,
double[] values) {
final double[] c = db.values(0);
final int nCol = _colIndexes.size();
Expand All @@ -151,7 +152,7 @@ private void decompressToDenseBlockDenseDictAllColumnsContiguous(DenseBlock db,
}
}

private void decompressToDenseBlockDenseDictNoColOffset(DenseBlock db, int rl, int ru, int offR, double[] values) {
private final void decompressToDenseBlockDenseDictNoColOffset(DenseBlock db, int rl, int ru, int offR, double[] values) {
final int nCol = _colIndexes.size();
final int colOut = db.getDim(1);
int off = (rl + offR) * colOut;
Expand All @@ -163,7 +164,7 @@ private void decompressToDenseBlockDenseDictNoColOffset(DenseBlock db, int rl, i
}
}

private void decompressToDenseBlockDenseDictNoOff(DenseBlock db, int rl, int ru, double[] values) {
private final void decompressToDenseBlockDenseDictNoOff(DenseBlock db, int rl, int ru, double[] values) {
final int nCol = _colIndexes.size();
final int nColU = db.getDim(1);
final double[] c = db.values(0);
Expand All @@ -175,7 +176,7 @@ private void decompressToDenseBlockDenseDictNoOff(DenseBlock db, int rl, int ru,
}
}

private void decompressToDenseBlockDenseDictGeneric(DenseBlock db, int rl, int ru, int offR, int offC,
private final void decompressToDenseBlockDenseDictGeneric(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
final int nCol = _colIndexes.size();
for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private AColGroup directCompressDDCMultiCol(IColIndex colIndexes, CompressedSize
final int fill = d.getUpperBoundValue();
d.fill(fill);

final DblArrayCountHashMap map = new DblArrayCountHashMap(cg.getNumVals(), colIndexes.size());
final DblArrayCountHashMap map = new DblArrayCountHashMap(Math.max(cg.getNumVals(), 64), colIndexes.size());
boolean extra;
if(nRow < CompressionSettings.PAR_DDC_THRESHOLD || k == 1)
extra = readToMapDDC(colIndexes, map, d, 0, nRow, fill);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public final void decompressToDenseBlockDenseDictionaryWithProvidedIterator(Dens
if(post) {
if(contiguous && _colIndexes.size() == 1)
decompressToDenseBlockDenseDictionaryPostSingleColContiguous(db, rl, ru, offR, offC, values, it);
else if(contiguous && _colIndexes.size() == db.getDim(1)) // OffC == 0 implied
decompressToDenseBlockDenseDictioanryPostAllCols(db, rl, ru, offR, values, it);
else
decompressToDenseBlockDenseDictionaryPostGeneric(db, rl, ru, offR, offC, values, it);
}
Expand All @@ -145,8 +147,8 @@ else if(contiguous && _colIndexes.size() == 1) {
}
}

private void decompressToDenseBlockDenseDictionaryPostSingleColContiguous(DenseBlock db, int rl, int ru, int offR,
int offC, double[] values, AIterator it) {
private final void decompressToDenseBlockDenseDictionaryPostSingleColContiguous(DenseBlock db, int rl, int ru,
int offR, int offC, double[] values, AIterator it) {
final int lastOff = _indexes.getOffsetToLast() + offR;
final int nCol = db.getDim(1);
final double[] c = db.values(0);
Expand All @@ -162,10 +164,27 @@ private void decompressToDenseBlockDenseDictionaryPostSingleColContiguous(DenseB
it.setOff(it.value() - offR);
}

private void decompressToDenseBlockDenseDictionaryPostGeneric(DenseBlock db, int rl, int ru, int offR, int offC,
private final void decompressToDenseBlockDenseDictioanryPostAllCols(DenseBlock db, int rl, int ru, int offR,
double[] values, AIterator it) {
final int lastOff = _indexes.getOffsetToLast();
final int nCol = _colIndexes.size();
while(true) {
final int idx = offR + it.value();
final double[] c = db.values(idx);
final int off = db.pos(idx);
final int offDict = _data.getIndex(it.getDataIndex()) * nCol;
for(int j = 0; j < nCol; j++)
c[off + j] += values[offDict + j];
if(it.value() == lastOff)
return;
it.next();
}
}

private final void decompressToDenseBlockDenseDictionaryPostGeneric(DenseBlock db, int rl, int ru, int offR,
int offC, double[] values, AIterator it) {
final int lastOff = _indexes.getOffsetToLast();
final int nCol = _colIndexes.size();
while(true) {
final int idx = offR + it.value();
final double[] c = db.values(idx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ public void decompressToDenseBlock(DenseBlock db, int rl, int ru, int offR, int
// _data is never empty
if(_data.isInSparseFormat())
decompressToDenseBlockSparseData(db, rl, ru, offR, offC);
else if(_colIndexes.size() == db.getDim(1))
decompressToDenseBlockDenseDataAllColumns(db, rl, ru, offR);
else
decompressToDenseBlockDenseData(db, rl, ru, offR, offC);
}
Expand All @@ -186,6 +188,19 @@ private void decompressToDenseBlockDenseData(DenseBlock db, int rl, int ru, int
}
}

private void decompressToDenseBlockDenseDataAllColumns(DenseBlock db, int rl, int ru, int offR) {
int offT = rl + offR;
final int nCol = _colIndexes.size();
final double[] values = _data.getDenseBlockValues();
int offS = rl * nCol;
for(int row = rl; row < ru; row++, offT++, offS += nCol) {
final double[] c = db.values(offT);
final int off = db.pos(offT);
for(int j = 0; j < nCol; j++)
c[off + j] += values[offS + j];
}
}

private void decompressToDenseBlockSparseData(DenseBlock db, int rl, int ru, int offR, int offC) {

final SparseBlock sb = _data.getSparseBlock();
Expand Down Expand Up @@ -385,7 +400,7 @@ else if((fn instanceof Builtin && ((Builtin) fn).getBuiltinCode() == BuiltinCode
throw new DMLRuntimeException("Not supported type of Unary Aggregate on colGroup");

// inefficient since usually uncompressed column groups are used in case of extreme sparsity, it is fine
// using a slice, since we dont allocate extra just extract the pointers to the sparse rows.
// using a slice, since we don't allocate extra just extract the pointers to the sparse rows.

final MatrixBlock tmpData = (rl == 0 && ru == nRows) ? _data : _data.slice(rl, ru - 1, false);
MatrixBlock tmp = tmpData.aggregateUnaryOperations(op, new MatrixBlock(), tmpData.getNumRows(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public boolean equals(AIterator o) {
}

@Override
public String toString(){
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getSimpleName());
sb.append(" v:" + value() + " d:" + getDataIndex() + " o:" + getOffsetsIndex());
Expand Down

2 comments on commit 9ef2e18

@Baunsgaard
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phaniarnab
Here the lineage test fail again, if you give me some guidelines i can fix it myself.

@phaniarnab
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this reproduce locally?

Please sign in to comment.