Skip to content

Commit

Permalink
[SYSTEMDS-3143] Frame rm empty instruction
Browse files Browse the repository at this point in the history
This commit adds the remove empty instruction to frame, this instruction
was previously only supported on matrices.

Closes #1397
  • Loading branch information
OlgaOvcharenko authored and Baunsgaard committed Nov 5, 2021
1 parent acf723e commit 69d3358
Show file tree
Hide file tree
Showing 7 changed files with 623 additions and 20 deletions.
Expand Up @@ -584,7 +584,8 @@ private void validateRemoveEmpty(DataIdentifier output, boolean conditional) {
+ Arrays.toString(invalid.toArray(new String[0])), false);

//check existence and correctness of arguments
checkTargetParam(getVarParam("target"), conditional);
Expression target = getVarParam("target");
checkEmptyTargetParam(target, conditional);

Expression margin = getVarParam("margin");
if( margin==null ){
Expand All @@ -608,8 +609,11 @@ else if( !(margin instanceof DataIdentifier) && !margin.toString().equals("rows"
_varParams.put("empty.return", new BooleanIdentifier(true));

// Output is a matrix with unknown dims
output.setDataType(DataType.MATRIX);
output.setValueType(ValueType.FP64);
output.setDataType(target.getOutput().getDataType());
if(target.getOutput().getDataType() == DataType.FRAME)
output.setValueType(ValueType.STRING);
else
output.setValueType(ValueType.FP64);
output.setDimensions(-1, -1);
}

Expand Down Expand Up @@ -726,6 +730,12 @@ else if( target.getOutput().getDataType() != DataType.MATRIX )
raiseValidateError("Input matrix 'target' is of type '"+target.getOutput().getDataType()
+"'. Please specify the input matrix.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}

private void checkEmptyTargetParam(Expression target, boolean conditional) {
if( target==null )
raiseValidateError("Named parameter 'target' missing. Please specify the input matrix.",
conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}

private void checkOptionalBooleanParam(Expression param, String name, boolean conditional) {
if( param!=null && (!param.getOutput().getDataType().isScalar() || param.getOutput().getValueType() != ValueType.BOOLEAN) ){
Expand Down
Expand Up @@ -208,21 +208,31 @@ else if(opcode.equalsIgnoreCase("rmempty")) {
String margin = params.get("margin");
if(!(margin.equals("rows") || margin.equals("cols")))
throw new DMLRuntimeException("Unspupported margin identifier '" + margin + "'.");
if(ec.isFrameObject(params.get("target"))) {
FrameBlock target = ec.getFrameInput(params.get("target"));
MatrixBlock select = params.containsKey("select") ? ec.getMatrixInput(params.get("select")) : null;

// acquire locks
MatrixBlock target = ec.getMatrixInput(params.get("target"));
MatrixBlock select = params.containsKey("select") ? ec.getMatrixInput(params.get("select")) : null;
boolean emptyReturn = Boolean.parseBoolean(params.get("empty.return").toLowerCase());
FrameBlock soresBlock = target.removeEmptyOperations(margin.equals("rows"), emptyReturn, select);
ec.setFrameOutput(output.getName(), soresBlock);
ec.releaseFrameInput(params.get("target"));
if(params.containsKey("select"))
ec.releaseMatrixInput(params.get("select"));
} else {
// acquire locks
MatrixBlock target = ec.getMatrixInput(params.get("target"));
MatrixBlock select = params.containsKey("select") ? ec.getMatrixInput(params.get("select")) : null;

// compute the result
boolean emptyReturn = Boolean.parseBoolean(params.get("empty.return").toLowerCase());
MatrixBlock soresBlock = target
.removeEmptyOperations(new MatrixBlock(), margin.equals("rows"), emptyReturn, select);
// compute the result
boolean emptyReturn = Boolean.parseBoolean(params.get("empty.return").toLowerCase());
MatrixBlock soresBlock = target.removeEmptyOperations(new MatrixBlock(), margin.equals("rows"), emptyReturn, select);

// release locks
ec.setMatrixOutput(output.getName(), soresBlock);
ec.releaseMatrixInput(params.get("target"));
if(params.containsKey("select"))
ec.releaseMatrixInput(params.get("select"));
// release locks
ec.setMatrixOutput(output.getName(), soresBlock);
ec.releaseMatrixInput(params.get("target"));
if(params.containsKey("select"))
ec.releaseMatrixInput(params.get("select"));
}
}
else if(opcode.equalsIgnoreCase("replace")) {
if(ec.isFrameObject(params.get("target"))){
Expand Down
Expand Up @@ -28,10 +28,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.zip.Adler32;
import java.util.zip.Checksum;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types;
Expand Down Expand Up @@ -73,6 +75,7 @@
import org.apache.sysds.runtime.transform.encode.EncoderFactory;
import org.apache.sysds.runtime.transform.encode.EncoderOmit;
import org.apache.sysds.runtime.transform.encode.MultiColumnEncoder;
import org.apache.sysds.runtime.util.UtilFunctions;

public class ParameterizedBuiltinFEDInstruction extends ComputationFEDInstruction {
protected final LinkedHashMap<String, String> params;
Expand Down Expand Up @@ -151,7 +154,10 @@ public void processInstruction(ExecutionContext ec) {
out.setFedMapping(mo.getFedMapping().copyWithNewID(fr1.getID()));
}
else if(opcode.equals("rmempty"))
rmempty(ec);
if (getTarget(ec) instanceof FrameObject)
rmemptyFrame(ec);
else
rmemptyMatrix(ec);
else if(opcode.equals("lowertri") || opcode.equals("uppertri"))
triangle(ec, opcode);
else if(opcode.equalsIgnoreCase("transformdecode"))
Expand Down Expand Up @@ -329,7 +335,170 @@ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
}
}

private void rmempty(ExecutionContext ec) {
private void rmemptyFrame(ExecutionContext ec) {
String margin = params.get("margin");
if(!(margin.equals("rows") || margin.equals("cols")))
throw new DMLRuntimeException("Unsupported margin identifier '" + margin + "'.");

FrameObject mo = (FrameObject) getTarget(ec);
MatrixObject select = params.containsKey("select") ? ec.getMatrixObject(params.get("select")) : null;
FrameObject out = ec.getFrameObject(output);

boolean marginRow = params.get("margin").equals("rows");
boolean isNotAligned = ((marginRow && mo.getFedMapping().getType().isColPartitioned()) ||
(!marginRow && mo.getFedMapping().getType().isRowPartitioned()));

MatrixBlock s = new MatrixBlock();
if(select == null && isNotAligned) {
List<MatrixBlock> colSums = new ArrayList<>();
mo.getFedMapping().forEachParallel((range, data) -> {
try {
FederatedResponse response = data
.executeFederatedOperation(new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
new GetFrameVector(data.getVarID(), margin.equals("rows"))))
.get();

if(!response.isSuccessful())
response.throwExceptionFromResponse();
MatrixBlock vector = (MatrixBlock) response.getData()[0];
synchronized(colSums) {
colSums.add(vector);
}
}
catch(Exception e) {
throw new DMLRuntimeException(e);
}
return null;
});
// find empty in matrix
BinaryOperator plus = InstructionUtils.parseBinaryOperator("+");
BinaryOperator greater = InstructionUtils.parseBinaryOperator(">");
s = colSums.get(0);
for(int i = 1; i < colSums.size(); i++)
s = s.binaryOperationsInPlace(plus, colSums.get(i));
s = s.binaryOperationsInPlace(greater, new MatrixBlock(s.getNumRows(), s.getNumColumns(), 0.0));
select = ExecutionContext.createMatrixObject(s);

long varID = FederationUtils.getNextFedDataID();
ec.setVariable(String.valueOf(varID), select);
params.put("select", String.valueOf(varID));
// construct new string
String[] oldString = InstructionUtils.getInstructionParts(instString);
String[] newString = new String[oldString.length + 1];
newString[2] = "select=" + varID;
System.arraycopy(oldString, 0, newString, 0, 2);
System.arraycopy(oldString, 2, newString, 3, newString.length - 3);
instString = instString.replace(InstructionUtils.concatOperands(oldString),
InstructionUtils.concatOperands(newString));
}

if(select == null) {
FederatedRequest fr1 = FederationUtils.callInstruction(instString,
output,
new CPOperand[] {getTargetOperand()},
new long[] {mo.getFedMapping().getID()});
mo.getFedMapping().execute(getTID(), true, fr1);
out.setFedMapping(mo.getFedMapping().copyWithNewID(fr1.getID()));
}
else if(!isNotAligned) {
// construct commands: broadcast , fed rmempty, clean broadcast
FederatedRequest[] fr1 = mo.getFedMapping().broadcastSliced(select, !marginRow);
FederatedRequest fr2 = FederationUtils.callInstruction(instString,
output,
new CPOperand[] {getTargetOperand(),
new CPOperand(params.get("select"), ValueType.FP64, DataType.MATRIX)},
new long[] {mo.getFedMapping().getID(), fr1[0].getID()});

// execute federated operations and set output
mo.getFedMapping().execute(getTID(), true, fr1, fr2);
out.setFedMapping(mo.getFedMapping().copyWithNewID(fr2.getID()));
}
else {
// construct commands: broadcast , fed rmempty, clean broadcast
FederatedRequest fr1 = mo.getFedMapping().broadcast(select);
FederatedRequest fr2 = FederationUtils.callInstruction(instString,
output,
new CPOperand[] {getTargetOperand(),
new CPOperand(params.get("select"), ValueType.FP64, DataType.MATRIX)},
new long[] {mo.getFedMapping().getID(), fr1.getID()});

// execute federated operations and set output
mo.getFedMapping().execute(getTID(), true, fr1, fr2);
out.setFedMapping(mo.getFedMapping().copyWithNewID(fr2.getID()));
}

// new ranges
Map<FederatedRange, int[]> dcs = new HashMap<>();
Map<FederatedRange, int[]> finalDcs1 = dcs;
Map<FederatedRange, ValueType[]> finalSchema = new HashMap<>();
out.getFedMapping().forEachParallel((range, data) -> {
try {
FederatedResponse response = data
.executeFederatedOperation(new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
new GetFrameCharacteristics(data.getVarID())))
.get();

if(!response.isSuccessful())
response.throwExceptionFromResponse();
Object[] ret = response.getData();
int[] subRangeCharacteristics = new int[]{(int) ret[0], (int) ret[1]};
ValueType[] schema = (ValueType[]) ret[2];
synchronized(finalDcs1) {
finalDcs1.put(range, subRangeCharacteristics);
}
synchronized(finalSchema) {
finalSchema.put(range, schema);
}
}
catch(Exception e) {
throw new DMLRuntimeException(e);
}
return null;
});

dcs = finalDcs1;
out.getDataCharacteristics().set(mo.getDataCharacteristics());
int len = marginRow ? mo.getSchema().length : (int) (mo.isFederated(FederationMap.FType.ROW) ? s
.getNonZeros() : finalSchema.values().stream().mapToInt(e -> e.length).sum());
ValueType[] schema = new ValueType[len];
int pos = 0;
for(int i = 0; i < mo.getFedMapping().getFederatedRanges().length; i++) {
FederatedRange federatedRange = new FederatedRange(out.getFedMapping().getFederatedRanges()[i]);

if(marginRow) {
schema = mo.getSchema();
} else if(mo.isFederated(FederationMap.FType.ROW)) {
schema = finalSchema.get(federatedRange);
} else {
ValueType[] tmp = finalSchema.get(federatedRange);
System.arraycopy(tmp, 0, schema, pos, tmp.length);
pos += tmp.length;
}

int[] newRange = dcs.get(federatedRange);
out.getFedMapping().getFederatedRanges()[i].setBeginDim(0,
(out.getFedMapping().getFederatedRanges()[i].getBeginDims()[0] == 0 ||
i == 0) ? 0 : out.getFedMapping().getFederatedRanges()[i - 1].getEndDims()[0]);

out.getFedMapping().getFederatedRanges()[i].setEndDim(0,
out.getFedMapping().getFederatedRanges()[i].getBeginDims()[0] + newRange[0]);

out.getFedMapping().getFederatedRanges()[i].setBeginDim(1,
(out.getFedMapping().getFederatedRanges()[i].getBeginDims()[1] == 0 ||
i == 0) ? 0 : out.getFedMapping().getFederatedRanges()[i - 1].getEndDims()[1]);

out.getFedMapping().getFederatedRanges()[i].setEndDim(1,
out.getFedMapping().getFederatedRanges()[i].getBeginDims()[1] + newRange[1]);
}

out.setSchema(schema);
out.getDataCharacteristics().set(out.getFedMapping().getMaxIndexInRange(0),
out.getFedMapping().getMaxIndexInRange(1),
(int) mo.getBlocksize());
}


private void rmemptyMatrix(ExecutionContext ec) {
String margin = params.get("margin");
if(!(margin.equals("rows") || margin.equals("cols")))
throw new DMLRuntimeException("Unsupported margin identifier '" + margin + "'.");
Expand Down Expand Up @@ -428,7 +597,7 @@ else if(!isNotAligned) {
try {
FederatedResponse response = data
.executeFederatedOperation(new FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
new GetDataCharacteristics(data.getVarID())))
new GetMatrixCharacteristics(data.getVarID())))
.get();

if(!response.isSuccessful())
Expand Down Expand Up @@ -724,11 +893,11 @@ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
}
}

private static class GetDataCharacteristics extends FederatedUDF {
private static class GetMatrixCharacteristics extends FederatedUDF {

private static final long serialVersionUID = 578461386177730925L;

public GetDataCharacteristics(long varID) {
public GetMatrixCharacteristics(long varID) {
super(new long[] {varID});
}

Expand All @@ -746,6 +915,28 @@ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
}
}

private static class GetFrameCharacteristics extends FederatedUDF {

private static final long serialVersionUID = 578461386177730925L;

public GetFrameCharacteristics(long varID) {
super(new long[] {varID});
}

@Override
public FederatedResponse execute(ExecutionContext ec, Data... data) {
FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease();
int r = fb.getNumRows() != 0 || fb.getNumRows() != -1 ? fb.getNumRows() : 0;
int c = fb.getNumColumns() != 0 || fb.getNumColumns() != -1 ? fb.getNumColumns() : 0;
return new FederatedResponse(ResponseType.SUCCESS, new Object[] {r, c, fb.getSchema()});
}

@Override
public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
return null;
}
}

private static class GetVector extends FederatedUDF {

private static final long serialVersionUID = -1003061862215703768L;
Expand Down Expand Up @@ -779,4 +970,54 @@ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
return null;
}
}

private static class GetFrameVector extends FederatedUDF {

private static final long serialVersionUID = -1003061862215703768L;
private final boolean _marginRow;

public GetFrameVector(long varID, boolean marginRow) {
super(new long[] {varID});
_marginRow = marginRow;
}

@Override
public FederatedResponse execute(ExecutionContext ec, Data... data) {
FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease();

MatrixBlock ret = _marginRow ? new MatrixBlock(fb.getNumRows(), 1, 0.0) : new MatrixBlock(1,fb.getNumColumns(), 0.0);

if(_marginRow) {
for(int i = 0; i < fb.getNumRows(); i++) {
boolean isEmpty = true;

for(int j = 0; j < fb.getNumColumns(); j++) {
ValueType type = fb.getSchema()[j];
isEmpty = isEmpty && (ArrayUtils.contains(new double[]{0.0, Double.NaN}, UtilFunctions.objectToDoubleSafe(type, fb.get(i, j))));

}

if(!isEmpty)
ret.setValue(i, 0, 1.0);
}
} else {
for(int i = 0; i < fb.getNumColumns(); i++) {
int finalI = i;
ValueType type = fb.getSchema()[i];
boolean isEmpty = IntStream.range(0, fb.getNumRows()).mapToObj(j -> fb.get(j, finalI))
.allMatch(e -> ArrayUtils.contains(new double[]{0.0, Double.NaN}, UtilFunctions.objectToDoubleSafe(type, e)));

if(!isEmpty)
ret.setValue(0, i,1.0);
}
}

return new FederatedResponse(ResponseType.SUCCESS, ret);
}

@Override
public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
return null;
}
}
}

0 comments on commit 69d3358

Please sign in to comment.