Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-3466] Asynchronous (Future-based) execution of Spark instructions #1733

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class ExecutionContext {
Expand Down Expand Up @@ -601,6 +602,16 @@ public void setMatrixOutputAndLineage(String varName, MatrixBlock outputData, Li
mo.release();
}

public void setMatrixOutput(String varName, Future<MatrixBlock> fmb) {
if (isAutoCreateVars() && !containsVariable(varName)) {
MatrixObject fmo = new MatrixObjectFuture(Types.ValueType.FP64,
OptimizerUtils.getUniqueTempFileName(), fmb);
}
MatrixObject mo = getMatrixObject(varName);
MatrixObjectFuture fmo = new MatrixObjectFuture(mo, fmb);
setVariable(varName, fmo);
}

public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType flag) {
if( isAutoCreateVars() && !containsVariable(varName) )
setVariable(varName, createMatrixObject(outputData));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.runtime.controlprogram.context;

import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;

import java.util.concurrent.Future;

public class MatrixObjectFuture extends MatrixObject
{
protected Future<MatrixBlock> _futureData;

public MatrixObjectFuture(ValueType vt, String file, Future<MatrixBlock> fmb) {
super(vt, file, null);
_futureData = fmb;
}

public MatrixObjectFuture(MatrixObject mo, Future<MatrixBlock> fmb) {
super(mo.getValueType(), mo.getFileName(), mo.getMetaData());
_futureData = fmb;
}

MatrixBlock getMatrixBlock() {
try {
return _futureData.get();
}
catch(Exception e) {
throw new DMLRuntimeException(e);
}
}

public MatrixBlock acquireRead() {
return acquireReadIntern();
}

private synchronized MatrixBlock acquireReadIntern() {
try {
if(!isAvailableToRead())
throw new DMLRuntimeException("MatrixObject not available to read.");
if(_data != null)
throw new DMLRuntimeException("_data must be null for future matrix object/block.");
acquire(false, false);
return _futureData.get();
}

catch(Exception e) {
throw new DMLRuntimeException(e);
}
}

public void release() {
releaseIntern();
}

private synchronized void releaseIntern() {
_futureData = null;
}

public synchronized void clearData(long tid) {
_data = null;
_futureData = null;
clearCache();
setCacheLineage(null);
setDirty(false);
setEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.spark.api.java.function.PairFunction;
import org.apache.sysds.common.Types;
import org.apache.sysds.common.Types.CorrectionLocationType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
Expand All @@ -43,9 +44,15 @@
import org.apache.sysds.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.util.CommonThreadPool;
import scala.Tuple2;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AggregateUnarySPInstruction extends UnarySPInstruction {
private SparkAggType _aggtype = null;
private AggregateOperator _aop = null;
Expand Down Expand Up @@ -102,19 +109,36 @@ private void processMatrixAggregate(ExecutionContext ec) {
//perform aggregation if necessary and put output into symbol table
if( _aggtype == SparkAggType.SINGLE_BLOCK )
{
if( auop.sparseSafe )
out = out.filter(new FilterNonEmptyBlocksFunction());
if (ConfigurationManager.isPrefetchEnabled()) {
//Trigger the chain of Spark operations and maintain a future to the result
//TODO: Make memory for the future matrix block
try {
if(CommonThreadPool.triggerRemoteOPsPool == null)
CommonThreadPool.triggerRemoteOPsPool = Executors.newCachedThreadPool();
RDDAggregateTask task = new RDDAggregateTask(_optr, _aop, in, mc);
Future<MatrixBlock> future_out = CommonThreadPool.triggerRemoteOPsPool.submit(task);
sec.setMatrixOutput(output.getName(), future_out);
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
}

JavaRDD<MatrixBlock> out2 = out.map(
new RDDUAggFunction2(auop, mc.getBlocksize()));
MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, aggop);
else {
if( auop.sparseSafe )
out = out.filter(new FilterNonEmptyBlocksFunction());

//drop correction after aggregation
out3.dropLastRowsOrColumns(aggop.correction);
JavaRDD<MatrixBlock> out2 = out.map(
new RDDUAggFunction2(auop, mc.getBlocksize()));
MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, aggop);

//put output block into symbol table (no lineage because single block)
//this also includes implicit maintenance of matrix characteristics
sec.setMatrixOutput(output.getName(), out3);
//drop correction after aggregation
out3.dropLastRowsOrColumns(aggop.correction);

//put output block into symbol table (no lineage because single block)
//this also includes implicit maintenance of matrix characteristics
sec.setMatrixOutput(output.getName(), out3);
}
}
else //MULTI_BLOCK or NONE
{
Expand Down Expand Up @@ -337,4 +361,36 @@ public TensorBlock call(TensorBlock arg0 )
return out;
}
}

private static class RDDAggregateTask implements Callable<MatrixBlock>
{
Operator _optr;
AggregateOperator _aop;
JavaPairRDD<MatrixIndexes, MatrixBlock> _in;
DataCharacteristics _mc;

RDDAggregateTask(Operator optr, AggregateOperator aop, JavaPairRDD<MatrixIndexes,
MatrixBlock> input, DataCharacteristics dc) {
_optr = optr;
_aop = aop;
_in = input;
_mc = dc;
}

@Override
public MatrixBlock call() {
AggregateUnaryOperator auop = (AggregateUnaryOperator)_optr;
JavaPairRDD<MatrixIndexes,MatrixBlock> out = _in;
if( auop.sparseSafe )
out = out.filter(new FilterNonEmptyBlocksFunction());

JavaRDD<MatrixBlock> out2 = out.map(
new RDDUAggFunction2(auop, _mc.getBlocksize()));
MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, _aop);

//drop correction after aggregation
out3.dropLastRowsOrColumns(_aop.correction);
return out3;
}
}
}