From f86f0d21a6e478c6228002d6df09fa31d89951f4 Mon Sep 17 00:00:00 2001 From: Lukas Erlbacher Date: Wed, 8 Sep 2021 16:34:45 +0200 Subject: [PATCH] [SYSTEMDS-2972] Transformencode sparse improvements This PR introduces the sparse implementations for the transform encoders. Furthermore, this adds support for row partitioning, statistics, and debug logging. Now we can enable multithreaded transorm via a flag in the config file, "parallel.encode". --- .../sysds/conf/ConfigurationManager.java | 4 + .../java/org/apache/sysds/conf/DMLConfig.java | 4 +- .../org/apache/sysds/hops/OptimizerUtils.java | 18 ++ .../compress/CompressedMatrixBlock.java | 5 - ...turnParameterizedBuiltinCPInstruction.java | 4 +- .../runtime/matrix/data/MatrixBlock.java | 35 +--- .../transform/encode/ColumnEncoder.java | 141 ++++++++++++---- .../transform/encode/ColumnEncoderBin.java | 90 ++++++++-- .../encode/ColumnEncoderComposite.java | 26 +-- .../encode/ColumnEncoderDummycode.java | 115 ++++++------- .../encode/ColumnEncoderFeatureHash.java | 64 ++++++- .../encode/ColumnEncoderPassThrough.java | 90 +++++++++- .../transform/encode/ColumnEncoderRecode.java | 81 ++++++++- .../transform/encode/EncoderFactory.java | 26 ++- .../transform/encode/EncoderMVImpute.java | 36 ++-- .../runtime/transform/encode/EncoderOmit.java | 22 ++- .../transform/encode/MultiColumnEncoder.java | 91 +++++++--- .../sysds/runtime/util/DependencyTask.java | 24 ++- .../runtime/util/DependencyThreadPool.java | 12 +- .../sysds/runtime/util/UtilFunctions.java | 9 + .../org/apache/sysds/utils/Statistics.java | 159 ++++++++++++++++-- .../TransformFrameBuildMultithreadedTest.java | 2 + ...TransformFrameEncodeMultithreadedTest.java | 4 +- .../homes3/homes.tfspec_dummy_all.json | 1 - .../homes3/homes.tfspec_dummy_sparse.json | 1 + 25 files changed, 805 insertions(+), 259 deletions(-) delete mode 100644 src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json create mode 100644 src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java index 5a3667c8511..93654b713b8 100644 --- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java +++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java @@ -171,6 +171,10 @@ public static boolean isParallelMatrixOperations() { return getCompilerConfigFlag(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS); } + public static boolean isParallelTransform() { + return getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE); + } + public static boolean isParallelParFor() { return getCompilerConfigFlag(ConfigType.PARALLEL_LOCAL_OR_REMOTE_PARFOR); } diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index 0b7692b1bcc..db595054be7 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -67,6 +67,7 @@ public class DMLConfig public static final String DEFAULT_BLOCK_SIZE = "sysds.defaultblocksize"; public static final String CP_PARALLEL_OPS = "sysds.cp.parallel.ops"; public static final String CP_PARALLEL_IO = "sysds.cp.parallel.io"; + public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply public static final String COMPRESSED_LINALG = "sysds.compressed.linalg"; public static final String COMPRESSED_LOSSY = "sysds.compressed.lossy"; public static final String COMPRESSED_VALID_COMPRESSIONS = "sysds.compressed.valid.compressions"; @@ -125,6 +126,7 @@ public class DMLConfig _defaultVals.put(DEFAULT_BLOCK_SIZE, String.valueOf(OptimizerUtils.DEFAULT_BLOCKSIZE) ); _defaultVals.put(CP_PARALLEL_OPS, "true" ); _defaultVals.put(CP_PARALLEL_IO, "true" ); + _defaultVals.put(PARALLEL_ENCODE, "false" ); _defaultVals.put(COMPRESSED_LINALG, Compression.CompressConfig.FALSE.name() ); _defaultVals.put(COMPRESSED_LOSSY, "false" ); _defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "SDC,DDC"); @@ -398,7 +400,7 @@ public static DMLConfig readConfigurationFile(String configPath) public String getConfigInfo() { String[] tmpConfig = new String[] { LOCAL_TMP_DIR,SCRATCH_SPACE,OPTIMIZATION_LEVEL, DEFAULT_BLOCK_SIZE, - CP_PARALLEL_OPS, CP_PARALLEL_IO, NATIVE_BLAS, NATIVE_BLAS_DIR, + CP_PARALLEL_OPS, CP_PARALLEL_IO, PARALLEL_ENCODE, NATIVE_BLAS, NATIVE_BLAS_DIR, COMPRESSED_LINALG, COMPRESSED_LOSSY, COMPRESSED_VALID_COMPRESSIONS, COMPRESSED_OVERLAPPING, COMPRESSED_SAMPLING_RATIO, COMPRESSED_COCODE, COMPRESSED_TRANSPOSE, CODEGEN, CODEGEN_API, CODEGEN_COMPILER, CODEGEN_OPTIMIZER, CODEGEN_PLANCACHE, CODEGEN_LITERALS, diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java index 01769c700c4..d63b2cbe789 100644 --- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java @@ -1008,6 +1008,24 @@ public static int getConstrainedNumThreads(int maxNumThreads) return ret; } + + public static int getTransformNumThreads(int maxNumThreads) + { + //by default max local parallelism (vcores) + int ret = InfrastructureAnalyzer.getLocalParallelism(); + + //apply external max constraint (e.g., set by parfor or other rewrites) + if( maxNumThreads > 0 ) { + ret = Math.min(ret, maxNumThreads); + } + + //check if enabled in config.xml + if( !ConfigurationManager.isParallelTransform() ) { + ret = 1; + } + + return ret; + } public static Level getDefaultLogLevel() { Level log = Logger.getRootLogger().getLevel(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 2c205f36bba..d374d3c4370 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -1344,11 +1344,6 @@ public void quickSetValue(int r, int c, double v) { throw new DMLCompressionException("Should not set a value on a compressed Matrix"); } - @Override - public void quickSetValueThreadSafe(int r, int c, double v) { - throw new DMLCompressionException("Thread safe execution does not work on Compressed Matrix"); - } - @Override public double quickGetValueThreadSafe(int r, int c) { throw new DMLCompressionException("Thread safe execution does not work on Compressed Matrix"); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java index b6e0d97170c..800aa519f86 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.instructions.InstructionUtils; @@ -85,7 +86,8 @@ public void processInstruction(ExecutionContext ec) { // execute block transform encode MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, colnames, fin.getNumColumns(), null); - MatrixBlock data = encoder.encode(fin); // build and apply + // TODO: Assign #threads in compiler and pass via the instruction string + MatrixBlock data = encoder.encode(fin, OptimizerUtils.getTransformNumThreads(-1)); // build and apply FrameBlock meta = encoder.getMetaData(new FrameBlock(fin.getNumColumns(), ValueType.STRING)); meta.setColumnNames(colnames); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java index 160f8e95b7d..a86a878ca36 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java @@ -117,7 +117,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizable { // private static final Log LOG = LogFactory.getLog(MatrixBlock.class.getName()); - + private static final long serialVersionUID = 7319972089143154056L; //sparsity nnz threshold, based on practical experiments on space consumption and performance @@ -654,27 +654,6 @@ public void quickSetValue(int r, int c, double v) } } - /** - * Thread save set. - * Blocks need to be allocated, and in case of MCSR sparse, all rows - * that are going to be accessed need to be allocated as well. - * - * @param r row - * @param c column - * @param v value - */ - public void quickSetValueThreadSafe(int r, int c, double v) { - if(sparse) { - if(!(sparseBlock instanceof SparseBlockMCSR)) - throw new RuntimeException("Only MCSR Blocks are supported for Multithreaded sparse set."); - synchronized (sparseBlock.get(r)) { - sparseBlock.set(r,c,v); - } - } - else - denseBlock.set(r,c,v); - } - public double quickGetValueThreadSafe(int r, int c) { if(sparse) { if(!(sparseBlock instanceof SparseBlockMCSR)) @@ -976,7 +955,7 @@ public double sum() { /** * Wrapper method for single threaded reduceall-colSum of a matrix. - * + * * @return A new MatrixBlock containing the column sums of this matrix. */ public MatrixBlock colSum() { @@ -986,7 +965,7 @@ public MatrixBlock colSum() { /** * Wrapper method for single threaded reduceall-rowSum of a matrix. - * + * * @return A new MatrixBlock containing the row sums of this matrix. */ public MatrixBlock rowSum(){ @@ -1422,7 +1401,7 @@ public void copy(MatrixValue thatValue, boolean sp) throw new RuntimeException( "Copy must not overwrite itself!" ); if(that instanceof CompressedMatrixBlock) that = CompressedMatrixBlock.getUncompressed(that, "Copy not effecient into a MatrixBlock"); - + rlen=that.rlen; clen=that.clen; sparse=sp; @@ -2935,7 +2914,7 @@ public MatrixBlock binaryOperations(BinaryOperator op, MatrixValue thatValue, Ma LibMatrixBincell.isValidDimensionsBinary(this, that); if(thatValue instanceof CompressedMatrixBlock) return ((CompressedMatrixBlock) thatValue).binaryOperationsLeft(op, this, result); - + //compute output dimensions boolean outer = (LibMatrixBincell.getBinaryAccessType(this, that) == BinaryAccessType.OUTER_VECTOR_VECTOR); @@ -2980,7 +2959,7 @@ public MatrixBlock ternaryOperations(TernaryOperator op, MatrixBlock m2, MatrixB m2 = ((CompressedMatrixBlock) m2).getUncompressed("Ternay Operator arg2 " + op.fn.getClass().getSimpleName()); if(m3 instanceof CompressedMatrixBlock) m3 = ((CompressedMatrixBlock) m3).getUncompressed("Ternay Operator arg3 " + op.fn.getClass().getSimpleName()); - + //prepare inputs final boolean s1 = (rlen==1 && clen==1); final boolean s2 = (m2.rlen==1 && m2.clen==1); @@ -3674,7 +3653,7 @@ public void checkDimensionsForAppend(MatrixBlock[] in, boolean cbind) { "Invalid nCol dimension for append rbind: was " + in[i].clen + " should be: " + clen); } } - + public static MatrixBlock naryOperations(Operator op, MatrixBlock[] matrices, ScalarObject[] scalars, MatrixBlock ret) { //note: currently only min max, plus supported and hence specialized implementation //prepare operator diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java index 33bf4529b2f..e04eeff5e5f 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java @@ -20,6 +20,7 @@ package org.apache.sysds.runtime.transform.encode; import static org.apache.sysds.runtime.transform.encode.EncoderFactory.getEncoderType; +import static org.apache.sysds.runtime.util.UtilFunctions.getBlockSizes; import java.io.Externalizable; import java.io.IOException; @@ -45,6 +46,8 @@ */ public abstract class ColumnEncoder implements Externalizable, Encoder, Comparable { protected static final Log LOG = LogFactory.getLog(ColumnEncoder.class.getName()); + protected static final int APPLY_ROW_BLOCKS_PER_COLUMN = 1; + public static int BUILD_ROW_BLOCKS_PER_COLUMN = 1; private static final long serialVersionUID = 2299156350718979064L; protected int _colID; @@ -52,7 +55,23 @@ protected ColumnEncoder(int colID) { _colID = colID; } - public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol); + /** + * Apply Functions are only used in Single Threaded or Multi-Threaded Dense context. + * That's why there is no regard for MT sparse! + * + * @param in Input Block + * @param out Output Matrix + * @param outputCol The output column for the given column + * @return same as out + * + */ + public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol){ + return apply(in, out, outputCol, 0, -1); + } + + public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol){ + return apply(in, out, outputCol, 0, -1); + } public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk); @@ -172,18 +191,18 @@ public int compareTo(ColumnEncoder o) { * complete if all previous tasks are done. This is so that we can use the last task as a dependency for the whole * build, reducing unnecessary dependencies. */ - public List> getBuildTasks(FrameBlock in, int blockSize) { + public List> getBuildTasks(FrameBlock in) { List> tasks = new ArrayList<>(); List>> dep = null; - if(blockSize <= 0 || blockSize >= in.getNumRows()) { + int nRows = in.getNumRows(); + int[] blockSizes = getBlockSizes(nRows, getNumBuildRowPartitions()); + if(blockSizes.length == 1) { tasks.add(getBuildTask(in)); } else { HashMap ret = new HashMap<>(); - for(int i = 0; i < in.getNumRows(); i = i + blockSize) - tasks.add(getPartialBuildTask(in, i, blockSize, ret)); - if(in.getNumRows() % blockSize != 0) - tasks.add(getPartialBuildTask(in, in.getNumRows() - in.getNumRows() % blockSize, -1, ret)); + for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++) + tasks.add(getPartialBuildTask(in, startRow, blockSizes[i], ret)); tasks.add(getPartialMergeBuildTask(ret)); dep = new ArrayList<>(Collections.nCopies(tasks.size() - 1, null)); dep.add(tasks.subList(0, tasks.size() - 1)); @@ -198,24 +217,63 @@ public Callable getBuildTask(FrameBlock in) { public Callable getPartialBuildTask(FrameBlock in, int startRow, int blockSize, HashMap ret) { throw new DMLRuntimeException( - "Trying to get the PartialBuild task of an Encoder which does not support " + "partial building"); + "Trying to get the PartialBuild task of an Encoder which does not support partial building"); } public Callable getPartialMergeBuildTask(HashMap ret) { throw new DMLRuntimeException( - "Trying to get the BuildMergeTask task of an Encoder which does not support " + "partial building"); + "Trying to get the BuildMergeTask task of an Encoder which does not support partial building"); } public List> getApplyTasks(FrameBlock in, MatrixBlock out, int outputCol) { - List> tasks = new ArrayList<>(); - tasks.add(new ColumnApplyTask(this, in, out, outputCol)); - return DependencyThreadPool.createDependencyTasks(tasks, null); + return getApplyTasks(in, null, out, outputCol); } public List> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) { + return getApplyTasks(null, in, out, outputCol); + } + + private List> getApplyTasks(FrameBlock inF, MatrixBlock inM, MatrixBlock out, int outputCol){ List> tasks = new ArrayList<>(); - tasks.add(new ColumnApplyTask(this, in, out, outputCol)); - return DependencyThreadPool.createDependencyTasks(tasks, null); + List>> dep = null; + if ((inF != null && inM != null) || (inF == null && inM == null)) + throw new DMLRuntimeException("getApplyTasks needs to be called with either FrameBlock input " + + "or MatrixBlock input"); + int nRows = inF == null ? inM.getNumRows() : inF.getNumRows(); + int[] blockSizes = getBlockSizes(nRows, getNumApplyRowPartitions()); + for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++){ + if(inF != null) + if(out.isInSparseFormat()) + tasks.add(getSparseTask(inF, out, outputCol, startRow, blockSizes[i])); + else + tasks.add(new ColumnApplyTask<>(this, inF, out, outputCol, startRow, blockSizes[i])); + else + if(out.isInSparseFormat()) + tasks.add(getSparseTask(inM, out, outputCol, startRow, blockSizes[i])); + else + tasks.add(new ColumnApplyTask<>(this, inM, out, outputCol, startRow, blockSizes[i])); + } + if(tasks.size() > 1){ + dep = new ArrayList<>(Collections.nCopies(tasks.size(), null)); + tasks.add(() -> null); // Empty task as barrier + dep.add(tasks.subList(0, tasks.size()-1)); + } + + return DependencyThreadPool.createDependencyTasks(tasks, dep); + } + + protected abstract ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk); + + protected abstract ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk); + + protected int getNumApplyRowPartitions(){ + return APPLY_ROW_BLOCKS_PER_COLUMN; + } + + protected int getNumBuildRowPartitions(){ + return BUILD_ROW_BLOCKS_PER_COLUMN; } public enum EncoderType { @@ -226,39 +284,54 @@ public enum EncoderType { * This is the base Task for each column apply. If no custom "getApplyTasks" is implemented in an Encoder this task * will be used. */ - private static class ColumnApplyTask implements Callable { + protected static class ColumnApplyTask implements Callable { + + protected final T _encoder; + protected final FrameBlock _inputF; + protected final MatrixBlock _inputM; + protected final MatrixBlock _out; + protected final int _outputCol; + protected final int _startRow; + protected final int _blk; + + protected ColumnApplyTask(T encoder, FrameBlock input, MatrixBlock out, int outputCol){ + this(encoder, input, out, outputCol, 0, -1); + } - private final ColumnEncoder _encoder; - private final FrameBlock _inputF; - private final MatrixBlock _inputM; - private final MatrixBlock _out; - private final int _outputCol; + protected ColumnApplyTask(T encoder, MatrixBlock input, MatrixBlock out, int outputCol){ + this(encoder, input, out, outputCol, 0, -1); + } - protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock input, MatrixBlock out, int outputCol) { - _encoder = encoder; - _inputF = input; - _inputM = null; - _out = out; - _outputCol = outputCol; + protected ColumnApplyTask(T encoder, FrameBlock input, MatrixBlock out, int outputCol, int startRow, int blk) { + this(encoder, input, null, out, outputCol, startRow, blk); } - protected ColumnApplyTask(ColumnEncoder encoder, MatrixBlock input, MatrixBlock out, int outputCol) { + protected ColumnApplyTask(T encoder, MatrixBlock input, MatrixBlock out, int outputCol, int startRow, int blk) { + this(encoder, null, input, out, outputCol, startRow, blk); + } + private ColumnApplyTask(T encoder, FrameBlock inputF, MatrixBlock inputM, MatrixBlock out, int outputCol, + int startRow, int blk){ _encoder = encoder; - _inputM = input; - _inputF = null; + _inputM = inputM; + _inputF = inputF; _out = out; _outputCol = outputCol; + _startRow = startRow; + _blk = blk; } @Override - public Void call() throws Exception { + public Object call() throws Exception { assert _outputCol >= 0; - int _rowStart = 0; - int _blk = -1; + if(_out.isInSparseFormat()){ + // this is an issue since most sparse Tasks modify the sparse structure so normal get and set calls are + // not possible. + throw new DMLRuntimeException("ColumnApplyTask called although output is in sparse format."); + } if(_inputF == null) - _encoder.apply(_inputM, _out, _outputCol, _rowStart, _blk); + _encoder.apply(_inputM, _out, _outputCol, _startRow, _blk); else - _encoder.apply(_inputF, _out, _outputCol, _rowStart, _blk); + _encoder.apply(_inputF, _out, _outputCol, _startRow, _blk); return null; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java index b4d48000040..5736c1e4997 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java @@ -28,11 +28,15 @@ import java.util.HashMap; import java.util.concurrent.Callable; +import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.MutableTriple; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.lops.Lop; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; public class ColumnEncoderBin extends ColumnEncoder { public static final String MIN_PREFIX = "min"; @@ -84,10 +88,13 @@ public double[] getBinMaxs() { @Override public void build(FrameBlock in) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; if(!isApplicable()) return; double[] pairMinMax = getMinMaxOfCol(in, _colID, 0, -1); computeBins(pairMinMax[0], pairMinMax[1]); + if(DMLScript.STATISTICS) + Statistics.incTransformBinningBuildTime(System.nanoTime()-t0); } private static double[] getMinMaxOfCol(FrameBlock in, int colID, int startRow, int blockSize) { @@ -118,6 +125,8 @@ public Callable getPartialMergeBuildTask(HashMap ret) { return new BinMergePartialBuildTask(this, ret); } + + public void computeBins(double min, double max) { // ensure allocated internal transformation metadata if(_binMins == null || _binMaxs == null) { @@ -145,39 +154,47 @@ public void buildPartial(FrameBlock in) { _colMaxs = pairMinMax[1]; } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - - @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { double inVal = UtilFunctions.objectToDouble(in.getSchema()[_colID - 1], in.get(i, _colID - 1)); int ix = Arrays.binarySearch(_binMaxs, inVal); int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1; - out.quickSetValueThreadSafe(i, outputCol, binID); + out.quickSetValue(i, outputCol, binID); } + if (DMLScript.STATISTICS) + Statistics.incTransformBinningApplyTime(System.nanoTime()-t0); return out; } @Override public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; int end = getEndIndex(in.getNumRows(), rowStart, blk); for(int i = rowStart; i < end; i++) { double inVal = in.quickGetValueThreadSafe(i, _colID - 1); int ix = Arrays.binarySearch(_binMaxs, inVal); int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1; - out.quickSetValueThreadSafe(i, outputCol, binID); + out.quickSetValue(i, outputCol, binID); } + if (DMLScript.STATISTICS) + Statistics.incTransformBinningApplyTime(System.nanoTime()-t0); return out; } + @Override + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + return new BinSparseApplyTask(this, in, out, outputCol); + } + + @Override + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException("Sparse Binning for MatrixBlocks not jet implemented"); + } + @Override public void mergeAt(ColumnEncoder other) { if(other instanceof ColumnEncoderBin) { @@ -264,6 +281,43 @@ public void readExternal(ObjectInput in) throws IOException { } } + private static class BinSparseApplyTask extends ColumnApplyTask { + + public BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock input, + MatrixBlock out, int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); + } + + private BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock input, MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } + + public Object call() throws Exception { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + int index = _encoder._colID - 1; + if(_out.getSparseBlock() == null) + return null; + assert _inputF != null; + for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) { + SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r); + double inVal = UtilFunctions.objectToDouble(_inputF.getSchema()[index], _inputF.get(r, index)); + int ix = Arrays.binarySearch(_encoder._binMaxs, inVal); + int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1; + row.values()[index] = binID; + row.indexes()[index] = _outputCol; + } + if(DMLScript.STATISTICS) + Statistics.incTransformBinningApplyTime(System.nanoTime()-t0); + return null; + } + + @Override + public String toString() { + return getClass().getSimpleName() + ""; + } + + } + private static class BinPartialBuildTask implements Callable { private final FrameBlock _input; @@ -284,7 +338,13 @@ protected BinPartialBuildTask(FrameBlock input, int colID, int startRow, int blo @Override public double[] call() throws Exception { - _partialMinMax.put(_startRow, getMinMaxOfCol(_input, _colID, _startRow, _blockSize)); + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + double[] minMax = getMinMaxOfCol(_input, _colID, _startRow, _blockSize); + synchronized (_partialMinMax){ + _partialMinMax.put(_startRow, minMax); + } + if (DMLScript.STATISTICS) + Statistics.incTransformBinningBuildTime(System.nanoTime()-t0); return null; } @@ -306,6 +366,7 @@ private BinMergePartialBuildTask(ColumnEncoderBin encoderBin, HashMap> getApplyTasks(MatrixBlock in, MatrixBlock out, in } @Override - public List> getBuildTasks(FrameBlock in, int blockSize) { + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException(); + } + + @Override + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException(); + } + + @Override + public List> getBuildTasks(FrameBlock in) { List> tasks = new ArrayList<>(); Map depMap = null; for(ColumnEncoder columnEncoder : _columnEncoders) { - List> t = columnEncoder.getBuildTasks(in, blockSize); + List> t = columnEncoder.getBuildTasks(in); if(t == null) continue; // Linear execution between encoders so they can't be built in parallel @@ -178,16 +190,6 @@ public void buildPartial(FrameBlock in) { columnEncoder.buildPartial(in); } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - - @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { try { diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java index 25b2eb9b848..1047f542051 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java @@ -24,17 +24,15 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Callable; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.runtime.DMLRuntimeException; -import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DependencyTask; -import org.apache.sysds.runtime.util.DependencyThreadPool; +import org.apache.sysds.utils.Statistics; public class ColumnEncoderDummycode extends ColumnEncoder { private static final long serialVersionUID = 5832130477659116489L; @@ -60,19 +58,10 @@ public void build(FrameBlock in) { } @Override - public List> getBuildTasks(FrameBlock in, int blockSize) { + public List> getBuildTasks(FrameBlock in) { return null; } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { throw new DMLRuntimeException("Called DummyCoder with FrameBlock"); @@ -80,6 +69,7 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowS @Override public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; // Out Matrix should already be correct size! // append dummy coded or unchanged values to output for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { @@ -89,20 +79,25 @@ public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int row int nCol = outputCol + (int) val - 1; // Setting value to 0 first in case of sparse so the row vector does not need to be resized if(nCol != outputCol) - out.quickSetValueThreadSafe(i, outputCol, 0); - out.quickSetValueThreadSafe(i, nCol, 1); + out.quickSetValue(i, outputCol, 0); + out.quickSetValue(i, nCol, 1); } + if (DMLScript.STATISTICS) + Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0); return out; } + + @Override + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + return new DummycodeSparseApplyTask(this, in, out, outputCol, startRow, blk); + } + @Override - public List> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) { - List> tasks = new ArrayList<>(); - if(out.isInSparseFormat()) - tasks.add(new DummycodeSparseApplyTask(this, in, out, outputCol)); - else - return super.getApplyTasks(in, out, outputCol); - return DependencyThreadPool.createDependencyTasks(tasks, null); + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new DMLRuntimeException("Called DummyCoder with FrameBlock"); } @Override @@ -139,6 +134,7 @@ else if(columnEncoder instanceof ColumnEncoderBin) { if(distinct != -1) { _domainSize = distinct; + LOG.debug("DummyCoder for column: " + _colID + " has domain size: " + _domainSize); } } } @@ -188,48 +184,47 @@ public int getDomainSize() { return _domainSize; } - private static class DummycodeSparseApplyTask implements Callable { - private final ColumnEncoderDummycode _encoder; - private final MatrixBlock _input; - private final MatrixBlock _out; - private final int _outputCol; + private static class DummycodeSparseApplyTask extends ColumnApplyTask { + + protected DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, + MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } - private DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, MatrixBlock out, - int outputCol) { - _encoder = encoder; - _input = input; - _out = out; - _outputCol = outputCol; + protected DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, + MatrixBlock out, int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); } public Object call() throws Exception { - for(int r = 0; r < _input.getNumRows(); r++) { - if(_out.getSparseBlock() == null) - return null; - synchronized(_out.getSparseBlock().get(r)) { - // Since the recoded values are already offset in the output matrix (same as input at this point) - // the dummycoding only needs to offset them within their column domain. Which means that the - // indexes in the SparseRowVector do not need to be sorted anymore and can be updated directly. - // - // Input: Output: - // - // 1 | 0 | 2 | 0 1 | 0 | 0 | 1 - // 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0 - // 1 | 0 | 2 | 0 1 | 0 | 0 | 1 - // 1 | 0 | 1 | 0 1 | 0 | 1 | 0 - // - // Example SparseRowVector Internals (1. row): - // - // indexes = [0,2] ===> indexes = [0,3] - // values = [1,2] values = [1,1] - int index = ((SparseRowVector) _out.getSparseBlock().get(r)).getIndex(_outputCol); - double val = _out.getSparseBlock().get(r).values()[index]; - int nCol = _outputCol + (int) val - 1; - - _out.getSparseBlock().get(r).indexes()[index] = nCol; - _out.getSparseBlock().get(r).values()[index] = 1; - } + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + assert _inputM != null; + if(_out.getSparseBlock() == null) + return null; + for(int r = _startRow; r < getEndIndex(_inputM.getNumRows(), _startRow, _blk); r++) { + // Since the recoded values are already offset in the output matrix (same as input at this point) + // the dummycoding only needs to offset them within their column domain. Which means that the + // indexes in the SparseRowVector do not need to be sorted anymore and can be updated directly. + // + // Input: Output: + // + // 1 | 0 | 2 | 0 1 | 0 | 0 | 1 + // 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0 + // 1 | 0 | 2 | 0 1 | 0 | 0 | 1 + // 1 | 0 | 1 | 0 1 | 0 | 1 | 0 + // + // Example SparseRowVector Internals (1. row): + // + // indexes = [0,2] ===> indexes = [0,3] + // values = [1,2] values = [1,1] + int index = _encoder._colID - 1; + double val = _out.getSparseBlock().get(r).values()[index]; + int nCol = _outputCol + (int) val - 1; + _out.getSparseBlock().get(r).indexes()[index] = nCol; + _out.getSparseBlock().get(r).values()[index] = 1; } + if (DMLScript.STATISTICS) + Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0); return null; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java index 1c74ae52687..d30d8dc404c 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java @@ -26,11 +26,15 @@ import java.io.ObjectOutput; import java.util.List; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DependencyTask; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; /** * Class used for feature hashing transformation of frames. @@ -56,7 +60,7 @@ public ColumnEncoderFeatureHash() { } private long getCode(String key) { - return key.hashCode() % _K; + return (key.hashCode() % _K) + 1; } @Override @@ -65,22 +69,25 @@ public void build(FrameBlock in) { } @Override - public List> getBuildTasks(FrameBlock in, int blockSize) { + public List> getBuildTasks(FrameBlock in) { return null; } @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + return new FeatureHashSparseApplyTask(this, in, out, outputCol, startRow, blk); } @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException("Sparse FeatureHashing for MatrixBlocks not jet implemented"); } @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; // apply feature hashing column wise for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { Object okey = in.get(i, _colID - 1); @@ -88,21 +95,26 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowS if(key == null) throw new DMLRuntimeException("Missing Value encountered in input Frame for FeatureHash"); long code = getCode(key); - out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN); + out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN); } + if(DMLScript.STATISTICS) + Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0); return out; } @Override public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; int end = getEndIndex(in.getNumRows(), rowStart, blk); // apply feature hashing column wise for(int i = rowStart; i < end; i++) { Object okey = in.quickGetValueThreadSafe(i, _colID - 1); String key = okey.toString(); long code = getCode(key); - out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN); + out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN); } + if(DMLScript.STATISTICS) + Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0); return out; } @@ -145,4 +157,40 @@ public void readExternal(ObjectInput in) throws IOException { super.readExternal(in); _K = in.readLong(); } + + public static class FeatureHashSparseApplyTask extends ColumnApplyTask{ + + public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash encoder, FrameBlock input, + MatrixBlock out, int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); + } + + public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash encoder, FrameBlock input, + MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } + + @Override + public Object call() throws Exception { + if(_out.getSparseBlock() == null) + return null; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + int index = _encoder._colID - 1; + assert _inputF != null; + for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++){ + SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r); + Object okey = _inputF.get(r, index); + String key = (okey != null) ? okey.toString() : null; + if(key == null) + throw new DMLRuntimeException("Missing Value encountered in input Frame for FeatureHash"); + long code = _encoder.getCode(key); + row.values()[index] = code; + row.indexes()[index] = _outputCol; + } + if(DMLScript.STATISTICS) + Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0); + return null; + } + } + } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java index 7a8df245795..5c7392c6a3f 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java @@ -21,16 +21,22 @@ import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DependencyTask; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; public class ColumnEncoderPassThrough extends ColumnEncoder { private static final long serialVersionUID = -8473768154646831882L; + private List sparseRowsWZeros = null; protected ColumnEncoderPassThrough(int ptCols) { super(ptCols); // 1-based @@ -40,37 +46,46 @@ public ColumnEncoderPassThrough() { this(-1); } + public List getSparseRowsWZeros(){ + return sparseRowsWZeros; + } + @Override public void build(FrameBlock in) { // do nothing } @Override - public List> getBuildTasks(FrameBlock in, int blockSize) { + public List> getBuildTasks(FrameBlock in) { return null; } @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + return new PassThroughSparseApplyTask(this, in, out, outputCol, startRow, blk); } @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new NotImplementedException("Sparse PassThrough for MatrixBlocks not jet implemented"); } @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; int col = _colID - 1; // 1-based ValueType vt = in.getSchema()[col]; for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { Object val = in.get(i, col); double v = (val == null || - (vt == ValueType.STRING && val.toString().isEmpty())) ? Double.NaN : UtilFunctions.objectToDouble(vt, - val); - out.quickSetValueThreadSafe(i, outputCol, v); + (vt == ValueType.STRING && val.toString().isEmpty())) + ? Double.NaN : UtilFunctions.objectToDouble(vt, val); + out.quickSetValue(i, outputCol, v); } + if(DMLScript.STATISTICS) + Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0); return out; } @@ -79,12 +94,15 @@ public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int row // only transfer from in to out if(in == out) return out; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; int col = _colID - 1; // 1-based int end = getEndIndex(in.getNumRows(), rowStart, blk); for(int i = rowStart; i < end; i++) { double val = in.quickGetValueThreadSafe(i, col); - out.quickSetValueThreadSafe(i, outputCol, val); + out.quickSetValue(i, outputCol, val); } + if(DMLScript.STATISTICS) + Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0); return out; } @@ -106,4 +124,58 @@ public FrameBlock getMetaData(FrameBlock meta) { public void initMetaData(FrameBlock meta) { // do nothing } + + public static class PassThroughSparseApplyTask extends ColumnApplyTask{ + + + protected PassThroughSparseApplyTask(ColumnEncoderPassThrough encoder, FrameBlock input, + MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } + + protected PassThroughSparseApplyTask(ColumnEncoderPassThrough encoder, FrameBlock input, MatrixBlock out, + int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); + } + + @Override + public Object call() throws Exception { + if(_out.getSparseBlock() == null) + return null; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + int index = _encoder._colID - 1; + assert _inputF != null; + List sparseRowsWZeros = null; + ValueType vt = _inputF.getSchema()[index]; + for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) { + Object val = _inputF.get(r, index); + double v = (val == null || (vt == ValueType.STRING && val.toString().isEmpty())) ? + Double.NaN : UtilFunctions.objectToDouble(vt, val); + SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r); + if(v == 0) { + if(sparseRowsWZeros == null) + sparseRowsWZeros = new ArrayList<>(); + sparseRowsWZeros.add(r); + } + row.values()[index] = v; + row.indexes()[index] = _outputCol; + } + if(sparseRowsWZeros != null){ + synchronized (_encoder){ + if(_encoder.sparseRowsWZeros == null) + _encoder.sparseRowsWZeros = new ArrayList<>(); + _encoder.sparseRowsWZeros.addAll(sparseRowsWZeros); + } + } + if(DMLScript.STATISTICS) + Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0); + return null; + } + + public String toString() { + return getClass().getSimpleName() + ""; + } + + } + } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java index fd18d868ea2..e190d74438f 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java @@ -33,10 +33,13 @@ import java.util.Objects; import java.util.concurrent.Callable; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.lops.Lop; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.utils.Statistics; public class ColumnEncoderRecode extends ColumnEncoder { private static final long serialVersionUID = 8213163881283341874L; @@ -135,7 +138,11 @@ private long lookupRCDMap(String key) { public void build(FrameBlock in) { if(!isApplicable()) return; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; makeRcdMap(in, _rcdMap, _colID, 0, in.getNumRows()); + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0); + } } @Override @@ -185,19 +192,18 @@ public void buildPartial(FrameBlock in) { _rcdMapPart.remove(""); } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) { - return apply(in, out, outputCol, 0, -1); - } - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; // FrameBlock is column Major and MatrixBlock row Major this results in cache inefficiencies :( for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) { Object okey = in.get(i, _colID - 1); String key = (okey != null) ? okey.toString() : null; long code = lookupRCDMap(key); - out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN); + out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN); + } + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0); } return out; } @@ -209,9 +215,16 @@ public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int row } @Override - public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) { - throw new DMLRuntimeException( - "Recode called with MatrixBlock. Should not happen since Recode is the first " + "encoder in the Stack"); + protected ColumnApplyTask + getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk){ + return new RecodeSparseApplyTask(this, in ,out, outputCol, startRow, blk); + } + + @Override + protected ColumnApplyTask + getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) { + throw new DMLRuntimeException("Recode called with MatrixBlock. Should not happen since Recode is the first " + + "encoder in the Stack"); } @Override @@ -313,6 +326,48 @@ public HashMap getRcdMap() { return _rcdMap; } + private static class RecodeSparseApplyTask extends ColumnApplyTask{ + + public RecodeSparseApplyTask(ColumnEncoderRecode encoder, FrameBlock input, MatrixBlock out, int outputCol) { + super(encoder, input, out, outputCol); + } + + protected RecodeSparseApplyTask(ColumnEncoderRecode encoder, FrameBlock input, MatrixBlock out, + int outputCol, int startRow, int blk) { + super(encoder, input, out, outputCol, startRow, blk); + } + + public Object call() throws Exception { + int index = _encoder._colID - 1; + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + if(_out.getSparseBlock() == null) + return null; + assert _inputF != null; + for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) { + SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r); + Object okey = _inputF.get(r, index); + String key = (okey != null) ? okey.toString() : null; + long code = _encoder.lookupRCDMap(key); + double val = (code < 0) ? Double.NaN : code; + row.values()[index] = val; + row.indexes()[index] = _outputCol; + } + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0); + } + return null; + } + + @Override + public String toString() { + String str = getClass().getSimpleName() + ""; + if(_blk != -1) + str+= ""; + return str; + } + + } + private static class RecodePartialBuildTask implements Callable { private final FrameBlock _input; @@ -332,11 +387,15 @@ protected RecodePartialBuildTask(FrameBlock input, int colID, int startRow, int @Override public HashMap call() throws Exception { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; HashMap partialMap = new HashMap<>(); makeRcdMap(_input, partialMap, _colID, _startRow, _blockSize); synchronized(_partialMaps) { _partialMaps.put(_startRow, partialMap); } + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0); + } return null; } @@ -358,6 +417,7 @@ private RecodeMergePartialBuildTask(ColumnEncoderRecode encoderRecode, HashMap rcdMap = _encoder.getRcdMap(); _partialMaps.forEach((start_row, map) -> { ((HashMap) map).forEach((k, v) -> { @@ -366,6 +426,9 @@ public Object call() throws Exception { }); }); _encoder._rcdMap = rcdMap; + if(DMLScript.STATISTICS){ + Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0); + } return null; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java index 4b48d2a09fd..012379a4225 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java @@ -19,16 +19,8 @@ package org.apache.sysds.runtime.transform.encode; -import static org.apache.sysds.runtime.util.CollectionUtils.except; -import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; - import org.apache.commons.lang.ArrayUtils; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.matrix.data.FrameBlock; @@ -36,9 +28,19 @@ import org.apache.sysds.runtime.transform.encode.ColumnEncoder.EncoderType; import org.apache.sysds.runtime.transform.meta.TfMetaUtils; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONObject; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import static org.apache.sysds.runtime.util.CollectionUtils.except; +import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct; + public class EncoderFactory { public static MultiColumnEncoder createEncoder(String spec, String[] colnames, int clen, FrameBlock meta) { @@ -125,16 +127,22 @@ public static MultiColumnEncoder createEncoder(String spec, String[] colnames, V } // create composite decoder of all created encoders for(Entry> listEntry : colEncoders.entrySet()) { + if(DMLScript.STATISTICS) + Statistics.incTransformEncoderCount(listEntry.getValue().size()); lencoders.add(new ColumnEncoderComposite(listEntry.getValue())); } encoder = new MultiColumnEncoder(lencoders); if(!oIDs.isEmpty()) { encoder.addReplaceLegacyEncoder(new EncoderOmit(jSpec, colnames, schema.length, minCol, maxCol)); + if(DMLScript.STATISTICS) + Statistics.incTransformEncoderCount(1); } if(!mvIDs.isEmpty()) { EncoderMVImpute ma = new EncoderMVImpute(jSpec, colnames, schema.length, minCol, maxCol); ma.initRecodeIDList(rcIDs); encoder.addReplaceLegacyEncoder(ma); + if(DMLScript.STATISTICS) + Statistics.incTransformEncoderCount(1); } // initialize meta data w/ robustness for superset of cols diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java index cda6b2af0ee..f77e690f98d 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java @@ -19,21 +19,8 @@ package org.apache.sysds.runtime.transform.encode; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.stream.Collectors; - import org.apache.commons.lang.ArrayUtils; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.functionobjects.Mean; @@ -44,10 +31,25 @@ import org.apache.sysds.runtime.transform.meta.TfMetaUtils; import org.apache.sysds.runtime.util.IndexRange; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; + public class EncoderMVImpute extends LegacyEncoder { private static final long serialVersionUID = 9057868620144662194L; // objects required to compute mean and variance of all non-missing entries @@ -173,6 +175,7 @@ public MatrixBlock encode(FrameBlock in, MatrixBlock out) { @Override public void build(FrameBlock in) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; try { for(int j = 0; j < _colList.length; j++) { int colID = _colList[j]; @@ -215,10 +218,13 @@ else if(_mvMethodList[j] == MVMethod.GLOBAL_MODE) { catch(Exception ex) { throw new RuntimeException(ex); } + if(DMLScript.STATISTICS) + Statistics.incTransformImputeBuildTime(System.nanoTime()-t0); } @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; for(int i = 0; i < in.getNumRows(); i++) { for(int j = 0; j < _colList.length; j++) { int colID = _colList[j]; @@ -226,6 +232,8 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out) { out.quickSetValue(i, colID - 1, Double.parseDouble(_replacementList[j])); } } + if(DMLScript.STATISTICS) + Statistics.incTransformImputeApplyTime(System.nanoTime()-t0); return out; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java index db61fc158f8..c2f3b68e47e 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java @@ -19,16 +19,9 @@ package org.apache.sysds.runtime.transform.encode; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -37,9 +30,18 @@ import org.apache.sysds.runtime.transform.meta.TfMetaUtils; import org.apache.sysds.runtime.util.IndexRange; import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.Statistics; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + public class EncoderOmit extends LegacyEncoder { /* * THIS CLASS IS ONLY FOR LEGACY SUPPORT!!! and will be fazed out slowly. @@ -126,6 +128,7 @@ public void build(FrameBlock in) { public MatrixBlock apply(FrameBlock in, MatrixBlock out) { // local rmRows for broadcasting encoder in spark + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; boolean[] rmRows; if(_federated) rmRows = _rmRows; @@ -148,7 +151,8 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out) { } _rmRows = rmRows; - + if(DMLScript.STATISTICS) + Statistics.incTransformOmitApplyTime(System.nanoTime()-t0); return ret; } diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java index 6db63f514d0..73d33a98aab 100644 --- a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java +++ b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java @@ -28,6 +28,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -36,22 +38,31 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.data.SparseRowVector; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.DependencyTask; import org.apache.sysds.runtime.util.DependencyThreadPool; import org.apache.sysds.runtime.util.DependencyWrapperTask; import org.apache.sysds.runtime.util.IndexRange; +import org.apache.sysds.utils.Statistics; public class MultiColumnEncoder implements Encoder { protected static final Log LOG = LogFactory.getLog(MultiColumnEncoder.class.getName()); private static final boolean MULTI_THREADED = true; - public static boolean MULTI_THREADED_STAGES = true; + // If true build and apply separately by placing a synchronization barrier + public static boolean MULTI_THREADED_STAGES = false; + + // Only affects if MULTI_THREADED_STAGES is true + // if true apply tasks for each column will complete + // before the next will start. + public static boolean APPLY_ENCODER_SEPARATE_STAGES = false; private List _columnEncoders; // These encoders are deprecated and will be phased out soon. @@ -60,18 +71,6 @@ public class MultiColumnEncoder implements Encoder { private int _colOffset = 0; // offset for federated Workers who are using subrange encoders private FrameBlock _meta = null; - // TEMP CONSTANTS for testing only - //private int APPLY_BLOCKSIZE = 0; // temp only for testing until automatic calculation of block size - public static int BUILD_BLOCKSIZE = 0; - - /*public void setApplyBlockSize(int blk) { - APPLY_BLOCKSIZE = blk; - }*/ - - public void setBuildBlockSize(int blk) { - BUILD_BLOCKSIZE = blk; - } - public MultiColumnEncoder(List columnEncoders) { _columnEncoders = columnEncoders; } @@ -90,6 +89,7 @@ public MatrixBlock encode(FrameBlock in, int k) { if(MULTI_THREADED && k > 1 && !MULTI_THREADED_STAGES && !hasLegacyEncoder()) { out = new MatrixBlock(); DependencyThreadPool pool = new DependencyThreadPool(k); + LOG.debug("Encoding with full DAG on " + k + " Threads"); try { pool.submitAllAndWait(getEncodeTasks(in, out, pool)); } @@ -98,10 +98,10 @@ public MatrixBlock encode(FrameBlock in, int k) { e.printStackTrace(); } pool.shutdown(); - out.recomputeNonZeros(); - return out; + outputMatrixPostProcessing(out); } else { + LOG.debug("Encoding with staged approach on: " + k + " Threads"); build(in, k); if(_legacyMVImpute != null) { // These operations are redundant for every encoder excluding the legacyMVImpute, the workaround to @@ -127,9 +127,10 @@ private List> getEncodeTasks(FrameBlock in, MatrixBlock out, D List> applyTAgg = null; Map depMap = new HashMap<>(); boolean hasDC = getColumnEncoders(ColumnEncoderDummycode.class).size() > 0; + boolean applyOffsetDep = false; tasks.add(DependencyThreadPool.createDependencyTask(new InitOutputMatrixTask(this, in, out))); for(ColumnEncoderComposite e : _columnEncoders) { - List> buildTasks = e.getBuildTasks(in, BUILD_BLOCKSIZE); + List> buildTasks = e.getBuildTasks(in); tasks.addAll(buildTasks); if(buildTasks.size() > 0) { @@ -152,11 +153,14 @@ private List> getEncodeTasks(FrameBlock in, MatrixBlock out, D // colUpdateTask can start when all domain sizes, because it can now calculate the offsets for // each column depMap.put(new Integer[] {-2, -1}, new Integer[] {tasks.size() - 1, tasks.size()}); + buildTasks.forEach(t -> t.setPriority(5)); + applyOffsetDep = true; } - if(hasDC) { + if(hasDC && applyOffsetDep) { // Apply Task dependency to output col update task (is last in list) - // All ApplyTasks need to wait for this task so they all have the correct offsets. + // All ApplyTasks need to wait for this task, so they all have the correct offsets. + // But only for the columns that come after the first DC coder since they don't have an offset depMap.put(new Integer[] {tasks.size(), tasks.size() + 1}, new Integer[] {-2, -1}); applyTAgg = applyTAgg == null ? new ArrayList<>() : applyTAgg; @@ -195,7 +199,7 @@ public void build(FrameBlock in, int k) { private List> getBuildTasks(FrameBlock in) { List> tasks = new ArrayList<>(); for(ColumnEncoderComposite columnEncoder : _columnEncoders) { - tasks.addAll(columnEncoder.getBuildTasks(in, BUILD_BLOCKSIZE)); + tasks.addAll(columnEncoder.getBuildTasks(in)); } return tasks; } @@ -241,23 +245,23 @@ public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int k) { if(in.getNumColumns() != numEncoders) throw new DMLRuntimeException("Not every column in has a CompositeEncoder. Please make sure every column " + "has a encoder or slice the input accordingly"); - // Block allocation for MT access - outputMatrixPreProcessing(out, in); // TODO smart checks if(MULTI_THREADED && k > 1) { + // Block allocation for MT access + outputMatrixPreProcessing(out, in); applyMT(in, out, outputCol, k); } else { int offset = outputCol; for(ColumnEncoderComposite columnEncoder : _columnEncoders) { columnEncoder.apply(in, out, columnEncoder._colID - 1 + offset); - if(columnEncoder.hasEncoder(ColumnEncoderDummycode.class)) + if (columnEncoder.hasEncoder(ColumnEncoderDummycode.class)) offset += columnEncoder.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1; } } // Recomputing NNZ since we access the block directly // TODO set NNZ explicit count them in the encoders - out.recomputeNonZeros(); + outputMatrixPostProcessing(out); if(_legacyOmit != null) out = _legacyOmit.apply(in, out); if(_legacyMVImpute != null) @@ -280,16 +284,26 @@ private List> getApplyTasks(FrameBlock in, MatrixBlock out, in private void applyMT(FrameBlock in, MatrixBlock out, int outputCol, int k) { DependencyThreadPool pool = new DependencyThreadPool(k); try { - pool.submitAllAndWait(getApplyTasks(in, out, outputCol)); + if(APPLY_ENCODER_SEPARATE_STAGES){ + int offset = outputCol; + for (ColumnEncoderComposite e : _columnEncoders) { + pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset)); + if (e.hasEncoder(ColumnEncoderDummycode.class)) + offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1; + } + }else{ + pool.submitAllAndWait(getApplyTasks(in, out, outputCol)); + } } catch(ExecutionException | InterruptedException e) { - LOG.error("MT Column encode failed"); + LOG.error("MT Column apply failed"); e.printStackTrace(); } pool.shutdown(); } private static void outputMatrixPreProcessing(MatrixBlock output, FrameBlock input) { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; output.allocateBlock(); if(output.isInSparseFormat()) { SparseBlock block = output.getSparseBlock(); @@ -300,8 +314,33 @@ private static void outputMatrixPreProcessing(MatrixBlock output, FrameBlock inp // allocate all sparse rows so MT sync can be done. // should be rare that rows have only 0 block.allocate(r, input.getNumColumns()); + // Setting the size here makes it possible to run all sparse apply tasks without any sync + // could become problematic if the input is very sparse since we allocate the same size as the input + // should be fine in theory ;) + ((SparseRowVector)block.get(r)).setSize(input.getNumColumns()); + } + } + if(DMLScript.STATISTICS) + Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0); + } + + private void outputMatrixPostProcessing(MatrixBlock output){ + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + Set indexSet = getColumnEncoders(ColumnEncoderPassThrough.class).stream() + .map(ColumnEncoderPassThrough::getSparseRowsWZeros).flatMap(l -> { + if(l == null) + return null; + return l.stream(); + }).collect(Collectors.toSet()); + if(!indexSet.stream().allMatch(Objects::isNull)){ + for(Integer row : indexSet){ + // TODO: Maybe MT in special cases when the number of rows is large + output.getSparseBlock().get(row).compact(); } } + output.recomputeNonZeros(); + if(DMLScript.STATISTICS) + Statistics.incTransformOutMatrixPostProcessingTime(System.nanoTime()-t0); } @Override @@ -660,7 +699,7 @@ public void applyColumnOffset() { } /* - * Currently not in use will be integrated in the future + * Currently, not in use will be integrated in the future */ @SuppressWarnings("unused") private static class MultiColumnLegacyBuildTask implements Callable { diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java index 5aff39b9844..17351a622be 100644 --- a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java +++ b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java @@ -25,16 +25,20 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.sysds.runtime.DMLRuntimeException; -public class DependencyTask implements Callable { +public class DependencyTask implements Comparable>, Callable { public static final boolean ENABLE_DEBUG_DATA = false; + protected static final Log LOG = LogFactory.getLog(DependencyTask.class.getName()); private final Callable _task; protected final List> _dependantTasks; public List> _dependencyTasks = null; // only for debugging private CompletableFuture> _future; private int _rdy = 0; + private Integer _priority = 0; private ExecutorService _pool; public DependencyTask(Callable task, List> dependantTasks) { @@ -54,6 +58,10 @@ public boolean isReady() { return _rdy == 0; } + public void setPriority(int priority) { + _priority = priority; + } + private boolean decrease() { synchronized(this) { _rdy -= 1; @@ -68,7 +76,11 @@ public void addDependent(DependencyTask dependencyTask) { @Override public E call() throws Exception { + LOG.debug("Executing Task: " + this); + long t0 = System.nanoTime(); E ret = _task.call(); + LOG.debug("Finished Task: " + this + " in: " + + (String.format("%.3f", (System.nanoTime()-t0)*1e-9)) + "sec."); _dependantTasks.forEach(t -> { if(t.decrease()) { if(_pool == null) @@ -79,4 +91,14 @@ public E call() throws Exception { return ret; } + + @Override + public String toString(){ + return _task.toString() + "" + ""; + } + + @Override + public int compareTo(DependencyTask task) { + return -1 * this._priority.compareTo(task._priority); + } } diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java index 4fdd63a36f5..50675d69c53 100644 --- a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java +++ b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java @@ -19,7 +19,12 @@ package org.apache.sysds.runtime.util; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.DMLRuntimeException; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,11 +36,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.sysds.runtime.DMLRuntimeException; - public class DependencyThreadPool { + protected static final Log LOG = LogFactory.getLog(DependencyThreadPool.class.getName()); private final ExecutorService _pool; public DependencyThreadPool(int k) { @@ -50,6 +54,8 @@ public List>> submitAll(List> dtasks) { List>> futures = new ArrayList<>(); List rdyTasks = new ArrayList<>(); int i = 0; + // sort by priority + Collections.sort(dtasks); for(DependencyTask t : dtasks) { CompletableFuture> f = new CompletableFuture<>(); t.addPool(_pool); @@ -63,6 +69,8 @@ public List>> submitAll(List> dtasks) { futures.add(f); i++; } + LOG.debug("Initial Starting tasks: \n\t" + + rdyTasks.stream().map(index -> dtasks.get(index).toString()).collect(Collectors.joining("\n\t"))); // Two stages to avoid race condition! for(Integer index : rdyTasks) { synchronized(_pool) { diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java index de6d7d6ac2d..7431a820c7d 100644 --- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java @@ -989,6 +989,15 @@ public static int getEndIndex(int arrayLength, int startIndex, int blockSize){ return (blockSize <= 0)? arrayLength: Math.min(arrayLength, startIndex + blockSize); } + public static int[] getBlockSizes(int num, int numBlocks){ + int[] blockSizes = new int[numBlocks]; + Arrays.fill(blockSizes, num/numBlocks); + for (int i = 0; i < num%numBlocks; i++){ + blockSizes[i]++; + } + return blockSizes; + } + public static String[] splitRecodeEntry(String s) { //forward to column encoder, as UtilFunctions available in map context return ColumnEncoderRecode.splitRecodeMapEntry(s); diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java b/src/main/java/org/apache/sysds/utils/Statistics.java index cacd2f7d10d..d91d9c511a5 100644 --- a/src/main/java/org/apache/sysds/utils/Statistics.java +++ b/src/main/java/org/apache/sysds/utils/Statistics.java @@ -19,20 +19,6 @@ package org.apache.sysds.utils; -import java.lang.management.CompilationMXBean; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.text.DecimalFormat; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.DoubleAdder; -import java.util.concurrent.atomic.LongAdder; - import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.api.DMLScript; @@ -51,6 +37,20 @@ import org.apache.sysds.runtime.lineage.LineageCacheStatistics; import org.apache.sysds.runtime.privacy.CheckedConstraintsLog; +import java.lang.management.CompilationMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.text.DecimalFormat; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.DoubleAdder; +import java.util.concurrent.atomic.LongAdder; + /** * This class captures all statistics. */ @@ -149,7 +149,28 @@ private static class InstStats { private static final LongAdder lTotalUIPVar = new LongAdder(); private static final LongAdder lTotalLix = new LongAdder(); private static final LongAdder lTotalLixUIP = new LongAdder(); - + + // Transformencode stats + private static final LongAdder transformEncoderCount = new LongAdder(); + + //private static final LongAdder transformBuildTime = new LongAdder(); + private static final LongAdder transformRecodeBuildTime = new LongAdder(); + private static final LongAdder transformBinningBuildTime = new LongAdder(); + private static final LongAdder transformImputeBuildTime = new LongAdder(); + + //private static final LongAdder transformApplyTime = new LongAdder(); + private static final LongAdder transformRecodeApplyTime = new LongAdder(); + private static final LongAdder transformDummyCodeApplyTime = new LongAdder(); + private static final LongAdder transformPassThroughApplyTime = new LongAdder(); + private static final LongAdder transformFeatureHashingApplyTime = new LongAdder(); + private static final LongAdder transformBinningApplyTime = new LongAdder(); + private static final LongAdder transformOmitApplyTime = new LongAdder(); + private static final LongAdder transformImputeApplyTime = new LongAdder(); + + + private static final LongAdder transformOutMatrixPreProcessingTime = new LongAdder(); + private static final LongAdder transformOutMatrixPostProcessingTime = new LongAdder(); + // Federated stats private static final LongAdder federatedReadCount = new LongAdder(); private static final LongAdder federatedPutCount = new LongAdder(); @@ -649,6 +670,70 @@ public static void accFedPSGradientWeightingTime(long t) { public static void accFedPSCommunicationTime(long t) { fedPSCommunicationTime.add(t);} + public static void incTransformEncoderCount(long encoders){ + transformEncoderCount.add(encoders); + } + + public static void incTransformRecodeApplyTime(long t){ + transformRecodeApplyTime.add(t); + } + + public static void incTransformDummyCodeApplyTime(long t){ + transformDummyCodeApplyTime.add(t); + } + + public static void incTransformBinningApplyTime(long t){ + transformBinningApplyTime.add(t); + } + + public static void incTransformPassThroughApplyTime(long t){ + transformPassThroughApplyTime.add(t); + } + + public static void incTransformFeatureHashingApplyTime(long t){ + transformFeatureHashingApplyTime.add(t); + } + + public static void incTransformOmitApplyTime(long t) { + transformOmitApplyTime.add(t); + } + + public static void incTransformImputeApplyTime(long t) { + transformImputeApplyTime.add(t); + } + + public static void incTransformRecodeBuildTime(long t){ + transformRecodeBuildTime.add(t); + } + + public static void incTransformBinningBuildTime(long t){ + transformBinningBuildTime.add(t); + } + + public static void incTransformImputeBuildTime(long t) { + transformImputeBuildTime.add(t); + } + + public static void incTransformOutMatrixPreProcessingTime(long t){ + transformOutMatrixPreProcessingTime.add(t); + } + + public static void incTransformOutMatrixPostProcessingTime(long t){ + transformOutMatrixPostProcessingTime.add(t); + } + + public static long getTransformEncodeBuildTime(){ + return transformBinningBuildTime.longValue() + transformImputeBuildTime.longValue() + + transformRecodeBuildTime.longValue(); + } + + public static long getTransformEncodeApplyTime(){ + return transformDummyCodeApplyTime.longValue() + transformBinningApplyTime.longValue() + + transformFeatureHashingApplyTime.longValue() + transformPassThroughApplyTime.longValue() + + transformRecodeApplyTime.longValue() + transformOmitApplyTime.longValue() + + transformImputeApplyTime.longValue(); + } + public static String getCPHeavyHitterCode( Instruction inst ) { String opcode = null; @@ -1129,6 +1214,50 @@ public static String display(int maxHeavyHitters) federatedExecuteInstructionCount.longValue() + "/" + federatedExecuteUDFCount.longValue() + ".\n"); } + if( transformEncoderCount.longValue() > 0) { + //TODO: Cleanup and condense + sb.append("TransformEncode num. encoders:\t").append(transformEncoderCount.longValue()).append("\n"); + sb.append("TransformEncode build time:\t").append(String.format("%.3f", + getTransformEncodeBuildTime()*1e-9)).append(" sec.\n"); + if(transformRecodeBuildTime.longValue() > 0) + sb.append("\tRecode build time:\t").append(String.format("%.3f", + transformRecodeBuildTime.longValue()*1e-9)).append(" sec.\n"); + if(transformBinningBuildTime.longValue() > 0) + sb.append("\tBinning build time:\t").append(String.format("%.3f", + transformBinningBuildTime.longValue()*1e-9)).append(" sec.\n"); + if(transformImputeBuildTime.longValue() > 0) + sb.append("\tImpute build time:\t").append(String.format("%.3f", + transformImputeBuildTime.longValue()*1e-9)).append(" sec.\n"); + + sb.append("TransformEncode apply time:\t").append(String.format("%.3f", + getTransformEncodeApplyTime()*1e-9)).append(" sec.\n"); + if(transformRecodeApplyTime.longValue() > 0) + sb.append("\tRecode apply time:\t").append(String.format("%.3f", + transformRecodeApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformBinningApplyTime.longValue() > 0) + sb.append("\tBinning apply time:\t").append(String.format("%.3f", + transformBinningApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformDummyCodeApplyTime.longValue() > 0) + sb.append("\tDummyCode apply time:\t").append(String.format("%.3f", + transformDummyCodeApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformFeatureHashingApplyTime.longValue() > 0) + sb.append("\tHashing apply time:\t").append(String.format("%.3f", + transformFeatureHashingApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformPassThroughApplyTime.longValue() > 0) + sb.append("\tPassThrough apply time:\t").append(String.format("%.3f", + transformPassThroughApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformOmitApplyTime.longValue() > 0) + sb.append("\tOmit apply time:\t").append(String.format("%.3f", + transformOmitApplyTime.longValue()*1e-9)).append(" sec.\n"); + if(transformImputeApplyTime.longValue() > 0) + sb.append("\tImpute apply time:\t").append(String.format("%.3f", + transformImputeApplyTime.longValue()*1e-9)).append(" sec.\n"); + + sb.append("TransformEncode PreProc. time:\t").append(String.format("%.3f", + transformOutMatrixPreProcessingTime.longValue()*1e-9)).append(" sec.\n"); + sb.append("TransformEncode PostProc. time:\t").append(String.format("%.3f", + transformOutMatrixPostProcessingTime.longValue()*1e-9)).append(" sec.\n"); + } if(ConfigurationManager.isCompressionEnabled()){ DMLCompressionStatistics.display(sb); diff --git a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java index 8824b9d8d10..b70571b3b51 100644 --- a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java +++ b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java @@ -30,6 +30,7 @@ import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; import org.apache.sysds.runtime.io.FrameReaderFactory; import org.apache.sysds.runtime.matrix.data.FrameBlock; +import org.apache.sysds.runtime.transform.encode.ColumnEncoder; import org.apache.sysds.runtime.transform.encode.ColumnEncoderBin; import org.apache.sysds.runtime.transform.encode.ColumnEncoderRecode; import org.apache.sysds.runtime.transform.encode.EncoderFactory; @@ -173,6 +174,7 @@ private void runTransformTest(Types.ExecMode rt, String ofmt, TransformType type .readFrameFromHDFS(DATASET, -1L, -1L); StringBuilder specSb = new StringBuilder(); Files.readAllLines(Paths.get(SPEC)).forEach(s -> specSb.append(s).append("\n")); + ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN = Math.max(blockSize, 1); MultiColumnEncoder encoderS = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(), input.getNumColumns(), null); MultiColumnEncoder encoderM = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(), diff --git a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java index 21562505a79..6679f36ccd6 100644 --- a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java +++ b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java @@ -48,7 +48,7 @@ public class TransformFrameEncodeMultithreadedTest extends AutomatedTestBase { private final static String DATASET1 = "homes3/homes.csv"; private final static String SPEC1 = "homes3/homes.tfspec_recode.json"; private final static String SPEC2 = "homes3/homes.tfspec_dummy.json"; - private final static String SPEC2all = "homes3/homes.tfspec_dummy_all.json"; + private final static String SPEC2sparse = "homes3/homes.tfspec_dummy_sparse.json"; private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // recode private final static String SPEC6 = "homes3/homes.tfspec_recode_dummy.json"; private final static String SPEC7 = "homes3/homes.tfspec_binDummy.json"; // recode+dummy @@ -164,7 +164,7 @@ private void runTransformTest(ExecMode rt, String ofmt, TransformType type, bool DATASET = DATASET1; break; case DUMMY_ALL: - SPEC = SPEC2all; + SPEC = SPEC2sparse; DATASET = DATASET1; break; case BIN: diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json b/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json deleted file mode 100644 index 65b8fee7d4d..00000000000 --- a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json +++ /dev/null @@ -1 +0,0 @@ -{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 5, 6, 8, 9 ] } \ No newline at end of file diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json new file mode 100644 index 00000000000..ed48308408a --- /dev/null +++ b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json @@ -0,0 +1 @@ +{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 6, 8, 9 ] } \ No newline at end of file