Skip to content

Commit

Permalink
[SYSTEMDS-2831] CLA DeltaDDC Column Group
Browse files Browse the repository at this point in the history
This commit adds a new column group type named DeltaDDC, with support for
a few operations.

While merging I added a interface for future readers for delta encoding,
a future tasks is to enable analysis of delta encoding.

DIA project WS2021/22.

Closes #1518
  • Loading branch information
Muh-Osa authored and Baunsgaard committed Jan 25, 2022
1 parent 7f67226 commit 31f2653
Show file tree
Hide file tree
Showing 17 changed files with 557 additions and 19 deletions.
Expand Up @@ -48,7 +48,7 @@ public abstract class AColGroup implements Serializable {

/** Public super types of compression ColGroups supported */
public enum CompressionType {
UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, PFOR,
UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, PFOR, DeltaDDC
}

/**
Expand All @@ -57,7 +57,7 @@ public enum CompressionType {
* Protected such that outside the ColGroup package it should be unknown which specific subtype is used.
*/
protected enum ColGroupType {
UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, PFOR;
UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, PFOR, DeltaDDC;
}

/** The ColGroup Indexes contained in the ColGroup */
Expand Down
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.runtime.compress.colgroup;

import org.apache.commons.lang.NotImplementedException;
import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;

/**
* Class to encapsulate information about a column group that is first delta encoded then encoded with dense dictionary
* encoding (DeltaDDC).
*/
public class ColGroupDeltaDDC extends ColGroupDDC {

/**
* Constructor for serialization
*
* @param numRows number of rows
*/
protected ColGroupDeltaDDC(int numRows) {
super(numRows);
}

protected ColGroupDeltaDDC(int[] colIndices, int numRows, ADictionary dict, AMapToData data, int[] cachedCounts) {
super(colIndices, numRows, dict, data, cachedCounts);
_zeros = false;
_data = data;
}

public CompressionType getCompType() {
return CompressionType.DeltaDDC;
}

@Override
protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
final int nCol = _colIndexes.length;
for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
final double[] c = db.values(offT);
final int off = db.pos(offT) + offC;
final int rowIndex = _data.getIndex(i) * nCol;
final int prevOff = (off == 0) ? off : off - nCol;
for(int j = 0; j < nCol; j++) {
// Here we use the values in the previous row to compute current values along with the delta
double newValue = c[prevOff + j] + values[rowIndex + j];
c[off + _colIndexes[j]] += newValue;
}
}
}

@Override
protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
double[] values) {
throw new NotImplementedException();
}

@Override
public AColGroup scalarOperation(ScalarOperator op) {
return new ColGroupDeltaDDC(_colIndexes, _numRows, _dict.applyScalarOp(op), _data, getCachedCounts());
}
}
Expand Up @@ -58,8 +58,10 @@
import org.apache.sysds.runtime.compress.utils.Util;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.DataConverter;

/**
* Factory class for constructing ColGroups.
Expand Down Expand Up @@ -335,6 +337,12 @@ else if(estimatedBestCompressionType == CompressionType.SDC && colIndexes.length
tmp.getDblCountMap(nrUniqueEstimate), cs);
else if(colIndexes.length > 1 && estimatedBestCompressionType == CompressionType.DDC)
return directCompressDDC(colIndexes, in, cs, cg, k);
else if(estimatedBestCompressionType == CompressionType.DeltaDDC) {
if(colIndexes.length > 1)
return directCompressDeltaDDC(colIndexes, in, cs, cg, k);
else
return compressDeltaDDC(colIndexes, in, cs, cg);
}
else {
final int numRows = cs.transposed ? in.getNumColumns() : in.getNumRows();
final ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, cs.transposed, nrUniqueEstimate,
Expand Down Expand Up @@ -376,11 +384,26 @@ private static AColGroup directCompressDDC(int[] colIndexes, MatrixBlock raw, Co
final int rlen = cs.transposed ? raw.getNumColumns() : raw.getNumRows();
// use a Map that is at least char size.
final int nVal = cg.getNumVals() < 16 ? 16 : Math.max(cg.getNumVals(), 257);
return directCompressDDC(colIndexes, raw, cs, cg, MapToFactory.create(rlen, nVal), rlen, k);
return directCompressDDCColGroup(colIndexes, raw, cs, cg, MapToFactory.create(rlen, nVal), rlen, k, false);
}

private static AColGroup directCompressDDC(int[] colIndexes, MatrixBlock raw, CompressionSettings cs,
CompressedSizeInfoColGroup cg, AMapToData data, int rlen, int k) {
private static AColGroup directCompressDeltaDDC(int[] colIndexes, MatrixBlock raw, CompressionSettings cs,
CompressedSizeInfoColGroup cg, int k) {
final int rlen = cs.transposed ? raw.getNumColumns() : raw.getNumRows();
// use a Map that is at least char size.
final int nVal = cg.getNumVals() < 16 ? 16 : Math.max(cg.getNumVals(), 257);
if(cs.transposed) {
LOG.warn("In-effecient transpose back of the input matrix to do delta encoding");
raw = LibMatrixReorg.transposeInPlace(raw, k);
cs.transposed = false;
}
// Delta encode the raw data
raw = deltaEncodeMatrixBlock(raw);
return directCompressDDCColGroup(colIndexes, raw, cs, cg, MapToFactory.create(rlen, nVal), rlen, k, true);
}

private static AColGroup directCompressDDCColGroup(int[] colIndexes, MatrixBlock raw, CompressionSettings cs,
CompressedSizeInfoColGroup cg, AMapToData data, int rlen, int k, boolean deltaEncoded) {
final int fill = data.getUpperBoundValue();
data.fill(fill);

Expand All @@ -396,7 +419,7 @@ private static AColGroup directCompressDDC(int[] colIndexes, MatrixBlock raw, Co
// This is highly unlikely but could happen if forced compression of
// not transposed column and the estimator says use DDC.
return new ColGroupEmpty(colIndexes);
ADictionary dict = DictionaryFactory.create(map, colIndexes.length, extra);
ADictionary dict = DictionaryFactory.create(map, colIndexes.length, extra, deltaEncoded);
if(extra) {
data.replace(fill, map.size());
data.setUnique(map.size() + 1);
Expand All @@ -405,8 +428,10 @@ private static AColGroup directCompressDDC(int[] colIndexes, MatrixBlock raw, Co
data.setUnique(map.size());

AMapToData resData = MapToFactory.resize(data, map.size() + (extra ? 1 : 0));
ColGroupDDC res = new ColGroupDDC(colIndexes, rlen, dict, resData, null);
return res;
if(deltaEncoded)
return new ColGroupDeltaDDC(colIndexes, rlen, dict, resData, null);
else
return new ColGroupDDC(colIndexes, rlen, dict, resData, null);
}

private static boolean readToMapDDC(final int[] colIndexes, final MatrixBlock raw, final DblArrayCountHashMap map,
Expand Down Expand Up @@ -460,6 +485,22 @@ private static boolean parallelReadToMapDDC(final int[] colIndexes, final Matrix
}
}

private static MatrixBlock deltaEncodeMatrixBlock(MatrixBlock mb) {
LOG.warn("Delta encoding entire matrix input!!");
int rows = mb.getNumRows();
int cols = mb.getNumColumns();
double[][] ret = new double[rows][cols];
double[] a = mb.getDenseBlockValues();
for(int i = 0, ix = 0; i < rows; i++) {
int prevRowOff = i > 0 ? ix - cols : 0;
for(int j = 0; j < cols; j++, ix++) {
double currentValue = a[ix];
ret[i][j] = i > 0 ? currentValue - a[prevRowOff + j] : currentValue;
}
}
return DataConverter.convertToMatrixBlock(ret);
}

static class readToMapDDCTask implements Callable<Boolean> {
private final int[] _colIndexes;
private final MatrixBlock _raw;
Expand Down Expand Up @@ -590,6 +631,27 @@ private static AColGroup compressDDC(int[] colIndexes, int rlen, ABitmap ubm, Co
return new ColGroupDDC(colIndexes, rlen, dict, data, null);
}

private static AColGroup compressDeltaDDC(int[] colIndexes, MatrixBlock in, CompressionSettings cs,
CompressedSizeInfoColGroup cg) {

LOG.warn("Multi column Delta encoding only supported if delta encoding is only compression");
if(cs.transposed) {
LibMatrixReorg.transposeInPlace(in, 1);
cs.transposed = false;
}
// Delta encode the raw data
in = deltaEncodeMatrixBlock(in);

final int rlen = in.getNumRows();
// TODO Add extractBitMap that is delta to not require delta encoding entire input matrix.
final ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, cs.transposed, cg.getNumVals(),
cs.sortTuplesByFrequency);
boolean zeros = ubm.getNumOffsets() < rlen;
ADictionary dict = DictionaryFactory.create(ubm, cg.getTupleSparsity(), zeros);
AMapToData data = MapToFactory.create(rlen, zeros, ubm.getOffsetList());
return new ColGroupDeltaDDC(colIndexes, rlen, dict, data, null);
}

private static AColGroup compressOLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs,
double tupleSparsity) {

Expand Down
Expand Up @@ -106,6 +106,8 @@ private static AColGroup constructColGroup(ColGroupType ctype, int nRows){
return new ColGroupRLE(nRows);
case DDC:
return new ColGroupDDC(nRows);
case DeltaDDC:
return new ColGroupDeltaDDC(nRows);
case CONST:
return new ColGroupConst();
case EMPTY:
Expand Down
@@ -0,0 +1,43 @@
package org.apache.sysds.runtime.compress.colgroup.dictionary;

import org.apache.commons.lang.NotImplementedException;
import org.apache.sysds.runtime.functionobjects.Divide;
import org.apache.sysds.runtime.functionobjects.Multiply;
import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.functionobjects.Minus;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;

/**
* This dictionary class is a specialization for the DeltaDDCColgroup. Here the adjustments for operations for the delta
* encoded values are implemented.
*/
public class DeltaDictionary extends Dictionary {

private final int _numCols;

public DeltaDictionary(double[] values, int numCols) {
super(values);
_numCols = numCols;
}

@Override
public DeltaDictionary applyScalarOp(ScalarOperator op) {
final double[] retV = new double[_values.length];
if (op.fn instanceof Multiply || op.fn instanceof Divide) {
for(int i = 0; i < _values.length; i++)
retV[i] = op.executeScalar(_values[i]);
}
else if (op.fn instanceof Plus || op.fn instanceof Minus) {
// With Plus and Minus only the first row needs to be updated when delta encoded
for(int i = 0; i < _values.length; i++) {
if (i < _numCols)
retV[i] = op.executeScalar(_values[i]);
else
retV[i] = _values[i];
}
} else
throw new NotImplementedException();

return new DeltaDictionary(retV, _numCols);
}
}
Expand Up @@ -41,7 +41,7 @@ public class Dictionary extends ADictionary {

private static final long serialVersionUID = -6517136537249507753L;

private final double[] _values;
protected final double[] _values;

public Dictionary(double[] values) {
if(values == null || values.length == 0)
Expand Down
Expand Up @@ -66,15 +66,15 @@ else if(nrColumns > 1 && tupleSparsity < 0.4)
return Dictionary.getInMemorySize(nrValues * nrColumns);
}

public static ADictionary create(DblArrayCountHashMap map, int nCols, boolean addZeroTuple) {
public static ADictionary create(DblArrayCountHashMap map, int nCols, boolean addZeroTuple, boolean deltaEncoded) {
final ArrayList<DArrCounts> vals = map.extractValues();
final int nVals = vals.size();
final double[] resValues = new double[(nVals + (addZeroTuple ? 1 : 0)) * nCols];
for(int i = 0; i < nVals; i++) {
final DArrCounts dac = vals.get(i);
System.arraycopy(dac.key.getData(), 0, resValues, dac.id * nCols, nCols);
}
return new Dictionary(resValues);
return deltaEncoded ? new DeltaDictionary(resValues, nCols) : new Dictionary(resValues);
}

public static ADictionary create(ABitmap ubm) {
Expand Down
Expand Up @@ -157,6 +157,17 @@ public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexe
return estimateCompressedColGroupSize(colIndexes, 8, worstCaseUpperBound(colIndexes));
}

/**
* Method for extracting Compressed Size Info of specified columns as delta encodings (delta from previous rows
* values), together in a single ColGroup
*
* @param colIndexes The columns to group together inside a ColGroup
* @return The CompressedSizeInformation associated with the selected ColGroups as delta encoding.
*/
public CompressedSizeInfoColGroup estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes) {
return estimateCompressedColGroupSize(colIndexes, 8, worstCaseUpperBound(colIndexes));
}

/**
* A method to extract the Compressed Size Info for a given list of columns, This method further limits the estimated
* number of unique values, since in some cases the estimated number of uniques is estimated higher than the number
Expand All @@ -169,11 +180,30 @@ public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexe
* in the sense that if the sample is small then this unique can be manually edited like in
* CoCodeCostMatrixMult.
*
* @return The CompressedSizeInfoColGroup fro the given column indexes.
* @return The CompressedSizeInfoColGroup for the given column indexes.
*/
public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate,
int nrUniqueUpperBound);

/**
* A method to extract the Compressed Size Info for a given list of columns, This method further limits the estimated
* number of unique values, since in some cases the estimated number of uniques is estimated higher than the number
* estimated in sub groups of the given colIndexes.
*
* The Difference for this method is that it extract the values as delta values from the matrix block input.
*
* @param colIndexes The columns to extract compression information from
* @param estimate An estimate of number of unique delta elements in these columns
* @param nrUniqueUpperBound The upper bound of unique elements allowed in the estimate, can be calculated from the
* number of unique elements estimated in sub columns multiplied together. This is flexible
* in the sense that if the sample is small then this unique can be manually edited like in
* CoCodeCostMatrixMult.
*
* @return The CompressedSizeInfoColGroup for the given column indexes.
*/
public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate,
int nrUniqueUpperBound);

/**
* Join two analyzed column groups together. without materializing the dictionaries of either side.
*
Expand Down
Expand Up @@ -42,6 +42,16 @@ public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexe
return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, map);
}

@Override
public CompressedSizeInfoColGroup estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate,
int nrUniqueUpperBound) {
final int _numRows = getNumRows();
final IEncode map = IEncode.createFromMatrixBlockDelta(_data, _cs.transposed, colIndexes);
final EstimationFactors em = map.computeSizeEstimation(colIndexes, _numRows, _data.getSparsity(),
_data.getSparsity());
return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, map);
}

@Override
protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
Expand All @@ -65,4 +75,5 @@ protected int worstCaseUpperBound(int[] columns) {
public final int getSampleSize() {
return getNumRows();
}

}

0 comments on commit 31f2653

Please sign in to comment.