Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/org/apache/sysds/conf/ConfigurationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ public static boolean isParallelMatrixOperations() {
return getCompilerConfigFlag(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS);
}

public static boolean isParallelTransform() {
return getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE);
}

public static boolean isParallelParFor() {
return getCompilerConfigFlag(ConfigType.PARALLEL_LOCAL_OR_REMOTE_PARFOR);
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/apache/sysds/conf/DMLConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class DMLConfig
public static final String DEFAULT_BLOCK_SIZE = "sysds.defaultblocksize";
public static final String CP_PARALLEL_OPS = "sysds.cp.parallel.ops";
public static final String CP_PARALLEL_IO = "sysds.cp.parallel.io";
public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply
public static final String COMPRESSED_LINALG = "sysds.compressed.linalg";
public static final String COMPRESSED_LOSSY = "sysds.compressed.lossy";
public static final String COMPRESSED_VALID_COMPRESSIONS = "sysds.compressed.valid.compressions";
Expand Down Expand Up @@ -125,6 +126,7 @@ public class DMLConfig
_defaultVals.put(DEFAULT_BLOCK_SIZE, String.valueOf(OptimizerUtils.DEFAULT_BLOCKSIZE) );
_defaultVals.put(CP_PARALLEL_OPS, "true" );
_defaultVals.put(CP_PARALLEL_IO, "true" );
_defaultVals.put(PARALLEL_ENCODE, "false" );
_defaultVals.put(COMPRESSED_LINALG, Compression.CompressConfig.FALSE.name() );
_defaultVals.put(COMPRESSED_LOSSY, "false" );
_defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "SDC,DDC");
Expand Down Expand Up @@ -398,7 +400,7 @@ public static DMLConfig readConfigurationFile(String configPath)
public String getConfigInfo() {
String[] tmpConfig = new String[] {
LOCAL_TMP_DIR,SCRATCH_SPACE,OPTIMIZATION_LEVEL, DEFAULT_BLOCK_SIZE,
CP_PARALLEL_OPS, CP_PARALLEL_IO, NATIVE_BLAS, NATIVE_BLAS_DIR,
CP_PARALLEL_OPS, CP_PARALLEL_IO, PARALLEL_ENCODE, NATIVE_BLAS, NATIVE_BLAS_DIR,
COMPRESSED_LINALG, COMPRESSED_LOSSY, COMPRESSED_VALID_COMPRESSIONS, COMPRESSED_OVERLAPPING,
COMPRESSED_SAMPLING_RATIO, COMPRESSED_COCODE, COMPRESSED_TRANSPOSE,
CODEGEN, CODEGEN_API, CODEGEN_COMPILER, CODEGEN_OPTIMIZER, CODEGEN_PLANCACHE, CODEGEN_LITERALS,
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/org/apache/sysds/hops/OptimizerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,24 @@ public static int getConstrainedNumThreads(int maxNumThreads)

return ret;
}

public static int getTransformNumThreads(int maxNumThreads)
{
//by default max local parallelism (vcores)
int ret = InfrastructureAnalyzer.getLocalParallelism();

//apply external max constraint (e.g., set by parfor or other rewrites)
if( maxNumThreads > 0 ) {
ret = Math.min(ret, maxNumThreads);
}

//check if enabled in config.xml
if( !ConfigurationManager.isParallelTransform() ) {
ret = 1;
}

return ret;
}

public static Level getDefaultLogLevel() {
Level log = Logger.getRootLogger().getLevel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1344,11 +1344,6 @@ public void quickSetValue(int r, int c, double v) {
throw new DMLCompressionException("Should not set a value on a compressed Matrix");
}

@Override
public void quickSetValueThreadSafe(int r, int c, double v) {
throw new DMLCompressionException("Thread safe execution does not work on Compressed Matrix");
}

@Override
public double quickGetValueThreadSafe(int r, int c) {
throw new DMLCompressionException("Thread safe execution does not work on Compressed Matrix");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.instructions.InstructionUtils;
Expand Down Expand Up @@ -85,7 +86,8 @@ public void processInstruction(ExecutionContext ec) {

// execute block transform encode
MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, colnames, fin.getNumColumns(), null);
MatrixBlock data = encoder.encode(fin); // build and apply
// TODO: Assign #threads in compiler and pass via the instruction string
MatrixBlock data = encoder.encode(fin, OptimizerUtils.getTransformNumThreads(-1)); // build and apply
FrameBlock meta = encoder.getMetaData(new FrameBlock(fin.getNumColumns(), ValueType.STRING));
meta.setColumnNames(colnames);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@

public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizable {
// private static final Log LOG = LogFactory.getLog(MatrixBlock.class.getName());

private static final long serialVersionUID = 7319972089143154056L;

//sparsity nnz threshold, based on practical experiments on space consumption and performance
Expand Down Expand Up @@ -654,27 +654,6 @@ public void quickSetValue(int r, int c, double v)
}
}

/**
* Thread save set.
* Blocks need to be allocated, and in case of MCSR sparse, all rows
* that are going to be accessed need to be allocated as well.
*
* @param r row
* @param c column
* @param v value
*/
public void quickSetValueThreadSafe(int r, int c, double v) {
if(sparse) {
if(!(sparseBlock instanceof SparseBlockMCSR))
throw new RuntimeException("Only MCSR Blocks are supported for Multithreaded sparse set.");
synchronized (sparseBlock.get(r)) {
sparseBlock.set(r,c,v);
}
}
else
denseBlock.set(r,c,v);
}

public double quickGetValueThreadSafe(int r, int c) {
if(sparse) {
if(!(sparseBlock instanceof SparseBlockMCSR))
Expand Down Expand Up @@ -976,7 +955,7 @@ public double sum() {

/**
* Wrapper method for single threaded reduceall-colSum of a matrix.
*
*
* @return A new MatrixBlock containing the column sums of this matrix.
*/
public MatrixBlock colSum() {
Expand All @@ -986,7 +965,7 @@ public MatrixBlock colSum() {

/**
* Wrapper method for single threaded reduceall-rowSum of a matrix.
*
*
* @return A new MatrixBlock containing the row sums of this matrix.
*/
public MatrixBlock rowSum(){
Expand Down Expand Up @@ -1422,7 +1401,7 @@ public void copy(MatrixValue thatValue, boolean sp)
throw new RuntimeException( "Copy must not overwrite itself!" );
if(that instanceof CompressedMatrixBlock)
that = CompressedMatrixBlock.getUncompressed(that, "Copy not effecient into a MatrixBlock");

rlen=that.rlen;
clen=that.clen;
sparse=sp;
Expand Down Expand Up @@ -2935,7 +2914,7 @@ public MatrixBlock binaryOperations(BinaryOperator op, MatrixValue thatValue, Ma
LibMatrixBincell.isValidDimensionsBinary(this, that);
if(thatValue instanceof CompressedMatrixBlock)
return ((CompressedMatrixBlock) thatValue).binaryOperationsLeft(op, this, result);

//compute output dimensions
boolean outer = (LibMatrixBincell.getBinaryAccessType(this, that)
== BinaryAccessType.OUTER_VECTOR_VECTOR);
Expand Down Expand Up @@ -2980,7 +2959,7 @@ public MatrixBlock ternaryOperations(TernaryOperator op, MatrixBlock m2, MatrixB
m2 = ((CompressedMatrixBlock) m2).getUncompressed("Ternay Operator arg2 " + op.fn.getClass().getSimpleName());
if(m3 instanceof CompressedMatrixBlock)
m3 = ((CompressedMatrixBlock) m3).getUncompressed("Ternay Operator arg3 " + op.fn.getClass().getSimpleName());

//prepare inputs
final boolean s1 = (rlen==1 && clen==1);
final boolean s2 = (m2.rlen==1 && m2.clen==1);
Expand Down Expand Up @@ -3674,7 +3653,7 @@ public void checkDimensionsForAppend(MatrixBlock[] in, boolean cbind) {
"Invalid nCol dimension for append rbind: was " + in[i].clen + " should be: " + clen);
}
}

public static MatrixBlock naryOperations(Operator op, MatrixBlock[] matrices, ScalarObject[] scalars, MatrixBlock ret) {
//note: currently only min max, plus supported and hence specialized implementation
//prepare operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.sysds.runtime.transform.encode;

import static org.apache.sysds.runtime.transform.encode.EncoderFactory.getEncoderType;
import static org.apache.sysds.runtime.util.UtilFunctions.getBlockSizes;

import java.io.Externalizable;
import java.io.IOException;
Expand All @@ -45,14 +46,32 @@
*/
public abstract class ColumnEncoder implements Externalizable, Encoder, Comparable<ColumnEncoder> {
protected static final Log LOG = LogFactory.getLog(ColumnEncoder.class.getName());
protected static final int APPLY_ROW_BLOCKS_PER_COLUMN = 1;
public static int BUILD_ROW_BLOCKS_PER_COLUMN = 1;
private static final long serialVersionUID = 2299156350718979064L;
protected int _colID;

protected ColumnEncoder(int colID) {
_colID = colID;
}

public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol);
/**
* Apply Functions are only used in Single Threaded or Multi-Threaded Dense context.
* That's why there is no regard for MT sparse!
*
* @param in Input Block
* @param out Output Matrix
* @param outputCol The output column for the given column
* @return same as out
*
*/
public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol){
return apply(in, out, outputCol, 0, -1);
}

public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol){
return apply(in, out, outputCol, 0, -1);
}

public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk);

Expand Down Expand Up @@ -172,18 +191,18 @@ public int compareTo(ColumnEncoder o) {
* complete if all previous tasks are done. This is so that we can use the last task as a dependency for the whole
* build, reducing unnecessary dependencies.
*/
public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
List<Callable<Object>> tasks = new ArrayList<>();
List<List<? extends Callable<?>>> dep = null;
if(blockSize <= 0 || blockSize >= in.getNumRows()) {
int nRows = in.getNumRows();
int[] blockSizes = getBlockSizes(nRows, getNumBuildRowPartitions());
if(blockSizes.length == 1) {
tasks.add(getBuildTask(in));
}
else {
HashMap<Integer, Object> ret = new HashMap<>();
for(int i = 0; i < in.getNumRows(); i = i + blockSize)
tasks.add(getPartialBuildTask(in, i, blockSize, ret));
if(in.getNumRows() % blockSize != 0)
tasks.add(getPartialBuildTask(in, in.getNumRows() - in.getNumRows() % blockSize, -1, ret));
for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++)
tasks.add(getPartialBuildTask(in, startRow, blockSizes[i], ret));
tasks.add(getPartialMergeBuildTask(ret));
dep = new ArrayList<>(Collections.nCopies(tasks.size() - 1, null));
dep.add(tasks.subList(0, tasks.size() - 1));
Expand All @@ -198,24 +217,63 @@ public Callable<Object> getBuildTask(FrameBlock in) {
public Callable<Object> getPartialBuildTask(FrameBlock in, int startRow, int blockSize,
HashMap<Integer, Object> ret) {
throw new DMLRuntimeException(
"Trying to get the PartialBuild task of an Encoder which does not support " + "partial building");
"Trying to get the PartialBuild task of an Encoder which does not support partial building");
}

public Callable<Object> getPartialMergeBuildTask(HashMap<Integer, ?> ret) {
throw new DMLRuntimeException(
"Trying to get the BuildMergeTask task of an Encoder which does not support " + "partial building");
"Trying to get the BuildMergeTask task of an Encoder which does not support partial building");
}

public List<DependencyTask<?>> getApplyTasks(FrameBlock in, MatrixBlock out, int outputCol) {
List<Callable<Object>> tasks = new ArrayList<>();
tasks.add(new ColumnApplyTask(this, in, out, outputCol));
return DependencyThreadPool.createDependencyTasks(tasks, null);
return getApplyTasks(in, null, out, outputCol);
}

public List<DependencyTask<?>> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) {
return getApplyTasks(null, in, out, outputCol);
}

private List<DependencyTask<?>> getApplyTasks(FrameBlock inF, MatrixBlock inM, MatrixBlock out, int outputCol){
List<Callable<Object>> tasks = new ArrayList<>();
tasks.add(new ColumnApplyTask(this, in, out, outputCol));
return DependencyThreadPool.createDependencyTasks(tasks, null);
List<List<? extends Callable<?>>> dep = null;
if ((inF != null && inM != null) || (inF == null && inM == null))
throw new DMLRuntimeException("getApplyTasks needs to be called with either FrameBlock input " +
"or MatrixBlock input");
int nRows = inF == null ? inM.getNumRows() : inF.getNumRows();
int[] blockSizes = getBlockSizes(nRows, getNumApplyRowPartitions());
for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++){
if(inF != null)
if(out.isInSparseFormat())
tasks.add(getSparseTask(inF, out, outputCol, startRow, blockSizes[i]));
else
tasks.add(new ColumnApplyTask<>(this, inF, out, outputCol, startRow, blockSizes[i]));
else
if(out.isInSparseFormat())
tasks.add(getSparseTask(inM, out, outputCol, startRow, blockSizes[i]));
else
tasks.add(new ColumnApplyTask<>(this, inM, out, outputCol, startRow, blockSizes[i]));
}
if(tasks.size() > 1){
dep = new ArrayList<>(Collections.nCopies(tasks.size(), null));
tasks.add(() -> null); // Empty task as barrier
dep.add(tasks.subList(0, tasks.size()-1));
}

return DependencyThreadPool.createDependencyTasks(tasks, dep);
}

protected abstract ColumnApplyTask<? extends ColumnEncoder>
getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk);

protected abstract ColumnApplyTask<? extends ColumnEncoder>
getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk);

protected int getNumApplyRowPartitions(){
return APPLY_ROW_BLOCKS_PER_COLUMN;
}

protected int getNumBuildRowPartitions(){
return BUILD_ROW_BLOCKS_PER_COLUMN;
}

public enum EncoderType {
Expand All @@ -226,39 +284,54 @@ public enum EncoderType {
* This is the base Task for each column apply. If no custom "getApplyTasks" is implemented in an Encoder this task
* will be used.
*/
private static class ColumnApplyTask implements Callable<Object> {
protected static class ColumnApplyTask<T extends ColumnEncoder> implements Callable<Object> {

protected final T _encoder;
protected final FrameBlock _inputF;
protected final MatrixBlock _inputM;
protected final MatrixBlock _out;
protected final int _outputCol;
protected final int _startRow;
protected final int _blk;

protected ColumnApplyTask(T encoder, FrameBlock input, MatrixBlock out, int outputCol){
this(encoder, input, out, outputCol, 0, -1);
}

private final ColumnEncoder _encoder;
private final FrameBlock _inputF;
private final MatrixBlock _inputM;
private final MatrixBlock _out;
private final int _outputCol;
protected ColumnApplyTask(T encoder, MatrixBlock input, MatrixBlock out, int outputCol){
this(encoder, input, out, outputCol, 0, -1);
}

protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock input, MatrixBlock out, int outputCol) {
_encoder = encoder;
_inputF = input;
_inputM = null;
_out = out;
_outputCol = outputCol;
protected ColumnApplyTask(T encoder, FrameBlock input, MatrixBlock out, int outputCol, int startRow, int blk) {
this(encoder, input, null, out, outputCol, startRow, blk);
}

protected ColumnApplyTask(ColumnEncoder encoder, MatrixBlock input, MatrixBlock out, int outputCol) {
protected ColumnApplyTask(T encoder, MatrixBlock input, MatrixBlock out, int outputCol, int startRow, int blk) {
this(encoder, null, input, out, outputCol, startRow, blk);
}
private ColumnApplyTask(T encoder, FrameBlock inputF, MatrixBlock inputM, MatrixBlock out, int outputCol,
int startRow, int blk){
_encoder = encoder;
_inputM = input;
_inputF = null;
_inputM = inputM;
_inputF = inputF;
_out = out;
_outputCol = outputCol;
_startRow = startRow;
_blk = blk;
}

@Override
public Void call() throws Exception {
public Object call() throws Exception {
assert _outputCol >= 0;
int _rowStart = 0;
int _blk = -1;
if(_out.isInSparseFormat()){
// this is an issue since most sparse Tasks modify the sparse structure so normal get and set calls are
// not possible.
throw new DMLRuntimeException("ColumnApplyTask called although output is in sparse format.");
}
if(_inputF == null)
_encoder.apply(_inputM, _out, _outputCol, _rowStart, _blk);
_encoder.apply(_inputM, _out, _outputCol, _startRow, _blk);
else
_encoder.apply(_inputF, _out, _outputCol, _rowStart, _blk);
_encoder.apply(_inputF, _out, _outputCol, _startRow, _blk);
return null;
}

Expand Down
Loading