Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/org/apache/sysds/hops/ReorgOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ else if( getDim1()==1 && getDim2()==1 )
}


@Override
public void computeMemEstimate(MemoTable memo){
if(_op == ReOrgOp.TRANS && getInput().get(0).isCompressedOutput() )
_outputMemEstimate = getInput().get(0).getCompressedSize();
else
super.computeMemEstimate(memo);
}

@Override
protected double computeOutputMemEstimate( long dim1, long dim2, long nnz ) {
//no dedicated mem estimation per op type, because always propagated via refreshSizeInformation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.hops.OptimizerUtils;
Expand All @@ -36,6 +38,7 @@
* workload-aware compression planning.
*/
public class IPAPassCompressionWorkloadAnalysis extends IPAPass {
private static final Log LOG = LogFactory.getLog(IPAPassCompressionWorkloadAnalysis.class.getName());

@Override
public boolean isApplicable(FunctionCallGraph fgraph) {
Expand All @@ -57,13 +60,13 @@ public boolean rewriteProgram(DMLProgram prog, FunctionCallGraph fgraph, Functio
final WTreeRoot tree = e.getValue();
final CostEstimatorBuilder b = new CostEstimatorBuilder(tree);
final boolean shouldCompress = b.shouldTryToCompress();
if(LOG.isTraceEnabled())
LOG.trace("IPAPass Should Compress:\n" + tree + "\n" + b + "\n Should Compress: " + shouldCompress);

// Filter out compression plans that is known to be bad
if(shouldCompress)
tree.getRoot().setRequiresCompression(tree);

else if(LOG.isTraceEnabled())
LOG.trace("IPAPass Says no Compress:\n" + tree + "\n" + b);
else if(LOG.isDebugEnabled())
LOG.debug("IPApass Says no Compress:\n" + tree.getRoot() + "\n" + b);
}

return map != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public MatrixBlock decompress() {
* @param k degree of parallelism
* @return a new uncompressed matrix block containing the contents of this block
*/
public MatrixBlock decompress(int k) {
public synchronized MatrixBlock decompress(int k) {
// Early out if empty.
if(isEmpty())
return new MatrixBlock(rlen, clen, true, 0);
Expand Down Expand Up @@ -508,11 +508,16 @@ public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType

@Override
public MatrixBlock replaceOperations(MatrixValue result, double pattern, double replacement) {
if(isOverlapping()) {
if(Double.isInfinite(pattern)) {
LOG.info("Ignoring replace infinite in compression since it does not contain this value");
return this;
}
else if(isOverlapping()) {
final String message = "replaceOperations " + pattern + " -> " + replacement;
return getUncompressed(message).replaceOperations(result, pattern, replacement);
}
else {

CompressedMatrixBlock ret = new CompressedMatrixBlock(getNumRows(), getNumColumns());
final List<AColGroup> prev = getColGroups();
final int colGroupsLength = prev.size();
Expand Down Expand Up @@ -726,14 +731,14 @@ public CM_COV_Object cmOperations(CMOperator op, MatrixBlock weights) {
@Override
public CM_COV_Object covOperations(COVOperator op, MatrixBlock that) {
MatrixBlock right = getUncompressed(that);
return getUncompressed("covOperations").covOperations(op, right);
return getUncompressed("covOperations", op.getNumThreads()).covOperations(op, right);
}

@Override
public CM_COV_Object covOperations(COVOperator op, MatrixBlock that, MatrixBlock weights) {
MatrixBlock right1 = getUncompressed(that);
MatrixBlock right2 = getUncompressed(weights);
return getUncompressed("covOperations").covOperations(op, right1, right2);
return getUncompressed("covOperations", op.getNumThreads()).covOperations(op, right1, right2);
}

@Override
Expand Down Expand Up @@ -866,9 +871,11 @@ public MatrixBlock ternaryOperations(TernaryOperator op, MatrixBlock m2, MatrixB
}

if(m2 instanceof CompressedMatrixBlock)
m2 = ((CompressedMatrixBlock) m2).getUncompressed("Ternary Operator arg2 " + op.fn.getClass().getSimpleName());
m2 = ((CompressedMatrixBlock) m2).getUncompressed("Ternary Operator arg2 " + op.fn.getClass().getSimpleName(),
op.getNumThreads());
if(m3 instanceof CompressedMatrixBlock)
m3 = ((CompressedMatrixBlock) m3).getUncompressed("Ternary Operator arg3 " + op.fn.getClass().getSimpleName());
m3 = ((CompressedMatrixBlock) m3).getUncompressed("Ternary Operator arg3 " + op.fn.getClass().getSimpleName(),
op.getNumThreads());

if(s2 != s3 && (op.fn instanceof PlusMultiply || op.fn instanceof MinusMultiply)) {
// SPECIAL CASE for sparse-dense combinations of common +* and -*
Expand Down Expand Up @@ -933,20 +940,26 @@ public static MatrixBlock getUncompressed(MatrixValue mVal, String message) {
}

public MatrixBlock getUncompressed() {
MatrixBlock d_compressed = getCachedDecompressed();
return getUncompressed((String) null);
}

public MatrixBlock getUncompressed(String operation) {
return getUncompressed(operation,
ConfigurationManager.isParallelMatrixOperations() ? InfrastructureAnalyzer.getLocalParallelism() : 1);
}

public MatrixBlock getUncompressed(String operation, int k) {
final MatrixBlock d_compressed = getCachedDecompressed();
if(d_compressed != null)
return d_compressed;
else if(isEmpty())
// Print warning if we do not have a cached decompressed version.
if(operation != null)
printDecompressWarning(operation);

if(isEmpty())
return new MatrixBlock(getNumRows(), getNumColumns(), true);
else if(ConfigurationManager.isParallelMatrixOperations())
return this.decompress(InfrastructureAnalyzer.getLocalParallelism());
else
return this.decompress(1);
}

public MatrixBlock getUncompressed(String operation) {
printDecompressWarning(operation);
return getUncompressed();
return this.decompress(k);
}

private static void printDecompressWarning(String operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.lib.CLALibUtils;
import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap;
import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.workload.WTreeRoot;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
Expand Down Expand Up @@ -108,6 +106,10 @@ public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
return compress(mb, 1, new CompressionSettingsBuilder(), root);
}

public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, CostEstimatorBuilder csb) {
return compress(mb, 1, new CompressionSettingsBuilder(), csb);
}

public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
CompressionSettingsBuilder customSettings) {
return compress(mb, 1, customSettings, (WTreeRoot) null);
Expand All @@ -121,6 +123,10 @@ public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
return compress(mb, k, new CompressionSettingsBuilder(), root);
}

public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k, CostEstimatorBuilder csb) {
return compress(mb, k, new CompressionSettingsBuilder(), csb);
}

public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, ACostEstimate costEstimator) {
return compress(mb, 1, new CompressionSettingsBuilder(), costEstimator);
}
Expand Down Expand Up @@ -236,6 +242,10 @@ else if(mb.isEmpty()) // empty input return empty compression
if(compressionGroups == null)
return abortCompression();

// clear extra data from analysis
compressionGroups.clearMaps();
informationExtractor.clearNNZ();

transposePhase();
compressPhase();
finalizePhase();
Expand All @@ -252,6 +262,13 @@ private void classifyPhase() {
// Compute the individual columns cost information
compressionGroups = informationExtractor.computeCompressedSizeInfos(k);

if(LOG.isTraceEnabled()) {
LOG.trace("Logging all individual columns estimated cost:");
for(CompressedSizeInfoColGroup g : compressionGroups.getInfo())
LOG.trace(String.format("Cost: %8.0f Size: %16d %15s", costEstimator.getCost(g), g.getMinSize(),
Arrays.toString(g.getColumns())));
}

_stats.estimatedSizeCols = compressionGroups.memoryEstimate();
_stats.estimatedCostCols = costEstimator.getCost(compressionGroups);

Expand Down Expand Up @@ -309,7 +326,8 @@ private void coCodePhase() {
}

private void transposePhase() {
if(!compSettings.transposed) {
final boolean haveMemory = Runtime.getRuntime().freeMemory() - (mb.estimateSizeInMemory() * 2) > 0;
if(!compSettings.transposed && haveMemory) {
transposeHeuristics();
if(compSettings.transposed) {
boolean sparse = mb.isInSparseFormat();
Expand All @@ -333,8 +351,8 @@ private void transposeHeuristics() {
if(mb.isInSparseFormat()) {
boolean haveManyColumns = mb.getNumColumns() > 10000;
boolean isNnzLowAndVerySparse = mb.getNonZeros() < 1000 && mb.getSparsity() < 0.4;
boolean isAboveRowNumbers = mb.getNumRows() > 500000;
boolean isAboveThreadToColumnRatio = compressionGroups.getNumberColGroups() > mb.getNumColumns() / 4;
boolean isAboveRowNumbers = mb.getNumRows() > 500000 && mb.getSparsity() < 0.4;
boolean isAboveThreadToColumnRatio = compressionGroups.getNumberColGroups() > mb.getNumColumns() / 30;
compSettings.transposed = haveManyColumns || isNnzLowAndVerySparse ||
(isAboveRowNumbers && isAboveThreadToColumnRatio);
}
Expand All @@ -351,15 +369,14 @@ private void compressPhase() {
}

private void finalizePhase() {

CLALibUtils.combineConstColumns(res);
res.cleanupBlock(true, true);

_stats.compressedSize = res.getInMemorySize();
_stats.compressedCost = costEstimator.getCost(res.getColGroups(), res.getNumRows());

final double ratio = _stats.getRatio();
final double denseRatio = _stats.getDenseRatio();

if(ratio < 1 && denseRatio < 100.0) {
LOG.info("--dense size: " + _stats.denseSize);
LOG.info("--original size: " + _stats.originalSize);
Expand All @@ -374,14 +391,16 @@ private void finalizePhase() {

_stats.setColGroupsCounts(res.getColGroups());

if(compSettings.isInSparkInstruction)
res.clearSoftReferenceToDecompressed();

final long oldNNZ = mb.getNonZeros();
if(oldNNZ <= 0)
res.setNonZeros(oldNNZ);
else
if(oldNNZ <= 0L)
res.recomputeNonZeros();
else
res.setNonZeros(oldNNZ);

logPhase();

}

private Pair<MatrixBlock, CompressionStatistics> abortCompression() {
Expand Down Expand Up @@ -428,28 +447,25 @@ private void logPhase() {
break;
case 3:
LOG.debug("--compression phase " + phase + " Compress : " + getLastTimePhase());
LOG.debug("--compression Hash collisions:" + "(" + DblArrayIntListHashMap.hashMissCount + ","
+ DoubleCountHashMap.hashMissCount + ")");
DblArrayIntListHashMap.hashMissCount = 0;
DoubleCountHashMap.hashMissCount = 0;
LOG.debug("--compressed initial actual size:" + _stats.compressedInitialSize);
break;
case 4:
LOG.debug("--num col groups: " + res.getColGroups().size());
LOG.debug("--compression phase " + phase + " Cleanup : " + getLastTimePhase());
LOG.debug("--col groups types " + _stats.getGroupsTypesString());
LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
LOG.debug(String.format("--dense size: %16d" , _stats.denseSize));
LOG.debug(String.format("--original size: %16d" , _stats.originalSize));
LOG.debug(String.format("--compressed size: %16d" , _stats.compressedSize));
LOG.debug(String.format("--dense size: %16d", _stats.denseSize));
LOG.debug(String.format("--original size: %16d", _stats.originalSize));
LOG.debug(String.format("--compressed size: %16d", _stats.compressedSize));
LOG.debug(String.format("--compression ratio: %4.3f", _stats.getRatio()));
LOG.debug(String.format("--Dense ratio: %4.3f", _stats.getDenseRatio()));
if(!(costEstimator instanceof MemoryCostEstimator)) {
LOG.debug(String.format("--original cost: %5.2E" , _stats.originalCost));
LOG.debug(String.format("--single col cost: %5.2E" , _stats.estimatedCostCols));
LOG.debug(String.format("--cocode cost: %5.2E" , _stats.estimatedCostCoCoded));
LOG.debug(String.format("--actual cost: %5.2E" , _stats.compressedCost));
LOG.debug(String.format("--relative cost: %1.4f" , (_stats.compressedCost / _stats.originalCost)));
LOG.debug(String.format("--original cost: %5.2E", _stats.originalCost));
LOG.debug(String.format("--single col cost: %5.2E", _stats.estimatedCostCols));
LOG.debug(String.format("--cocode cost: %5.2E", _stats.estimatedCostCoCoded));
LOG.debug(String.format("--actual cost: %5.2E", _stats.compressedCost));
LOG.debug(
String.format("--relative cost: %1.4f", (_stats.compressedCost / _stats.originalCost)));
}
if(compressionGroups.getInfo().size() < 1000) {
int[] lengths = new int[res.getColGroups().size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public CompressionSettingsBuilder addValidCompression(CompressionType cp) {
* @return The CompressionSettingsBuilder
*/
public CompressionSettingsBuilder clearValidCompression() {
this.validCompressions = EnumSet.of(CompressionType.UNCOMPRESSED);
this.validCompressions = EnumSet.of(CompressionType.UNCOMPRESSED, CompressionType.EMPTY, CompressionType.CONST);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.sysds.runtime.compress.bitmap;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -72,7 +73,7 @@ public static ABitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, bool

private static ABitmap extractBitmapSingleColumn(int colIndex, MatrixBlock rawBlock, int numRows, boolean transposed,
int est, boolean sort) {
if(transposed){
if(transposed) {
if(rawBlock.isInSparseFormat() && rawBlock.getSparseBlock().isEmpty(colIndex))
return null;
return makeSingleColBitmap(extractSingleColT(colIndex, rawBlock, est), rawBlock.getNumColumns(), sort);
Expand All @@ -84,10 +85,23 @@ private static ABitmap extractBitmapSingleColumn(int colIndex, MatrixBlock rawBl
private static DoubleIntListHashMap extractSingleCol(int colIndex, MatrixBlock rawBlock, int estimatedUnique) {
final DoubleIntListHashMap distinctVals = new DoubleIntListHashMap(estimatedUnique);
final int nRows = rawBlock.getNumRows();
final boolean notSparse = !rawBlock.isInSparseFormat();
final int nCols = rawBlock.getNumColumns();

if(notSparse && rawBlock.getDenseBlock().isContiguous()) {
final boolean sparse = rawBlock.isInSparseFormat();

if(sparse) {
final SparseBlock sb = rawBlock.getSparseBlock();
for(int r = 0; r < nRows; r++) {
if(sb.isEmpty(r))
continue;
final int apos = sb.pos(r);
final int alen = sb.size(r) + apos;
final int[] aix = sb.indexes(r);
final int idx = Arrays.binarySearch(aix, apos, alen, colIndex);
if(idx >= 0)
distinctVals.appendValue(sb.values(r)[idx], r);
}
}
else if(rawBlock.getDenseBlock().isContiguous()) {
final double[] values = rawBlock.getDenseBlockValues();
if(nCols == 1)
// Since the only values contained is in this column index. simply extract it continuously.
Expand Down
Loading