From 45642b65f3f1620a4e2187af4b2b54e26ce1c42e Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Mon, 21 Jul 2014 18:19:37 -0700 Subject: [PATCH 01/10] WIP --- .../mahout/sparkbindings/blas/package.scala | 19 ++++++++++++++++ .../drm/CheckpointedDrmSpark.scala | 22 ++++++++++++++++--- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index 32d6fb5623..5af0e8be1c 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -18,6 +18,9 @@ package org.apache.mahout.sparkbindings import scala.reflect.ClassTag +import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput} +import org.apache.spark.SparkContext._ /** * This validation contains distributed algorithms that distributed matrix expression optimizer picks @@ -27,4 +30,20 @@ package object blas { implicit def drmRdd2ops[K:ClassTag](rdd:DrmRdd[K]):DrmRddOps[K] = new DrmRddOps[K](rdd) + private[mahout] def fixIntConsistency(op:DrmLike[Int], src:DrmRddInput[Int]):DrmRddInput[Int] = { + + if ( op.isInstanceOf[CheckpointedDrmSpark[Int]]) { + val cp = op.asInstanceOf[CheckpointedDrmSpark[Int]] + if ( cp.intFixRequired) { + + val rdd = src.toDrmRdd() + val sc = rdd.sparkContext + + // TODO TO BE CONTD. + sc.parallelize() + + } else src + } else src + } + } diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 674ff0ac93..51ac277aea 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -43,6 +43,9 @@ class CheckpointedDrmSpark[K: ClassTag]( lazy val nrow = if (_nrow >= 0) _nrow else computeNRow lazy val ncol = if (_ncol >= 0) _ncol else computeNCol + private[mahout] var intFixRequired = false + private[mahout] var intFixExtra: Long = 0L + private var cached: Boolean = false override val context: DistributedContext = rdd.context @@ -151,12 +154,25 @@ class CheckpointedDrmSpark[K: ClassTag]( val intRowIndex = classTag[K] == classTag[Int] - if (intRowIndex) - cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L - else + if (intRowIndex) { + val rdd = cache().rdd + + // I guess it is a suitable place to compute int keys consistency test here because we know + // that nrow can be computed lazily, which always happens when rdd is already available, cached, + // and it's ok to compute small summaries without triggering huge pipelines. Which usually + // happens right after things like drmFromHDFS or drmWrap(). + val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L + val rowCount = rdd.count() + intFixRequired = maxPlus1 != rowCount || + rdd.map(_._1).sum().toLong != ((rowCount -1.0 ) * (rowCount -2.0) /2.0).toLong + intFixExtra = (maxPlus1 - rowCount) max 0L + maxPlus1 + } else cache().rdd.count() } + + protected def computeNCol = cache().rdd.map(_._2.length).fold(-1)(max(_, _)) From 746b3ddc6c0e7e8bb89ce591c32ba1b70ec688e6 Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Tue, 22 Jul 2014 11:25:57 -0700 Subject: [PATCH 02/10] WIP --- .../mahout/math/drm/CheckpointedDrm.scala | 2 +- .../mahout/sparkbindings/blas/package.scala | 40 +++++++++++++++---- .../drm/CheckpointedDrmSpark.scala | 6 +-- 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 0266944a80..28fb7fd786 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -31,6 +31,6 @@ trait CheckpointedDrm[K] extends DrmLike[K] { def writeDRM(path: String) /** If this checkpoint is already declared cached, uncache. */ - def uncache() + def uncache(): this.type } diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index 5af0e8be1c..e16a878d0b 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -18,9 +18,12 @@ package org.apache.mahout.sparkbindings import scala.reflect.ClassTag -import org.apache.mahout.math.drm.DrmLike import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput} import org.apache.spark.SparkContext._ +import org.apache.mahout.math._ +import org.apache.mahout.math.drm._ +import scalabindings._ +import RLikeOps._ /** * This validation contains distributed algorithms that distributed matrix expression optimizer picks @@ -32,18 +35,39 @@ package object blas { private[mahout] def fixIntConsistency(op:DrmLike[Int], src:DrmRddInput[Int]):DrmRddInput[Int] = { - if ( op.isInstanceOf[CheckpointedDrmSpark[Int]]) { + if (op.isInstanceOf[CheckpointedDrmSpark[Int]]) { val cp = op.asInstanceOf[CheckpointedDrmSpark[Int]] - if ( cp.intFixRequired) { + if (cp.intFixRequired) { - val rdd = src.toDrmRdd() - val sc = rdd.sparkContext + val rdd = src.toDrmRdd() + val sc = rdd.sparkContext + val dueRows = safeToNonNegInt(cp.nrow) + val dueCols = cp.ncol + val fixedRdd = sc - // TODO TO BE CONTD. - sc.parallelize() + // Bootstrap full key set + .parallelize(0 until dueRows, numSlices = cp.rdd.partitions.size max 1) + + // Enable PairedFunctions + .map(_ -> Unit) + + // Cogroup with all rows + .cogroup(other = rdd) + + // Filter out out-of-bounds + .filter { case (key, _) => key >= 0 && key < dueRows} + + // Coalesce and output RHS + .map { case (key, (seqUnit, seqVec)) => + val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols)) + key -> ((acc /: seqVec.tail)(_ + _)) + } + + new DrmRddInput[Int](rowWiseSrc = Some(dueCols -> fixedRdd)) } else src - } else src + } else src + } } diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 51ac277aea..137e883ec1 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -72,7 +72,7 @@ class CheckpointedDrmSpark[K: ClassTag]( * if matrix was previously persisted into cache, * delete cached representation */ - def uncache() = { + def uncache(): this.type = { if (cached) { rdd.unpersist(blocking = false) cached = false @@ -80,7 +80,7 @@ class CheckpointedDrmSpark[K: ClassTag]( this } -// def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] = + // def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] = // new CheckpointedDrmSpark[K](rdd.map(t => (t._1, mapfun(t._1, t._2)))) @@ -155,7 +155,7 @@ class CheckpointedDrmSpark[K: ClassTag]( val intRowIndex = classTag[K] == classTag[Int] if (intRowIndex) { - val rdd = cache().rdd + val rdd = cache().rdd.asInstanceOf[DrmRdd[Int]] // I guess it is a suitable place to compute int keys consistency test here because we know // that nrow can be computed lazily, which always happens when rdd is already available, cached, From 1ff376b2ddd1bcbe61f896d14e27d7a413e7313c Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Tue, 22 Jul 2014 13:23:14 -0700 Subject: [PATCH 03/10] Code up for lazy int-keyed missing rows fix --- .../org/apache/mahout/math/drm/DrmLike.scala | 2 ++ .../math/drm/logical/AbstractBinaryOp.scala | 2 ++ .../math/drm/logical/AbstractUnaryOp.scala | 1 + .../mahout/math/drm/logical/OpAewScalar.scala | 3 +++ .../mahout/sparkbindings/SparkEngine.scala | 8 +++---- .../mahout/sparkbindings/blas/AewB.scala | 16 +++++++++++-- .../mahout/sparkbindings/blas/package.scala | 12 +++++----- .../drm/CheckpointedDrmSpark.scala | 24 +++++++++++++++---- .../apache/mahout/sparkbindings/package.scala | 3 ++- 9 files changed, 54 insertions(+), 17 deletions(-) diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala index 995c873ae6..97fe98920e 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala @@ -30,6 +30,8 @@ trait DrmLike[K] { protected[mahout] def partitioningTag: Long + protected[mahout] def canHaveMissingRows: Boolean + /** * Distributed context, can be implicitly converted to operations on [[org.apache.mahout.math.drm. * DistributedEngine]]. diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala index 78635269ca..3b6b8bfe58 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala @@ -42,6 +42,8 @@ abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag] protected[drm] var B: DrmLike[B] lazy val context: DistributedContext = A.context + protected[mahout] def canHaveMissingRows: Boolean = false + // These are explicit evidence export. Sometimes scala falls over to figure that on its own. def classTagA: ClassTag[A] = implicitly[ClassTag[A]] diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala index 92abdb4bb6..a445f212ef 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala @@ -32,5 +32,6 @@ abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag] def classTagK: ClassTag[K] = implicitly[ClassTag[K]] + override protected[mahout] lazy val canHaveMissingRows: Boolean = A.canHaveMissingRows } diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala index 91e0dd48d1..f3e10ba317 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala @@ -29,6 +29,9 @@ case class OpAewScalar[K: ClassTag]( override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag + /** Stuff like `A +1` is always supposed to fix this */ + override protected[mahout] val canHaveMissingRows: Boolean = false + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index b68a98ecf9..c37354f4d0 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -161,10 +161,10 @@ object SparkEngine extends DistributedEngine { { implicit def getWritable(x: Any): Writable = val2keyFunc() - new CheckpointedDrmSpark( - rdd = rdd.map(t => (key2valFunc(t._1), t._2)), - _cacheStorageLevel = StorageLevel.MEMORY_ONLY - )(unwrappedKeyTag.asInstanceOf[ClassTag[Any]]) + + val drmRdd = rdd.map { t => (key2valFunc(t._1), t._2)} + + drmWrap(rdd = drmRdd, cacheHint = CacheHint.MEMORY_ONLY)(unwrappedKeyTag.asInstanceOf[ClassTag[Any]]) } } diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index 384b9865ae..a3980814ad 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -26,6 +26,8 @@ import org.apache.mahout.math.{Matrix, Vector} import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB} import org.apache.log4j.Logger import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc} +import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm} +import org.apache.mahout.math.drm._ /** Elementwise drm-drm operators */ object AewB { @@ -109,11 +111,21 @@ object AewB { case "/:" => ewOps.scalarDiv case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId)) } - val a = srcA.toBlockifiedDrmRdd() - val rdd = a + + // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing + // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. + val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.canHaveMissingRows) { + val fixedRdd = fixIntConsistency(op.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) + drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] + } else { + srcA.toBlockifiedDrmRdd() + } + + val rdd = aBlockRdd .map({ case (keys, block) => keys -> reduceFunc(block, scalar) }) + new DrmRddInput[K](blockifiedSrc = Some(rdd)) } } diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index e16a878d0b..e5a52194b3 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -33,17 +33,19 @@ package object blas { implicit def drmRdd2ops[K:ClassTag](rdd:DrmRdd[K]):DrmRddOps[K] = new DrmRddOps[K](rdd) - private[mahout] def fixIntConsistency(op:DrmLike[Int], src:DrmRddInput[Int]):DrmRddInput[Int] = { + private[mahout] def fixIntConsistency(op:DrmLike[Int], src:DrmRdd[Int]):DrmRdd[Int] = { if (op.isInstanceOf[CheckpointedDrmSpark[Int]]) { val cp = op.asInstanceOf[CheckpointedDrmSpark[Int]] - if (cp.intFixRequired) { + if (cp.canHaveMissingRows) { - val rdd = src.toDrmRdd() + val rdd = src val sc = rdd.sparkContext val dueRows = safeToNonNegInt(cp.nrow) val dueCols = cp.ncol - val fixedRdd = sc + + // Compute the fix. + sc // Bootstrap full key set .parallelize(0 until dueRows, numSlices = cp.rdd.partitions.size max 1) @@ -63,8 +65,6 @@ package object blas { key -> ((acc /: seqVec.tail)(_ + _)) } - new DrmRddInput[Int](rowWiseSrc = Some(dueCols -> fixedRdd)) - } else src } else src diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 137e883ec1..03050bb9cd 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -31,19 +31,35 @@ import org.apache.mahout.math.drm._ import org.apache.mahout.sparkbindings._ import org.apache.spark.SparkContext._ -/** Spark-specific optimizer-checkpointed DRM. */ +/** ==Spark-specific optimizer-checkpointed DRM.== + * + * @param rdd underlying rdd to wrap over. + * @param _nrow number of rows; if unspecified, we will compute with an inexpensive traversal. + * @param _ncol number of columns; if unspecified, we will try to guess with an inexpensive traversal. + * @param _cacheStorageLevel storage level + * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands. + * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing rows + * (will require a lazy fix for some physical operations. + * @param evidence$1 class tag context bound for K. + * @tparam K matrix key type (e.g. the keys of sequence files once persisted) + */ class CheckpointedDrmSpark[K: ClassTag]( val rdd: DrmRdd[K], private var _nrow: Long = -1L, private var _ncol: Int = -1, private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, - override protected[mahout] val partitioningTag: Long = Random.nextLong() + override protected[mahout] val partitioningTag: Long = Random.nextLong(), + private var _canHaveMissingRows: Boolean = false ) extends CheckpointedDrm[K] { lazy val nrow = if (_nrow >= 0) _nrow else computeNRow lazy val ncol = if (_ncol >= 0) _ncol else computeNCol + lazy val canHaveMissingRows: Boolean = { + nrow + _canHaveMissingRows + } - private[mahout] var intFixRequired = false + // private[mahout] var canHaveMissingRows = false private[mahout] var intFixExtra: Long = 0L private var cached: Boolean = false @@ -163,7 +179,7 @@ class CheckpointedDrmSpark[K: ClassTag]( // happens right after things like drmFromHDFS or drmWrap(). val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L val rowCount = rdd.count() - intFixRequired = maxPlus1 != rowCount || + _canHaveMissingRows = maxPlus1 != rowCount || rdd.map(_._1).sum().toLong != ((rowCount -1.0 ) * (rowCount -2.0) /2.0).toLong intFixExtra = (maxPlus1 - rowCount) max 0L maxPlus1 diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 872676641d..68bf906fea 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -24,7 +24,7 @@ import org.apache.mahout.common.IOUtils import org.apache.log4j.Logger import org.apache.mahout.math.drm._ import scala.reflect.ClassTag -import org.apache.mahout.sparkbindings.drm.{SparkBCast, CheckpointedDrmSparkOps, CheckpointedDrmSpark} +import org.apache.mahout.sparkbindings.drm.{DrmRddInput, SparkBCast, CheckpointedDrmSparkOps, CheckpointedDrmSpark} import org.apache.spark.rdd.RDD import org.apache.spark.broadcast.Broadcast import org.apache.mahout.math.{VectorWritable, Vector, MatrixWritable, Matrix} @@ -182,6 +182,7 @@ package object sparkbindings { ncol: Int = -1, cacheHint:CacheHint.CacheHint = CacheHint.NONE ): CheckpointedDrm[K] = + new CheckpointedDrmSpark[K]( rdd = rdd, _nrow = nrow, From c9ac3be81ed464ccc4d440b8187e15efa9a21193 Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Tue, 22 Jul 2014 14:03:25 -0700 Subject: [PATCH 04/10] Tests, passing . --- .../mahout/math/drm/logical/OpAewScalar.scala | 2 +- .../math/drm/RLikeDrmOpsSuiteBase.scala | 9 ++++ .../mahout/sparkbindings/blas/AewB.scala | 4 +- .../mahout/sparkbindings/blas/package.scala | 46 +++++++++---------- .../sparkbindings/drm/DrmLikeSuite.scala | 1 + .../sparkbindings/drm/RLikeDrmOpsSuite.scala | 42 ++++++++++++++++- 6 files changed, 76 insertions(+), 28 deletions(-) diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala index f3e10ba317..3b651f6a06 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala @@ -30,7 +30,7 @@ case class OpAewScalar[K: ClassTag]( override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag /** Stuff like `A +1` is always supposed to fix this */ - override protected[mahout] val canHaveMissingRows: Boolean = false + override protected[mahout] lazy val canHaveMissingRows: Boolean = false /** R-like syntax for number of rows. */ def nrow: Long = A.nrow diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala index 71dc6403c3..50beccf46e 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala @@ -480,4 +480,13 @@ trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite with Matchers { } + test("B = A + 1.0") { + val inCoreA = dense((1, 2), (2, 3), (3, 4)) + val controlB = inCoreA + 1.0 + + val drmB = drmParallelize(m = inCoreA, numPartitions = 2) + 1.0 + + (drmB -: controlB).norm should be < 1e-10 + } + } diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index a3980814ad..db487ee8ee 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -114,8 +114,8 @@ object AewB { // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. - val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.canHaveMissingRows) { - val fixedRdd = fixIntConsistency(op.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) + val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows) { + val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] } else { srcA.toBlockifiedDrmRdd() diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index e5a52194b3..96823ac64b 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -33,39 +33,37 @@ package object blas { implicit def drmRdd2ops[K:ClassTag](rdd:DrmRdd[K]):DrmRddOps[K] = new DrmRddOps[K](rdd) - private[mahout] def fixIntConsistency(op:DrmLike[Int], src:DrmRdd[Int]):DrmRdd[Int] = { + private[mahout] def fixIntConsistency(op: DrmLike[Int], src: DrmRdd[Int]): DrmRdd[Int] = { - if (op.isInstanceOf[CheckpointedDrmSpark[Int]]) { - val cp = op.asInstanceOf[CheckpointedDrmSpark[Int]] - if (cp.canHaveMissingRows) { + if (op.canHaveMissingRows) { - val rdd = src - val sc = rdd.sparkContext - val dueRows = safeToNonNegInt(cp.nrow) - val dueCols = cp.ncol + val rdd = src + val sc = rdd.sparkContext + val dueRows = safeToNonNegInt(op.nrow) + val dueCols = op.ncol - // Compute the fix. - sc + // Compute the fix. + sc - // Bootstrap full key set - .parallelize(0 until dueRows, numSlices = cp.rdd.partitions.size max 1) + // Bootstrap full key set + .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1) - // Enable PairedFunctions - .map(_ -> Unit) + // Enable PairedFunctions + .map(_ -> Unit) - // Cogroup with all rows - .cogroup(other = rdd) + // Cogroup with all rows + .cogroup(other = rdd) - // Filter out out-of-bounds - .filter { case (key, _) => key >= 0 && key < dueRows} + // Filter out out-of-bounds + .filter { case (key, _) => key >= 0 && key < dueRows} - // Coalesce and output RHS - .map { case (key, (seqUnit, seqVec)) => - val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols)) - key -> ((acc /: seqVec.tail)(_ + _)) - } + // Coalesce and output RHS + .map { case (key, (seqUnit, seqVec)) => + val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols)) + val vec = if ( seqVec.size>0) (acc /: seqVec.tail)(_ + _) else acc + key -> vec + } - } else src } else src } diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala index bf635dca36..c47f7f1384 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala @@ -56,4 +56,5 @@ class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuite keys -> block }).norm should be < 1e-4 } + } diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala index b15c72cd2a..f35eb08aef 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala @@ -19,9 +19,49 @@ package org.apache.mahout.sparkbindings.drm import org.scalatest.FunSuite import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ import drm._ import org.apache.mahout.sparkbindings._ +import RLikeDrmOps._ import test.DistributedSparkSuite /** ==R-like DRM DSL operation tests -- Spark== */ -class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase +class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase { + + test("B = A + 1.0 missing rows") { + + val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc + + // Concoct an rdd with missing rows + val aRdd: DrmRdd[Int] = sc.parallelize( + 0 -> dvec(1, 2, 3) :: + 3 -> dvec(3, 4, 5) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val drmA = drmWrap(rdd = aRdd) + + drmA.canHaveMissingRows should equal(true) + + val inCoreA = drmA.collect + + printf("collected A = \n%s\n", inCoreA) + + val controlB = inCoreA + 1.0 + + val drmB = drmA + 1.0 + + printf ("collected B = \n%s\n", drmB.collect) + + (drmB -: controlB).norm should be < 1e-10 + + // Test that unary operators don't obscure the fact that source had missing rows + val drmC = drmA.mapBlock() { case (keys, block) => + keys -> block + } + 1.0 + + (drmC -: controlB).norm should be < 1e-10 + + } + +} From 5d8e1407a7ea2535ae6d00701ed4f60390c1b30e Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Tue, 22 Jul 2014 14:09:11 -0700 Subject: [PATCH 05/10] Orientation changing unary ops cannot produce missing rows --- .../main/scala/org/apache/mahout/math/drm/logical/OpAt.scala | 2 ++ .../main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala index 3239ad215a..4791301d00 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala @@ -30,4 +30,6 @@ case class OpAt( /** R-like syntax for number of columns */ def ncol: Int = safeToNonNegInt(A.nrow) + /** A' after simplifications cannot produce missing rows, ever. */ + override protected[mahout] lazy val canHaveMissingRows: Boolean = false } diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala index c7c604608f..409ec2c668 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala @@ -31,6 +31,6 @@ case class OpAtA[K: ClassTag]( /** R-like syntax for number of columns */ def ncol: Int = A.ncol - /** Non-zero element count */ - def nNonZero: Long = throw new UnsupportedOperationException + override protected[mahout] lazy val canHaveMissingRows: Boolean = false + } From 2063c5ccaf88bf31d7f5c1dd3d1650925c4dfd58 Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Tue, 22 Jul 2014 14:12:56 -0700 Subject: [PATCH 06/10] style --- .../main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala | 2 +- .../scala/org/apache/mahout/sparkbindings/blas/package.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala index 409ec2c668..ad2a5d8356 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala @@ -32,5 +32,5 @@ case class OpAtA[K: ClassTag]( def ncol: Int = A.ncol override protected[mahout] lazy val canHaveMissingRows: Boolean = false - + } diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index 96823ac64b..9a50afaf8c 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -60,7 +60,7 @@ package object blas { // Coalesce and output RHS .map { case (key, (seqUnit, seqVec)) => val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols)) - val vec = if ( seqVec.size>0) (acc /: seqVec.tail)(_ + _) else acc + val vec = if (seqVec.size > 0) (acc /: seqVec.tail)(_ + _) else acc key -> vec } From 57f669a7d50097a5a816ce13ca8230f6c6742b65 Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Tue, 22 Jul 2014 14:57:57 -0700 Subject: [PATCH 07/10] Also fixing `A ew B` with missing rows operands --- .../mahout/sparkbindings/blas/AewB.scala | 17 ++++++++++--- .../apache/mahout/sparkbindings/package.scala | 24 ++++++++++++++++--- .../sparkbindings/drm/RLikeDrmOpsSuite.scala | 23 ++++++++++++++++++ 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index db487ee8ee..40b6dfeefb 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.SparkContext._ import org.apache.mahout.math.scalabindings._ import RLikeOps._ -import org.apache.mahout.math.{Matrix, Vector} +import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector} import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB} import org.apache.log4j.Logger import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc} @@ -54,6 +54,7 @@ object AewB { val ewOps = getEWOps() val opId = op.op + val ncol = op.ncol val reduceFunc = opId match { case "+" => ewOps.plus @@ -85,14 +86,24 @@ object AewB { log.debug("applying elementwise as join") a + // Full outer-join operands row-wise .cogroup(b, numPartitions = a.partitions.size max b.partitions.size) + + // Reduce both sides. In case there are duplicate rows in RHS or LHS, they are summed up + // prior to reduction. .map({ case (key, (vectorSeqA, vectorSeqB)) => - key -> reduceFunc(vectorSeqA.reduce(reduceFunc), vectorSeqB.reduce(reduceFunc)) + val lhsVec: Vector = if (vectorSeqA.isEmpty) new SequentialAccessSparseVector(ncol) + else + (vectorSeqA.head /: vectorSeqA.tail)(_ + _) + val rhsVec: Vector = if (vectorSeqB.isEmpty) new SequentialAccessSparseVector(ncol) + else + (vectorSeqB.head /: vectorSeqB.tail)(_ + _) + key -> reduceFunc(lhsVec, rhsVec) }) } - new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd)) + new DrmRddInput(rowWiseSrc = Some(ncol -> rdd)) } /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */ diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 68bf906fea..6639a348ef 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -176,18 +176,36 @@ package object sparkbindings { private[sparkbindings] implicit def w2v(w:VectorWritable):Vector = w.get() - def drmWrap[K : ClassTag]( + /** + * ==Wrap existing RDD into a matrix== + * + * @param rdd source rdd conforming to [[org.apache.mahout.sparkbindings.DrmRdd]] + * @param nrow optional, number of rows. If not specified, we'll try to figure out on our own. + * @param ncol optional, number of columns. If not specififed, we'll try to figure out on our own. + * @param cacheHint optional, desired cache policy for that rdd. + * @param canHaveMissingRows optional. For int-keyed rows, there might be implied but missing rows. + * If underlying rdd may have that condition, we need to know since some + * operators consider that a deficiency and we'll need to fix it lazily + * before proceeding with such operators. It only meaningful if `nrow` is + * also specified (otherwise, we'll run quick test to figure if rows may + * be missing, at the time we count the rows). + * @tparam K row key type + * @return wrapped DRM + */ + def drmWrap[K: ClassTag]( rdd: DrmRdd[K], nrow: Int = -1, ncol: Int = -1, - cacheHint:CacheHint.CacheHint = CacheHint.NONE + cacheHint: CacheHint.CacheHint = CacheHint.NONE, + canHaveMissingRows: Boolean = false ): CheckpointedDrm[K] = new CheckpointedDrmSpark[K]( rdd = rdd, _nrow = nrow, _ncol = ncol, - _cacheStorageLevel = SparkEngine.cacheHint2Spark(cacheHint) + _cacheStorageLevel = SparkEngine.cacheHint2Spark(cacheHint), + _canHaveMissingRows = canHaveMissingRows ) diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala index f35eb08aef..afcf06b67f 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala @@ -29,6 +29,29 @@ import test.DistributedSparkSuite /** ==R-like DRM DSL operation tests -- Spark== */ class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrmOpsSuiteBase { + test("C = A + B missing rows") { + val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc + + // Concoct an rdd with missing rows + val aRdd: DrmRdd[Int] = sc.parallelize( + 0 -> dvec(1, 2, 3) :: + 3 -> dvec(4, 5, 6) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val bRdd: DrmRdd[Int] = sc.parallelize( + 1 -> dvec(2, 3, 4) :: + 2 -> dvec(3, 4, 5) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val drmA = drmWrap(rdd=aRdd) + val drmB = drmWrap(rdd = bRdd, nrow = 4, canHaveMissingRows = true) + val drmC = drmA + drmB + val controlC = drmA.collect + drmB.collect + + (drmC -: controlC).norm should be < 1e-10 + + } + test("B = A + 1.0 missing rows") { val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc From 005aeccd353faeb59f2ddb2003f393a603978edb Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Tue, 22 Jul 2014 15:33:18 -0700 Subject: [PATCH 08/10] Cbind test --- .../sparkbindings/drm/RLikeDrmOpsSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala index afcf06b67f..a537001090 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala @@ -52,6 +52,31 @@ class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrm } + test("C = cbind(A, B) with missing rows") { + val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc + + // Concoct an rdd with missing rows + val aRdd: DrmRdd[Int] = sc.parallelize( + 0 -> dvec(1, 2, 3) :: + 3 -> dvec(4, 5, 6) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val bRdd: DrmRdd[Int] = sc.parallelize( + 1 -> dvec(2, 3, 4) :: + 2 -> dvec(3, 4, 5) :: Nil + ).map { case (key, vec) => key -> (vec: Vector)} + + val drmA = drmWrap(rdd=aRdd) + val drmB = drmWrap(rdd = bRdd, nrow = 4, canHaveMissingRows = true) + val drmC = drmA.cbind(drmB) + val controlC = new DenseMatrix(safeToNonNegInt(drmA.nrow), drmA.ncol + drmB.ncol) + controlC(::, 0 until drmA.ncol) := drmA + controlC(::, drmA.ncol until drmA.ncol + drmB.ncol) := drmB + + (drmC -: controlC).norm should be < 1e-10 + + } + test("B = A + 1.0 missing rows") { val sc = mahoutCtx.asInstanceOf[SparkDistributedContext].sc From 35c873cceeda87b7b212677bffc0a21877932deb Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Tue, 22 Jul 2014 15:34:00 -0700 Subject: [PATCH 09/10] CBind test tweak --- .../org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala index a537001090..c30570a111 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala @@ -57,7 +57,7 @@ class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrm // Concoct an rdd with missing rows val aRdd: DrmRdd[Int] = sc.parallelize( - 0 -> dvec(1, 2, 3) :: + 1 -> dvec(2, 2, 3) :: 3 -> dvec(4, 5, 6) :: Nil ).map { case (key, vec) => key -> (vec: Vector)} From 9038b2c248ec163e0ff85dd37cb109345b3bbb9d Mon Sep 17 00:00:00 2001 From: Dmitriy Lyubimov Date: Mon, 28 Jul 2014 10:22:30 -0700 Subject: [PATCH 10/10] in-place performance enhancement at the risk of side-effects, for now --- .../scala/org/apache/mahout/sparkbindings/blas/AewB.scala | 4 ++-- .../apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index 40b6dfeefb..3cdb797a7f 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -95,10 +95,10 @@ object AewB { case (key, (vectorSeqA, vectorSeqB)) => val lhsVec: Vector = if (vectorSeqA.isEmpty) new SequentialAccessSparseVector(ncol) else - (vectorSeqA.head /: vectorSeqA.tail)(_ + _) + (vectorSeqA.head /: vectorSeqA.tail)(_ += _) val rhsVec: Vector = if (vectorSeqB.isEmpty) new SequentialAccessSparseVector(ncol) else - (vectorSeqB.head /: vectorSeqB.tail)(_ + _) + (vectorSeqB.head /: vectorSeqB.tail)(_ += _) key -> reduceFunc(lhsVec, rhsVec) }) } diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala index c30570a111..2a4f21352a 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala @@ -69,6 +69,7 @@ class RLikeDrmOpsSuite extends FunSuite with DistributedSparkSuite with RLikeDrm val drmA = drmWrap(rdd=aRdd) val drmB = drmWrap(rdd = bRdd, nrow = 4, canHaveMissingRows = true) val drmC = drmA.cbind(drmB) + val controlC = new DenseMatrix(safeToNonNegInt(drmA.nrow), drmA.ncol + drmB.ncol) controlC(::, 0 until drmA.ncol) := drmA controlC(::, drmA.ncol until drmA.ncol + drmB.ncol) := drmB