From 477709f5ae1a8cd84d3f6215f133854706c50d4f Mon Sep 17 00:00:00 2001 From: BenFradet Date: Tue, 23 Jun 2015 23:05:03 +0200 Subject: [PATCH 01/15] removal of the unit test for the now deprecated callUdf --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 47443a917b765..654a345308e3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -301,15 +301,6 @@ class DataFrameSuite extends QueryTest { ) } - test("deprecated callUdf in SQLContext") { - val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") - val sqlctx = df.sqlContext - sqlctx.udf.register("simpleUdf", (v: Int) => v * v) - checkAnswer( - df.select($"id", callUdf("simpleUdf", $"value")), - Row("id1", 1) :: Row("id2", 16) :: Row("id3", 25) :: Nil) - } - test("callUDF in SQLContext") { val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") val sqlctx = df.sqlContext From 84d6780775fcd7b79ccde21f8da4cc7badd0f3b5 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Tue, 23 Jun 2015 23:22:57 +0200 Subject: [PATCH 02/15] modified unit test in SQLQuerySuite to use udf instead of callUDF --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 73bc6c999164e..22c54e43c1d16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -137,13 +137,12 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { test("SPARK-7158 collect and take return different results") { import java.util.UUID - import org.apache.spark.sql.types._ val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index") // we except the id is materialized once - def id: () => String = () => { UUID.randomUUID().toString() } + val idUdf = udf(() => UUID.randomUUID().toString) - val dfWithId = df.withColumn("id", callUDF(id, StringType)) + val dfWithId = df.withColumn("id", idUdf()) // Make a new DataFrame (actually the same reference to the old one) val cached = dfWithId.cache() // Trigger the cache From 197ec827e34c20bdc87101ebe47c7d9de4821d7f Mon Sep 17 00:00:00 2001 From: BenFradet Date: Wed, 24 Jun 2015 19:34:04 +0200 Subject: [PATCH 03/15] callUDF => udf in OneVsRest --- .../spark/ml/classification/OneVsRest.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index b657882f8ad3f..afaa448a9218d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -90,7 +90,7 @@ final class OneVsRestModel private[ml] ( val accColName = "mbc$acc" + UUID.randomUUID().toString val init: () => Map[Int, Double] = () => {Map()} val mapType = MapType(IntegerType, DoubleType, valueContainsNull = false) - val newDataset = dataset.withColumn(accColName, callUDF(init, mapType)) + val newDataset = dataset.withColumn(accColName, udf(init).apply()) // persist if underlying dataset is not persistent. val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE @@ -110,9 +110,9 @@ final class OneVsRestModel private[ml] ( (predictions: Map[Int, Double], prediction: Vector) => { predictions + ((index, prediction(1))) } - val updateUdf = callUDF(update, mapType, col(accColName), col(rawPredictionCol)) + val updateUDF = callUDF(update, mapType, col(accColName), col(rawPredictionCol)) val transformedDataset = model.transform(df).select(columns : _*) - val updatedDataset = transformedDataset.withColumn(tmpColName, updateUdf) + val updatedDataset = transformedDataset.withColumn(tmpColName, updateUDF) val newColumns = origCols ++ List(col(tmpColName)) // switch out the intermediate column with the accumulator column @@ -129,8 +129,8 @@ final class OneVsRestModel private[ml] ( } // output label and label metadata as prediction - val labelUdf = callUDF(label, DoubleType, col(accColName)) - aggregatedDataset.withColumn($(predictionCol), labelUdf.as($(predictionCol), labelMetadata)) + val labelUDF = udf(label).apply(col(accColName)) + aggregatedDataset.withColumn($(predictionCol), labelUDF.as($(predictionCol), labelMetadata)) .drop(accColName) } @@ -175,12 +175,12 @@ final class OneVsRest(override val uid: String) } val numClasses = MetadataUtils.getNumClasses(labelSchema).fold(computeNumClasses())(identity) - val multiclassLabeled = dataset.select($(labelCol), $(featuresCol)) + val multiClassLabeled = dataset.select($(labelCol), $(featuresCol)) // persist if underlying dataset is not persistent. val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) { - multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) + multiClassLabeled.persist(StorageLevel.MEMORY_AND_DISK) } // create k columns, one for each binary classifier. @@ -192,17 +192,17 @@ final class OneVsRest(override val uid: String) // generate new label metadata for the binary problem. // TODO: use when ... otherwise after SPARK-7321 is merged - val labelUDF = callUDF(label, DoubleType, col($(labelCol))) + val labelUDF = udf(label).apply(col($(labelCol))) val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() val labelColName = "mc2b$" + index val labelUDFWithNewMeta = labelUDF.as(labelColName, newLabelMeta) - val trainingDataset = multiclassLabeled.withColumn(labelColName, labelUDFWithNewMeta) + val trainingDataset = multiClassLabeled.withColumn(labelColName, labelUDFWithNewMeta) val classifier = getClassifier classifier.fit(trainingDataset, classifier.labelCol -> labelColName) }.toArray[ClassificationModel[_, _]] if (handlePersistence) { - multiclassLabeled.unpersist() + multiClassLabeled.unpersist() } // extract label metadata from label column if present, or create a nominal attribute From 0ea30b36702d48a3f2027d6b3e97455d1095f3aa Mon Sep 17 00:00:00 2001 From: BenFradet Date: Wed, 24 Jun 2015 20:07:13 +0200 Subject: [PATCH 04/15] callUDF => udf in Classifier where possible --- .../scala/org/apache/spark/ml/classification/Classifier.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 14c285dbfc54a..70c7052296be9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -108,7 +108,7 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur } if (getPredictionCol != "") { val predUDF = if (getRawPredictionCol != "") { - callUDF(raw2prediction _, DoubleType, col(getRawPredictionCol)) + udf[Double, Vector](raw2prediction).apply(col(getRawPredictionCol)) } else { callUDF(predict _, DoubleType, col(getFeaturesCol)) } From fe2a10be75196559595ef0755e0cb07e8a635b1c Mon Sep 17 00:00:00 2001 From: BenFradet Date: Wed, 24 Jun 2015 20:31:49 +0200 Subject: [PATCH 05/15] callUDF => udf in ProbabilisticClassifier --- .../spark/ml/classification/ProbabilisticClassifier.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index 330ae2938f4e0..15e23466250b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -113,9 +113,9 @@ private[spark] abstract class ProbabilisticClassificationModel[ } if ($(predictionCol).nonEmpty) { val predUDF = if ($(rawPredictionCol).nonEmpty) { - callUDF(raw2prediction _, DoubleType, col($(rawPredictionCol))) + udf[Double, Vector](raw2prediction).apply(col($(rawPredictionCol))) } else if ($(probabilityCol).nonEmpty) { - callUDF(probability2prediction _, DoubleType, col($(probabilityCol))) + udf[Double, Vector](probability2prediction).apply(col($(probabilityCol))) } else { callUDF(predict _, DoubleType, col($(featuresCol))) } From bbdeaf33e2c9bf95482fc77902ea9665f08a1131 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Thu, 25 Jun 2015 20:18:42 +0200 Subject: [PATCH 06/15] fixed syntax for init udf in OneVsRest --- .../scala/org/apache/spark/ml/classification/OneVsRest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index afaa448a9218d..30e6963839579 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -88,9 +88,9 @@ final class OneVsRestModel private[ml] ( // add an accumulator column to store predictions of all the models val accColName = "mbc$acc" + UUID.randomUUID().toString - val init: () => Map[Int, Double] = () => {Map()} + val init = udf { () => Map[Int, Double]() } val mapType = MapType(IntegerType, DoubleType, valueContainsNull = false) - val newDataset = dataset.withColumn(accColName, udf(init).apply()) + val newDataset = dataset.withColumn(accColName, init()) // persist if underlying dataset is not persistent. val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE From 49e4904a3900dd82356bf303102b73efbb6f23c8 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Thu, 25 Jun 2015 20:42:25 +0200 Subject: [PATCH 07/15] Revert "removal of the unit test for the now deprecated callUdf" --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 654a345308e3c..47443a917b765 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -301,6 +301,15 @@ class DataFrameSuite extends QueryTest { ) } + test("deprecated callUdf in SQLContext") { + val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") + val sqlctx = df.sqlContext + sqlctx.udf.register("simpleUdf", (v: Int) => v * v) + checkAnswer( + df.select($"id", callUdf("simpleUdf", $"value")), + Row("id1", 1) :: Row("id2", 16) :: Row("id3", 25) :: Nil) + } + test("callUDF in SQLContext") { val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") val sqlctx = df.sqlContext From a6722283cc10b6355567e6165ba739779f9479d0 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Thu, 25 Jun 2015 20:43:48 +0200 Subject: [PATCH 08/15] uniformized udf calls in OneVsRest --- .../spark/ml/classification/OneVsRest.scala | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 30e6963839579..09e2330627d24 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -106,13 +106,12 @@ final class OneVsRestModel private[ml] ( // add temporary column to store intermediate scores and update val tmpColName = "mbc$tmp" + UUID.randomUUID().toString - val update: (Map[Int, Double], Vector) => Map[Int, Double] = - (predictions: Map[Int, Double], prediction: Vector) => { - predictions + ((index, prediction(1))) - } - val updateUDF = callUDF(update, mapType, col(accColName), col(rawPredictionCol)) + val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) => + predictions + ((index, prediction(1))) + } val transformedDataset = model.transform(df).select(columns : _*) - val updatedDataset = transformedDataset.withColumn(tmpColName, updateUDF) + val updatedDataset = transformedDataset + .withColumn(tmpColName, updateUDF(col(accColName), col(rawPredictionCol))) val newColumns = origCols ++ List(col(tmpColName)) // switch out the intermediate column with the accumulator column @@ -124,13 +123,13 @@ final class OneVsRestModel private[ml] ( } // output the index of the classifier with highest confidence as prediction - val label: Map[Int, Double] => Double = (predictions: Map[Int, Double]) => { + val labelUDF = udf { (predictions: Map[Int, Double]) => predictions.maxBy(_._2)._1.toDouble } // output label and label metadata as prediction - val labelUDF = udf(label).apply(col(accColName)) - aggregatedDataset.withColumn($(predictionCol), labelUDF.as($(predictionCol), labelMetadata)) + aggregatedDataset + .withColumn($(predictionCol), labelUDF(col(accColName)).as($(predictionCol), labelMetadata)) .drop(accColName) } @@ -175,34 +174,32 @@ final class OneVsRest(override val uid: String) } val numClasses = MetadataUtils.getNumClasses(labelSchema).fold(computeNumClasses())(identity) - val multiClassLabeled = dataset.select($(labelCol), $(featuresCol)) + val multiclassLabeled = dataset.select($(labelCol), $(featuresCol)) // persist if underlying dataset is not persistent. val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) { - multiClassLabeled.persist(StorageLevel.MEMORY_AND_DISK) + multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) } // create k columns, one for each binary classifier. val models = Range(0, numClasses).par.map { index => - - val label: Double => Double = (label: Double) => { + val labelUDF = udf { (label: Double) => if (label.toInt == index) 1.0 else 0.0 } // generate new label metadata for the binary problem. // TODO: use when ... otherwise after SPARK-7321 is merged - val labelUDF = udf(label).apply(col($(labelCol))) val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() val labelColName = "mc2b$" + index - val labelUDFWithNewMeta = labelUDF.as(labelColName, newLabelMeta) - val trainingDataset = multiClassLabeled.withColumn(labelColName, labelUDFWithNewMeta) + val labelUDFWithNewMeta = labelUDF(col($(labelCol))).as(labelColName, newLabelMeta) + val trainingDataset = multiclassLabeled.withColumn(labelColName, labelUDFWithNewMeta) val classifier = getClassifier classifier.fit(trainingDataset, classifier.labelCol -> labelColName) }.toArray[ClassificationModel[_, _]] if (handlePersistence) { - multiClassLabeled.unpersist() + multiclassLabeled.unpersist() } // extract label metadata from label column if present, or create a nominal attribute From 13054922a361ba3875fe09865157b613566ed1ab Mon Sep 17 00:00:00 2001 From: BenFradet Date: Thu, 25 Jun 2015 21:01:05 +0200 Subject: [PATCH 09/15] uniformized udf calls in Classifier --- .../apache/spark/ml/classification/Classifier.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 70c7052296be9..85c097bc64a4f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -102,15 +102,20 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur var outputData = dataset var numColsOutput = 0 if (getRawPredictionCol != "") { - outputData = outputData.withColumn(getRawPredictionCol, - callUDF(predictRaw _, new VectorUDT, col(getFeaturesCol))) + val predictRawUDF = udf { (features: Any) => + predictRaw(features.asInstanceOf[FeaturesType]) + } + outputData = outputData.withColumn(getRawPredictionCol, predictRawUDF(col(getFeaturesCol))) numColsOutput += 1 } if (getPredictionCol != "") { val predUDF = if (getRawPredictionCol != "") { - udf[Double, Vector](raw2prediction).apply(col(getRawPredictionCol)) + udf(raw2prediction _).apply(col(getRawPredictionCol)) } else { - callUDF(predict _, DoubleType, col(getFeaturesCol)) + val predictUDF = udf { (features: Any) => + predict(features.asInstanceOf[FeaturesType]) + } + predictUDF(col(getFeaturesCol)) } outputData = outputData.withColumn(getPredictionCol, predUDF) numColsOutput += 1 From 94345b57577e9cd68adc7f0d0a62381815a50806 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Thu, 25 Jun 2015 21:18:20 +0200 Subject: [PATCH 10/15] unifomized udf calls in ProbabilisticClassifier --- .../ProbabilisticClassifier.scala | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index 15e23466250b3..38e832372698c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -98,26 +98,34 @@ private[spark] abstract class ProbabilisticClassificationModel[ var outputData = dataset var numColsOutput = 0 if ($(rawPredictionCol).nonEmpty) { - outputData = outputData.withColumn(getRawPredictionCol, - callUDF(predictRaw _, new VectorUDT, col(getFeaturesCol))) + val predictRawUDF = udf { (features: Any) => + predictRaw(features.asInstanceOf[FeaturesType]) + } + outputData = outputData.withColumn(getRawPredictionCol, predictRawUDF(col(getFeaturesCol))) numColsOutput += 1 } if ($(probabilityCol).nonEmpty) { val probUDF = if ($(rawPredictionCol).nonEmpty) { - callUDF(raw2probability _, new VectorUDT, col($(rawPredictionCol))) + udf(raw2probability _).apply(col($(rawPredictionCol))) } else { - callUDF(predictProbability _, new VectorUDT, col($(featuresCol))) + val probabilityUDF = udf { (features: Any) => + predictProbability(features.asInstanceOf[FeaturesType]) + } + probabilityUDF(col($(featuresCol))) } outputData = outputData.withColumn($(probabilityCol), probUDF) numColsOutput += 1 } if ($(predictionCol).nonEmpty) { val predUDF = if ($(rawPredictionCol).nonEmpty) { - udf[Double, Vector](raw2prediction).apply(col($(rawPredictionCol))) + udf(raw2prediction _).apply(col($(rawPredictionCol))) } else if ($(probabilityCol).nonEmpty) { - udf[Double, Vector](probability2prediction).apply(col($(probabilityCol))) + udf(probability2prediction _).apply(col($(probabilityCol))) } else { - callUDF(predict _, DoubleType, col($(featuresCol))) + val predictUDF = udf { (features: Any) => + predict(features.asInstanceOf[FeaturesType]) + } + predictUDF(col($(featuresCol))) } outputData = outputData.withColumn($(predictionCol), predUDF) numColsOutput += 1 From 801340982d5de34a1af4155522c409e12459aa19 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Thu, 25 Jun 2015 21:32:32 +0200 Subject: [PATCH 11/15] replaced the now deprecated callUDF by udf in Predictor --- mllib/src/main/scala/org/apache/spark/ml/Predictor.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index edaa2afb790e6..333b42711ec52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -122,9 +122,7 @@ abstract class Predictor[ */ protected def extractLabeledPoints(dataset: DataFrame): RDD[LabeledPoint] = { dataset.select($(labelCol), $(featuresCol)) - .map { case Row(label: Double, features: Vector) => - LabeledPoint(label, features) - } + .map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) } } } @@ -171,7 +169,10 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) if ($(predictionCol).nonEmpty) { - dataset.withColumn($(predictionCol), callUDF(predict _, DoubleType, col($(featuresCol)))) + val predictUDF = udf { (features: Any) => + predict(features.asInstanceOf[FeaturesType]) + } + dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) } else { this.logWarning(s"$uid: Predictor.transform() was called as NOOP" + " since no output columns were set.") From 0ebd0da50cac5c951871a0d2350b76f218069146 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Thu, 25 Jun 2015 21:51:53 +0200 Subject: [PATCH 12/15] replace the now deprecated callUDF by udf in VectorIndexer --- .../scala/org/apache/spark/ml/feature/VectorIndexer.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index f4854a5e4b7b7..6c45ea28da273 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -30,7 +30,7 @@ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util.{Identifiable, SchemaUtils} import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, VectorUDT} import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.functions.callUDF +import org.apache.spark.sql.functions.udf import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.collection.OpenHashSet @@ -339,7 +339,10 @@ class VectorIndexerModel private[ml] ( override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) val newField = prepOutputField(dataset.schema) - val newCol = callUDF(transformFunc, new VectorUDT, dataset($(inputCol))) + val transformUDF = udf { (vector: Any) => + transformFunc(vector.asInstanceOf[Vector]) + } + val newCol = transformUDF(dataset($(inputCol))) dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata)) } From 48ca15e18a4ac0a504714fc8ef9457ff4f402bb4 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 26 Jun 2015 18:32:31 +0200 Subject: [PATCH 13/15] used vector type tag for udf call in VectorIndexer --- .../scala/org/apache/spark/ml/feature/VectorIndexer.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 6c45ea28da273..e3264e2988e48 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -339,9 +339,7 @@ class VectorIndexerModel private[ml] ( override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) val newField = prepOutputField(dataset.schema) - val transformUDF = udf { (vector: Any) => - transformFunc(vector.asInstanceOf[Vector]) - } + val transformUDF = udf { (vector: Vector) => transformFunc(vector) } val newCol = transformUDF(dataset($(inputCol))) dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata)) } From 1ddb452e1b5f71b4f9c6b1f869dc1c958b014ff7 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 26 Jun 2015 18:33:41 +0200 Subject: [PATCH 14/15] renamed initUDF in order to be consistent in OneVsRest --- .../scala/org/apache/spark/ml/classification/OneVsRest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 09e2330627d24..ea757c5e40c76 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -88,9 +88,9 @@ final class OneVsRestModel private[ml] ( // add an accumulator column to store predictions of all the models val accColName = "mbc$acc" + UUID.randomUUID().toString - val init = udf { () => Map[Int, Double]() } + val initUDF = udf { () => Map[Int, Double]() } val mapType = MapType(IntegerType, DoubleType, valueContainsNull = false) - val newDataset = dataset.withColumn(accColName, init()) + val newDataset = dataset.withColumn(accColName, initUDF()) // persist if underlying dataset is not persistent. val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE From 26f5a7a28cb5bf9f6d9e91d1b85255a282b92396 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 26 Jun 2015 18:53:35 +0200 Subject: [PATCH 15/15] 2 spaces instead of 1 --- .../main/scala/org/apache/spark/ml/feature/VectorIndexer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index e3264e2988e48..c73bdccdef5fa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -339,7 +339,7 @@ class VectorIndexerModel private[ml] ( override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) val newField = prepOutputField(dataset.schema) - val transformUDF = udf { (vector: Vector) => transformFunc(vector) } + val transformUDF = udf { (vector: Vector) => transformFunc(vector) } val newCol = transformUDF(dataset($(inputCol))) dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata)) }