diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java index 5a3667c8511..93654b713b8 100644 --- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java +++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java @@ -171,6 +171,10 @@ public static boolean isParallelMatrixOperations() { return getCompilerConfigFlag(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS); } + public static boolean isParallelTransform() { + return getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE); + } + public static boolean isParallelParFor() { return getCompilerConfigFlag(ConfigType.PARALLEL_LOCAL_OR_REMOTE_PARFOR); } diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index 0b7692b1bcc..db595054be7 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -67,6 +67,7 @@ public class DMLConfig public static final String DEFAULT_BLOCK_SIZE = "sysds.defaultblocksize"; public static final String CP_PARALLEL_OPS = "sysds.cp.parallel.ops"; public static final String CP_PARALLEL_IO = "sysds.cp.parallel.io"; + public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply public static final String COMPRESSED_LINALG = "sysds.compressed.linalg"; public static final String COMPRESSED_LOSSY = "sysds.compressed.lossy"; public static final String COMPRESSED_VALID_COMPRESSIONS = "sysds.compressed.valid.compressions"; @@ -125,6 +126,7 @@ public class DMLConfig _defaultVals.put(DEFAULT_BLOCK_SIZE, String.valueOf(OptimizerUtils.DEFAULT_BLOCKSIZE) ); _defaultVals.put(CP_PARALLEL_OPS, "true" ); _defaultVals.put(CP_PARALLEL_IO, "true" ); + _defaultVals.put(PARALLEL_ENCODE, "false" ); _defaultVals.put(COMPRESSED_LINALG, Compression.CompressConfig.FALSE.name() ); _defaultVals.put(COMPRESSED_LOSSY, "false" ); _defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "SDC,DDC"); @@ -398,7 +400,7 @@ public static DMLConfig readConfigurationFile(String configPath) public String getConfigInfo() { String[] tmpConfig = new String[] { LOCAL_TMP_DIR,SCRATCH_SPACE,OPTIMIZATION_LEVEL, DEFAULT_BLOCK_SIZE, - CP_PARALLEL_OPS, CP_PARALLEL_IO, NATIVE_BLAS, NATIVE_BLAS_DIR, + CP_PARALLEL_OPS, CP_PARALLEL_IO, PARALLEL_ENCODE, NATIVE_BLAS, NATIVE_BLAS_DIR, COMPRESSED_LINALG, COMPRESSED_LOSSY, COMPRESSED_VALID_COMPRESSIONS, COMPRESSED_OVERLAPPING, COMPRESSED_SAMPLING_RATIO, COMPRESSED_COCODE, COMPRESSED_TRANSPOSE, CODEGEN, CODEGEN_API, CODEGEN_COMPILER, CODEGEN_OPTIMIZER, CODEGEN_PLANCACHE, CODEGEN_LITERALS, diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java index 01769c700c4..d63b2cbe789 100644 --- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java @@ -1008,6 +1008,24 @@ public static int getConstrainedNumThreads(int maxNumThreads) return ret; } + + public static int getTransformNumThreads(int maxNumThreads) + { + //by default max local parallelism (vcores) + int ret = InfrastructureAnalyzer.getLocalParallelism(); + + //apply external max constraint (e.g., set by parfor or other rewrites) + if( maxNumThreads > 0 ) { + ret = Math.min(ret, maxNumThreads); + } + + //check if enabled in config.xml + if( !ConfigurationManager.isParallelTransform() ) { + ret = 1; + } + + return ret; + } public static Level getDefaultLogLevel() { Level log = Logger.getRootLogger().getLevel(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 2c205f36bba..d374d3c4370 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -1344,11 +1344,6 @@ public void quickSetValue(int r, int c, double v) { throw new DMLCompressionException("Should not set a value on a compressed Matrix"); } - @Override - public void quickSetValueThreadSafe(int r, int c, double v) { - throw new DMLCompressionException("Thread safe execution does not work on Compressed Matrix"); - } - @Override public double quickGetValueThreadSafe(int r, int c) { throw new DMLCompressionException("Thread safe execution does not work on Compressed Matrix"); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java index b6e0d97170c..800aa519f86 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.instructions.InstructionUtils; @@ -85,7 +86,8 @@ public void processInstruction(ExecutionContext ec) { // execute block transform encode MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, colnames, fin.getNumColumns(), null); - MatrixBlock data = encoder.encode(fin); // build and apply + // TODO: Assign #threads in compiler and pass via the instruction string + MatrixBlock data = encoder.encode(fin, OptimizerUtils.getTransformNumThreads(-1)); // build and apply FrameBlock meta = encoder.getMetaData(new FrameBlock(fin.getNumColumns(), ValueType.STRING)); meta.setColumnNames(colnames); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java index 160f8e95b7d..a86a878ca36 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java @@ -117,7 +117,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizable { // private static final Log LOG = LogFactory.getLog(MatrixBlock.class.getName()); - + private static final long serialVersionUID = 7319972089143154056L; //sparsity nnz threshold, based on practical experiments on space consumption and performance @@ -654,27 +654,6 @@ public void quickSetValue(int r, int c, double v) } } - /** - * Thread save set. - * Blocks need to be allocated, and in case of MCSR sparse, all rows - * that are going to be accessed need to be allocated as well. - * - * @param r row - * @param c column - * @param v value - */ - public void quickSetValueThreadSafe(int r, int c, double v) { - if(sparse) { - if(!(sparseBlock instanceof SparseBlockMCSR)) - throw new RuntimeException("Only MCSR Blocks are supported for Multithreaded sparse set."); - synchronized (sparseBlock.get(r)) { - sparseBlock.set(r,c,v); - } - } - else - denseBlock.set(r,c,v); - } - public double quickGetValueThreadSafe(int r, int c) { if(sparse) { if(!(sparseBlock instanceof SparseBlockMCSR)) @@ -976,7 +955,7 @@ public double sum() { /** * Wrapper method for single threaded reduceall-colSum of a matrix. - * + * * @return A new MatrixBlock containing the column sums of this matrix. */ public MatrixBlock colSum() { @@ -986,7 +965,7 @@ public MatrixBlock colSum() { /** * Wrapper method for single threaded reduceall-rowSum of a matrix. - * + * * @return A new MatrixBlock containing the row sums of this matrix. */ public MatrixBlock rowSum(){ @@ -1422,7 +1401,7 @@ public void copy(MatrixValue thatValue, boolean sp) throw new RuntimeException( "Copy must not overwrite itself!" ); if(that instanceof CompressedMatrixBlock) that = CompressedMatrixBlock.getUncompressed(that, "Copy not effecient into a MatrixBlock"); - + rlen=that.rlen; clen=that.clen; sparse=sp; @@ -2935,7 +2914,7 @@ public MatrixBlock binaryOperations(BinaryOperator op, MatrixValue thatValue, Ma LibMatrixBincell.isValidDimensionsBinary(this, that); if(thatValue instanceof CompressedMatrixBlock) return ((CompressedMatrixBlock) thatValue).binaryOperationsLeft(op, this, result); - + //compute output dimensions boolean outer = (LibMatrixBincell.getBinaryAccessType(this, that) == BinaryAccessType.OUTER_VECTOR_VECTOR); @@ -2980,7 +2959,7 @@ public MatrixBlock ternaryOperations(TernaryOperator op, MatrixBlock m2, MatrixB m2 = ((CompressedMatrixBlock) m2).getUncompressed("Ternay Operator arg2 " + op.fn.getClass().getSimpleName()); if(m3 instanceof CompressedMatrixBlock) m3 = ((CompressedMatrixBlock) m3).getUncompressed("Ternay Operator arg3 " + op.fn.getClass().getSimpleName()); - + //prepare inputs final boolean s1 = (rlen==1 && clen==1); final boolean s2 = (m2.rlen==1 && m2.clen==1); @@ -3674,7 +3653,7 @@ public void checkDimensionsForAppend(MatrixBlock[] in, boolean cbind) { "Invalid nCol dimension for append rbind: was " + in[i].clen + " should be: " + clen); } } - + public static MatrixBlock naryOperations(Operator op, MatrixBlock[] matrices, ScalarObject[] scalars, MatrixBlock ret) { //note: currently only min max, plus supported and hence specialized implementation //prepare operator diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java index 33bf4529b2f..e04eeff5e5f 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java @@ -20,6 +20,7 @@ package org.apache.sysds.runtime.transform.encode; import static org.apache.sysds.runtime.transform.encode.EncoderFactory.getEncoderType; +import static org.apache.sysds.runtime.util.UtilFunctions.getBlockSizes; import java.io.Externalizable; import java.io.IOException; @@ -45,6 +46,8 @@ */ public abstract class ColumnEncoder implements Externalizable, Encoder, Comparable { protected static final Log LOG = LogFactory.getLog(ColumnEncoder.class.getName()); + protected static final int APPLY_ROW_BLOCKS_PER_COLUMN = 1; + public static int BUILD_ROW_BLOCKS_PER_COLUMN = 1; private static final long serialVersionUID = 2299156350718979064L; protected int _colID; @@ -52,7 +55,23 @@ protected ColumnEncoder(int colID) { _colID = colID; } - public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol); + /** + * Apply Functions are only used in Single Threaded or Multi-Threaded Dense context. + * That's why there is no regard for MT sparse! + * + * @param in Input Block + * @param out Output Matrix + * @param outputCol The output column for the given column + * @return same as out + * + */ + public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol){ + return apply(in, out, outputCol, 0, -1); + } + + public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol){ + return apply(in, out, outputCol, 0, -1); + } public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk); @@ -172,18 +191,18 @@ public int compareTo(ColumnEncoder o) { * complete if all previous tasks are done. This is so that we can use the last task as a dependency for the whole * build, reducing unnecessary dependencies. */ - public List> getBuildTasks(FrameBlock in, int blockSize) { + public List> getBuildTasks(FrameBlock in) { List> tasks = new ArrayList<>(); List>> dep = null; - if(blockSize <= 0 || blockSize >= in.getNumRows()) { + int nRows = in.getNumRows(); + int[] blockSizes = getBlockSizes(nRows, getNumBuildRowPartitions()); + if(blockSizes.length == 1) { tasks.add(getBuildTask(in)); } else { HashMap ret = new HashMap<>(); - for(int i = 0; i < in.getNumRows(); i = i + blockSize) - tasks.add(getPartialBuildTask(in, i, blockSize, ret)); - if(in.getNumRows() % blockSize != 0) - tasks.add(getPartialBuildTask(in, in.getNumRows() - in.getNumRows() % blockSize, -1, ret)); + for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++) + tasks.add(getPartialBuildTask(in, startRow, blockSizes[i], ret)); tasks.add(getPartialMergeBuildTask(ret)); dep = new ArrayList<>(Collections.nCopies(tasks.size() - 1, null)); dep.add(tasks.subList(0, tasks.size() - 1)); @@ -198,24 +217,63 @@ public Callable getBuildTask(FrameBlock in) { public Callable getPartialBuildTask(FrameBlock in, int startRow, int blockSize, HashMap ret) { throw new DMLRuntimeException( - "Trying to get the PartialBuild task of an Encoder which does not support " + "partial building"); + "Trying to get the PartialBuild task of an Encoder which does not support partial building"); } public Callable getPartialMergeBuildTask(HashMap ret) { throw new DMLRuntimeException( - "Trying to get the BuildMergeTask task of an Encoder which does not support " + "partial building"); + "Trying to get the BuildMergeTask task of an Encoder which does not support partial building"); } public List> getApplyTasks(FrameBlock in, MatrixBlock out, int outputCol) { - List> tasks = new ArrayList<>(); - tasks.add(new ColumnApplyTask(this, in, out, outputCol)); - return DependencyThreadPool.createDependencyTasks(tasks, null); + return getApplyTasks(in, null, out, outputCol); } public List> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) { + return getApplyTasks(null, in, out, outputCol); + } + + private List> getApplyTasks(FrameBlock inF, MatrixBlock inM, MatrixBlock out, int outputCol){ List> tasks = new ArrayList<>(); - tasks.add(new ColumnApplyTask(this, in, out, outputCol)); - return DependencyThreadPool.createDependencyTasks(tasks, null); + List>> dep = null; + if ((inF != null && inM != null) || (inF == null && inM == null)) + throw new DMLRuntimeException("getApplyTasks needs to be called with either FrameBlock input " + + "or MatrixBlock input"); + int nRows = inF == null ? inM.getNumRows() : inF.getNumRows(); + int[] blockSizes = getBlockSizes(nRows, getNumApplyRowPartitions()); + for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++){ + if(inF != null) + if(out.isInSparseFormat()) + tasks.add(getSparseTask(inF, out, outputCol, startRow, blockSizes[i])); + else + tasks.add(new ColumnApplyTask<>(this, inF, out, outputCol, startRow, blockSizes[i])); + else + if(out.isInSparseFormat()) + tasks.add(getSparseTask(inM, out, outputCol, startRow, blockSizes[i])); + else + tasks.add(new ColumnApplyTask<>(this, inM, out, outputCol, startRow, blockSizes[i])); + } + if(tasks.size() > 1){ + dep = new ArrayList<>(Collections.nCopies(tasks.size(), null)); + tasks.add(() -> null); // Empty task as barrier + dep.add(tasks.subList(0, tasks.size()-1)); + } + + return DependencyThreadPool.createDependencyTasks(tasks, dep); + } + + protected abstract ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk); + + protected abstract ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk); + + protected int getNumApplyRowPartitions(){ + return APPLY_ROW_BLOCKS_PER_COLUMN; + } + + protected int getNumBuildRowPartitions(){ + return BUILD_ROW_BLOCKS_PER_COLUMN; } public enum EncoderType { @@ -226,39 +284,54 @@ public enum EncoderType { * This is the base Task for each column apply. If no custom "getApplyTasks" is implemented in an Encoder this task * will be used. */ - private static class ColumnApplyTask implements Callable { + protected static class ColumnApplyTask implements Callable { + + protected final T _encoder; + protected final FrameBlock _inputF; + protected final MatrixBlock _inputM; + protected final MatrixBlock _out; + protected final int _outputCol; + protected final int _startRow; + protected final int _blk; + + protected ColumnApplyTask(T encoder, FrameBlock input, MatrixBlock out, int outputCol){ + this(encoder, input, out, outputCol, 0, -1); + } - private final ColumnEncoder _encoder; - private final FrameBlock _inputF; - private final MatrixBlock _inputM; - private final MatrixBlock _out; - private final int _outputCol; + protected ColumnApplyTask(T encoder, MatrixBlock input, MatrixBlock out, int outputCol){ + this(encoder, input, out, outputCol, 0, -1); + } - protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock input, MatrixBlock out, int outputCol) { - _encoder = encoder; - _inputF = input; - _inputM = null; - _out = out; - _outputCol = outputCol; + protected ColumnApplyTask(T encoder, FrameBlock input, MatrixBlock out, int outputCol, int startRow, int blk) { + this(encoder, input, null, out, outputCol, startRow, blk); } - protected ColumnApplyTask(ColumnEncoder encoder, MatrixBlock input, MatrixBlock out, int outputCol) { + protected ColumnApplyTask(T encoder, MatrixBlock input, MatrixBlock out, int outputCol, int startRow, int blk) { + this(encoder, null, input, out, outputCol, startRow, blk); + } + private ColumnApplyTask(T encoder, FrameBlock inputF, MatrixBlock inputM, MatrixBlock out, int outputCol, + int startRow, int blk){ _encoder = encoder; - _inputM = input; - _inputF = null; + _inputM = inputM; + _inputF = inputF; _out = out; _outputCol = outputCol; + _startRow = startRow; + _blk = blk; } @Override - public Void call() throws Exception { + public Object call() throws Exception { assert _outputCol >= 0; - int _rowStart = 0; - int _blk = -1; + if(_out.isInSparseFormat()){ + // this is an issue since most sparse Tasks modify the sparse structure so normal get and set calls are + // not possible. + throw new DMLRuntimeException("ColumnApplyTask called although output is in sparse format."); + } if(_inputF == null) - _encoder.apply(_inputM, _out, _outputCol, _rowStart, _blk); + _encoder.apply(_inputM, _out, _outputCol, _startRow, _blk); else - _encoder.apply(_inputF, _out, _outputCol, _rowStart, _blk); + _encoder.apply(_inputF, _out, _outputCol, _startRow, _blk); return null; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java index b4d48000040..5736c1e4997 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java @@ -28,11 +28,15 @@ import java.util.HashMap; import java.util.concurrent.Callable; +import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.MutableTriple; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.lops.Lop; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; public class ColumnEncoderBin extends ColumnEncoder { public static final String MIN_PREFIX = "min"; @@ -84,10 +88,13 @@ public double[] getBinMaxs() { @Override public void build(FrameBlock in) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; if(!isApplicable()) return; double[] pairMinMax = getMinMaxOfCol(in, _colID, 0, -1); computeBins(pairMinMax[0], pairMinMax[1]); + if(DMLScript.STATISTICS) + Statistics.incTransformBinningBuildTime(System.nanoTime()-t0); } private static double[] getMinMaxOfCol(FrameBlock in, int colID, int startRow, int blockSize) { @@ -118,6 +125,8 @@ public Callable getPartialMergeBuildTask(HashMap ret) { return new BinMergePartialBuildTask(this, ret); } + + public void computeBins(double min, double max) { // ensure allocated internal transformation metadata if(_binMins == null || _binMaxs == null) { @@ -145,39 +154,47 @@ public void buildPartial(FrameBlock in) { _colMaxs = pairMinMax[1]; } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - - @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { double inVal = UtilFunctions.objectToDouble(in.getSchema()[_colID - 1], in.get(i, _colID - 1)); int ix = Arrays.binarySearch(_binMaxs, inVal); int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1; - out.quickSetValueThreadSafe(i, outputCol, binID); + out.quickSetValue(i, outputCol, binID); } + if (DMLScript.STATISTICS) + Statistics.incTransformBinningApplyTime(System.nanoTime()-t0); return out; } @Override public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; int end = getEndIndex(in.getNumRows(), rowStart, blk); for(int i = rowStart; i < end; i++) { double inVal = in.quickGetValueThreadSafe(i, _colID - 1); int ix = Arrays.binarySearch(_binMaxs, inVal); int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1; - out.quickSetValueThreadSafe(i, outputCol, binID); + out.quickSetValue(i, outputCol, binID); } + if (DMLScript.STATISTICS) + Statistics.incTransformBinningApplyTime(System.nanoTime()-t0); return out; } + @Override + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + return new BinSparseApplyTask(this, in, out, outputCol); + } + + @Override + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException("Sparse Binning for MatrixBlocks not jet implemented"); + } + @Override public void mergeAt(ColumnEncoder other) { if(other instanceof ColumnEncoderBin) { @@ -264,6 +281,43 @@ public void readExternal(ObjectInput in) throws IOException { } } + private static class BinSparseApplyTask extends ColumnApplyTask { + + public BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock input, + MatrixBlock out, int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); + } + + private BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock input, MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } + + public Object call() throws Exception { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + int index = _encoder._colID - 1; + if(_out.getSparseBlock() == null) + return null; + assert _inputF != null; + for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) { + SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r); + double inVal = UtilFunctions.objectToDouble(_inputF.getSchema()[index], _inputF.get(r, index)); + int ix = Arrays.binarySearch(_encoder._binMaxs, inVal); + int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1; + row.values()[index] = binID; + row.indexes()[index] = _outputCol; + } + if(DMLScript.STATISTICS) + Statistics.incTransformBinningApplyTime(System.nanoTime()-t0); + return null; + } + + @Override + public String toString() { + return getClass().getSimpleName() + ""; + } + + } + private static class BinPartialBuildTask implements Callable { private final FrameBlock _input; @@ -284,7 +338,13 @@ protected BinPartialBuildTask(FrameBlock input, int colID, int startRow, int blo @Override public double[] call() throws Exception { - _partialMinMax.put(_startRow, getMinMaxOfCol(_input, _colID, _startRow, _blockSize)); + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + double[] minMax = getMinMaxOfCol(_input, _colID, _startRow, _blockSize); + synchronized (_partialMinMax){ + _partialMinMax.put(_startRow, minMax); + } + if (DMLScript.STATISTICS) + Statistics.incTransformBinningBuildTime(System.nanoTime()-t0); return null; } @@ -306,6 +366,7 @@ private BinMergePartialBuildTask(ColumnEncoderBin encoderBin, HashMap> getApplyTasks(MatrixBlock in, MatrixBlock out, in } @Override - public List> getBuildTasks(FrameBlock in, int blockSize) { + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException(); + } + + @Override + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException(); + } + + @Override + public List> getBuildTasks(FrameBlock in) { List> tasks = new ArrayList<>(); Map depMap = null; for(ColumnEncoder columnEncoder : _columnEncoders) { - List> t = columnEncoder.getBuildTasks(in, blockSize); + List> t = columnEncoder.getBuildTasks(in); if(t == null) continue; // Linear execution between encoders so they can't be built in parallel @@ -178,16 +190,6 @@ public void buildPartial(FrameBlock in) { columnEncoder.buildPartial(in); } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - - @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { try { diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java index 25b2eb9b848..1047f542051 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java @@ -24,17 +24,15 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Callable; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.runtime.DMLRuntimeException; -import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DependencyTask; -import org.apache.sysds.runtime.util.DependencyThreadPool; +import org.apache.sysds.utils.Statistics; public class ColumnEncoderDummycode extends ColumnEncoder { private static final long serialVersionUID = 5832130477659116489L; @@ -60,19 +58,10 @@ public void build(FrameBlock in) { } @Override - public List> getBuildTasks(FrameBlock in, int blockSize) { + public List> getBuildTasks(FrameBlock in) { return null; } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { throw new DMLRuntimeException("Called DummyCoder with FrameBlock"); @@ -80,6 +69,7 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowS @Override public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; // Out Matrix should already be correct size! // append dummy coded or unchanged values to output for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { @@ -89,20 +79,25 @@ public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int row int nCol = outputCol + (int) val - 1; // Setting value to 0 first in case of sparse so the row vector does not need to be resized if(nCol != outputCol) - out.quickSetValueThreadSafe(i, outputCol, 0); - out.quickSetValueThreadSafe(i, nCol, 1); + out.quickSetValue(i, outputCol, 0); + out.quickSetValue(i, nCol, 1); } + if (DMLScript.STATISTICS) + Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0); return out; } + + @Override + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + return new DummycodeSparseApplyTask(this, in, out, outputCol, startRow, blk); + } + @Override - public List> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) { - List> tasks = new ArrayList<>(); - if(out.isInSparseFormat()) - tasks.add(new DummycodeSparseApplyTask(this, in, out, outputCol)); - else - return super.getApplyTasks(in, out, outputCol); - return DependencyThreadPool.createDependencyTasks(tasks, null); + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new DMLRuntimeException("Called DummyCoder with FrameBlock"); } @Override @@ -139,6 +134,7 @@ else if(columnEncoder instanceof ColumnEncoderBin) { if(distinct != -1) { _domainSize = distinct; + LOG.debug("DummyCoder for column: " + _colID + " has domain size: " + _domainSize); } } } @@ -188,48 +184,47 @@ public int getDomainSize() { return _domainSize; } - private static class DummycodeSparseApplyTask implements Callable { - private final ColumnEncoderDummycode _encoder; - private final MatrixBlock _input; - private final MatrixBlock _out; - private final int _outputCol; + private static class DummycodeSparseApplyTask extends ColumnApplyTask { + + protected DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, + MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } - private DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, MatrixBlock out, - int outputCol) { - _encoder = encoder; - _input = input; - _out = out; - _outputCol = outputCol; + protected DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, + MatrixBlock out, int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); } public Object call() throws Exception { - for(int r = 0; r < _input.getNumRows(); r++) { - if(_out.getSparseBlock() == null) - return null; - synchronized(_out.getSparseBlock().get(r)) { - // Since the recoded values are already offset in the output matrix (same as input at this point) - // the dummycoding only needs to offset them within their column domain. Which means that the - // indexes in the SparseRowVector do not need to be sorted anymore and can be updated directly. - // - // Input: Output: - // - // 1 | 0 | 2 | 0 1 | 0 | 0 | 1 - // 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0 - // 1 | 0 | 2 | 0 1 | 0 | 0 | 1 - // 1 | 0 | 1 | 0 1 | 0 | 1 | 0 - // - // Example SparseRowVector Internals (1. row): - // - // indexes = [0,2] ===> indexes = [0,3] - // values = [1,2] values = [1,1] - int index = ((SparseRowVector) _out.getSparseBlock().get(r)).getIndex(_outputCol); - double val = _out.getSparseBlock().get(r).values()[index]; - int nCol = _outputCol + (int) val - 1; - - _out.getSparseBlock().get(r).indexes()[index] = nCol; - _out.getSparseBlock().get(r).values()[index] = 1; - } + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + assert _inputM != null; + if(_out.getSparseBlock() == null) + return null; + for(int r = _startRow; r < getEndIndex(_inputM.getNumRows(), _startRow, _blk); r++) { + // Since the recoded values are already offset in the output matrix (same as input at this point) + // the dummycoding only needs to offset them within their column domain. Which means that the + // indexes in the SparseRowVector do not need to be sorted anymore and can be updated directly. + // + // Input: Output: + // + // 1 | 0 | 2 | 0 1 | 0 | 0 | 1 + // 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0 + // 1 | 0 | 2 | 0 1 | 0 | 0 | 1 + // 1 | 0 | 1 | 0 1 | 0 | 1 | 0 + // + // Example SparseRowVector Internals (1. row): + // + // indexes = [0,2] ===> indexes = [0,3] + // values = [1,2] values = [1,1] + int index = _encoder._colID - 1; + double val = _out.getSparseBlock().get(r).values()[index]; + int nCol = _outputCol + (int) val - 1; + _out.getSparseBlock().get(r).indexes()[index] = nCol; + _out.getSparseBlock().get(r).values()[index] = 1; } + if (DMLScript.STATISTICS) + Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0); return null; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java index 1c74ae52687..d30d8dc404c 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java @@ -26,11 +26,15 @@ import java.io.ObjectOutput; import java.util.List; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DependencyTask; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; /** * Class used for feature hashing transformation of frames. @@ -56,7 +60,7 @@ public ColumnEncoderFeatureHash() { } private long getCode(String key) { - return key.hashCode() % _K; + return (key.hashCode() % _K) + 1; } @Override @@ -65,22 +69,25 @@ public void build(FrameBlock in) { } @Override - public List> getBuildTasks(FrameBlock in, int blockSize) { + public List> getBuildTasks(FrameBlock in) { return null; } @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + return new FeatureHashSparseApplyTask(this, in, out, outputCol, startRow, blk); } @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException("Sparse FeatureHashing for MatrixBlocks not jet implemented"); } @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; // apply feature hashing column wise for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { Object okey = in.get(i, _colID - 1); @@ -88,21 +95,26 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowS if(key == null) throw new DMLRuntimeException("Missing Value encountered in input Frame for FeatureHash"); long code = getCode(key); - out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN); + out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN); } + if(DMLScript.STATISTICS) + Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0); return out; } @Override public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; int end = getEndIndex(in.getNumRows(), rowStart, blk); // apply feature hashing column wise for(int i = rowStart; i < end; i++) { Object okey = in.quickGetValueThreadSafe(i, _colID - 1); String key = okey.toString(); long code = getCode(key); - out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN); + out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN); } + if(DMLScript.STATISTICS) + Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0); return out; } @@ -145,4 +157,40 @@ public void readExternal(ObjectInput in) throws IOException { super.readExternal(in); _K = in.readLong(); } + + public static class FeatureHashSparseApplyTask extends ColumnApplyTask{ + + public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash encoder, FrameBlock input, + MatrixBlock out, int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); + } + + public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash encoder, FrameBlock input, + MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } + + @Override + public Object call() throws Exception { + if(_out.getSparseBlock() == null) + return null; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + int index = _encoder._colID - 1; + assert _inputF != null; + for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++){ + SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r); + Object okey = _inputF.get(r, index); + String key = (okey != null) ? okey.toString() : null; + if(key == null) + throw new DMLRuntimeException("Missing Value encountered in input Frame for FeatureHash"); + long code = _encoder.getCode(key); + row.values()[index] = code; + row.indexes()[index] = _outputCol; + } + if(DMLScript.STATISTICS) + Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0); + return null; + } + } + } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java index 7a8df245795..5c7392c6a3f 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java @@ -21,16 +21,22 @@ import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DependencyTask; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; public class ColumnEncoderPassThrough extends ColumnEncoder { private static final long serialVersionUID = -8473768154646831882L; + private List sparseRowsWZeros = null; protected ColumnEncoderPassThrough(int ptCols) { super(ptCols); // 1-based @@ -40,37 +46,46 @@ public ColumnEncoderPassThrough() { this(-1); } + public List getSparseRowsWZeros(){ + return sparseRowsWZeros; + } + @Override public void build(FrameBlock in) { // do nothing } @Override - public List> getBuildTasks(FrameBlock in, int blockSize) { + public List> getBuildTasks(FrameBlock in) { return null; } @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + return new PassThroughSparseApplyTask(this, in, out, outputCol, startRow, blk); } @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException("Sparse PassThrough for MatrixBlocks not jet implemented"); } @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; int col = _colID - 1; // 1-based ValueType vt = in.getSchema()[col]; for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { Object val = in.get(i, col); double v = (val == null || - (vt == ValueType.STRING && val.toString().isEmpty())) ? Double.NaN : UtilFunctions.objectToDouble(vt, - val); - out.quickSetValueThreadSafe(i, outputCol, v); + (vt == ValueType.STRING && val.toString().isEmpty())) + ? Double.NaN : UtilFunctions.objectToDouble(vt, val); + out.quickSetValue(i, outputCol, v); } + if(DMLScript.STATISTICS) + Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0); return out; } @@ -79,12 +94,15 @@ public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int row // only transfer from in to out if(in == out) return out; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; int col = _colID - 1; // 1-based int end = getEndIndex(in.getNumRows(), rowStart, blk); for(int i = rowStart; i < end; i++) { double val = in.quickGetValueThreadSafe(i, col); - out.quickSetValueThreadSafe(i, outputCol, val); + out.quickSetValue(i, outputCol, val); } + if(DMLScript.STATISTICS) + Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0); return out; } @@ -106,4 +124,58 @@ public FrameBlock getMetaData(FrameBlock meta) { public void initMetaData(FrameBlock meta) { // do nothing } + + public static class PassThroughSparseApplyTask extends ColumnApplyTask{ + + + protected PassThroughSparseApplyTask(ColumnEncoderPassThrough encoder, FrameBlock input, + MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } + + protected PassThroughSparseApplyTask(ColumnEncoderPassThrough encoder, FrameBlock input, MatrixBlock out, + int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); + } + + @Override + public Object call() throws Exception { + if(_out.getSparseBlock() == null) + return null; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + int index = _encoder._colID - 1; + assert _inputF != null; + List sparseRowsWZeros = null; + ValueType vt = _inputF.getSchema()[index]; + for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) { + Object val = _inputF.get(r, index); + double v = (val == null || (vt == ValueType.STRING && val.toString().isEmpty())) ? + Double.NaN : UtilFunctions.objectToDouble(vt, val); + SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r); + if(v == 0) { + if(sparseRowsWZeros == null) + sparseRowsWZeros = new ArrayList<>(); + sparseRowsWZeros.add(r); + } + row.values()[index] = v; + row.indexes()[index] = _outputCol; + } + if(sparseRowsWZeros != null){ + synchronized (_encoder){ + if(_encoder.sparseRowsWZeros == null) + _encoder.sparseRowsWZeros = new ArrayList<>(); + _encoder.sparseRowsWZeros.addAll(sparseRowsWZeros); + } + } + if(DMLScript.STATISTICS) + Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0); + return null; + } + + public String toString() { + return getClass().getSimpleName() + ""; + } + + } + } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java index fd18d868ea2..e190d74438f 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java @@ -33,10 +33,13 @@ import java.util.Objects; import java.util.concurrent.Callable; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.lops.Lop; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.utils.Statistics; public class ColumnEncoderRecode extends ColumnEncoder { private static final long serialVersionUID = 8213163881283341874L; @@ -135,7 +138,11 @@ private long lookupRCDMap(String key) { public void build(FrameBlock in) { if(!isApplicable()) return; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; makeRcdMap(in, _rcdMap, _colID, 0, in.getNumRows()); + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0); + } } @Override @@ -185,19 +192,18 @@ public void buildPartial(FrameBlock in) { _rcdMapPart.remove(""); } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; // FrameBlock is column Major and MatrixBlock row Major this results in cache inefficiencies :( for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { Object okey = in.get(i, _colID - 1); String key = (okey != null) ? okey.toString() : null; long code = lookupRCDMap(key); - out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN); + out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN); + } + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0); } return out; } @@ -209,9 +215,16 @@ public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int row } @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - throw new DMLRuntimeException( - "Recode called with MatrixBlock. Should not happen since Recode is the first " + "encoder in the Stack"); + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk){ + return new RecodeSparseApplyTask(this, in ,out, outputCol, startRow, blk); + } + + @Override + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new DMLRuntimeException("Recode called with MatrixBlock. Should not happen since Recode is the first " + + "encoder in the Stack"); } @Override @@ -313,6 +326,48 @@ public HashMap getRcdMap() { return _rcdMap; } + private static class RecodeSparseApplyTask extends ColumnApplyTask{ + + public RecodeSparseApplyTask(ColumnEncoderRecode encoder, FrameBlock input, MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } + + protected RecodeSparseApplyTask(ColumnEncoderRecode encoder, FrameBlock input, MatrixBlock out, + int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); + } + + public Object call() throws Exception { + int index = _encoder._colID - 1; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + if(_out.getSparseBlock() == null) + return null; + assert _inputF != null; + for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) { + SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r); + Object okey = _inputF.get(r, index); + String key = (okey != null) ? okey.toString() : null; + long code = _encoder.lookupRCDMap(key); + double val = (code < 0) ? Double.NaN : code; + row.values()[index] = val; + row.indexes()[index] = _outputCol; + } + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0); + } + return null; + } + + @Override + public String toString() { + String str = getClass().getSimpleName() + ""; + if(_blk != -1) + str+= ""; + return str; + } + + } + private static class RecodePartialBuildTask implements Callable { private final FrameBlock _input; @@ -332,11 +387,15 @@ protected RecodePartialBuildTask(FrameBlock input, int colID, int startRow, int @Override public HashMap call() throws Exception { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; HashMap partialMap = new HashMap<>(); makeRcdMap(_input, partialMap, _colID, _startRow, _blockSize); synchronized(_partialMaps) { _partialMaps.put(_startRow, partialMap); } + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0); + } return null; } @@ -358,6 +417,7 @@ private RecodeMergePartialBuildTask(ColumnEncoderRecode encoderRecode, HashMap rcdMap = _encoder.getRcdMap(); _partialMaps.forEach((start_row, map) -> { ((HashMap) map).forEach((k, v) -> { @@ -366,6 +426,9 @@ public Object call() throws Exception { }); }); _encoder._rcdMap = rcdMap; + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0); + } return null; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java index 4b48d2a09fd..012379a4225 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java @@ -19,16 +19,8 @@ package org.apache.sysds.runtime.transform.encode; -import static org.apache.sysds.runtime.util.CollectionUtils.except; -import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; - import org.apache.commons.lang.ArrayUtils; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.matrix.data.FrameBlock; @@ -36,9 +28,19 @@ import org.apache.sysds.runtime.transform.encode.ColumnEncoder.EncoderType; import org.apache.sysds.runtime.transform.meta.TfMetaUtils; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONObject; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import static org.apache.sysds.runtime.util.CollectionUtils.except; +import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct; + public class EncoderFactory { public static MultiColumnEncoder createEncoder(String spec, String[] colnames, int clen, FrameBlock meta) { @@ -125,16 +127,22 @@ public static MultiColumnEncoder createEncoder(String spec, String[] colnames, V } // create composite decoder of all created encoders for(Entry> listEntry : colEncoders.entrySet()) { + if(DMLScript.STATISTICS) + Statistics.incTransformEncoderCount(listEntry.getValue().size()); lencoders.add(new ColumnEncoderComposite(listEntry.getValue())); } encoder = new MultiColumnEncoder(lencoders); if(!oIDs.isEmpty()) { encoder.addReplaceLegacyEncoder(new EncoderOmit(jSpec, colnames, schema.length, minCol, maxCol)); + if(DMLScript.STATISTICS) + Statistics.incTransformEncoderCount(1); } if(!mvIDs.isEmpty()) { EncoderMVImpute ma = new EncoderMVImpute(jSpec, colnames, schema.length, minCol, maxCol); ma.initRecodeIDList(rcIDs); encoder.addReplaceLegacyEncoder(ma); + if(DMLScript.STATISTICS) + Statistics.incTransformEncoderCount(1); } // initialize meta data w/ robustness for superset of cols diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java index cda6b2af0ee..f77e690f98d 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java @@ -19,21 +19,8 @@ package org.apache.sysds.runtime.transform.encode; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.stream.Collectors; - import org.apache.commons.lang.ArrayUtils; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.functionobjects.Mean; @@ -44,10 +31,25 @@ import org.apache.sysds.runtime.transform.meta.TfMetaUtils; import org.apache.sysds.runtime.util.IndexRange; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; + public class EncoderMVImpute extends LegacyEncoder { private static final long serialVersionUID = 9057868620144662194L; // objects required to compute mean and variance of all non-missing entries @@ -173,6 +175,7 @@ public MatrixBlock encode(FrameBlock in, MatrixBlock out) { @Override public void build(FrameBlock in) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; try { for(int j = 0; j < _colList.length; j++) { int colID = _colList[j]; @@ -215,10 +218,13 @@ else if(_mvMethodList[j] == MVMethod.GLOBAL_MODE) { catch(Exception ex) { throw new RuntimeException(ex); } + if(DMLScript.STATISTICS) + Statistics.incTransformImputeBuildTime(System.nanoTime()-t0); } @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; for(int i = 0; i < in.getNumRows(); i++) { for(int j = 0; j < _colList.length; j++) { int colID = _colList[j]; @@ -226,6 +232,8 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out) { out.quickSetValue(i, colID - 1, Double.parseDouble(_replacementList[j])); } } + if(DMLScript.STATISTICS) + Statistics.incTransformImputeApplyTime(System.nanoTime()-t0); return out; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java index db61fc158f8..c2f3b68e47e 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java @@ -19,16 +19,9 @@ package org.apache.sysds.runtime.transform.encode; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -37,9 +30,18 @@ import org.apache.sysds.runtime.transform.meta.TfMetaUtils; import org.apache.sysds.runtime.util.IndexRange; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + public class EncoderOmit extends LegacyEncoder { /* * THIS CLASS IS ONLY FOR LEGACY SUPPORT!!! and will be fazed out slowly. @@ -126,6 +128,7 @@ public void build(FrameBlock in) { public MatrixBlock apply(FrameBlock in, MatrixBlock out) { // local rmRows for broadcasting encoder in spark + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; boolean[] rmRows; if(_federated) rmRows = _rmRows; @@ -148,7 +151,8 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out) { } _rmRows = rmRows; - + if(DMLScript.STATISTICS) + Statistics.incTransformOmitApplyTime(System.nanoTime()-t0); return ret; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java index 6db63f514d0..73d33a98aab 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java @@ -28,6 +28,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -36,22 +38,31 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DependencyTask; import org.apache.sysds.runtime.util.DependencyThreadPool; import org.apache.sysds.runtime.util.DependencyWrapperTask; import org.apache.sysds.runtime.util.IndexRange; +import org.apache.sysds.utils.Statistics; public class MultiColumnEncoder implements Encoder { protected static final Log LOG = LogFactory.getLog(MultiColumnEncoder.class.getName()); private static final boolean MULTI_THREADED = true; - public static boolean MULTI_THREADED_STAGES = true; + // If true build and apply separately by placing a synchronization barrier + public static boolean MULTI_THREADED_STAGES = false; + + // Only affects if MULTI_THREADED_STAGES is true + // if true apply tasks for each column will complete + // before the next will start. + public static boolean APPLY_ENCODER_SEPARATE_STAGES = false; private List _columnEncoders; // These encoders are deprecated and will be phased out soon. @@ -60,18 +71,6 @@ public class MultiColumnEncoder implements Encoder { private int _colOffset = 0; // offset for federated Workers who are using subrange encoders private FrameBlock _meta = null; - // TEMP CONSTANTS for testing only - //private int APPLY_BLOCKSIZE = 0; // temp only for testing until automatic calculation of block size - public static int BUILD_BLOCKSIZE = 0; - - /*public void setApplyBlockSize(int blk) { - APPLY_BLOCKSIZE = blk; - }*/ - - public void setBuildBlockSize(int blk) { - BUILD_BLOCKSIZE = blk; - } - public MultiColumnEncoder(List columnEncoders) { _columnEncoders = columnEncoders; } @@ -90,6 +89,7 @@ public MatrixBlock encode(FrameBlock in, int k) { if(MULTI_THREADED && k > 1 && !MULTI_THREADED_STAGES && !hasLegacyEncoder()) { out = new MatrixBlock(); DependencyThreadPool pool = new DependencyThreadPool(k); + LOG.debug("Encoding with full DAG on " + k + " Threads"); try { pool.submitAllAndWait(getEncodeTasks(in, out, pool)); } @@ -98,10 +98,10 @@ public MatrixBlock encode(FrameBlock in, int k) { e.printStackTrace(); } pool.shutdown(); - out.recomputeNonZeros(); - return out; + outputMatrixPostProcessing(out); } else { + LOG.debug("Encoding with staged approach on: " + k + " Threads"); build(in, k); if(_legacyMVImpute != null) { // These operations are redundant for every encoder excluding the legacyMVImpute, the workaround to @@ -127,9 +127,10 @@ private List> getEncodeTasks(FrameBlock in, MatrixBlock out, D List> applyTAgg = null; Map depMap = new HashMap<>(); boolean hasDC = getColumnEncoders(ColumnEncoderDummycode.class).size() > 0; + boolean applyOffsetDep = false; tasks.add(DependencyThreadPool.createDependencyTask(new InitOutputMatrixTask(this, in, out))); for(ColumnEncoderComposite e : _columnEncoders) { - List> buildTasks = e.getBuildTasks(in, BUILD_BLOCKSIZE); + List> buildTasks = e.getBuildTasks(in); tasks.addAll(buildTasks); if(buildTasks.size() > 0) { @@ -152,11 +153,14 @@ private List> getEncodeTasks(FrameBlock in, MatrixBlock out, D // colUpdateTask can start when all domain sizes, because it can now calculate the offsets for // each column depMap.put(new Integer[] {-2, -1}, new Integer[] {tasks.size() - 1, tasks.size()}); + buildTasks.forEach(t -> t.setPriority(5)); + applyOffsetDep = true; } - if(hasDC) { + if(hasDC && applyOffsetDep) { // Apply Task dependency to output col update task (is last in list) - // All ApplyTasks need to wait for this task so they all have the correct offsets. + // All ApplyTasks need to wait for this task, so they all have the correct offsets. + // But only for the columns that come after the first DC coder since they don't have an offset depMap.put(new Integer[] {tasks.size(), tasks.size() + 1}, new Integer[] {-2, -1}); applyTAgg = applyTAgg == null ? new ArrayList<>() : applyTAgg; @@ -195,7 +199,7 @@ public void build(FrameBlock in, int k) { private List> getBuildTasks(FrameBlock in) { List> tasks = new ArrayList<>(); for(ColumnEncoderComposite columnEncoder : _columnEncoders) { - tasks.addAll(columnEncoder.getBuildTasks(in, BUILD_BLOCKSIZE)); + tasks.addAll(columnEncoder.getBuildTasks(in)); } return tasks; } @@ -241,23 +245,23 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int k) { if(in.getNumColumns() != numEncoders) throw new DMLRuntimeException("Not every column in has a CompositeEncoder. Please make sure every column " + "has a encoder or slice the input accordingly"); - // Block allocation for MT access - outputMatrixPreProcessing(out, in); // TODO smart checks if(MULTI_THREADED && k > 1) { + // Block allocation for MT access + outputMatrixPreProcessing(out, in); applyMT(in, out, outputCol, k); } else { int offset = outputCol; for(ColumnEncoderComposite columnEncoder : _columnEncoders) { columnEncoder.apply(in, out, columnEncoder._colID - 1 + offset); - if(columnEncoder.hasEncoder(ColumnEncoderDummycode.class)) + if (columnEncoder.hasEncoder(ColumnEncoderDummycode.class)) offset += columnEncoder.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1; } } // Recomputing NNZ since we access the block directly // TODO set NNZ explicit count them in the encoders - out.recomputeNonZeros(); + outputMatrixPostProcessing(out); if(_legacyOmit != null) out = _legacyOmit.apply(in, out); if(_legacyMVImpute != null) @@ -280,16 +284,26 @@ private List> getApplyTasks(FrameBlock in, MatrixBlock out, in private void applyMT(FrameBlock in, MatrixBlock out, int outputCol, int k) { DependencyThreadPool pool = new DependencyThreadPool(k); try { - pool.submitAllAndWait(getApplyTasks(in, out, outputCol)); + if(APPLY_ENCODER_SEPARATE_STAGES){ + int offset = outputCol; + for (ColumnEncoderComposite e : _columnEncoders) { + pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset)); + if (e.hasEncoder(ColumnEncoderDummycode.class)) + offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1; + } + }else{ + pool.submitAllAndWait(getApplyTasks(in, out, outputCol)); + } } catch(ExecutionException | InterruptedException e) { - LOG.error("MT Column encode failed"); + LOG.error("MT Column apply failed"); e.printStackTrace(); } pool.shutdown(); } private static void outputMatrixPreProcessing(MatrixBlock output, FrameBlock input) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; output.allocateBlock(); if(output.isInSparseFormat()) { SparseBlock block = output.getSparseBlock(); @@ -300,8 +314,33 @@ private static void outputMatrixPreProcessing(MatrixBlock output, FrameBlock inp // allocate all sparse rows so MT sync can be done. // should be rare that rows have only 0 block.allocate(r, input.getNumColumns()); + // Setting the size here makes it possible to run all sparse apply tasks without any sync + // could become problematic if the input is very sparse since we allocate the same size as the input + // should be fine in theory ;) + ((SparseRowVector)block.get(r)).setSize(input.getNumColumns()); + } + } + if(DMLScript.STATISTICS) + Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0); + } + + private void outputMatrixPostProcessing(MatrixBlock output){ + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + Set indexSet = getColumnEncoders(ColumnEncoderPassThrough.class).stream() + .map(ColumnEncoderPassThrough::getSparseRowsWZeros).flatMap(l -> { + if(l == null) + return null; + return l.stream(); + }).collect(Collectors.toSet()); + if(!indexSet.stream().allMatch(Objects::isNull)){ + for(Integer row : indexSet){ + // TODO: Maybe MT in special cases when the number of rows is large + output.getSparseBlock().get(row).compact(); } } + output.recomputeNonZeros(); + if(DMLScript.STATISTICS) + Statistics.incTransformOutMatrixPostProcessingTime(System.nanoTime()-t0); } @Override @@ -660,7 +699,7 @@ public void applyColumnOffset() { } /* - * Currently not in use will be integrated in the future + * Currently, not in use will be integrated in the future */ @SuppressWarnings("unused") private static class MultiColumnLegacyBuildTask implements Callable { diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java index 5aff39b9844..17351a622be 100644 --- a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java +++ b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java @@ -25,16 +25,20 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.sysds.runtime.DMLRuntimeException; -public class DependencyTask implements Callable { +public class DependencyTask implements Comparable>, Callable { public static final boolean ENABLE_DEBUG_DATA = false; + protected static final Log LOG = LogFactory.getLog(DependencyTask.class.getName()); private final Callable _task; protected final List> _dependantTasks; public List> _dependencyTasks = null; // only for debugging private CompletableFuture> _future; private int _rdy = 0; + private Integer _priority = 0; private ExecutorService _pool; public DependencyTask(Callable task, List> dependantTasks) { @@ -54,6 +58,10 @@ public boolean isReady() { return _rdy == 0; } + public void setPriority(int priority) { + _priority = priority; + } + private boolean decrease() { synchronized(this) { _rdy -= 1; @@ -68,7 +76,11 @@ public void addDependent(DependencyTask dependencyTask) { @Override public E call() throws Exception { + LOG.debug("Executing Task: " + this); + long t0 = System.nanoTime(); E ret = _task.call(); + LOG.debug("Finished Task: " + this + " in: " + + (String.format("%.3f", (System.nanoTime()-t0)*1e-9)) + "sec."); _dependantTasks.forEach(t -> { if(t.decrease()) { if(_pool == null) @@ -79,4 +91,14 @@ public E call() throws Exception { return ret; } + + @Override + public String toString(){ + return _task.toString() + "" + ""; + } + + @Override + public int compareTo(DependencyTask task) { + return -1 * this._priority.compareTo(task._priority); + } } diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java index 4fdd63a36f5..50675d69c53 100644 --- a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java +++ b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java @@ -19,7 +19,12 @@ package org.apache.sysds.runtime.util; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.DMLRuntimeException; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,11 +36,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.sysds.runtime.DMLRuntimeException; - public class DependencyThreadPool { + protected static final Log LOG = LogFactory.getLog(DependencyThreadPool.class.getName()); private final ExecutorService _pool; public DependencyThreadPool(int k) { @@ -50,6 +54,8 @@ public List>> submitAll(List> dtasks) { List>> futures = new ArrayList<>(); List rdyTasks = new ArrayList<>(); int i = 0; + // sort by priority + Collections.sort(dtasks); for(DependencyTask t : dtasks) { CompletableFuture> f = new CompletableFuture<>(); t.addPool(_pool); @@ -63,6 +69,8 @@ public List>> submitAll(List> dtasks) { futures.add(f); i++; } + LOG.debug("Initial Starting tasks: \n\t" + + rdyTasks.stream().map(index -> dtasks.get(index).toString()).collect(Collectors.joining("\n\t"))); // Two stages to avoid race condition! for(Integer index : rdyTasks) { synchronized(_pool) { diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java index de6d7d6ac2d..7431a820c7d 100644 --- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java @@ -989,6 +989,15 @@ public static int getEndIndex(int arrayLength, int startIndex, int blockSize){ return (blockSize <= 0)? arrayLength: Math.min(arrayLength, startIndex + blockSize); } + public static int[] getBlockSizes(int num, int numBlocks){ + int[] blockSizes = new int[numBlocks]; + Arrays.fill(blockSizes, num/numBlocks); + for (int i = 0; i < num%numBlocks; i++){ + blockSizes[i]++; + } + return blockSizes; + } + public static String[] splitRecodeEntry(String s) { //forward to column encoder, as UtilFunctions available in map context return ColumnEncoderRecode.splitRecodeMapEntry(s); diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java b/src/main/java/org/apache/sysds/utils/Statistics.java index cacd2f7d10d..d91d9c511a5 100644 --- a/src/main/java/org/apache/sysds/utils/Statistics.java +++ b/src/main/java/org/apache/sysds/utils/Statistics.java @@ -19,20 +19,6 @@ package org.apache.sysds.utils; -import java.lang.management.CompilationMXBean; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.text.DecimalFormat; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.DoubleAdder; -import java.util.concurrent.atomic.LongAdder; - import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.api.DMLScript; @@ -51,6 +37,20 @@ import org.apache.sysds.runtime.lineage.LineageCacheStatistics; import org.apache.sysds.runtime.privacy.CheckedConstraintsLog; +import java.lang.management.CompilationMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.text.DecimalFormat; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.DoubleAdder; +import java.util.concurrent.atomic.LongAdder; + /** * This class captures all statistics. */ @@ -149,7 +149,28 @@ private static class InstStats { private static final LongAdder lTotalUIPVar = new LongAdder(); private static final LongAdder lTotalLix = new LongAdder(); private static final LongAdder lTotalLixUIP = new LongAdder(); - + + // Transformencode stats + private static final LongAdder transformEncoderCount = new LongAdder(); + + //private static final LongAdder transformBuildTime = new LongAdder(); + private static final LongAdder transformRecodeBuildTime = new LongAdder(); + private static final LongAdder transformBinningBuildTime = new LongAdder(); + private static final LongAdder transformImputeBuildTime = new LongAdder(); + + //private static final LongAdder transformApplyTime = new LongAdder(); + private static final LongAdder transformRecodeApplyTime = new LongAdder(); + private static final LongAdder transformDummyCodeApplyTime = new LongAdder(); + private static final LongAdder transformPassThroughApplyTime = new LongAdder(); + private static final LongAdder transformFeatureHashingApplyTime = new LongAdder(); + private static final LongAdder transformBinningApplyTime = new LongAdder(); + private static final LongAdder transformOmitApplyTime = new LongAdder(); + private static final LongAdder transformImputeApplyTime = new LongAdder(); + + + private static final LongAdder transformOutMatrixPreProcessingTime = new LongAdder(); + private static final LongAdder transformOutMatrixPostProcessingTime = new LongAdder(); + // Federated stats private static final LongAdder federatedReadCount = new LongAdder(); private static final LongAdder federatedPutCount = new LongAdder(); @@ -649,6 +670,70 @@ public static void accFedPSGradientWeightingTime(long t) { public static void accFedPSCommunicationTime(long t) { fedPSCommunicationTime.add(t);} + public static void incTransformEncoderCount(long encoders){ + transformEncoderCount.add(encoders); + } + + public static void incTransformRecodeApplyTime(long t){ + transformRecodeApplyTime.add(t); + } + + public static void incTransformDummyCodeApplyTime(long t){ + transformDummyCodeApplyTime.add(t); + } + + public static void incTransformBinningApplyTime(long t){ + transformBinningApplyTime.add(t); + } + + public static void incTransformPassThroughApplyTime(long t){ + transformPassThroughApplyTime.add(t); + } + + public static void incTransformFeatureHashingApplyTime(long t){ + transformFeatureHashingApplyTime.add(t); + } + + public static void incTransformOmitApplyTime(long t) { + transformOmitApplyTime.add(t); + } + + public static void incTransformImputeApplyTime(long t) { + transformImputeApplyTime.add(t); + } + + public static void incTransformRecodeBuildTime(long t){ + transformRecodeBuildTime.add(t); + } + + public static void incTransformBinningBuildTime(long t){ + transformBinningBuildTime.add(t); + } + + public static void incTransformImputeBuildTime(long t) { + transformImputeBuildTime.add(t); + } + + public static void incTransformOutMatrixPreProcessingTime(long t){ + transformOutMatrixPreProcessingTime.add(t); + } + + public static void incTransformOutMatrixPostProcessingTime(long t){ + transformOutMatrixPostProcessingTime.add(t); + } + + public static long getTransformEncodeBuildTime(){ + return transformBinningBuildTime.longValue() + transformImputeBuildTime.longValue() + + transformRecodeBuildTime.longValue(); + } + + public static long getTransformEncodeApplyTime(){ + return transformDummyCodeApplyTime.longValue() + transformBinningApplyTime.longValue() + + transformFeatureHashingApplyTime.longValue() + transformPassThroughApplyTime.longValue() + + transformRecodeApplyTime.longValue() + transformOmitApplyTime.longValue() + + transformImputeApplyTime.longValue(); + } + public static String getCPHeavyHitterCode( Instruction inst ) { String opcode = null; @@ -1129,6 +1214,50 @@ public static String display(int maxHeavyHitters) federatedExecuteInstructionCount.longValue() + "/" + federatedExecuteUDFCount.longValue() + ".\n"); } + if( transformEncoderCount.longValue() > 0) { + //TODO: Cleanup and condense + sb.append("TransformEncode num. encoders:\t").append(transformEncoderCount.longValue()).append("\n"); + sb.append("TransformEncode build time:\t").append(String.format("%.3f", + getTransformEncodeBuildTime()*1e-9)).append(" sec.\n"); + if(transformRecodeBuildTime.longValue() > 0) + sb.append("\tRecode build time:\t").append(String.format("%.3f", + transformRecodeBuildTime.longValue()*1e-9)).append(" sec.\n"); + if(transformBinningBuildTime.longValue() > 0) + sb.append("\tBinning build time:\t").append(String.format("%.3f", + transformBinningBuildTime.longValue()*1e-9)).append(" sec.\n"); + if(transformImputeBuildTime.longValue() > 0) + sb.append("\tImpute build time:\t").append(String.format("%.3f", + transformImputeBuildTime.longValue()*1e-9)).append(" sec.\n"); + + sb.append("TransformEncode apply time:\t").append(String.format("%.3f", + getTransformEncodeApplyTime()*1e-9)).append(" sec.\n"); + if(transformRecodeApplyTime.longValue() > 0) + sb.append("\tRecode apply time:\t").append(String.format("%.3f", + transformRecodeApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformBinningApplyTime.longValue() > 0) + sb.append("\tBinning apply time:\t").append(String.format("%.3f", + transformBinningApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformDummyCodeApplyTime.longValue() > 0) + sb.append("\tDummyCode apply time:\t").append(String.format("%.3f", + transformDummyCodeApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformFeatureHashingApplyTime.longValue() > 0) + sb.append("\tHashing apply time:\t").append(String.format("%.3f", + transformFeatureHashingApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformPassThroughApplyTime.longValue() > 0) + sb.append("\tPassThrough apply time:\t").append(String.format("%.3f", + transformPassThroughApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformOmitApplyTime.longValue() > 0) + sb.append("\tOmit apply time:\t").append(String.format("%.3f", + transformOmitApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformImputeApplyTime.longValue() > 0) + sb.append("\tImpute apply time:\t").append(String.format("%.3f", + transformImputeApplyTime.longValue()*1e-9)).append(" sec.\n"); + + sb.append("TransformEncode PreProc. time:\t").append(String.format("%.3f", + transformOutMatrixPreProcessingTime.longValue()*1e-9)).append(" sec.\n"); + sb.append("TransformEncode PostProc. time:\t").append(String.format("%.3f", + transformOutMatrixPostProcessingTime.longValue()*1e-9)).append(" sec.\n"); + } if(ConfigurationManager.isCompressionEnabled()){ DMLCompressionStatistics.display(sb); diff --git a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java index 8824b9d8d10..b70571b3b51 100644 --- a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java +++ b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java @@ -30,6 +30,7 @@ import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; import org.apache.sysds.runtime.io.FrameReaderFactory; import org.apache.sysds.runtime.matrix.data.FrameBlock; +import org.apache.sysds.runtime.transform.encode.ColumnEncoder; import org.apache.sysds.runtime.transform.encode.ColumnEncoderBin; import org.apache.sysds.runtime.transform.encode.ColumnEncoderRecode; import org.apache.sysds.runtime.transform.encode.EncoderFactory; @@ -173,6 +174,7 @@ private void runTransformTest(Types.ExecMode rt, String ofmt, TransformType type .readFrameFromHDFS(DATASET, -1L, -1L); StringBuilder specSb = new StringBuilder(); Files.readAllLines(Paths.get(SPEC)).forEach(s -> specSb.append(s).append("\n")); + ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN = Math.max(blockSize, 1); MultiColumnEncoder encoderS = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(), input.getNumColumns(), null); MultiColumnEncoder encoderM = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(), diff --git a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java index 21562505a79..6679f36ccd6 100644 --- a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java +++ b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java @@ -48,7 +48,7 @@ public class TransformFrameEncodeMultithreadedTest extends AutomatedTestBase { private final static String DATASET1 = "homes3/homes.csv"; private final static String SPEC1 = "homes3/homes.tfspec_recode.json"; private final static String SPEC2 = "homes3/homes.tfspec_dummy.json"; - private final static String SPEC2all = "homes3/homes.tfspec_dummy_all.json"; + private final static String SPEC2sparse = "homes3/homes.tfspec_dummy_sparse.json"; private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // recode private final static String SPEC6 = "homes3/homes.tfspec_recode_dummy.json"; private final static String SPEC7 = "homes3/homes.tfspec_binDummy.json"; // recode+dummy @@ -164,7 +164,7 @@ private void runTransformTest(ExecMode rt, String ofmt, TransformType type, bool DATASET = DATASET1; break; case DUMMY_ALL: - SPEC = SPEC2all; + SPEC = SPEC2sparse; DATASET = DATASET1; break; case BIN: diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json b/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json deleted file mode 100644 index 65b8fee7d4d..00000000000 --- a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json +++ /dev/null @@ -1 +0,0 @@ -{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 5, 6, 8, 9 ] } \ No newline at end of file diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json new file mode 100644 index 00000000000..ed48308408a --- /dev/null +++ b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json @@ -0,0 +1 @@ +{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 6, 8, 9 ] } \ No newline at end of file