diff --git a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java index 74c55c5fbd9..607641c7ff1 100644 --- a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java +++ b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java @@ -677,7 +677,7 @@ else if( s instanceof FunctionStatement ) { for(DataIdentifier write : datsUpdated) { if( !c._var.equals( write.getName() ) ) continue; - if( cdt != DataType.MATRIX && cdt != DataType.LIST ) { + if( cdt != DataType.MATRIX && cdt != DataType.FRAME && cdt != DataType.LIST ) { //cannot infer type, need to exit (conservative approach) throw new LanguageException("PARFOR loop dependency analysis: cannot check " + "for dependencies due to unknown datatype of var '"+c._var+"': "+cdt.name()+"."); @@ -716,6 +716,7 @@ else if(runBanerjeeGCDTest( c._dat, dat2 )) { return; } else if( (cdt == DataType.MATRIX && dat2dt == DataType.MATRIX) + || (cdt == DataType.FRAME && dat2dt == DataType.FRAME ) || (cdt == DataType.LIST && dat2dt == DataType.LIST ) ) { boolean invalid = false; diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java index 25d49bb7a04..42ab8bcaf1c 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java @@ -38,6 +38,7 @@ import org.apache.sysds.parser.VariableSet; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.CacheableData; +import org.apache.sysds.runtime.controlprogram.caching.FrameObject; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; @@ -51,6 +52,7 @@ import org.apache.sysds.runtime.controlprogram.parfor.RemoteParForJobReturn; import org.apache.sysds.runtime.controlprogram.parfor.RemoteParForSpark; import org.apache.sysds.runtime.controlprogram.parfor.ResultMerge; +import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeFrameLocalMemory; import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalAutomatic; import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalFile; import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalMemory; @@ -1056,9 +1058,9 @@ private void handleSparkEagerCaching( ExecutionContext ec ) { * @param out output matrix * @param in array of input matrix objects */ - private static void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in, boolean parallel) { + private static void cleanWorkerResultVariables(ExecutionContext ec, CacheableData out, CacheableData[] in, boolean parallel) { //check for empty inputs (no iterations executed) - Stream results = Arrays.stream(in).filter(m -> m!=null && m!=out); + Stream> results = Arrays.stream(in).filter(m -> m!=null && m!=out); //perform cleanup (parallel to mitigate file deletion bottlenecks) (parallel ? results.parallel() : results) .forEach(m -> ec.cleanupCacheableData(m)); @@ -1307,33 +1309,41 @@ private DataPartitioner createDataPartitioner(PartitionFormat dpf, PDataPartitio return dp; } - private ResultMerge createResultMerge( PResultMerge prm, MatrixObject out, MatrixObject[] in, String fname, boolean accum, ExecutionContext ec ) + private ResultMerge createResultMerge( PResultMerge prm, + CacheableData out, CacheableData[] in, String fname, boolean accum, ExecutionContext ec ) { - ResultMerge rm = null; + ResultMerge rm = null; - //create result merge implementation (determine degree of parallelism - //only for spark to avoid unnecessary spark context creation) - switch( prm ) - { - case LOCAL_MEM: - rm = new ResultMergeLocalMemory( out, in, fname, accum ); - break; - case LOCAL_FILE: - rm = new ResultMergeLocalFile( out, in, fname, accum ); - break; - case LOCAL_AUTOMATIC: - rm = new ResultMergeLocalAutomatic( out, in, fname, accum ); - break; - case REMOTE_SPARK: - int numMap = Math.max(_numThreads, - SparkExecutionContext.getDefaultParallelism(true)); - int numRed = numMap; //equal map/reduce - rm = new ResultMergeRemoteSpark( out, in, - fname, accum, ec, numMap, numRed ); - break; - - default: - throw new DMLRuntimeException("Undefined result merge: '" +prm.toString()+"'."); + if( out instanceof FrameObject ) { + rm = new ResultMergeFrameLocalMemory((FrameObject)out, (FrameObject[])in, fname, accum); + } + else if(out instanceof MatrixObject) { + //create result merge implementation (determine degree of parallelism + //only for spark to avoid unnecessary spark context creation) + switch( prm ) + { + case LOCAL_MEM: + rm = new ResultMergeLocalMemory( (MatrixObject)out, (MatrixObject[])in, fname, accum ); + break; + case LOCAL_FILE: + rm = new ResultMergeLocalFile( (MatrixObject)out, (MatrixObject[])in, fname, accum ); + break; + case LOCAL_AUTOMATIC: + rm = new ResultMergeLocalAutomatic( (MatrixObject)out, (MatrixObject[])in, fname, accum ); + break; + case REMOTE_SPARK: + int numMap = Math.max(_numThreads, + SparkExecutionContext.getDefaultParallelism(true)); + int numRed = numMap; //equal map/reduce + rm = new ResultMergeRemoteSpark( (MatrixObject)out, + (MatrixObject[])in, fname, accum, ec, numMap, numRed ); + break; + default: + throw new DMLRuntimeException("Undefined result merge: '" +prm.toString()+"'."); + } + } + else { + throw new DMLRuntimeException("Unsupported result merge data: "+out.getClass().getSimpleName()); } return rm; @@ -1437,14 +1447,15 @@ private void consolidateAndCheckResults(ExecutionContext ec, long expIters, long { Data dat = ec.getVariable(var._name); - if( dat instanceof MatrixObject ) //robustness scalars + if( dat instanceof MatrixObject | dat instanceof FrameObject ) { - MatrixObject out = (MatrixObject) dat; - MatrixObject[] in = Arrays.stream(results).map(vars -> - vars.get(var._name)).toArray(MatrixObject[]::new); + CacheableData out = (CacheableData) dat; + Stream tmp = Arrays.stream(results).map(vars -> vars.get(var._name)); + CacheableData[] in = (dat instanceof MatrixObject) ? + tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new); String fname = constructResultMergeFileName(); - ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec); - MatrixObject outNew = USE_PARALLEL_RESULT_MERGE ? + ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec); + CacheableData outNew = USE_PARALLEL_RESULT_MERGE ? rm.executeParallelMerge(_numThreads) : rm.executeSerialMerge(); @@ -1653,18 +1664,19 @@ public void run() if( var == LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks) break; - MatrixObject out = null; + CacheableData out = null; synchronized( _ec.getVariables() ){ - out = _ec.getMatrixObject(var._name); + out = _ec.getCacheableData(var._name); } - MatrixObject[] in = new MatrixObject[ _refVars.length ]; - for( int i=0; i< _refVars.length; i++ ) - in[i] = (MatrixObject) _refVars[i].get( var._name ); + Stream tmp = Arrays.stream(_refVars).map(vars -> vars.get(var._name)); + CacheableData[] in = (out instanceof MatrixObject) ? + tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new); + String fname = constructResultMergeFileName(); - ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec); - MatrixObject outNew = null; + ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec); + CacheableData outNew = null; if( USE_PARALLEL_RESULT_MERGE ) outNew = rm.executeParallelMerge( _numThreads ); else diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java index 5eae98637d4..448538833fd 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java @@ -41,6 +41,7 @@ import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; import org.apache.sysds.runtime.matrix.data.FrameBlock; import org.apache.sysds.runtime.meta.DataCharacteristics; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.runtime.meta.MetaData; import org.apache.sysds.runtime.meta.MetaDataFormat; import org.apache.sysds.runtime.util.UtilFunctions; @@ -86,6 +87,12 @@ public FrameObject(String fname, MetaData meta, ValueType[] schema) { */ public FrameObject(FrameObject fo) { super(fo); + + MetaDataFormat metaOld = (MetaDataFormat) fo.getMetaData(); + _metaData = new MetaDataFormat( + new MatrixCharacteristics(metaOld.getDataCharacteristics()), + metaOld.getFileFormat()); + _schema = fo._schema.clone(); } @Override diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java index 18b09a1328e..b69ba965147 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMerge.java @@ -21,42 +21,33 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.controlprogram.caching.CacheableData; import org.apache.sysds.runtime.instructions.InstructionUtils; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import java.io.Serializable; -import java.util.List; -/** - * Due to independence of all iterations, any result has the following properties: - * (1) non local var, (2) matrix object, and (3) completely independent. - * These properties allow us to realize result merging in parallel without any synchronization. - * - */ -public abstract class ResultMerge implements Serializable +public abstract class ResultMerge> implements Serializable { //note: this class needs to be serializable to ensure that all attributes of //ResultMergeRemoteSparkWCompare are included in the task closure - private static final long serialVersionUID = 2620430969346516677L; + private static final long serialVersionUID = -6756689640511059030L; protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName()); protected static final String NAME_SUFFIX = "_rm"; protected static final BinaryOperator PLUS = InstructionUtils.parseBinaryOperator("+"); //inputs to result merge - protected MatrixObject _output = null; - protected MatrixObject[] _inputs = null; - protected String _outputFName = null; - protected boolean _isAccum = false; + protected T _output = null; + protected T[] _inputs = null; + protected String _outputFName = null; + protected boolean _isAccum = false; protected ResultMerge( ) { //do nothing } - public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) { + public ResultMerge( T out, T[] in, String outputFilename, boolean accum ) { _output = out; _inputs = in; _outputFName = outputFilename; @@ -70,7 +61,7 @@ public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename, * * @return output (merged) matrix */ - public abstract MatrixObject executeSerialMerge(); + public abstract T executeSerialMerge(); /** * Merge all given input matrices in parallel into the given output matrix. @@ -80,67 +71,6 @@ public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename, * @param par degree of parallelism * @return output (merged) matrix */ - public abstract MatrixObject executeParallelMerge( int par ); - - protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) { - mergeWithoutComp(out, in, appendOnly, false); - } + public abstract T executeParallelMerge(int par); - protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly, boolean par ) { - //pass through to matrix block operations - if( _isAccum ) - out.binaryOperationsInPlace(PLUS, in); - else - out.merge(in, appendOnly, par); - } - - /** - * NOTE: append only not applicable for wiht compare because output must be populated with - * initial state of matrix - with append, this would result in duplicates. - * - * @param out output matrix block - * @param in input matrix block - * @param compare ? - */ - protected void mergeWithComp( MatrixBlock out, MatrixBlock in, DenseBlock compare ) - { - //Notes for result correctness: - // * Always iterate over entire block in order to compare all values - // (using sparse iterator would miss values set to 0) - // * Explicit NaN awareness because for cases were original matrix contains - // NaNs, since NaN != NaN, otherwise we would potentially overwrite results - // * For the case of accumulation, we add out += (new-old) to ensure correct results - // because all inputs have the old values replicated - - if( in.isEmptyBlock(false) ) { - if( _isAccum ) return; //nothing to do - for( int i=0; i in ) { - //sum of nnz of input (worker result) - output var existing nnz - long outNNZ = out.getDataCharacteristics().getNonZeros(); - return outNNZ - in.size() * outNNZ + in.stream() - .mapToLong(m -> m.getDataCharacteristics().getNonZeros()).sum(); - } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeFrameLocalMemory.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeFrameLocalMemory.java new file mode 100644 index 00000000000..cd2d99f1e66 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeFrameLocalMemory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.controlprogram.parfor; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.caching.FrameObject; +import org.apache.sysds.runtime.matrix.data.FrameBlock; +import org.apache.sysds.runtime.util.UtilFunctions; + +public class ResultMergeFrameLocalMemory extends ResultMerge +{ + private static final long serialVersionUID = 549739254879310540L; + + public ResultMergeFrameLocalMemory(FrameObject out, FrameObject[] in, String outputFilename, boolean accum) { + super( out, in, outputFilename, accum ); + } + + @Override + public FrameObject executeSerialMerge() + { + FrameObject foNew = null; //always create new matrix object (required for nested parallelism) + + if( LOG.isTraceEnabled() ) + LOG.trace("ResultMerge (local, in-memory): Execute serial merge for output " + +_output.hashCode()+" (fname="+_output.getFileName()+")"); + + try + { + //get old and new output frame blocks + FrameBlock outFB = _output.acquireRead(); + FrameBlock outFBNew = new FrameBlock(outFB); + + //create compare matrix if required (existing data in result) + FrameBlock compare = outFB; + int rlen = compare.getNumRows(); + int clen = compare.getNumColumns(); + + //serial merge all inputs + boolean flagMerged = false; + for( FrameObject in : _inputs ) + { + //check for empty inputs (no iterations executed) + if( in != null && in != _output ) + { + if( LOG.isTraceEnabled() ) + LOG.trace("ResultMergeFrame (local, in-memory): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")"); + + //read/pin input_i + FrameBlock inMB = in.acquireRead(); + + //core merge + for(int j=0; j implements Serializable +{ + private static final long serialVersionUID = 5319002218804570071L; + + public ResultMergeMatrix() { + super(); + } + + public ResultMergeMatrix(MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum) { + super(out, in, outputFilename, accum); + } + + protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) { + mergeWithoutComp(out, in, appendOnly, false); + } + + protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly, boolean par ) { + //pass through to matrix block operations + if( _isAccum ) + out.binaryOperationsInPlace(PLUS, in); + else + out.merge(in, appendOnly, par); + } + + /** + * NOTE: append only not applicable for wiht compare because output must be populated with + * initial state of matrix - with append, this would result in duplicates. + * + * @param out output matrix block + * @param in input matrix block + * @param compare ? + */ + protected void mergeWithComp( MatrixBlock out, MatrixBlock in, DenseBlock compare ) + { + //Notes for result correctness: + // * Always iterate over entire block in order to compare all values + // (using sparse iterator would miss values set to 0) + // * Explicit NaN awareness because for cases were original matrix contains + // NaNs, since NaN != NaN, otherwise we would potentially overwrite results + // * For the case of accumulation, we add out += (new-old) to ensure correct results + // because all inputs have the old values replicated + + if( in.isEmptyBlock(false) ) { + if( _isAccum ) return; //nothing to do + for( int i=0; i in ) { + //sum of nnz of input (worker result) - output var existing nnz + long outNNZ = out.getDataCharacteristics().getNonZeros(); + return outNNZ - in.size() * outNNZ + in.stream() + .mapToLong(m -> m.getDataCharacteristics().getNonZeros()).sum(); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java index 8a70ecf229b..6f332256232 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java @@ -44,7 +44,7 @@ import java.util.Arrays; -public class ResultMergeRemoteSpark extends ResultMerge +public class ResultMergeRemoteSpark extends ResultMergeMatrix { private static final long serialVersionUID = -6924566953903424820L; diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java index a152c525490..6b8d424b05f 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java @@ -31,7 +31,7 @@ import scala.Tuple2; -public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairFunction,MatrixBlock>>, MatrixIndexes, MatrixBlock> +public class ResultMergeRemoteSparkWCompare extends ResultMergeMatrix implements PairFunction,MatrixBlock>>, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = -5970805069405942836L; diff --git a/src/test/java/org/apache/sysds/test/component/parfor/ParForDependencyAnalysisTest.java b/src/test/java/org/apache/sysds/test/component/parfor/ParForDependencyAnalysisTest.java index 04f575a5890..cf7c71ac662 100644 --- a/src/test/java/org/apache/sysds/test/component/parfor/ParForDependencyAnalysisTest.java +++ b/src/test/java/org/apache/sysds/test/component/parfor/ParForDependencyAnalysisTest.java @@ -66,8 +66,8 @@ * 49a: dep, 49b: dep * * accumulators * 53a: no, 53b dep, 53c dep, 53d dep, 53e dep - * * lists - * 54a: no, 54b: no, 54c: dep, 54d: dep + * * lists/frames + * 54a: no, 54b: no, 54c: dep, 54d: dep, 54e: no-dep, 54f: dep * * negative loop increment * 55a: no, 55b: yes */ @@ -327,6 +327,12 @@ public void setUp() {} @Test public void testDependencyAnalysis54d() { runTest("parfor54d.dml", true); } + @Test + public void testDependencyAnalysis54e() { runTest("parfor54e.dml", false); } + + @Test + public void testDependencyAnalysis54f() { runTest("parfor54f.dml", true); } + @Test public void testDependencyAnalysis55a() { runTest("parfor55a.dml", false); } diff --git a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListResultVarsTest.java b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListFrameResultVarsTest.java similarity index 75% rename from src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListResultVarsTest.java rename to src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListFrameResultVarsTest.java index fc952e18753..a2067815d7a 100644 --- a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListResultVarsTest.java +++ b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForListFrameResultVarsTest.java @@ -25,16 +25,18 @@ import org.apache.sysds.test.AutomatedTestBase; import org.apache.sysds.test.TestConfiguration; -public class ParForListResultVarsTest extends AutomatedTestBase +public class ParForListFrameResultVarsTest extends AutomatedTestBase { private final static String TEST_DIR = "functions/parfor/"; private final static String TEST_NAME1 = "parfor_listResults"; - private final static String TEST_CLASS_DIR = TEST_DIR + ParForListResultVarsTest.class.getSimpleName() + "/"; + private final static String TEST_NAME2 = "parfor_frameResults"; + + private final static String TEST_CLASS_DIR = TEST_DIR + ParForListFrameResultVarsTest.class.getSimpleName() + "/"; @Override public void setUp() { - addTestConfiguration(TEST_NAME1, - new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) ); + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[]{"R"})); + addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[]{"R"})); } @Test @@ -47,11 +49,21 @@ public void testParForListResult1b() { runListResultVarTest(TEST_NAME1, 35, 10); } + @Test + public void testParForFrameResult1a() { + runListResultVarTest(TEST_NAME2, 2, 1); + } + + @Test + public void testParForFrameResult1b() { + runListResultVarTest(TEST_NAME2, 35, 10); + } + private void runListResultVarTest(String testName, int rows, int cols) { loadTestConfiguration(getTestConfiguration(testName)); String HOME = SCRIPT_DIR + TEST_DIR; - fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + fullDMLScriptName = HOME + testName + ".dml"; programArgs = new String[]{"-explain","-args", String.valueOf(rows), String.valueOf(cols), output("R") }; diff --git a/src/test/scripts/component/parfor/parfor54e.dml b/src/test/scripts/component/parfor/parfor54e.dml new file mode 100644 index 00000000000..70837e9f4e1 --- /dev/null +++ b/src/test/scripts/component/parfor/parfor54e.dml @@ -0,0 +1,26 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +A = rbind(as.frame("a"), as.frame("b"), as.frame("c")); +parfor( i in 1:nrow(A) ) + A[i,1] = as.frame(as.scalar(A[i,1])+"-"+i); +print(toString(A)); diff --git a/src/test/scripts/component/parfor/parfor54f.dml b/src/test/scripts/component/parfor/parfor54f.dml new file mode 100644 index 00000000000..23bcf44d482 --- /dev/null +++ b/src/test/scripts/component/parfor/parfor54f.dml @@ -0,0 +1,26 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +A = rbind(as.frame("a"), as.frame("b"), as.frame("c")); +parfor( i in 1:nrow(A) ) + A[i,1] = as.frame(as.scalar(A[1,1])+"-"+i); +print(toString(A)); diff --git a/src/test/scripts/functions/parfor/parfor_frameResults.dml b/src/test/scripts/functions/parfor/parfor_frameResults.dml new file mode 100644 index 00000000000..b1a54becb7f --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_frameResults.dml @@ -0,0 +1,32 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +F = as.frame(matrix(0,7,1)); + +parfor(i in 1:nrow(F)) + F[i,1] = as.frame(rowMeans(as.matrix(F[i]))+i); + +R1 = matrix(0,0,1) +for(i in 1:length(F)) + R1 = rbind(R1, as.matrix(F[i,1])); + +R = as.matrix(sum(R1==seq(1,7))); +write(R, $3);