Skip to content

Commit

Permalink
Added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed Jun 22, 2015
1 parent bdd58f2 commit f4ba888
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 42 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/pipelines/images/mnist/MnistRandomFFT.scala
Expand Up @@ -10,7 +10,7 @@ import org.apache.commons.math3.random.MersenneTwister
import org.apache.spark.{SparkConf, SparkContext}
import pipelines._
import scopt.OptionParser
import workflow.Gather
import workflow.Pipeline


object MnistRandomFFT extends Serializable with Logging {
Expand All @@ -35,7 +35,7 @@ object MnistRandomFFT extends Serializable with Logging {
.cache())
val labels = ClassLabelIndicatorsFromIntLabels(numClasses).apply(train.labels)

val featurizer = Gather {
val featurizer = Pipeline.gather {
Seq.fill(conf.numFFTs) {
RandomSignNode(mnistImageSize, randomSignSource) andThen PaddedFFT() andThen LinearRectifier(0.0)
}
Expand Down
40 changes: 0 additions & 40 deletions src/main/scala/workflow/Gather.scala

This file was deleted.

15 changes: 15 additions & 0 deletions src/main/scala/workflow/GatherTransformer.scala
@@ -0,0 +1,15 @@
package workflow

import org.apache.spark.rdd.RDD

import scala.reflect.ClassTag

private[workflow] class GatherTransformer[T] extends TransformerNode[Seq[T]] {
def transform(dataDependencies: Seq[_], fitDependencies: Seq[TransformerNode[_]]): Seq[T] = dataDependencies.map(_.asInstanceOf[T])

def transformRDD(dataDependencies: Seq[RDD[_]], fitDependencies: Seq[TransformerNode[_]]): RDD[Seq[T]] = {
dataDependencies.map(_.asInstanceOf[RDD[T]].map(t => Seq(t))).reduceLeft((x, y) => {
x.zip(y).map(z => z._1 ++ z._2)
})
}
}
31 changes: 31 additions & 0 deletions src/main/scala/workflow/Pipeline.scala
Expand Up @@ -3,6 +3,8 @@ package workflow
import org.apache.spark.rdd.RDD
import pipelines.Logging

import scala.reflect.ClassTag

trait Pipeline[A, B] {
private[workflow] val nodes: Seq[Node]
private[workflow] val dataDeps: Seq[Seq[Int]]
Expand Down Expand Up @@ -128,6 +130,35 @@ object Pipeline {
*/
def apply[T](): Pipeline[T, T] = new ConcretePipeline(Seq(), Seq(), Seq(), SOURCE)
private[workflow] def apply[A, B](nodes: Seq[Node], dataDeps: Seq[Seq[Int]], fitDeps: Seq[Seq[Int]], sink: Int): Pipeline[A, B] = new ConcretePipeline(nodes, dataDeps, fitDeps, sink)

/**
* Produces a pipeline that when given an input,
* combines the outputs of all its branches when executed on that input into a single Seq (in order)
* @param branches The pipelines whose outputs should be combined into a Seq
*/
def gather[A, B : ClassTag](branches: Seq[Pipeline[A, B]]): Pipeline[A, Seq[B]] = {
// attach a value per branch to offset all existing node ids by.
val branchesWithNodeOffsets = branches.scanLeft(0)(_ + _.nodes.size).zip(branches)

val newNodes = branches.map(_.nodes).reduceLeft(_ ++ _) :+ new GatherTransformer[B]

val newDataDeps = branchesWithNodeOffsets.map { case (offset, branch) =>
val dataDeps = branch.dataDeps
dataDeps.map(_.map(x => if (x == Pipeline.SOURCE) Pipeline.SOURCE else x + offset))
}.reduceLeft(_ ++ _) :+ branchesWithNodeOffsets.map { case (offset, branch) =>
val sink = branch.sink
if (sink == Pipeline.SOURCE) Pipeline.SOURCE else sink + offset
}

val newFitDeps = branchesWithNodeOffsets.map { case (offset, branch) =>
val fitDeps = branch.fitDeps
fitDeps.map(_.map(x => if (x == Pipeline.SOURCE) Pipeline.SOURCE else x + offset))
}.reduceLeft(_ ++ _) :+ Seq()

val newSink = newNodes.size - 1
Pipeline(newNodes, newDataDeps, newFitDeps, newSink)
}

}

/**
Expand Down
27 changes: 27 additions & 0 deletions src/test/scala/workflow/DelegatingTransformerSuite.scala
@@ -0,0 +1,27 @@
package workflow

import org.apache.spark.SparkContext
import org.scalatest.FunSuite
import pipelines.{Logging, LocalSparkContext}

class DelegatingTransformerSuite extends FunSuite with LocalSparkContext with Logging {
test("single apply") {
val hashTransformer = Transformer[String, Int](_.hashCode)
val delegatingTransformer = new DelegatingTransformer[Int]("label")

val string = "A31DFSsafds*be31"
assert(delegatingTransformer.transform(Seq(string), Seq(hashTransformer)) === string.hashCode)
}

test("rdd apply") {
sc = new SparkContext("local", "test")

val hashTransformer = Transformer[String, Int](_.hashCode)
val delegatingTransformer = new DelegatingTransformer[Int]("label")

val strings = Seq("A31DFSsafds*be31", "lj32fsd", "woadsf8923")
val transformedStrings = delegatingTransformer.transformRDD(Seq(sc.parallelize(strings)), Seq(hashTransformer)).collect()
assert(transformedStrings.toSeq === strings.map(_.hashCode))
}

}
25 changes: 25 additions & 0 deletions src/test/scala/workflow/EstimatorSuite.scala
@@ -0,0 +1,25 @@
package workflow

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
import pipelines.{LocalSparkContext, Logging}

class EstimatorSuite extends FunSuite with LocalSparkContext with Logging {
test("estimator withData") {
sc = new SparkContext("local", "test")

val intEstimator = new Estimator[Int, Int] {
protected def fit(data: RDD[Int]): Transformer[Int, Int] = {
val first = data.first()
Transformer(_ => first)
}
}

val trainData = sc.parallelize(Seq(32, 94, 12))
val testData = sc.parallelize(Seq(42, 58, 61))

val pipeline = intEstimator.withData(trainData)
assert(pipeline.apply(testData).collect().toSeq === Seq(32, 32, 32))
}
}
28 changes: 28 additions & 0 deletions src/test/scala/workflow/LabelEstimatorSuite.scala
@@ -0,0 +1,28 @@
package workflow

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
import pipelines.{LocalSparkContext, Logging}

class LabelEstimatorSuite extends FunSuite with LocalSparkContext with Logging {
test("estimator withData") {
sc = new SparkContext("local", "test")

val intEstimator = new LabelEstimator[Int, Int, String] {
protected def fit(data: RDD[Int], labels: RDD[String]): Transformer[Int, Int] = {
val first = data.first()
val label = labels.first().hashCode
Transformer(_ => first + label)

}
}

val trainData = sc.parallelize(Seq(32, 94, 12))
val trainLabels = sc.parallelize(Seq("sjkfdl", "iw", "432"))
val testData = sc.parallelize(Seq(42, 58, 61))

val pipeline = intEstimator.withData(trainData, trainLabels)
assert(pipeline.apply(testData).collect().toSeq === Seq.fill(3)(32 + "sjkfdl".hashCode))
}
}
101 changes: 101 additions & 0 deletions src/test/scala/workflow/PipelineSuite.scala
@@ -0,0 +1,101 @@
package workflow

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
import pipelines.{LocalSparkContext, Logging}

class PipelineSuite extends FunSuite with LocalSparkContext with Logging {
test("pipeline chaining") {
sc = new SparkContext("local", "test")

val first = Transformer[Int, Int](_ * 2)
val second = Transformer[Int, Int](_ - 3)

val data = sc.parallelize(Seq(32, 94, 12))
val pipeline = first andThen second

val pipelineOut = pipeline(data).collect().toSeq

assert(pipeline(7) === (7 * 2) - 3)
assert(pipelineOut === Seq((32*2) - 3, (94*2) - 3, (12*2) - 3))
}

test("estimator chaining") {
sc = new SparkContext("local", "test")

val doubleTransformer = Transformer[Int, Int](_ * 2)

val intEstimator = new Estimator[Int, Int] {
protected def fit(data: RDD[Int]): Transformer[Int, Int] = {
val first = data.first()
Transformer(x => x + first)
}
}


val data = sc.parallelize(Seq(32, 94, 12))
val pipeline = doubleTransformer andThen (intEstimator, data)

val pipelineOut = pipeline(data).collect().toSeq
val pipelineLastTransformerOut = pipeline.fittedTransformer(data).collect().toSeq

assert(pipelineOut === Seq(32*2 + 32*2, 94*2 + 32*2, 12*2 + 32*2))
assert(pipelineLastTransformerOut === Seq(32 + 32*2, 94 + 32*2, 12 + 32*2))
}

test("label estimator chaining") {
sc = new SparkContext("local", "test")

val doubleTransformer = Transformer[Int, Int](_ * 2)

val intEstimator = new LabelEstimator[Int, Int, String] {
protected def fit(data: RDD[Int], labels: RDD[String]): Transformer[Int, Int] = {
val first = data.first() + labels.first().toInt
Transformer(x => x + first)
}
}


val data = sc.parallelize(Seq(32, 94, 12))
val labels = sc.parallelize(Seq("10", "7", "14"))
val pipeline = doubleTransformer andThen (intEstimator, data, labels)

val pipelineOut = pipeline(data).collect().toSeq
val pipelineLastTransformerOut = pipeline.fittedTransformer(data).collect().toSeq

assert(pipelineOut === Seq(32*2 + 32*2 + 10, 94*2 + 32*2 + 10, 12*2 + 32*2 + 10))
assert(pipelineLastTransformerOut === Seq(32 + 32*2 + 10, 94 + 32*2 + 10, 12 + 32*2 + 10))
}

test("Pipeline gather") {
sc = new SparkContext("local", "test")

val firstPipeline = Transformer[Int, Int](_ * 2) andThen Transformer[Int, Int](_ - 3)

val secondPipeline = Transformer[Int, Int](_ * 2) andThen (new Estimator[Int, Int] {
protected def fit(data: RDD[Int]): Transformer[Int, Int] = {
val first = data.first()
Transformer(x => x + first)
}
}, sc.parallelize(Seq(32, 94, 12)))

val thirdPipeline = Transformer[Int, Int](_ * 4) andThen (new LabelEstimator[Int, Int, String] {
protected def fit(data: RDD[Int], labels: RDD[String]): Transformer[Int, Int] = {
val first = data.first() + labels.first().toInt
Transformer(x => x + first)
}
}, sc.parallelize(Seq(32, 94, 12)), sc.parallelize(Seq("10", "7", "14")))

val pipeline = Pipeline.gather {
firstPipeline :: secondPipeline :: thirdPipeline :: Nil
}

val single = 7
assert(pipeline(single) === Seq(firstPipeline.apply(single), secondPipeline.apply(single), thirdPipeline.apply(single)))

val data = Seq(13, 2, 83)
val correctOut = data.map(x => Seq(firstPipeline.apply(x), secondPipeline.apply(x), thirdPipeline.apply(x)))
assert(pipeline(sc.parallelize(data)).collect().toSeq === correctOut)
}
}

0 comments on commit f4ba888

Please sign in to comment.