Skip to content

Commit

Permalink
fixed test suites after last commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jkbradley committed Feb 5, 2015
1 parent bcb9549 commit fc62406
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object CrossValidatorExample {

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
.select('id, 'text, 'probability, 'prediction)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.ml.classification.{Classifier, ClassifierParams, Classif
import org.apache.spark.ml.param.{Params, IntParam, ParamMap}
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.{SchemaRDD, Row, SQLContext}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}


/**
Expand Down Expand Up @@ -68,13 +68,15 @@ object DeveloperApiExample {

// Make predictions on test data.
val sumPredictions: Double = model.transform(test)
.select('features, 'label, 'prediction)
.select("features", "label", "prediction")
.collect()
.map { case Row(features: Vector, label: Double, prediction: Double) =>
prediction
}.sum
assert(sumPredictions == 0.0,
"MyLogisticRegression predicted something other than 0, even though all weights are 0!")

sc.stop()
}
}

Expand Down Expand Up @@ -113,7 +115,7 @@ private class MyLogisticRegression

// This method is used by fit()
override protected def train(
dataset: SchemaRDD,
dataset: DataFrame,
paramMap: ParamMap): MyLogisticRegressionModel = {
// Extract columns from data using helper method.
val oldDataset = extractLabeledPoints(dataset, paramMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object SimpleParamsExample {
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select('features, 'label, 'myProbability, 'prediction)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println("($features, $label) -> prob=$prob, prediction=$prediction")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object SimpleTextClassificationPipeline {

// Make predictions on test documents.
model.transform(test)
.select('id, 'text, 'probability, 'prediction)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println("($id, $text) --> prob=$prob, prediction=$prediction")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,22 @@ private[ml] object ClassificationModel {
val features2raw: FeaturesType => Vector = model.predictRaw
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT,
tmpData(map(model.featuresCol))).as(map(model.rawPredictionCol)))
col(map(model.featuresCol))).as(map(model.rawPredictionCol)))
numColsOutput += 1
if (map(model.predictionCol) != "") {
val raw2pred: Vector => Double = (rawPred) => {
rawPred.toArray.zipWithIndex.maxBy(_._1)._2
}
tmpData = tmpData.select($"*",
callUDF(raw2pred, DoubleType,
tmpData(map(model.rawPredictionCol))).as(map(model.predictionCol)))
callUDF(raw2pred, col(map(model.rawPredictionCol))).as(map(model.predictionCol)))
numColsOutput += 1
}
} else if (map(model.predictionCol) != "") {
// output prediction
val features2pred: FeaturesType => Double = model.predict
tmpData = tmpData.select($"*",
callUDF(features2pred, DoubleType,
tmpData(map(model.featuresCol))).as(map(model.predictionCol)))
col(map(model.featuresCol))).as(map(model.predictionCol)))
numColsOutput += 1
}
(numColsOutput, tmpData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.spark.ml.classification
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors}
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.storage.StorageLevel


Expand Down Expand Up @@ -120,7 +119,7 @@ class LogisticRegressionModel private[ml] (
if (map(rawPredictionCol) != "") {
val features2raw: Vector => Vector = predictRaw
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT, tmpData(map(featuresCol))).as(map(rawPredictionCol)))
callUDF(features2raw, col(map(featuresCol))).as(map(rawPredictionCol)))
numColsOutput += 1
}
if (map(probabilityCol) != "") {
Expand All @@ -130,11 +129,11 @@ class LogisticRegressionModel private[ml] (
Vectors.dense(1.0 - prob1, prob1)
}
tmpData = tmpData.select($"*",
callUDF(raw2prob, new VectorUDT, tmpData(map(rawPredictionCol))).as(map(probabilityCol)))
callUDF(raw2prob, col(map(rawPredictionCol))).as(map(probabilityCol)))
} else {
val features2prob: Vector => Vector = predictProbabilities
tmpData = tmpData.select($"*",
callUDF(features2prob, new VectorUDT, tmpData(map(featuresCol))).as(map(probabilityCol)))
callUDF(features2prob, col(map(featuresCol))).as(map(probabilityCol)))
}
numColsOutput += 1
}
Expand All @@ -145,18 +144,18 @@ class LogisticRegressionModel private[ml] (
if (probs(1) > t) 1.0 else 0.0
}
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, tmpData(map(probabilityCol))).as(map(predictionCol)))
callUDF(predict, col(map(probabilityCol))).as(map(predictionCol)))
} else if (map(rawPredictionCol) != "") {
val predict: Vector => Double = (rawPreds) => {
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
if (prob1 > t) 1.0 else 0.0
}
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, tmpData(map(rawPredictionCol))).as(map(predictionCol)))
callUDF(predict, col(map(rawPredictionCol))).as(map(predictionCol)))
} else {
val predict: Vector => Double = this.predict
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, tmpData(map(featuresCol))).as(map(predictionCol)))
callUDF(predict, col(map(featuresCol))).as(map(predictionCol)))
}
numColsOutput += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ abstract class ProbabilisticClassificationModel[
tmpModel.predictProbabilities(features)
}
outputData.select($"*",
callUDF(features2probs, new VectorUDT,
outputData(map(featuresCol))).as(map(probabilityCol)))
callUDF(features2probs, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol)))
} else {
if (numColsOutput == 0) {
this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType,
val pred: FeaturesType => Double = (features) => {
tmpModel.predict(features)
}
dataset.select($"*",
callUDF(pred, DoubleType, dataset(map(featuresCol))).as(map(predictionCol)))
dataset.select($"*", callUDF(pred, DoubleType, col(map(featuresCol))).as(map(predictionCol)))
} else {
this.logWarning(s"$uid: Predictor.transform() was called as NOOP" +
" since no output columns were set.")
Expand Down

0 comments on commit fc62406

Please sign in to comment.