From 934f97b7c876fe704dbe8a55d2b8bcff798b6b59 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 30 Dec 2014 14:44:37 -0800 Subject: [PATCH] Fixed bugs from previous commit. --- .../examples/ml/JavaSimpleParamsExample.java | 2 +- .../examples/ml/DeveloperApiExample.scala | 2 - .../examples/ml/SimpleParamsExample.scala | 2 +- .../spark/ml/classification/Classifier.scala | 6 -- .../classification/LogisticRegression.scala | 4 +- .../ProbabilisticClassifier.scala | 10 +-- .../BinaryClassificationEvaluator.scala | 1 + .../spark/ml/impl/estimator/Predictor.scala | 19 ++++- .../org/apache/spark/ml/param/params.scala | 4 +- .../ml/regression/LinearRegression.scala | 4 +- .../spark/ml/regression/Regressor.scala | 2 + .../spark/ml/JavaLabeledPointSuite.java | 78 ------------------- .../apache/spark/ml/LabeledPointSuite.scala | 59 -------------- .../LogisticRegressionSuite.scala | 20 ++--- .../ml/regression/LinearRegressionSuite.scala | 2 +- 15 files changed, 43 insertions(+), 172 deletions(-) delete mode 100644 mllib/src/test/java/org/apache/spark/ml/JavaLabeledPointSuite.java delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/LabeledPointSuite.scala diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java index 3f4f42594059a..98677d0a4a67b 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java @@ -102,7 +102,7 @@ public static void main(String[] args) { // 'probability' column since we renamed the lr.probabilityCol parameter previously. model2.transform(test).registerTempTable("results"); DataFrame results = - jsql.sql("SELECT features, label, probability, prediction FROM results"); + jsql.sql("SELECT features, label, myProbability, prediction FROM results"); for (Row r: results.collect()) { System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2) + ", prediction=" + r.get(3)); diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala index 2f1de5c58ed1e..deffce192b2b4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala @@ -43,8 +43,6 @@ object DeveloperApiExample { import sqlContext._ // Prepare training data. - // We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of Java Beans - // into SchemaRDDs, where it uses the bean metadata to infer the schema. val training = sparkContext.parallelize(Seq( LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala index 325492c5a577e..80d130728c85f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala @@ -93,7 +93,7 @@ object SimpleParamsExample { model2.transform(test) .select('features, 'label, 'myProbability, 'prediction) .collect() - .foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) => + .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 243de234dffdf..34e4b6fe298d1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -17,8 +17,6 @@ package org.apache.spark.ml.classification -import scala.reflect.runtime.universe._ - import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams} import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol} @@ -62,8 +60,6 @@ abstract class Classifier[ extends Predictor[FeaturesType, Learner, M] with ClassifierParams { - setRawPredictionCol("") // Do not output by default - def setRawPredictionCol(value: String): Learner = set(rawPredictionCol, value).asInstanceOf[Learner] @@ -82,8 +78,6 @@ abstract class Classifier[ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[FeaturesType, M]] extends PredictionModel[FeaturesType, M] with ClassifierParams { - setRawPredictionCol("") // Do not output by default - def setRawPredictionCol(value: String): M = set(rawPredictionCol, value).asInstanceOf[M] /** Number of classes (values which the label can take). */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 8f07ff6d58edb..2653af994d7c6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param._ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, BLAS, Vector} +import org.apache.spark.mllib.linalg.{BLAS, Vector, VectorUDT, Vectors} import org.apache.spark.sql._ import org.apache.spark.sql.Dsl._ import org.apache.spark.sql.types.{DoubleType, StructField, StructType} @@ -35,6 +35,7 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas /** * :: AlphaComponent :: + * * Logistic regression. * Currently, this class only supports binary classification. */ @@ -86,6 +87,7 @@ class LogisticRegression /** * :: AlphaComponent :: + * * Model produced by [[LogisticRegression]]. */ @AlphaComponent diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index 41f9b9601a00b..b4da63111f830 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -17,8 +17,6 @@ package org.apache.spark.ml.classification -import scala.reflect.runtime.universe._ - import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params} import org.apache.spark.mllib.linalg.{Vector, VectorUDT} @@ -42,8 +40,10 @@ private[classification] trait ProbabilisticClassifierParams } } + /** * :: AlphaComponent :: + * * Single-label binary or multiclass classifier which can output class conditional probabilities. * * @tparam FeaturesType Type of input features. E.g., [[Vector]] @@ -57,13 +57,13 @@ abstract class ProbabilisticClassifier[ M <: ProbabilisticClassificationModel[FeaturesType, M]] extends Classifier[FeaturesType, Learner, M] with ProbabilisticClassifierParams { - setProbabilityCol("") // Do not output by default - def setProbabilityCol(value: String): Learner = set(probabilityCol, value).asInstanceOf[Learner] } + /** * :: AlphaComponent :: + * * Model produced by a [[ProbabilisticClassifier]]. * Classes are indexed {0, 1, ..., numClasses - 1}. * @@ -76,8 +76,6 @@ abstract class ProbabilisticClassificationModel[ M <: ProbabilisticClassificationModel[FeaturesType, M]] extends ClassificationModel[FeaturesType, M] with ProbabilisticClassifierParams { - setProbabilityCol("") // Do not output by default - def setProbabilityCol(value: String): M = set(probabilityCol, value).asInstanceOf[M] /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index 9dcd6c0682256..af141e95ee9e0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.DoubleType /** * :: AlphaComponent :: + * * Evaluator for binary classification, which expects two input columns: score and label. */ @AlphaComponent diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala index 35ca5a0bcbe00..b39875197f38d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.impl.estimator -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param._ import org.apache.spark.mllib.linalg.Vector @@ -62,6 +62,8 @@ trait PredictorParams extends Params } /** + * :: AlphaComponent :: + * * Abstraction for prediction problems (regression and classification). * * @tparam FeaturesType Type of features. @@ -71,7 +73,7 @@ trait PredictorParams extends Params * @tparam M Specialization of [[PredictionModel]]. If you subclass this type, use this type * parameter to specify the concrete type for the corresponding model. */ -@DeveloperApi +@AlphaComponent abstract class Predictor[ FeaturesType, Learner <: Predictor[FeaturesType, Learner, M], @@ -124,7 +126,18 @@ abstract class Predictor[ } } -private[ml] abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, M]] +/** + * :: AlphaComponent :: + * + * Abstraction for a model for prediction tasks (regression and classification). + * + * @tparam FeaturesType Type of features. + * E.g., [[org.apache.spark.mllib.linalg.VectorUDT]] for vector features. + * @tparam M Specialization of [[PredictionModel]]. If you subclass this type, use this type + * parameter to specify the concrete type for the corresponding model. + */ +@AlphaComponent +abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, M]] extends Model[M] with PredictorParams { def setFeaturesCol(value: String): M = set(featuresCol, value).asInstanceOf[M] diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index b3e3e58665fef..0358309991655 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -19,14 +19,12 @@ package org.apache.spark.ml.param import scala.annotation.varargs import scala.collection.mutable -import scala.reflect.runtime.universe._ import java.lang.reflect.Modifier -import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} +import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} import org.apache.spark.ml.Identifiable import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.ScalaReflection /** * :: AlphaComponent :: diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 3ff7107221763..8ac2738bfe5b5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -32,7 +32,8 @@ private[regression] trait LinearRegressionParams extends RegressorParams /** * :: AlphaComponent :: - * Logistic regression. + * + * Linear regression. */ @AlphaComponent class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegressionModel] @@ -78,6 +79,7 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress /** * :: AlphaComponent :: + * * Model produced by [[LinearRegression]]. */ @AlphaComponent diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala index 5f10344456a10..dca849f44270f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala @@ -30,6 +30,7 @@ trait RegressorParams extends PredictorParams /** * :: AlphaComponent :: + * * Single-label regression * * @tparam FeaturesType Type of input features. E.g., [[org.apache.spark.mllib.linalg.Vector]] @@ -49,6 +50,7 @@ abstract class Regressor[ /** * :: AlphaComponent :: + * * Model produced by a [[Regressor]]. * * @tparam FeaturesType Type of input features. E.g., [[org.apache.spark.mllib.linalg.Vector]] diff --git a/mllib/src/test/java/org/apache/spark/ml/JavaLabeledPointSuite.java b/mllib/src/test/java/org/apache/spark/ml/JavaLabeledPointSuite.java deleted file mode 100644 index 878ad5a7baed5..0000000000000 --- a/mllib/src/test/java/org/apache/spark/ml/JavaLabeledPointSuite.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml; - -import java.util.List; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.sql.api.java.JavaSchemaRDD; -import org.apache.spark.sql.api.java.Row; - -/** - * Test {@link LabeledPoint} in Java - */ -public class JavaLabeledPointSuite { - - private transient JavaSparkContext jsc; - private transient JavaSQLContext jsql; - - @Before - public void setUp() { - jsc = new JavaSparkContext("local", "JavaLabeledPointSuite"); - jsql = new JavaSQLContext(jsc); - } - - @After - public void tearDown() { - jsc.stop(); - jsc = null; - } - - @Test - public void labeledPointDefaultWeight() { - double label = 1.0; - Vector features = Vectors.dense(1.0, 2.0, 3.0); - LabeledPoint lp1 = new LabeledPoint(label, features); - LabeledPoint lp2 = new LabeledPoint(label, features, 1.0); - assert(lp1.equals(lp2)); - } - - @Test - public void labeledPointSchemaRDD() { - List arr = Lists.newArrayList( - new LabeledPoint(0.0, Vectors.dense(1.0, 2.0, 3.0)), - new LabeledPoint(1.0, Vectors.dense(1.1, 2.1, 3.1)), - new LabeledPoint(0.0, Vectors.dense(1.2, 2.2, 3.2)), - new LabeledPoint(1.0, Vectors.dense(1.3, 2.3, 3.3))); - JavaRDD rdd = jsc.parallelize(arr); - JavaSchemaRDD schemaRDD = jsql.applySchema(rdd, LabeledPoint.class); - schemaRDD.registerTempTable("points"); - List points = jsql.sql("SELECT label, features FROM points").collect(); - assert (points.size() == arr.size()); - } -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/LabeledPointSuite.scala deleted file mode 100644 index 94659ba95b1be..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/ml/LabeledPointSuite.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml - -import org.scalatest.FunSuite - -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.SQLContext - -/** - * Test [[LabeledPoint]] - */ -class LabeledPointSuite extends FunSuite with MLlibTestSparkContext { - - @transient var sqlContext: SQLContext = _ - - override def beforeAll(): Unit = { - super.beforeAll() - sqlContext = new SQLContext(sc) - } - - test("LabeledPoint default weight 1.0") { - val label = 1.0 - val features = Vectors.dense(1.0, 2.0, 3.0) - val lp1 = LabeledPoint(label, features) - val lp2 = LabeledPoint(label, features, weight = 1.0) - assert(lp1 === lp2) - } - - test("Create SchemaRDD from RDD[LabeledPoint]") { - val sqlContext = this.sqlContext - import sqlContext._ - val arr = Seq( - LabeledPoint(0.0, Vectors.dense(1.0, 2.0, 3.0)), - LabeledPoint(1.0, Vectors.dense(1.1, 2.1, 3.1)), - LabeledPoint(0.0, Vectors.dense(1.2, 2.2, 3.2)), - LabeledPoint(1.0, Vectors.dense(1.3, 2.3, 3.3))) - val rdd = sc.parallelize(arr) - val schemaRDD = rdd.select('label, 'features) - val points = schemaRDD.collect() - assert(points.size === arr.size) - } -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index df2374bf655e9..e7ee1a01e3796 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.ml.classification import org.scalatest.FunSuite -import org.apache.spark.ml.LabeledPoint import org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} @@ -45,13 +45,13 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { assert(lr.getLabelCol == "label") val model = lr.fit(dataset) model.transform(dataset) - .select('label, 'score, 'prediction) + .select('label, 'probability, 'prediction) .collect() // Check defaults assert(model.getThreshold === 0.5) assert(model.getFeaturesCol == "features") assert(model.getPredictionCol == "prediction") - assert(model.getScoreCol == "score") + assert(model.getProbabilityCol == "probability") } test("logistic regression with setters") { @@ -60,7 +60,7 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { .setMaxIter(10) .setRegParam(1.0) .setThreshold(0.6) - .setScoreCol("probability") + .setProbabilityCol("myProbability") val model = lr.fit(dataset) model.transform(dataset, model.threshold -> 0.8) // overwrite threshold .select("label", "score", "prediction") @@ -80,26 +80,26 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { // Modify model params, and check that the params worked. model.setThreshold(1.0) val predAllZero = model.transform(dataset) - .select('prediction, 'probability) + .select('prediction, 'myProbability) .collect() - .map { case Row(pred: Double, prob: Double) => pred } + .map { case Row(pred: Double, prob: Vector) => pred } assert(predAllZero.forall(_ === 0.0)) // Call transform with params, and check that the params worked. val predNotAllZero = - model.transform(dataset, model.threshold -> 0.0, model.scoreCol -> "myProb") + model.transform(dataset, model.threshold -> 0.0, model.probabilityCol -> "myProb") .select('prediction, 'myProb) .collect() - .map { case Row(pred: Double, prob: Double) => pred } + .map { case Row(pred: Double, prob: Vector) => pred } assert(predNotAllZero.exists(_ !== 0.0)) // Call fit() with new params, and check as many params as we can. val model2 = lr.fit(dataset, lr.maxIter -> 5, lr.regParam -> 0.1, lr.threshold -> 0.4, - lr.scoreCol -> "theProb") + lr.probabilityCol -> "theProb") assert(model2.fittingParamMap.get(lr.maxIter).get === 5) assert(model2.fittingParamMap.get(lr.regParam).get === 0.1) assert(model2.fittingParamMap.get(lr.threshold).get === 0.4) assert(model2.getThreshold === 0.4) - assert(model2.getScoreCol == "theProb") + assert(model2.getProbabilityCol == "theProb") } test("logistic regression: Predictor, Classifier methods") { diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index b126ccabfb37e..bfeae23268875 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.ml.regression import org.scalatest.FunSuite -import org.apache.spark.ml.LabeledPoint import org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{Row, SQLContext, SchemaRDD}