-
Notifications
You must be signed in to change notification settings - Fork 457
/
BinaryMatrixScalarFEDInstruction.java
83 lines (73 loc) · 3.77 KB
/
BinaryMatrixScalarFEDInstruction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.instructions.fed;
import java.util.concurrent.Future;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.BinaryMatrixScalarCPInstruction;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.BinaryMatrixScalarSPInstruction;
import org.apache.sysds.runtime.matrix.operators.Operator;
public class BinaryMatrixScalarFEDInstruction extends BinaryFEDInstruction
{
protected BinaryMatrixScalarFEDInstruction(Operator op,
CPOperand in1, CPOperand in2, CPOperand out, String opcode, String istr, FederatedOutput fedOut) {
super(FEDType.Binary, op, in1, in2, out, opcode, istr, fedOut);
}
public static BinaryMatrixScalarFEDInstruction parseInstruction(BinaryMatrixScalarCPInstruction instr) {
return new BinaryMatrixScalarFEDInstruction(instr.getOperator(), instr.input1, instr.input2, instr.output,
instr.getOpcode(), instr.getInstructionString(), FederatedOutput.NONE);
}
public static BinaryMatrixScalarFEDInstruction parseInstruction(BinaryMatrixScalarSPInstruction instr) {
String instrStr = rewriteSparkInstructionToCP(instr.getInstructionString());
String opcode = InstructionUtils.getInstructionPartsWithValueType(instrStr)[0];
return new BinaryMatrixScalarFEDInstruction(instr.getOperator(), instr.input1, instr.input2, instr.output,
opcode, instrStr, FederatedOutput.NONE);
}
@Override
public void processInstruction(ExecutionContext ec) {
CPOperand matrix = input1.isMatrix() ? input1 : input2;
CPOperand scalar = input2.isScalar() ? input2 : input1;
MatrixObject mo = ec.getMatrixObject(matrix);
//prepare federated request matrix-scalar
FederatedRequest fr1 = !scalar.isLiteral() ?
mo.getFedMapping().broadcast(ec.getScalarInput(scalar)) : null;
FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
new CPOperand[]{matrix, (fr1 != null)?scalar:null},
new long[]{mo.getFedMapping().getID(), (fr1 != null)?fr1.getID():-1}, true);
//execute federated matrix-scalar operation and cleanups
Future<FederatedResponse>[] ffr = null;
if( fr1 != null ) {
FederatedRequest fr3 = mo.getFedMapping().cleanup(getTID(), fr1.getID());
ffr = mo.getFedMapping().execute(getTID(), true, fr1, fr2, fr3);
}
else {
ffr = mo.getFedMapping().execute(getTID(), true, fr2);
}
//derive new fed mapping for output
MatrixObject out = ec.getMatrixObject(output);
out.getDataCharacteristics().set(mo.getDataCharacteristics())
.setNonZeros(FederationUtils.sumNonZeros(ffr));
out.setFedMapping(mo.getFedMapping().copyWithNewID(fr2.getID()));
}
}