Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -963,18 +963,18 @@ public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, Matri
}

@Override
public MatrixValue aggregateBinaryOperations(MatrixValue mv1, MatrixValue mv2, MatrixValue result, AggregateBinaryOperator op)
public MatrixValue aggregateBinaryOperations(MatrixValue mv1, MatrixValue mv2, MatrixValue result, AggregateBinaryOperator op, boolean useNativeBLAS)
throws DMLRuntimeException
{
//call uncompressed matrix mult if necessary
if( !isCompressed() ) {
return super.aggregateBinaryOperations(mv1, mv2, result, op);
return super.aggregateBinaryOperations(mv1, mv2, result, op, useNativeBLAS);
}

//multi-threaded mm of single uncompressed colgroup
if( isSingleUncompressedGroup() ){
MatrixBlock tmp = ((ColGroupUncompressed)_colGroups.get(0)).getData();
return tmp.aggregateBinaryOperations(this==mv1?tmp:mv1, this==mv2?tmp:mv2, result, op);
return tmp.aggregateBinaryOperations(this==mv1?tmp:mv1, this==mv2?tmp:mv2, result, op, useNativeBLAS);
}

Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
Expand Down Expand Up @@ -2003,12 +2003,12 @@ public MatrixValue sortOperations(MatrixValue weights, MatrixValue result)
@Override
public MatrixValue aggregateBinaryOperations(MatrixIndexes m1Index,
MatrixValue m1Value, MatrixIndexes m2Index, MatrixValue m2Value,
MatrixValue result, AggregateBinaryOperator op)
MatrixValue result, AggregateBinaryOperator op, boolean useNativeBLAS)
throws DMLRuntimeException {
printDecompressWarning("aggregateBinaryOperations");
MatrixBlock left = isCompressed() ? decompress() : this;
MatrixBlock right = getUncompressed(m2Value);
return left.aggregateBinaryOperations(m1Index, left, m2Index, right, result, op);
return left.aggregateBinaryOperations(m1Index, left, m2Index, right, result, op, useNativeBLAS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public void processInstruction(ExecutionContext ec)
//get inputs
MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
//compute matrix multiplication
AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr;
MatrixBlock soresBlock = null;
if( matBlock2 instanceof CompressedMatrixBlock )
soresBlock = (MatrixBlock) (matBlock2.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op));
soresBlock = (MatrixBlock) (matBlock2.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op, NativeHelper.isNativeLibraryLoaded()));
else {
soresBlock = (MatrixBlock) (matBlock1.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op, NativeHelper.isNativeLibraryLoaded()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class AggregateBinaryInstruction extends BinaryMRInstructionBase implemen
{
private String _opcode = null;

boolean useNativeBLAS = false; // Since MR is in maintenance mode

//optional argument for cpmm
private MMCJType _aggType = MMCJType.AGG;

Expand Down Expand Up @@ -166,7 +168,7 @@ public void processInstruction(Class<? extends MatrixValue> valueClass,
in1.getIndexes(), in1.getValue(),
in2.getIndexes(), in2.getValue(),
out.getIndexes(), out.getValue(),
((AggregateBinaryOperator)optr));
((AggregateBinaryOperator)optr), useNativeBLAS);

//put the output value in the cache
if(out==tempValue)
Expand Down Expand Up @@ -213,7 +215,7 @@ private void processMapMultInstruction(Class<? extends MatrixValue> valueClass,
//process instruction
OperationsOnMatrixValues.performAggregateBinary(in1.getIndexes(), in1.getValue(),
in2BlockIndex, in2BlockValue, out.getIndexes(), out.getValue(),
((AggregateBinaryOperator)optr));
((AggregateBinaryOperator)optr), useNativeBLAS);

removeOutput &= ( !_outputEmptyBlocks && out.getValue().isEmpty() );
}
Expand Down Expand Up @@ -242,7 +244,7 @@ private void processMapMultInstruction(Class<? extends MatrixValue> valueClass,
OperationsOnMatrixValues.performAggregateBinary(in1BlockIndex, in1BlockValue,
in2.getIndexes(), in2.getValue(),
out.getIndexes(), out.getValue(),
((AggregateBinaryOperator)optr));
((AggregateBinaryOperator)optr), useNativeBLAS);

removeOutput &= ( !_outputEmptyBlocks && out.getValue().isEmpty() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.utils.NativeHelper;

/**
* Cpmm: cross-product matrix multiplication operation (distributed matrix multiply
Expand Down Expand Up @@ -98,7 +99,7 @@ public void processInstruction(ExecutionContext ec)
JavaPairRDD<Long, IndexedMatrixValue> tmp2 = in2.mapToPair(new CpmmIndexFunction(false));
JavaPairRDD<MatrixIndexes,MatrixBlock> out = tmp1
.join(tmp2) // join over common dimension
.mapToPair(new CpmmMultiplyFunction()); // compute block multiplications
.mapToPair(new CpmmMultiplyFunction(NativeHelper.isNativeLibraryLoaded())); // compute block multiplications

//process cpmm aggregation and handle outputs
if( _aggtype == SparkAggType.SINGLE_BLOCK )
Expand Down Expand Up @@ -150,11 +151,13 @@ private static class CpmmMultiplyFunction implements PairFunction<Tuple2<Long, T
private static final long serialVersionUID = -2009255629093036642L;

private AggregateBinaryOperator _op = null;
private boolean _useNativeBLAS;

public CpmmMultiplyFunction()
public CpmmMultiplyFunction(boolean useNativeBLAS)
{
AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject());
_op = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
_useNativeBLAS = useNativeBLAS;
}

@Override
Expand All @@ -167,7 +170,7 @@ public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<Long, Tuple2<IndexedMatrix
MatrixBlock blkOut = new MatrixBlock();

//core block matrix multiplication
blkIn1.aggregateBinaryOperations(blkIn1, blkIn2, blkOut, _op);
blkIn1.aggregateBinaryOperations(blkIn1, blkIn2, blkOut, _op, _useNativeBLAS);

//return target block
ixOut.setIndexes(arg0._2()._1().getIndexes().getRowIndex(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.utils.NativeHelper;

public class MapmmSPInstruction extends BinarySPInstruction
{
Expand Down Expand Up @@ -138,7 +139,7 @@ public void processInstruction(ExecutionContext ec)
//execute mapmm and aggregation if necessary and put output into symbol table
if( _aggtype == SparkAggType.SINGLE_BLOCK )
{
JavaRDD<MatrixBlock> out = in1.map(new RDDMapMMFunction2(type, in2));
JavaRDD<MatrixBlock> out = in1.map(new RDDMapMMFunction2(type, in2, NativeHelper.isNativeLibraryLoaded()));
MatrixBlock out2 = RDDAggregateUtils.sumStable(out);

//put output block into symbol table (no lineage because single block)
Expand All @@ -155,12 +156,12 @@ public void processInstruction(ExecutionContext ec)
+numParts+" partitions to satisfy size restrictions of output partitions.");
in1 = in1.repartition(numParts);
}
out = in1.flatMapToPair( new RDDFlatMapMMFunction(type, in2) );
out = in1.flatMapToPair( new RDDFlatMapMMFunction(type, in2, NativeHelper.isNativeLibraryLoaded()) );
}
else if( preservesPartitioning(mcRdd, type) )
out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(type, in2), true);
out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(type, in2, NativeHelper.isNativeLibraryLoaded()), true);
else
out = in1.mapToPair( new RDDMapMMFunction(type, in2) );
out = in1.mapToPair( new RDDMapMMFunction(type, in2, NativeHelper.isNativeLibraryLoaded()) );

//empty output block filter
if( !_outputEmpty )
Expand Down Expand Up @@ -252,11 +253,13 @@ private static class RDDMapMMFunction implements PairFunction<Tuple2<MatrixIndex
private final CacheType _type;
private final AggregateBinaryOperator _op;
private final PartitionedBroadcast<MatrixBlock> _pbc;
private boolean _useNativeBLAS;

public RDDMapMMFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput )
public RDDMapMMFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput, boolean useNativeBLAS )
{
_type = type;
_pbc = binput;
_useNativeBLAS = useNativeBLAS;

//created operator for reuse
AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject());
Expand All @@ -280,7 +283,7 @@ public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBloc

//execute matrix-vector mult
OperationsOnMatrixValues.performAggregateBinary(
new MatrixIndexes(1,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op);
new MatrixIndexes(1,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op, _useNativeBLAS);
}
else //if( _type == CacheType.RIGHT )
{
Expand All @@ -289,7 +292,7 @@ public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBloc

//execute matrix-vector mult
OperationsOnMatrixValues.performAggregateBinary(
ixIn, blkIn, new MatrixIndexes(ixIn.getColumnIndex(),1), right, ixOut, blkOut, _op);
ixIn, blkIn, new MatrixIndexes(ixIn.getColumnIndex(),1), right, ixOut, blkOut, _op, _useNativeBLAS);
}

//output new tuple
Expand All @@ -307,11 +310,13 @@ private static class RDDMapMMFunction2 implements Function<Tuple2<MatrixIndexes,
private final CacheType _type;
private final AggregateBinaryOperator _op;
private final PartitionedBroadcast<MatrixBlock> _pbc;
private boolean _useNativeBLAS;

public RDDMapMMFunction2( CacheType type, PartitionedBroadcast<MatrixBlock> binput )
public RDDMapMMFunction2( CacheType type, PartitionedBroadcast<MatrixBlock> binput, boolean useNativeBLAS )
{
_type = type;
_pbc = binput;
_useNativeBLAS = useNativeBLAS;

//created operator for reuse
AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject());
Expand All @@ -332,7 +337,7 @@ public MatrixBlock call( Tuple2<MatrixIndexes, MatrixBlock> arg0 )

//execute matrix-vector mult
return (MatrixBlock) OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(
left, blkIn, new MatrixBlock(), _op);
left, blkIn, new MatrixBlock(), _op, _useNativeBLAS);
}
else //if( _type == CacheType.RIGHT )
{
Expand All @@ -341,7 +346,7 @@ public MatrixBlock call( Tuple2<MatrixIndexes, MatrixBlock> arg0 )

//execute matrix-vector mult
return (MatrixBlock) OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(
blkIn, right, new MatrixBlock(), _op);
blkIn, right, new MatrixBlock(), _op, _useNativeBLAS);
}
}
}
Expand All @@ -353,11 +358,13 @@ private static class RDDMapMMPartitionFunction implements PairFlatMapFunction<It
private final CacheType _type;
private final AggregateBinaryOperator _op;
private final PartitionedBroadcast<MatrixBlock> _pbc;
private boolean _useNativeBLAS;

public RDDMapMMPartitionFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput )
public RDDMapMMPartitionFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput, boolean useNativeBLAS )
{
_type = type;
_pbc = binput;
_useNativeBLAS = useNativeBLAS;

//created operator for reuse
AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject());
Expand All @@ -368,7 +375,7 @@ public RDDMapMMPartitionFunction( CacheType type, PartitionedBroadcast<MatrixBlo
public LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0)
throws Exception
{
return new MapMMPartitionIterator(arg0);
return new MapMMPartitionIterator(arg0, _useNativeBLAS);
}

/**
Expand All @@ -378,8 +385,10 @@ public LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tu
*/
private class MapMMPartitionIterator extends LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>>
{
public MapMMPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) {
boolean _useNativeBLAS;
public MapMMPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in, boolean useNativeBLAS) {
super(in);
_useNativeBLAS = useNativeBLAS;
}

@Override
Expand All @@ -396,15 +405,15 @@ protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, M
MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex());

//execute index preserving matrix multiplication
left.aggregateBinaryOperations(left, blkIn, blkOut, _op);
left.aggregateBinaryOperations(left, blkIn, blkOut, _op, _useNativeBLAS);
}
else //if( _type == CacheType.RIGHT )
{
//get the right hand side matrix
MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1);

//execute index preserving matrix multiplication
blkIn.aggregateBinaryOperations(blkIn, right, blkOut, _op);
blkIn.aggregateBinaryOperations(blkIn, right, blkOut, _op, _useNativeBLAS);
}

return new Tuple2<MatrixIndexes,MatrixBlock>(ixIn, blkOut);
Expand All @@ -419,11 +428,13 @@ private static class RDDFlatMapMMFunction implements PairFlatMapFunction<Tuple2<
private final CacheType _type;
private final AggregateBinaryOperator _op;
private final PartitionedBroadcast<MatrixBlock> _pbc;
private boolean _useNativeBLAS;

public RDDFlatMapMMFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput )
public RDDFlatMapMMFunction( CacheType type, PartitionedBroadcast<MatrixBlock> binput, boolean useNativeBLAS )
{
_type = type;
_pbc = binput;
_useNativeBLAS = useNativeBLAS;

//created operator for reuse
AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject());
Expand Down Expand Up @@ -451,7 +462,7 @@ public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes,

//execute matrix-vector mult
OperationsOnMatrixValues.performAggregateBinary(
new MatrixIndexes(i,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op);
new MatrixIndexes(i,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op, _useNativeBLAS);

ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut));
}
Expand All @@ -469,7 +480,7 @@ public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes,

//execute matrix-vector mult
OperationsOnMatrixValues.performAggregateBinary(
ixIn, blkIn, new MatrixIndexes(ixIn.getColumnIndex(),j), right, ixOut, blkOut, _op);
ixIn, blkIn, new MatrixIndexes(ixIn.getColumnIndex(),j), right, ixOut, blkOut, _op, _useNativeBLAS);

ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.utils.NativeHelper;

/**
* This pmapmm matrix multiplication instruction is still experimental
Expand Down Expand Up @@ -119,7 +120,7 @@ public void processInstruction(ExecutionContext ec)

//matrix multiplication
JavaPairRDD<MatrixIndexes,MatrixBlock> rdd2 = in2
.flatMapToPair(new PMapMMFunction(bpmb, i/mc1.getRowsPerBlock()));
.flatMapToPair(new PMapMMFunction(bpmb, i/mc1.getRowsPerBlock(), NativeHelper.isNativeLibraryLoaded()));
rdd2 = RDDAggregateUtils.sumByKeyStable(rdd2, false);
rdd2.persist(pmapmmStorageLevel)
.count();
Expand Down Expand Up @@ -171,11 +172,13 @@ private static class PMapMMFunction implements PairFlatMapFunction<Tuple2<Matrix
private AggregateBinaryOperator _op = null;
private Broadcast<PartitionedBlock<MatrixBlock>> _pbc = null;
private long _offset = -1;
private boolean _useNativeBLAS;

public PMapMMFunction( Broadcast<PartitionedBlock<MatrixBlock>> binput, long offset )
public PMapMMFunction( Broadcast<PartitionedBlock<MatrixBlock>> binput, long offset, boolean useNativeBLAS )
{
_pbc = binput;
_offset = offset;
_useNativeBLAS = useNativeBLAS;

//created operator for reuse
AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject());
Expand All @@ -202,7 +205,7 @@ public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, M

//execute matrix-vector mult
OperationsOnMatrixValues.performAggregateBinary(
new MatrixIndexes(i,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op);
new MatrixIndexes(i,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op, _useNativeBLAS);

//output new tuple
ixOut.setIndexes(_offset+i, ixOut.getColumnIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.utils.NativeHelper;

public class RmmSPInstruction extends BinarySPInstruction
{
Expand Down Expand Up @@ -94,7 +95,7 @@ public void processInstruction(ExecutionContext ec)
//step 2: join prepared datasets, multiply, and aggregate
JavaPairRDD<MatrixIndexes,MatrixBlock> out =
tmp1.join( tmp2 ) //join by result block
.mapToPair( new RmmMultiplyFunction() ); //do matrix multiplication
.mapToPair( new RmmMultiplyFunction(NativeHelper.isNativeLibraryLoaded()) ); //do matrix multiplication
out = RDDAggregateUtils.sumByKeyStable(out, false); //aggregation per result block

//put output block into symbol table (no lineage because single block)
Expand Down Expand Up @@ -162,11 +163,13 @@ private static class RmmMultiplyFunction implements PairFunction<Tuple2<TripleIn
private static final long serialVersionUID = -5772410117511730911L;

private AggregateBinaryOperator _op = null;
private boolean _useNativeBLAS;

public RmmMultiplyFunction()
public RmmMultiplyFunction(boolean useNativeBLAS)
{
AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject());
_op = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
_useNativeBLAS = useNativeBLAS;
}

@Override
Expand All @@ -181,7 +184,7 @@ public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<TripleIndexes, Tuple2<Mat
MatrixBlock blkOut = new MatrixBlock();

//core block matrix multiplication
blkIn1.aggregateBinaryOperations(blkIn1, blkIn2, blkOut, _op);
blkIn1.aggregateBinaryOperations(blkIn1, blkIn2, blkOut, _op, _useNativeBLAS);

//output new tuple
return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut);
Expand Down
Loading