Skip to content

Commit

Permalink
[SYSTEMDS-3088] No prefetch for List type consumers
Browse files Browse the repository at this point in the history
This patch fixes a bug in prefetch placement and prevent
prefetch if the consumer is of List type. List is not an
operation. A prefetch can wrongly pull a Spark intermediate
if the output goes into a List.
e.g. rightindex -> List

Closes #1840
  • Loading branch information
phaniarnab committed Jun 11, 2023
1 parent 384a707 commit 8ddbf55
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,17 @@ private boolean isPrefetchNeeded(Lop lop) {
&& !(lop instanceof MMTSJ) && !(lop instanceof UAggOuterChain)
&& !(lop instanceof ParameterizedBuiltin) && !(lop instanceof SpoofFused);

// Exclude List consumers. List is just a metadata handle.
boolean anyOutputList = lop.getOutputs().stream()
.anyMatch(out -> out.getDataType() == Types.DataType.LIST);

//FIXME: Rewire _inputParams when needed (e.g. GroupedAggregate)
boolean hasParameterizedOut = lop.getOutputs().stream()
.anyMatch(out -> ((out instanceof ParameterizedBuiltin)
|| (out instanceof GroupedAggregate)
|| (out instanceof GroupedAggregateM)));
//TODO: support non-matrix outputs
return transformOP && !hasParameterizedOut
return transformOP && !hasParameterizedOut && !anyOutputList
&& (lop.isAllOutputsCP() || OperatorOrderingUtils.isCollectForBroadcast(lop))
&& lop.getDataType() == Types.DataType.MATRIX;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class PrefetchRDDTest extends AutomatedTestBase {

protected static final String TEST_DIR = "functions/async/";
protected static final String TEST_NAME = "PrefetchRDD";
protected static final int TEST_VARIANTS = 4;
protected static final int TEST_VARIANTS = 5;
protected static String TEST_CLASS_DIR = TEST_DIR + PrefetchRDDTest.class.getSimpleName() + "/";

@Override
Expand Down Expand Up @@ -73,6 +73,12 @@ public void testAsyncSparkOPs4() {
runTest(TEST_NAME+"4");
}

@Test
public void testAsyncSparkOPs5() {
//List type consumer. No Prefetch.
runTest(TEST_NAME+"5");
}

public void runTest(String testname) {
boolean old_trans_exec_type = OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE;
ExecMode oldPlatform = setExecMode(ExecMode.HYBRID);
Expand Down Expand Up @@ -108,7 +114,10 @@ public void runTest(String testname) {
if (!matchVal)
System.out.println("Value w/o Prefetch "+R+" w/ Prefetch "+R_pf);
//assert Prefetch instructions and number of success.
long expected_numPF = !testname.equalsIgnoreCase(TEST_NAME+"3") ? 1 : 0;
long expected_numPF = 1;
if (testname.equalsIgnoreCase(TEST_NAME+"3")
|| testname.equalsIgnoreCase(TEST_NAME+"5"))
expected_numPF = 0;
//long expected_successPF = !testname.equalsIgnoreCase(TEST_NAME+"3") ? 1 : 0;
long numPF = Statistics.getCPHeavyHitterCount("prefetch");
Assert.assertTrue("Violated Prefetch instruction count: "+numPF, numPF == expected_numPF);
Expand Down
35 changes: 35 additions & 0 deletions src/test/scripts/functions/async/PrefetchRDD5.dml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
X = rand(rows=10000, cols=200, seed=42); #sp_rand
k = 2;
#create empty lists
dataset_X = list(); #empty list
fs = ceil(nrow(X)/k);
off = fs - 1;
#devide X into lists of k matrices
for (i in seq(1, k)) {
#List type consumer. No prefetch after rightindex.
dataset_X = append(dataset_X, X[i*fs-off : min(i*fs, nrow(X)),]);
}
[tmpX, testX] = remove(dataset_X, 1);
R = sum(rbind(testX));
write(R, $1, format="text");

0 comments on commit 8ddbf55

Please sign in to comment.