Skip to content

Commit

Permalink
[SYSTEMDS-3118] Extended parfor parser/runtime (frame result variables)
Browse files Browse the repository at this point in the history
This patch extends parfor by support for frame results variables during
dependency analysis and merge of worker result variables. So far, this
captures only in-memory frame result merge.
  • Loading branch information
mboehm7 committed Sep 2, 2021
1 parent 4497199 commit 60d16c4
Show file tree
Hide file tree
Showing 16 changed files with 408 additions and 134 deletions.
Expand Up @@ -677,7 +677,7 @@ else if( s instanceof FunctionStatement ) {
for(DataIdentifier write : datsUpdated) {
if( !c._var.equals( write.getName() ) ) continue;

if( cdt != DataType.MATRIX && cdt != DataType.LIST ) {
if( cdt != DataType.MATRIX && cdt != DataType.FRAME && cdt != DataType.LIST ) {
//cannot infer type, need to exit (conservative approach)
throw new LanguageException("PARFOR loop dependency analysis: cannot check "
+ "for dependencies due to unknown datatype of var '"+c._var+"': "+cdt.name()+".");
Expand Down Expand Up @@ -716,6 +716,7 @@ else if(runBanerjeeGCDTest( c._dat, dat2 )) {
return;
}
else if( (cdt == DataType.MATRIX && dat2dt == DataType.MATRIX)
|| (cdt == DataType.FRAME && dat2dt == DataType.FRAME )
|| (cdt == DataType.LIST && dat2dt == DataType.LIST ) )
{
boolean invalid = false;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.sysds.parser.VariableSet;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
Expand All @@ -51,6 +52,7 @@
import org.apache.sysds.runtime.controlprogram.parfor.RemoteParForJobReturn;
import org.apache.sysds.runtime.controlprogram.parfor.RemoteParForSpark;
import org.apache.sysds.runtime.controlprogram.parfor.ResultMerge;
import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeFrameLocalMemory;
import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalAutomatic;
import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalFile;
import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalMemory;
Expand Down Expand Up @@ -1056,9 +1058,9 @@ private void handleSparkEagerCaching( ExecutionContext ec ) {
* @param out output matrix
* @param in array of input matrix objects
*/
private static void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in, boolean parallel) {
private static void cleanWorkerResultVariables(ExecutionContext ec, CacheableData<?> out, CacheableData<?>[] in, boolean parallel) {
//check for empty inputs (no iterations executed)
Stream<MatrixObject> results = Arrays.stream(in).filter(m -> m!=null && m!=out);
Stream<CacheableData<?>> results = Arrays.stream(in).filter(m -> m!=null && m!=out);
//perform cleanup (parallel to mitigate file deletion bottlenecks)
(parallel ? results.parallel() : results)
.forEach(m -> ec.cleanupCacheableData(m));
Expand Down Expand Up @@ -1307,33 +1309,41 @@ private DataPartitioner createDataPartitioner(PartitionFormat dpf, PDataPartitio
return dp;
}

private ResultMerge createResultMerge( PResultMerge prm, MatrixObject out, MatrixObject[] in, String fname, boolean accum, ExecutionContext ec )
private ResultMerge<?> createResultMerge( PResultMerge prm,
CacheableData<?> out, CacheableData<?>[] in, String fname, boolean accum, ExecutionContext ec )
{
ResultMerge rm = null;
ResultMerge<?> rm = null;

//create result merge implementation (determine degree of parallelism
//only for spark to avoid unnecessary spark context creation)
switch( prm )
{
case LOCAL_MEM:
rm = new ResultMergeLocalMemory( out, in, fname, accum );
break;
case LOCAL_FILE:
rm = new ResultMergeLocalFile( out, in, fname, accum );
break;
case LOCAL_AUTOMATIC:
rm = new ResultMergeLocalAutomatic( out, in, fname, accum );
break;
case REMOTE_SPARK:
int numMap = Math.max(_numThreads,
SparkExecutionContext.getDefaultParallelism(true));
int numRed = numMap; //equal map/reduce
rm = new ResultMergeRemoteSpark( out, in,
fname, accum, ec, numMap, numRed );
break;

default:
throw new DMLRuntimeException("Undefined result merge: '" +prm.toString()+"'.");
if( out instanceof FrameObject ) {
rm = new ResultMergeFrameLocalMemory((FrameObject)out, (FrameObject[])in, fname, accum);
}
else if(out instanceof MatrixObject) {
//create result merge implementation (determine degree of parallelism
//only for spark to avoid unnecessary spark context creation)
switch( prm )
{
case LOCAL_MEM:
rm = new ResultMergeLocalMemory( (MatrixObject)out, (MatrixObject[])in, fname, accum );
break;
case LOCAL_FILE:
rm = new ResultMergeLocalFile( (MatrixObject)out, (MatrixObject[])in, fname, accum );
break;
case LOCAL_AUTOMATIC:
rm = new ResultMergeLocalAutomatic( (MatrixObject)out, (MatrixObject[])in, fname, accum );
break;
case REMOTE_SPARK:
int numMap = Math.max(_numThreads,
SparkExecutionContext.getDefaultParallelism(true));
int numRed = numMap; //equal map/reduce
rm = new ResultMergeRemoteSpark( (MatrixObject)out,
(MatrixObject[])in, fname, accum, ec, numMap, numRed );
break;
default:
throw new DMLRuntimeException("Undefined result merge: '" +prm.toString()+"'.");
}
}
else {
throw new DMLRuntimeException("Unsupported result merge data: "+out.getClass().getSimpleName());
}

return rm;
Expand Down Expand Up @@ -1437,14 +1447,15 @@ private void consolidateAndCheckResults(ExecutionContext ec, long expIters, long
{
Data dat = ec.getVariable(var._name);

if( dat instanceof MatrixObject ) //robustness scalars
if( dat instanceof MatrixObject | dat instanceof FrameObject )
{
MatrixObject out = (MatrixObject) dat;
MatrixObject[] in = Arrays.stream(results).map(vars ->
vars.get(var._name)).toArray(MatrixObject[]::new);
CacheableData<?> out = (CacheableData<?>) dat;
Stream<Object> tmp = Arrays.stream(results).map(vars -> vars.get(var._name));
CacheableData<?>[] in = (dat instanceof MatrixObject) ?
tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new);
String fname = constructResultMergeFileName();
ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec);
MatrixObject outNew = USE_PARALLEL_RESULT_MERGE ?
ResultMerge<?> rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec);
CacheableData<?> outNew = USE_PARALLEL_RESULT_MERGE ?
rm.executeParallelMerge(_numThreads) :
rm.executeSerialMerge();

Expand Down Expand Up @@ -1653,18 +1664,19 @@ public void run()
if( var == LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks)
break;

MatrixObject out = null;
CacheableData<?> out = null;
synchronized( _ec.getVariables() ){
out = _ec.getMatrixObject(var._name);
out = _ec.getCacheableData(var._name);
}

MatrixObject[] in = new MatrixObject[ _refVars.length ];
for( int i=0; i< _refVars.length; i++ )
in[i] = (MatrixObject) _refVars[i].get( var._name );
Stream<Object> tmp = Arrays.stream(_refVars).map(vars -> vars.get(var._name));
CacheableData<?>[] in = (out instanceof MatrixObject) ?
tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new);

String fname = constructResultMergeFileName();

ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec);
MatrixObject outNew = null;
ResultMerge<?> rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec);
CacheableData<?> outNew = null;
if( USE_PARALLEL_RESULT_MERGE )
outNew = rm.executeParallelMerge( _numThreads );
else
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaData;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.util.UtilFunctions;
Expand Down Expand Up @@ -86,6 +87,12 @@ public FrameObject(String fname, MetaData meta, ValueType[] schema) {
*/
public FrameObject(FrameObject fo) {
super(fo);

MetaDataFormat metaOld = (MetaDataFormat) fo.getMetaData();
_metaData = new MetaDataFormat(
new MatrixCharacteristics(metaOld.getDataCharacteristics()),
metaOld.getFileFormat());
_schema = fo._schema.clone();
}

@Override
Expand Down
Expand Up @@ -21,42 +21,33 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;

import java.io.Serializable;
import java.util.List;

/**
* Due to independence of all iterations, any result has the following properties:
* (1) non local var, (2) matrix object, and (3) completely independent.
* These properties allow us to realize result merging in parallel without any synchronization.
*
*/
public abstract class ResultMerge implements Serializable
public abstract class ResultMerge<T extends CacheableData<?>> implements Serializable
{
//note: this class needs to be serializable to ensure that all attributes of
//ResultMergeRemoteSparkWCompare are included in the task closure
private static final long serialVersionUID = 2620430969346516677L;
private static final long serialVersionUID = -6756689640511059030L;

protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName());
protected static final String NAME_SUFFIX = "_rm";
protected static final BinaryOperator PLUS = InstructionUtils.parseBinaryOperator("+");

//inputs to result merge
protected MatrixObject _output = null;
protected MatrixObject[] _inputs = null;
protected String _outputFName = null;
protected boolean _isAccum = false;
protected T _output = null;
protected T[] _inputs = null;
protected String _outputFName = null;
protected boolean _isAccum = false;

protected ResultMerge( ) {
//do nothing
}

public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) {
public ResultMerge( T out, T[] in, String outputFilename, boolean accum ) {
_output = out;
_inputs = in;
_outputFName = outputFilename;
Expand All @@ -70,7 +61,7 @@ public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename,
*
* @return output (merged) matrix
*/
public abstract MatrixObject executeSerialMerge();
public abstract T executeSerialMerge();

/**
* Merge all given input matrices in parallel into the given output matrix.
Expand All @@ -80,67 +71,6 @@ public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename,
* @param par degree of parallelism
* @return output (merged) matrix
*/
public abstract MatrixObject executeParallelMerge( int par );

protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) {
mergeWithoutComp(out, in, appendOnly, false);
}
public abstract T executeParallelMerge(int par);

protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly, boolean par ) {
//pass through to matrix block operations
if( _isAccum )
out.binaryOperationsInPlace(PLUS, in);
else
out.merge(in, appendOnly, par);
}

/**
* NOTE: append only not applicable for wiht compare because output must be populated with
* initial state of matrix - with append, this would result in duplicates.
*
* @param out output matrix block
* @param in input matrix block
* @param compare ?
*/
protected void mergeWithComp( MatrixBlock out, MatrixBlock in, DenseBlock compare )
{
//Notes for result correctness:
// * Always iterate over entire block in order to compare all values
// (using sparse iterator would miss values set to 0)
// * Explicit NaN awareness because for cases were original matrix contains
// NaNs, since NaN != NaN, otherwise we would potentially overwrite results
// * For the case of accumulation, we add out += (new-old) to ensure correct results
// because all inputs have the old values replicated

if( in.isEmptyBlock(false) ) {
if( _isAccum ) return; //nothing to do
for( int i=0; i<in.getNumRows(); i++ )
for( int j=0; j<in.getNumColumns(); j++ )
if( compare.get(i, j) != 0 )
out.quickSetValue(i, j, 0);
}
else { //SPARSE/DENSE
int rows = in.getNumRows();
int cols = in.getNumColumns();
for( int i=0; i<rows; i++ )
for( int j=0; j<cols; j++ ) {
double valOld = compare.get(i,j);
double valNew = in.quickGetValue(i,j); //input value
if( (valNew != valOld && !Double.isNaN(valNew) ) //for changed values
|| Double.isNaN(valNew) != Double.isNaN(valOld) ) //NaN awareness
{
double value = !_isAccum ? valNew :
(out.quickGetValue(i, j) + (valNew - valOld));
out.quickSetValue(i, j, value);
}
}
}
}

protected long computeNonZeros( MatrixObject out, List<MatrixObject> in ) {
//sum of nnz of input (worker result) - output var existing nnz
long outNNZ = out.getDataCharacteristics().getNonZeros();
return outNNZ - in.size() * outNNZ + in.stream()
.mapToLong(m -> m.getDataCharacteristics().getNonZeros()).sum();
}
}

0 comments on commit 60d16c4

Please sign in to comment.