Skip to content

Commit

Permalink
Alpine Data Labs
Browse files Browse the repository at this point in the history
  • Loading branch information
DB Tsai committed Jul 28, 2014
1 parent 81fcdd2 commit 8c7cbcc
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.Matchers
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
import org.apache.spark.mllib.util.TestingUtils._

object LogisticRegressionSuite {

Expand Down Expand Up @@ -81,9 +82,8 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Match
val model = lr.run(testRDD)

// Test the weights
val weight0 = model.weights(0)
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
assert(model.weights(0) ~== -1.52 relTol 0.01)
assert(model.intercept ~== 2.00 relTol 0.01)

val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17)
val validationRDD = sc.parallelize(validationData, 2)
Expand Down Expand Up @@ -113,9 +113,9 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Match

val model = lr.run(testRDD, initialWeights)

val weight0 = model.weights(0)
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
// Test the weights
assert(model.weights(0) ~== -1.50 relTol 0.01)
assert(model.intercept ~== 1.97 relTol 0.01)

val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17)
val validationRDD = sc.parallelize(validationData, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
import org.apache.spark.mllib.util.TestingUtils._

class KMeansSuite extends FunSuite with LocalSparkContext {

Expand All @@ -41,26 +42,26 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
// centered at the mean of the points

var model = KMeans.train(data, k = 1, maxIterations = 1)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 2)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(
data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)
}

test("no distinct points") {
Expand Down Expand Up @@ -104,26 +105,26 @@ class KMeansSuite extends FunSuite with LocalSparkContext {

var model = KMeans.train(data, k = 1, maxIterations = 1)
assert(model.clusterCenters.size === 1)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 2)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1,
initializationMode = K_MEANS_PARALLEL)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)
}

test("single cluster with sparse data") {
Expand All @@ -149,31 +150,39 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0)))

var model = KMeans.train(data, k = 1, maxIterations = 1)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 2)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1,
initializationMode = K_MEANS_PARALLEL)
assert(model.clusterCenters.head === center)
assert(model.clusterCenters.head ~== center absTol 1E-5)

data.unpersist()
}

test("k-means|| initialization") {

case class VectorWithCompare(x: Vector) extends Ordered[VectorWithCompare] {
@Override def compare(that: VectorWithCompare): Int = {
if(this.x.toArray.foldLeft[Double](0.0)((acc, x) => acc + x * x) >
that.x.toArray.foldLeft[Double](0.0)((acc, x) => acc + x * x)) -1 else 1
}
}

val points = Seq(
Vectors.dense(1.0, 2.0, 6.0),
Vectors.dense(1.0, 3.0, 0.0),
Expand All @@ -188,15 +197,19 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
// unselected point as long as it hasn't yet selected all of them

var model = KMeans.train(rdd, k = 5, maxIterations = 1)
assert(Set(model.clusterCenters: _*) === Set(points: _*))

assert(model.clusterCenters.sortBy(VectorWithCompare(_))
.zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5))

// Iterations of Lloyd's should not change the answer either
model = KMeans.train(rdd, k = 5, maxIterations = 10)
assert(Set(model.clusterCenters: _*) === Set(points: _*))
assert(model.clusterCenters.sortBy(VectorWithCompare(_))
.zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5))

// Neither should more runs
model = KMeans.train(rdd, k = 5, maxIterations = 10, runs = 5)
assert(Set(model.clusterCenters: _*) === Set(points: _*))
assert(model.clusterCenters.sortBy(VectorWithCompare(_))
.zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5))
}

test("two clusters") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,28 @@ package org.apache.spark.mllib.evaluation
import org.scalatest.FunSuite

import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.util.TestingUtils._

class AreaUnderCurveSuite extends FunSuite with LocalSparkContext {
test("auc computation") {
val curve = Seq((0.0, 0.0), (1.0, 1.0), (2.0, 3.0), (3.0, 0.0))
val auc = 4.0
assert(AreaUnderCurve.of(curve) === auc)
assert(AreaUnderCurve.of(curve) ~== auc absTol 1E-5)
val rddCurve = sc.parallelize(curve, 2)
assert(AreaUnderCurve.of(rddCurve) == auc)
assert(AreaUnderCurve.of(rddCurve) ~== auc absTol 1E-5)
}

test("auc of an empty curve") {
val curve = Seq.empty[(Double, Double)]
assert(AreaUnderCurve.of(curve) === 0.0)
assert(AreaUnderCurve.of(curve) ~== 0.0 absTol 1E-5)
val rddCurve = sc.parallelize(curve, 2)
assert(AreaUnderCurve.of(rddCurve) === 0.0)
assert(AreaUnderCurve.of(rddCurve) ~== 0.0 absTol 1E-5)
}

test("auc of a curve with a single point") {
val curve = Seq((1.0, 1.0))
assert(AreaUnderCurve.of(curve) === 0.0)
assert(AreaUnderCurve.of(curve) ~== 0.0 absTol 1E-5)
val rddCurve = sc.parallelize(curve, 2)
assert(AreaUnderCurve.of(rddCurve) === 0.0)
assert(AreaUnderCurve.of(rddCurve) ~== 0.0 absTol 1E-5)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,14 @@ package org.apache.spark.mllib.evaluation
import org.scalatest.FunSuite

import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.util.TestingUtils.DoubleWithAlmostEquals
import org.apache.spark.mllib.util.TestingUtils._

class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext {

// TODO: move utility functions to TestingUtils.
def cond1(x: (Double, Double)): Boolean = x._1 ~= (x._2) absTol 1E-5

def elementsAlmostEqual(actual: Seq[Double], expected: Seq[Double]): Boolean = {
actual.zip(expected).forall { case (x1, x2) =>
x1.almostEquals(x2)
}
}

def elementsAlmostEqual(
actual: Seq[(Double, Double)],
expected: Seq[(Double, Double)])(implicit dummy: DummyImplicit): Boolean = {
actual.zip(expected).forall { case ((x1, y1), (x2, y2)) =>
x1.almostEquals(x2) && y1.almostEquals(y2)
}
}
def cond2(x: ((Double, Double), (Double, Double))): Boolean =
(x._1._1 ~= x._2._1 absTol 1E-5) && (x._1._2 ~= x._2._2 absTol 1E-5)

test("binary evaluation metrics") {
val scoreAndLabels = sc.parallelize(
Expand All @@ -57,16 +46,17 @@ class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext {
val rocCurve = Seq((0.0, 0.0)) ++ fpr.zip(recall) ++ Seq((1.0, 1.0))
val pr = recall.zip(precision)
val prCurve = Seq((0.0, 1.0)) ++ pr
val f1 = pr.map { case (r, p) => 2.0 * (p * r) / (p + r) }
val f1 = pr.map { case (r, p) => 2.0 * (p * r) / (p + r)}
val f2 = pr.map { case (r, p) => 5.0 * (p * r) / (4.0 * p + r)}
assert(elementsAlmostEqual(metrics.thresholds().collect(), threshold))
assert(elementsAlmostEqual(metrics.roc().collect(), rocCurve))
assert(metrics.areaUnderROC().almostEquals(AreaUnderCurve.of(rocCurve)))
assert(elementsAlmostEqual(metrics.pr().collect(), prCurve))
assert(metrics.areaUnderPR().almostEquals(AreaUnderCurve.of(prCurve)))
assert(elementsAlmostEqual(metrics.fMeasureByThreshold().collect(), threshold.zip(f1)))
assert(elementsAlmostEqual(metrics.fMeasureByThreshold(2.0).collect(), threshold.zip(f2)))
assert(elementsAlmostEqual(metrics.precisionByThreshold().collect(), threshold.zip(precision)))
assert(elementsAlmostEqual(metrics.recallByThreshold().collect(), threshold.zip(recall)))

assert(metrics.thresholds().collect().zip(threshold).forall(cond1))
assert(metrics.roc().collect().zip(rocCurve).forall(cond2))
assert(metrics.areaUnderROC() ~== AreaUnderCurve.of(rocCurve) absTol 1E-5)
assert(metrics.pr().collect().zip(prCurve).forall(cond2))
assert(metrics.areaUnderPR() ~== AreaUnderCurve.of(prCurve) absTol 1E-5)
assert(metrics.fMeasureByThreshold().collect().zip(threshold.zip(f1)).forall(cond2))
assert(metrics.fMeasureByThreshold(2.0).collect().zip(threshold.zip(f2)).forall(cond2))
assert(metrics.precisionByThreshold().collect().zip(threshold.zip(precision)).forall(cond2))
assert(metrics.recallByThreshold().collect().zip(threshold.zip(recall)).forall(cond2))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.{FunSuite, Matchers}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
import org.apache.spark.mllib.util.TestingUtils._

object GradientDescentSuite {

Expand Down Expand Up @@ -126,19 +127,14 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with Matchers
val (newWeights1, loss1) = GradientDescent.runMiniBatchSGD(
dataRDD, gradient, updater, 1, 1, regParam1, 1.0, initialWeightsWithIntercept)

def compareDouble(x: Double, y: Double, tol: Double = 1E-3): Boolean = {
math.abs(x - y) / (math.abs(y) + 1e-15) < tol
}

assert(compareDouble(
loss1(0),
loss0(0) + (math.pow(initialWeightsWithIntercept(0), 2) +
math.pow(initialWeightsWithIntercept(1), 2)) / 2),
assert(
loss1(0) ~= (loss0(0) + (math.pow(initialWeightsWithIntercept(0), 2) +
math.pow(initialWeightsWithIntercept(1), 2)) / 2) absTol 1E-5,
"""For non-zero weights, the regVal should be \frac{1}{2}\sum_i w_i^2.""")

assert(
compareDouble(newWeights1(0) , newWeights0(0) - initialWeightsWithIntercept(0)) &&
compareDouble(newWeights1(1) , newWeights0(1) - initialWeightsWithIntercept(1)),
(newWeights1(0) ~= (newWeights0(0) - initialWeightsWithIntercept(0)) absTol 1E-5) &&
(newWeights1(1) ~= (newWeights0(1) - initialWeightsWithIntercept(1)) absTol 1E-5),
"The different between newWeights with/without regularization " +
"should be initialWeightsWithIntercept.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.{FunSuite, Matchers}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{LocalClusterSparkContext, LocalSparkContext}
import org.apache.spark.mllib.util.TestingUtils._

class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {

Expand All @@ -49,10 +50,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {

lazy val dataRDD = sc.parallelize(data, 2).cache()

def compareDouble(x: Double, y: Double, tol: Double = 1E-3): Boolean = {
math.abs(x - y) / (math.abs(y) + 1e-15) < tol
}

test("LBFGS loss should be decreasing and match the result of Gradient Descent.") {
val regParam = 0

Expand Down Expand Up @@ -126,15 +123,15 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
miniBatchFrac,
initialWeightsWithIntercept)

assert(compareDouble(lossGD(0), lossLBFGS(0)),
assert(lossGD(0) ~= lossLBFGS(0) absTol 1E-5,
"The first losses of LBFGS and GD should be the same.")

// The 2% difference here is based on observation, but is not theoretically guaranteed.
assert(compareDouble(lossGD.last, lossLBFGS.last, 0.02),
assert(lossGD.last ~= lossLBFGS.last relTol 0.02,
"The last losses of LBFGS and GD should be within 2% difference.")

assert(compareDouble(weightLBFGS(0), weightGD(0), 0.02) &&
compareDouble(weightLBFGS(1), weightGD(1), 0.02),
assert(
(weightLBFGS(0) ~= weightGD(0) relTol 0.02) && (weightLBFGS(1) ~= weightGD(1) relTol 0.02),
"The weight differences between LBFGS and GD should be within 2%.")
}

Expand Down Expand Up @@ -226,8 +223,8 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with Matchers {
initialWeightsWithIntercept)

// for class LBFGS and the optimize method, we only look at the weights
assert(compareDouble(weightLBFGS(0), weightGD(0), 0.02) &&
compareDouble(weightLBFGS(1), weightGD(1), 0.02),
assert(
(weightLBFGS(0) ~= weightGD(0) relTol 0.02) && (weightLBFGS(1) ~= weightGD(1) relTol 0.02),
"The weight differences between LBFGS and GD should be within 2%.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import scala.util.Random

import org.scalatest.FunSuite

import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas}
import org.jblas.{DoubleMatrix, SimpleBlas}

import org.apache.spark.mllib.util.TestingUtils._

class NNLSSuite extends FunSuite {
/** Generate an NNLS problem whose optimal solution is the all-ones vector. */
Expand Down Expand Up @@ -73,7 +75,7 @@ class NNLSSuite extends FunSuite {
val ws = NNLS.createWorkspace(n)
val x = NNLS.solve(ata, atb, ws)
for (i <- 0 until n) {
assert(Math.abs(x(i) - goodx(i)) < 1e-3)
assert(x(i) ~== goodx(i) absTol 1E-3)
assert(x(i) >= 0)
}
}
Expand Down
Loading

0 comments on commit 8c7cbcc

Please sign in to comment.