From 5d41d2b97f897565ce89939851dbd7a7075fedbd Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 27 Jan 2017 00:12:54 +0900 Subject: [PATCH 1/2] Put the use of HiveUDFs in one place --- .../apache/spark/sql/hive/HivemallOps.scala | 1109 +++++++++-------- .../sql/hive/internal/HivemallOpsImpl.scala | 78 ++ .../spark/sql/hive/HivemallOpsSuite.scala | 38 +- .../spark/sql/hive/ModelMixingSuite.scala | 10 +- .../apache/spark/sql/hive/XGBoostSuite.scala | 4 +- .../HivemallOpsWithFeatureSuite.scala | 2 +- 6 files changed, 673 insertions(+), 568 deletions(-) create mode 100644 spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala index e3e20ee44..6e588a91c 100644 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{EachTopK, Expression, Literal, NamedExpression, UserDefinedGenerator} import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -54,607 +53,563 @@ import org.apache.spark.unsafe.types.UTF8String * @groupname misc */ final class HivemallOps(df: DataFrame) extends Logging { + import internal.HivemallOpsImpl._ private[this] val _sparkSession = df.sparkSession private[this] val _analyzer = _sparkSession.sessionState.analyzer /** - * @see hivemall.regression.AdaDeltaUDTF + * @see [[hivemall.regression.AdaDeltaUDTF]] * @group regression */ @scala.annotation.varargs def train_adadelta(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_adadelta", - new HiveFunctionWrapper("hivemall.regression.AdaDeltaUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.AdaDeltaUDTF", + "train_adadelta", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.regression.AdaGradUDTF + * @see [[hivemall.regression.AdaGradUDTF]] * @group regression */ @scala.annotation.varargs def train_adagrad(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_adagrad", - new HiveFunctionWrapper("hivemall.regression.AdaGradUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.AdaGradUDTF", + "train_adagrad", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.regression.AROWRegressionUDTF + * @see [[hivemall.regression.AROWRegressionUDTF]] * @group regression */ @scala.annotation.varargs def train_arow_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_arow_regr", - new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.AROWRegressionUDTF", + "train_arow_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) } /** - * @see hivemall.regression.AROWRegressionUDTF$AROWe + * @see [[hivemall.regression.AROWRegressionUDTF.AROWe]] * @group regression */ @scala.annotation.varargs def train_arowe_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_arowe_regr", - new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.AROWRegressionUDTF$AROWe", + "train_arowe_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) } /** - * @see hivemall.regression.AROWRegressionUDTF$AROWe2 + * @see [[hivemall.regression.AROWRegressionUDTF.AROWe2]] * @group regression */ @scala.annotation.varargs def train_arowe2_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_arowe2_regr", - new HiveFunctionWrapper("hivemall.regression.AROWRegressionUDTF$AROWe2"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.AROWRegressionUDTF$AROWe2", + "train_arowe2_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) } /** - * @see hivemall.regression.LogressUDTF + * @see [[hivemall.regression.LogressUDTF]] * @group regression */ @scala.annotation.varargs def train_logregr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_logregr", - new HiveFunctionWrapper("hivemall.regression.LogressUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.LogressUDTF", + "train_logregr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.regression.PassiveAggressiveRegressionUDTF + * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF]] * @group regression */ @scala.annotation.varargs def train_pa1_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_pa1_regr", - new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.PassiveAggressiveRegressionUDTF", + "train_pa1_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a + * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA1a]] * @group regression */ @scala.annotation.varargs def train_pa1a_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_pa1a_regr", - new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.PassiveAggressiveRegressionUDTF$PA1a", + "train_pa1a_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2 + * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2]] * @group regression */ @scala.annotation.varargs def train_pa2_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_pa2_regr", - new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2", + "train_pa2_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a + * @see [[hivemall.regression.PassiveAggressiveRegressionUDTF.PA2a]] * @group regression */ @scala.annotation.varargs def train_pa2a_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_pa2a_regr", - new HiveFunctionWrapper("hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.regression.PassiveAggressiveRegressionUDTF$PA2a", + "train_pa2a_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.smile.regression.RandomForestRegressionUDTF + * @see [[hivemall.smile.regression.RandomForestRegressionUDTF]] * @group regression */ @scala.annotation.varargs def train_randomforest_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_randomforest_regr", - new HiveFunctionWrapper("hivemall.smile.regression.RandomForestRegressionUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, + planHiveGenericUDTF( + df, + "hivemall.smile.regression.RandomForestRegressionUDTF", + "train_randomforest_regr", + setMixServs(toHivemallFeatures(exprs)), Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") - .map(UnresolvedAttribute(_)), - df.logicalPlan) + ) } /** - * @see hivemall.classifier.PerceptronUDTF + * @see [[hivemall.classifier.PerceptronUDTF]] * @group classifier */ @scala.annotation.varargs def train_perceptron(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_perceptron", - new HiveFunctionWrapper("hivemall.classifier.PerceptronUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.PerceptronUDTF", + "train_perceptron", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.classifier.PassiveAggressiveUDTF + * @see [[hivemall.classifier.PassiveAggressiveUDTF]] * @group classifier */ @scala.annotation.varargs def train_pa(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_pa", - new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.PassiveAggressiveUDTF", + "train_pa", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.classifier.PassiveAggressiveUDTF$PA1 + * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA1]] * @group classifier */ @scala.annotation.varargs def train_pa1(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_pa1", - new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA1"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.PassiveAggressiveUDTF$PA1", + "train_pa1", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.classifier.PassiveAggressiveUDTF$PA2 + * @see [[hivemall.classifier.PassiveAggressiveUDTF.PA2]] * @group classifier */ @scala.annotation.varargs def train_pa2(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_pa2", - new HiveFunctionWrapper("hivemall.classifier.PassiveAggressiveUDTF$PA2"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.PassiveAggressiveUDTF$PA2", + "train_pa2", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.classifier.ConfidenceWeightedUDTF + * @see [[hivemall.classifier.ConfidenceWeightedUDTF]] * @group classifier */ @scala.annotation.varargs def train_cw(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_cw", - new HiveFunctionWrapper("hivemall.classifier.ConfidenceWeightedUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.ConfidenceWeightedUDTF", + "train_cw", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) } /** - * @see hivemall.classifier.AROWClassifierUDTF + * @see [[hivemall.classifier.AROWClassifierUDTF]] * @group classifier */ @scala.annotation.varargs def train_arow(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_arow", - new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.AROWClassifierUDTF", + "train_arow", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) } /** - * @see hivemall.classifier.AROWClassifierUDTF$AROWh + * @see [[hivemall.classifier.AROWClassifierUDTF.AROWh]] * @group classifier */ @scala.annotation.varargs def train_arowh(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_arowh", - new HiveFunctionWrapper("hivemall.classifier.AROWClassifierUDTF$AROWh"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.AROWClassifierUDTF$AROWh", + "train_arowh", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) } /** - * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1 + * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]] * @group classifier */ @scala.annotation.varargs def train_scw(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_scw", - new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW1"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.SoftConfideceWeightedUDTF$SCW1", + "train_scw", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) } /** - * @see hivemall.classifier.SoftConfideceWeightedUDTF$SCW1 + * @see [[hivemall.classifier.SoftConfideceWeightedUDTF.SCW1]] * @group classifier */ @scala.annotation.varargs def train_scw2(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_scw2", - new HiveFunctionWrapper("hivemall.classifier.SoftConfideceWeightedUDTF$SCW2"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.SoftConfideceWeightedUDTF$SCW2", + "train_scw2", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight", "conv") + ) } /** - * @see hivemall.classifier.classifier.AdaGradRDAUDTF + * @see [[hivemall.classifier.AdaGradRDAUDTF]] * @group classifier */ @scala.annotation.varargs def train_adagrad_rda(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_adagrad_rda", - new HiveFunctionWrapper("hivemall.classifier.AdaGradRDAUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.AdaGradRDAUDTF", + "train_adagrad_rda", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) } /** - * @see hivemall.smile.classification.RandomForestClassifierUDTF + * @see [[hivemall.smile.classification.RandomForestClassifierUDTF]] * @group classifier */ @scala.annotation.varargs def train_randomforest_classifier(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_randomforest_classifier", - new HiveFunctionWrapper("hivemall.smile.classification.RandomForestClassifierUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, + planHiveGenericUDTF( + df, + "hivemall.smile.classification.RandomForestClassifierUDTF", + "train_randomforest_classifier", + setMixServs(toHivemallFeatures(exprs)), Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") - .map(UnresolvedAttribute(_)), - df.logicalPlan) + ) } /** - * @see hivemall.classifier.classifier.MulticlassPerceptronUDTF + * @see [[hivemall.classifier.multiclass.MulticlassPerceptronUDTF]] * @group classifier.multiclass */ @scala.annotation.varargs def train_multiclass_perceptron(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_multiclass_perceptron", - new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPerceptronUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("label", "feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassPerceptronUDTF", + "train_multiclass_perceptron", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight") + ) } /** - * @see hivemall.classifier.classifier.PassiveAggressiveUDTF + * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF]] * @group classifier.multiclass */ @scala.annotation.varargs def train_multiclass_pa(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_multiclass_pa", - new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("label", "feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF", + "train_multiclass_pa", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight") + ) } /** - * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA1 + * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA1]] * @group classifier.multiclass */ @scala.annotation.varargs def train_multiclass_pa1(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_multiclass_pa1", - new HiveFunctionWrapper( - "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("label", "feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA1", + "train_multiclass_pa1", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight") + ) } /** - * @see hivemall.classifier.classifier.PassiveAggressiveUDTF$PA2 + * @see [[hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF.PA2]] * @group classifier.multiclass */ @scala.annotation.varargs def train_multiclass_pa2(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_multiclass_pa2", - new HiveFunctionWrapper( - "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("label", "feature", "weight").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassPassiveAggressiveUDTF$PA2", + "train_multiclass_pa2", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight") + ) } /** - * @see hivemall.classifier.classifier.MulticlassConfidenceWeightedUDTF + * @see [[hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF]] * @group classifier.multiclass */ @scala.annotation.varargs def train_multiclass_cw(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_multiclass_cw", - new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassConfidenceWeightedUDTF", + "train_multiclass_cw", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) } /** - * @see hivemall.classifier.classifier.MulticlassAROWClassifierUDTF + * @see [[hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF]] * @group classifier.multiclass */ @scala.annotation.varargs def train_multiclass_arow(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_multiclass_arow", - new HiveFunctionWrapper("hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF", + "train_multiclass_arow", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) } /** - * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW1 + * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW1]] * @group classifier.multiclass */ @scala.annotation.varargs def train_multiclass_scw(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_multiclass_scw", - new HiveFunctionWrapper( - "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1"), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW1", + "train_multiclass_scw", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) } /** - * @see hivemall.classifier.classifier.MulticlassSoftConfidenceWeightedUDTF$SCW2 + * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW2]] * @group classifier.multiclass */ @scala.annotation.varargs def train_multiclass_scw2(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_multiclass_scw2", - new HiveFunctionWrapper( - "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2" - ), - setMixServs(toHivemallFeatureDf(exprs: _*)).map(_.expr) - ), - join = false, outer = false, None, - Seq("label", "feature", "weight", "conv").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF$SCW2", + "train_multiclass_scw2", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) } /** * :: Experimental :: - * @see hivemall.xgboost.regression.XGBoostRegressionUDTF + * @see [[hivemall.xgboost.regression.XGBoostRegressionUDTF]] * @group xgboost */ @Experimental @scala.annotation.varargs def train_xgboost_regr(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_xgboost_regr", - new HiveFunctionWrapper("hivemall.xgboost.regression.XGBoostRegressionUDTFWrapper"), - toHivemallFeatureDf(exprs : _*).map(_.expr) - ), - join = false, outer = false, None, - Seq("model_id", "pred_model").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.xgboost.regression.XGBoostRegressionUDTFWrapper", + "train_xgboost_regr", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "pred_model") + ) } /** * :: Experimental :: - * @see hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF + * @see [[hivemall.xgboost.classification.XGBoostBinaryClassifierUDTF]] * @group xgboost */ @Experimental @scala.annotation.varargs def train_xgboost_classifier(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_xgboost_classifier", - new HiveFunctionWrapper( - "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTFWrapper"), - toHivemallFeatureDf(exprs : _*).map(_.expr) - ), - join = false, outer = false, None, - Seq("model_id", "pred_model").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.xgboost.classification.XGBoostBinaryClassifierUDTFWrapper", + "train_xgboost_classifier", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "pred_model") + ) } /** * :: Experimental :: - * @see hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF + * @see [[hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTF]] * @group xgboost */ @Experimental @scala.annotation.varargs def train_xgboost_multiclass_classifier(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "train_xgboost_multiclass_classifier", - new HiveFunctionWrapper( - "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTFWrapper" - ), - toHivemallFeatureDf(exprs: _*).map(_.expr) - ), - join = false, outer = false, None, - Seq("model_id", "pred_model").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.xgboost.classification.XGBoostMulticlassClassifierUDTFWrapper", + "train_xgboost_multiclass_classifier", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "pred_model") + ) } /** * :: Experimental :: - * @see hivemall.xgboost.tools.XGBoostPredictUDTF + * @see [[hivemall.xgboost.tools.XGBoostPredictUDTF]] * @group xgboost */ @Experimental @scala.annotation.varargs def xgboost_predict(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "xgboost_predict", - new HiveFunctionWrapper("hivemall.xgboost.tools.XGBoostPredictUDTF"), - toHivemallFeatureDf(exprs: _*).map(_.expr) - ), - join = false, outer = false, None, - Seq("rowid", "predicted").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.xgboost.tools.XGBoostPredictUDTF", + "xgboost_predict", + setMixServs(toHivemallFeatures(exprs)), + Seq("rowid", "predicted") + ) } /** * :: Experimental :: - * @see hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF + * @see [[hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF]] * @group xgboost */ @Experimental @scala.annotation.varargs def xgboost_multiclass_predict(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "xgboost_multiclass_predict", - new HiveFunctionWrapper("hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF"), - toHivemallFeatureDf(exprs: _*).map(_.expr) - ), - join = false, outer = false, None, - Seq("rowid", "label", "probability").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.xgboost.tools.XGBoostMulticlassPredictUDTF", + "xgboost_multiclass_predict", + setMixServs(toHivemallFeatures(exprs)), + Seq("rowid", "label", "probability") + ) } /** - * @see hivemall.knn.lsh.MinHashUDTF + * @see [[hivemall.knn.lsh.MinHashUDTF]] * @group knn.lsh */ @scala.annotation.varargs def minhash(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "minhash", - new HiveFunctionWrapper("hivemall.knn.lsh.MinHashUDTF"), - exprs.map(_.expr)), - join = false, outer = false, None, - Seq("clusterid", "item").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.knn.lsh.MinHashUDTF", + "minhash", + setMixServs(toHivemallFeatures(exprs)), + Seq("clusterid", "item") + ) } /** - * @see hivemall.ftvec.amplify.AmplifierUDTF + * @see [[hivemall.ftvec.amplify.AmplifierUDTF]] * @group ftvec.amplify */ @scala.annotation.varargs @@ -663,17 +618,17 @@ final class HivemallOps(df: DataFrame) extends Logging { case Column(expr: NamedExpression) => UnresolvedAttribute(expr.name) case Column(expr: Expression) => UnresolvedAttribute(expr.simpleString) } - Generate(HiveGenericUDTF( - "amplify", - new HiveFunctionWrapper("hivemall.ftvec.amplify.AmplifierUDTF"), - exprs.map(_.expr)), - join = false, outer = false, None, - outputAttr, - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.ftvec.amplify.AmplifierUDTF", + "amplify", + setMixServs(toHivemallFeatures(exprs)), + Seq("clusterid", "item") + ) } /** - * @see hivemall.ftvec.amplify.RandomAmplifierUDTF + * @see [[hivemall.ftvec.amplify.RandomAmplifierUDTF]] * @group ftvec.amplify */ @scala.annotation.varargs @@ -702,48 +657,48 @@ final class HivemallOps(df: DataFrame) extends Logging { /** * Quantifies input columns. - * @see hivemall.ftvec.conv.QuantifyColumnsUDTF + * @see [[hivemall.ftvec.conv.QuantifyColumnsUDTF]] * @group ftvec.conv */ @scala.annotation.varargs def quantify(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "quantify", - new HiveFunctionWrapper("hivemall.ftvec.conv.QuantifyColumnsUDTF"), - exprs.map(_.expr)), - join = false, outer = false, None, - (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.ftvec.conv.QuantifyColumnsUDTF", + "quantify", + setMixServs(toHivemallFeatures(exprs)), + (0 until exprs.size - 1).map(i => s"c$i") + ) } /** - * @see hivemall.ftvec.trans.BinarizeLabelUDTF + * @see [[hivemall.ftvec.trans.BinarizeLabelUDTF]] * @group ftvec.trans */ @scala.annotation.varargs def binarize_label(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "binarize_label", - new HiveFunctionWrapper("hivemall.ftvec.trans.BinarizeLabelUDTF"), - exprs.map(_.expr)), - join = false, outer = false, None, - (0 until exprs.size - 1).map(i => s"c$i").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.ftvec.trans.BinarizeLabelUDTF", + "binarize_label", + setMixServs(toHivemallFeatures(exprs)), + (0 until exprs.size - 1).map(i => s"c$i") + ) } /** - * @see hivemall.ftvec.trans.QuantifiedFeaturesUDTF + * @see [[hivemall.ftvec.trans.QuantifiedFeaturesUDTF]] * @group ftvec.trans */ @scala.annotation.varargs def quantified_features(exprs: Column*): DataFrame = withTypedPlan { - Generate(HiveGenericUDTF( - "quantified_features", - new HiveFunctionWrapper("hivemall.ftvec.trans.QuantifiedFeaturesUDTF"), - exprs.map(_.expr)), - join = false, outer = false, None, - Seq("features").map(UnresolvedAttribute(_)), - df.logicalPlan) + planHiveGenericUDTF( + df, + "hivemall.ftvec.trans.QuantifiedFeaturesUDTF", + "quantified_features", + setMixServs(toHivemallFeatures(exprs)), + Seq("features") + ) } /** @@ -758,7 +713,7 @@ final class HivemallOps(df: DataFrame) extends Logging { } /** - * Splits `org.apache.spark.ml.linalg.Vector` into pieces. + * Splits [[Vector]] into pieces. * @group ftvec */ def explode_vector(expr: Column): DataFrame = { @@ -811,36 +766,27 @@ final class HivemallOps(df: DataFrame) extends Logging { } /** - * Returns a new [[DataFrame]] with columns renamed. - * This is a wrapper for DataFrame#toDF. + * @see [[hivemall.dataset.LogisticRegressionDataGeneratorUDTF]] * @group misc */ @scala.annotation.varargs - def as(colNames: String*): DataFrame = df.toDF(colNames: _*) + def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper", + "lr_datagen", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "features") + ) + } /** * Returns all the columns as Seq[Column] in this [[DataFrame]]. - * @group misc */ - def cols: Seq[Column] = { + private[sql] def cols: Seq[Column] = { df.schema.fields.map(col => df.col(col.name)).toSeq } - /** - * @see hivemall.dataset.LogisticRegressionDataGeneratorUDTF - * @group misc - */ - @scala.annotation.varargs - def lr_datagen(exprs: Column*): Dataset[Row] = withTypedPlan { - Generate(HiveGenericUDTF( - "lr_datagen", - new HiveFunctionWrapper("hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper"), - exprs.map(_.expr)), - join = false, outer = false, None, - Seq("label", "features").map(UnresolvedAttribute(_)), - df.logicalPlan) - } - /** * :: Experimental :: * If a parameter '-mix' does not exist in a 3rd argument, @@ -868,7 +814,10 @@ final class HivemallOps(df: DataFrame) extends Logging { } } - @inline private[this] def toHivemallFeatureDf(exprs: Column*): Seq[Column] = { + /** + * If the input is a [[Vector]], transform it into Hivemall features. + */ + @inline private[this] def toHivemallFeatures(exprs: Seq[Column]): Seq[Column] = { df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map { case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c) case (_, c) => c @@ -886,6 +835,7 @@ final class HivemallOps(df: DataFrame) extends Logging { } object HivemallOps { + import internal.HivemallOpsImpl._ /** * Implicitly inject the [[HivemallOps]] into [[DataFrame]]. @@ -894,417 +844,490 @@ object HivemallOps { new HivemallOps(df) /** - * @see hivemall.HivemallVersionUDF + * @see [[hivemall.HivemallVersionUDF]] * @group misc */ def hivemall_version(): Column = withExpr { - HiveSimpleUDF("hivemall_version", new HiveFunctionWrapper("hivemall.HivemallVersionUDF"), Nil) + planHiveUDF( + "hivemall.HivemallVersionUDF", + "hivemall_version", + Nil + ) } /** - * @see hivemall.knn.similarity.CosineSimilarityUDF + * @see [[hivemall.knn.similarity.CosineSimilarityUDF]] * @group knn.similarity */ @scala.annotation.varargs def cosine_sim(exprs: Column*): Column = withExpr { - HiveGenericUDF("cosine_sim", - new HiveFunctionWrapper("hivemall.knn.similarity.CosineSimilarityUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.similarity.CosineSimilarityUDF", + "cosine_sim", + exprs + ) } /** - * @see hivemall.knn.similarity.JaccardIndexUDF + * @see [[hivemall.knn.similarity.JaccardIndexUDF]] * @group knn.similarity */ @scala.annotation.varargs def jaccard(exprs: Column*): Column = withExpr { - HiveSimpleUDF("jaccard", - new HiveFunctionWrapper("hivemall.knn.similarity.JaccardIndexUDF"), - exprs.map(_.expr)) + planHiveUDF( + "hivemall.knn.similarity.JaccardIndexUDF", + "jaccard", + exprs + ) } /** - * @see hivemall.knn.similarity.AngularSimilarityUDF + * @see [[hivemall.knn.similarity.AngularSimilarityUDF]] * @group knn.similarity */ @scala.annotation.varargs def angular_similarity(exprs: Column*): Column = withExpr { - HiveGenericUDF("angular_similarity", - new HiveFunctionWrapper("hivemall.knn.similarity.AngularSimilarityUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.similarity.AngularSimilarityUDF", + "angular_similarity", + exprs + ) } /** - * @see hivemall.knn.similarity.EuclidSimilarity + * @see [[hivemall.knn.similarity.EuclidSimilarity]] * @group knn.similarity */ @scala.annotation.varargs def euclid_similarity(exprs: Column*): Column = withExpr { - HiveGenericUDF("euclid_similarity", - new HiveFunctionWrapper("hivemall.knn.similarity.EuclidSimilarity"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.similarity.EuclidSimilarity", + "euclid_similarity", + exprs + ) } /** - * @see hivemall.knn.similarity.Distance2SimilarityUDF + * @see [[hivemall.knn.similarity.Distance2SimilarityUDF]] * @group knn.similarity */ @scala.annotation.varargs def distance2similarity(exprs: Column*): Column = withExpr { // TODO: Need a wrapper class because of using unsupported types - HiveGenericUDF("distance2similarity", - new HiveFunctionWrapper("hivemall.knn.similarity.Distance2SimilarityUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.similarity.Distance2SimilarityUDF", + "distance2similarity", + exprs + ) } /** - * @see hivemall.knn.distance.HammingDistanceUDF + * @see [[hivemall.knn.distance.HammingDistanceUDF]] * @group knn.distance */ @scala.annotation.varargs def hamming_distance(exprs: Column*): Column = withExpr { - HiveSimpleUDF("hamming_distance", - new HiveFunctionWrapper("hivemall.knn.distance.HammingDistanceUDF"), - exprs.map(_.expr)) + planHiveUDF( + "hivemall.knn.distance.HammingDistanceUDF", + "hamming_distance", + exprs + ) } /** - * @see hivemall.knn.distance.PopcountUDF + * @see [[hivemall.knn.distance.PopcountUDF]] * @group knn.distance */ @scala.annotation.varargs def popcnt(exprs: Column*): Column = withExpr { - HiveSimpleUDF("popcnt", - new HiveFunctionWrapper("hivemall.knn.distance.PopcountUDF"), - exprs.map(_.expr)) + planHiveUDF( + "hivemall.knn.distance.PopcountUDF", + "popcnt", + exprs + ) } /** - * @see hivemall.knn.distance.KLDivergenceUDF + * @see [[hivemall.knn.distance.KLDivergenceUDF]] * @group knn.distance */ @scala.annotation.varargs def kld(exprs: Column*): Column = withExpr { - HiveSimpleUDF("kld", - new HiveFunctionWrapper("hivemall.knn.distance.KLDivergenceUDF"), - exprs.map(_.expr)) + planHiveUDF( + "hivemall.knn.distance.KLDivergenceUDF", + "kld", + exprs + ) } /** - * @see hivemall.knn.distance.EuclidDistanceUDF + * @see [[hivemall.knn.distance.EuclidDistanceUDF]] * @group knn.distance */ @scala.annotation.varargs def euclid_distance(exprs: Column*): Column = withExpr { - HiveGenericUDF("euclid_distance", - new HiveFunctionWrapper("hivemall.knn.distance.EuclidDistanceUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.distance.EuclidDistanceUDF", + "euclid_distance", + exprs + ) } /** - * @see hivemall.knn.distance.CosineDistanceUDF + * @see [[hivemall.knn.distance.CosineDistanceUDF]] * @group knn.distance */ @scala.annotation.varargs def cosine_distance(exprs: Column*): Column = withExpr { - HiveGenericUDF("cosine_distance", - new HiveFunctionWrapper("hivemall.knn.distance.CosineDistanceUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.distance.CosineDistanceUDF", + "cosine_distance", + exprs + ) } /** - * @see hivemall.knn.distance.AngularDistanceUDF + * @see [[hivemall.knn.distance.AngularDistanceUDF]] * @group knn.distance */ @scala.annotation.varargs def angular_distance(exprs: Column*): Column = withExpr { - HiveGenericUDF("angular_distance", - new HiveFunctionWrapper("hivemall.knn.distance.AngularDistanceUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.distance.AngularDistanceUDF", + "angular_distance", + exprs + ) } /** - * @see hivemall.knn.distance.ManhattanDistanceUDF + * @see [[hivemall.knn.distance.ManhattanDistanceUDF]] * @group knn.distance */ @scala.annotation.varargs def manhattan_distance(exprs: Column*): Column = withExpr { - HiveGenericUDF("manhattan_distance", - new HiveFunctionWrapper("hivemall.knn.distance.ManhattanDistanceUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.distance.ManhattanDistanceUDF", + "manhattan_distance", + exprs + ) } /** - * @see hivemall.knn.distance.MinkowskiDistanceUDF + * @see [[hivemall.knn.distance.MinkowskiDistanceUDF]] * @group knn.distance */ @scala.annotation.varargs def minkowski_distance (exprs: Column*): Column = withExpr { - HiveGenericUDF("minkowski_distance", - new HiveFunctionWrapper("hivemall.knn.distance.MinkowskiDistanceUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.distance.MinkowskiDistanceUDF", + "minkowski_distance", + exprs + ) } /** - * @see hivemall.knn.lsh.bBitMinHashUDF + * @see [[hivemall.knn.lsh.bBitMinHashUDF]] * @group knn.lsh */ @scala.annotation.varargs def bbit_minhash(exprs: Column*): Column = withExpr { - HiveSimpleUDF("bbit_minhash", - new HiveFunctionWrapper("hivemall.knn.lsh.bBitMinHashUDF"), - exprs.map(_.expr)) + planHiveUDF( + "hivemall.knn.lsh.bBitMinHashUDF", + "bbit_minhash", + exprs + ) } /** - * @see hivemall.knn.lsh.MinHashesUDF + * @see [[hivemall.knn.lsh.MinHashesUDFWrapper]] * @group knn.lsh */ @scala.annotation.varargs def minhashes(exprs: Column*): Column = withExpr { - HiveGenericUDF("minhashes", - new HiveFunctionWrapper("hivemall.knn.lsh.MinHashesUDFWrapper"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.knn.lsh.MinHashesUDFWrapper", + "minhashes", + exprs + ) } /** * Returns new features with `1.0` (bias) appended to the input features. + * @see [[hivemall.ftvec.AddBiasUDFWrapper]] * @group ftvec */ def add_bias(expr: Column): Column = withExpr { - HiveGenericUDF("add_bias", - new HiveFunctionWrapper("hivemall.ftvec.AddBiasUDFWrapper"), - expr.expr :: Nil) + planHiveGenericUDF( + "hivemall.ftvec.AddBiasUDFWrapper", + "add_bias", + expr :: Nil + ) } /** - * @see hivemall.ftvec.ExtractFeatureUdf + * @see [[hivemall.ftvec.ExtractFeatureUDFWrapper]] * @group ftvec * * TODO: This throws java.lang.ClassCastException because * HiveInspectors.toInspector has a bug in spark. * Need to fix it later. */ - def extract_feature(expr: Column): Column = { - val hiveUdf = HiveGenericUDF( + def extract_feature(expr: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.ExtractFeatureUDFWrapper", "extract_feature", - new HiveFunctionWrapper("hivemall.ftvec.ExtractFeatureUDFWrapper"), - expr.expr :: Nil) - Column(hiveUdf).as("feature") - } + expr :: Nil + ) + }.as("feature") /** - * @see hivemall.ftvec.ExtractWeightUdf + * @see [[hivemall.ftvec.ExtractWeightUDFWrapper]] * @group ftvec * * TODO: This throws java.lang.ClassCastException because * HiveInspectors.toInspector has a bug in spark. * Need to fix it later. */ - def extract_weight(expr: Column): Column = { - val hiveUdf = HiveGenericUDF( + def extract_weight(expr: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.ExtractWeightUDFWrapper", "extract_weight", - new HiveFunctionWrapper("hivemall.ftvec.ExtractWeightUDFWrapper"), - expr.expr :: Nil) - Column(hiveUdf).as("value") - } + expr :: Nil + ) + }.as("value") /** - * @see hivemall.ftvec.AddFeatureIndexUDFWrapper + * @see [[hivemall.ftvec.AddFeatureIndexUDFWrapper]] * @group ftvec */ def add_feature_index(expr: Column): Column = withExpr { - HiveGenericUDF("add_feature_index", - new HiveFunctionWrapper("hivemall.ftvec.AddFeatureIndexUDFWrapper"), - expr.expr :: Nil) + planHiveGenericUDF( + "hivemall.ftvec.AddFeatureIndexUDFWrapper", + "add_feature_index", + expr :: Nil + ) } /** - * @see hivemall.ftvec.SortByFeatureUDF + * @see [[hivemall.ftvec.SortByFeatureUDFWrapper]] * @group ftvec */ def sort_by_feature(expr: Column): Column = withExpr { - HiveGenericUDF("sort_by_feature", - new HiveFunctionWrapper("hivemall.ftvec.SortByFeatureUDFWrapper"), - expr.expr :: Nil) + planHiveGenericUDF( + "hivemall.ftvec.SortByFeatureUDFWrapper", + "sort_by_feature", + expr :: Nil + ) } /** - * @see hivemall.ftvec.hashing.MurmurHash3UDF + * @see [[hivemall.ftvec.hashing.MurmurHash3UDF]] * @group ftvec.hashing */ def mhash(expr: Column): Column = withExpr { - HiveSimpleUDF("mhash", - new HiveFunctionWrapper("hivemall.ftvec.hashing.MurmurHash3UDF"), - expr.expr :: Nil) + planHiveUDF( + "hivemall.ftvec.hashing.MurmurHash3UDF", + "mhash", + expr :: Nil + ) } /** - * @see hivemall.ftvec.hashing.Sha1UDF + * @see [[hivemall.ftvec.hashing.Sha1UDF]] * @group ftvec.hashing */ def sha1(expr: Column): Column = withExpr { - HiveSimpleUDF("sha1", - new HiveFunctionWrapper("hivemall.ftvec.hashing.Sha1UDF"), - expr.expr :: Nil) + planHiveUDF( + "hivemall.ftvec.hashing.Sha1UDF", + "sha1", + expr :: Nil + ) } /** - * @see hivemall.ftvec.hashing.ArrayHashValuesUDF + * @see [[hivemall.ftvec.hashing.ArrayHashValuesUDF]] * @group ftvec.hashing */ @scala.annotation.varargs def array_hash_values(exprs: Column*): Column = withExpr { // TODO: Need a wrapper class because of using unsupported types - HiveSimpleUDF("array_hash_values", - new HiveFunctionWrapper("hivemall.ftvec.hashing.ArrayHashValuesUDF"), - exprs.map(_.expr)) + planHiveUDF( + "hivemall.ftvec.hashing.ArrayHashValuesUDF", + "array_hash_values", + exprs + ) } /** - * @see hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF + * @see [[hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF]] * @group ftvec.hashing */ @scala.annotation.varargs def prefixed_hash_values(exprs: Column*): Column = withExpr { // TODO: Need a wrapper class because of using unsupported types - HiveSimpleUDF("prefixed_hash_values", - new HiveFunctionWrapper("hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF"), - exprs.map(_.expr)) + planHiveUDF( + "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF", + "prefixed_hash_values", + exprs + ) } /** - * @see hivemall.ftvec.scaling.RescaleUDF + * @see [[hivemall.ftvec.scaling.RescaleUDF]] * @group ftvec.scaling */ def rescale(value: Column, max: Column, min: Column): Column = withExpr { - HiveSimpleUDF("rescale", - new HiveFunctionWrapper("hivemall.ftvec.scaling.RescaleUDF"), - (value.cast(FloatType) :: max :: min :: Nil).map(_.expr)) + planHiveUDF( + "hivemall.ftvec.scaling.RescaleUDF", + "rescale", + value.cast(FloatType) :: max :: min :: Nil + ) } /** - * @see hivemall.ftvec.scaling.ZScoreUDF + * @see [[hivemall.ftvec.scaling.ZScoreUDF]] * @group ftvec.scaling */ @scala.annotation.varargs def zscore(exprs: Column*): Column = withExpr { - HiveSimpleUDF("zscore", - new HiveFunctionWrapper("hivemall.ftvec.scaling.ZScoreUDF"), - exprs.map(_.expr)) + planHiveUDF( + "hivemall.ftvec.scaling.ZScoreUDF", + "zscore", + exprs + ) } /** - * @see hivemall.ftvec.scaling.L2NormalizationUDF + * @see [[hivemall.ftvec.scaling.L2NormalizationUDFWrapper]] * @group ftvec.scaling */ def normalize(expr: Column): Column = withExpr { - HiveGenericUDF("normalize", - new HiveFunctionWrapper("hivemall.ftvec.scaling.L2NormalizationUDFWrapper"), - expr.expr :: Nil) + planHiveGenericUDF( + "hivemall.ftvec.scaling.L2NormalizationUDFWrapper", + "normalize", + expr :: Nil + ) } /** - * @see hivemall.ftvec.selection.ChiSquareUDF + * @see [[hivemall.ftvec.selection.ChiSquareUDF]] * @group ftvec.selection */ def chi2(observed: Column, expected: Column): Column = withExpr { - HiveGenericUDF("chi2", - new HiveFunctionWrapper("hivemall.ftvec.selection.ChiSquareUDF"), - Seq(observed.expr, expected.expr)) + planHiveGenericUDF( + "hivemall.ftvec.selection.ChiSquareUDF", + "chi2", + Seq(observed, expected) + ) } /** - * @see hivemall.ftvec.conv.ToDenseFeaturesUDF + * @see [[hivemall.ftvec.conv.ToDenseFeaturesUDF]] * @group ftvec.conv */ @scala.annotation.varargs def to_dense_features(exprs: Column*): Column = withExpr { // TODO: Need a wrapper class because of using unsupported types - HiveGenericUDF("to_dense_features", - new HiveFunctionWrapper("hivemall.ftvec.conv.ToDenseFeaturesUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.ftvec.conv.ToDenseFeaturesUDF", + "to_dense_features", + exprs + ) } /** - * @see hivemall.ftvec.conv.ToSparseFeaturesUDF + * @see [[hivemall.ftvec.conv.ToSparseFeaturesUDF]] * @group ftvec.conv */ @scala.annotation.varargs def to_sparse_features(exprs: Column*): Column = withExpr { // TODO: Need a wrapper class because of using unsupported types - HiveGenericUDF("to_sparse_features", - new HiveFunctionWrapper("hivemall.ftvec.conv.ToSparseFeaturesUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.ftvec.conv.ToSparseFeaturesUDF", + "to_sparse_features", + exprs + ) } /** - * @see hivemall.ftvec.trans.VectorizeFeaturesUDF + * @see [[hivemall.ftvec.trans.VectorizeFeaturesUDF]] * @group ftvec.trans */ @scala.annotation.varargs def vectorize_features(exprs: Column*): Column = withExpr { - HiveGenericUDF("vectorize_features", - new HiveFunctionWrapper("hivemall.ftvec.trans.VectorizeFeaturesUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.ftvec.trans.VectorizeFeaturesUDF", + "vectorize_features", + exprs + ) } /** - * @see hivemall.ftvec.trans.CategoricalFeaturesUDF + * @see [[hivemall.ftvec.trans.CategoricalFeaturesUDF]] * @group ftvec.trans */ @scala.annotation.varargs def categorical_features(exprs: Column*): Column = withExpr { - HiveGenericUDF("categorical_features", - new HiveFunctionWrapper("hivemall.ftvec.trans.CategoricalFeaturesUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.ftvec.trans.CategoricalFeaturesUDF", + "categorical_features", + exprs + ) } /** - * @see hivemall.ftvec.trans.IndexedFeatures + * @see [[hivemall.ftvec.trans.IndexedFeatures]] * @group ftvec.trans */ @scala.annotation.varargs def indexed_features(exprs: Column*): Column = withExpr { - HiveGenericUDF("indexed_features", - new HiveFunctionWrapper("hivemall.ftvec.trans.IndexedFeatures"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.ftvec.trans.IndexedFeatures", + "indexed_features", + exprs + ) } /** - * @see hivemall.ftvec.trans.QuantitativeFeaturesUDF + * @see [[hivemall.ftvec.trans.QuantitativeFeaturesUDF]] * @group ftvec.trans */ @scala.annotation.varargs def quantitative_features(exprs: Column*): Column = withExpr { - HiveGenericUDF("quantitative_features", - new HiveFunctionWrapper("hivemall.ftvec.trans.QuantitativeFeaturesUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.ftvec.trans.QuantitativeFeaturesUDF", + "quantitative_features", + exprs + ) } /** - * @see hivemall.smile.tools.TreePredictUDF + * @see [[hivemall.smile.tools.TreePredictUDF]] * @group misc */ @scala.annotation.varargs def tree_predict(exprs: Column*): Column = withExpr { - HiveGenericUDF("tree_predict", - new HiveFunctionWrapper("hivemall.smile.tools.TreePredictUDF"), - exprs.map(_.expr)) + planHiveGenericUDF( + "hivemall.smile.tools.TreePredictUDF", + "tree_predict", + exprs + ) } /** - * @see hivemall.tools.array.SelectKBestUDF + * @see [[hivemall.tools.array.SelectKBestUDF]] * @group tools.array */ def select_k_best(X: Column, importanceList: Column, k: Column): Column = withExpr { - HiveGenericUDF("select_k_best", - new HiveFunctionWrapper("hivemall.tools.array.SelectKBestUDF"), - Seq(X.expr, importanceList.expr, k.expr)) + planHiveGenericUDF( + "hivemall.tools.array.SelectKBestUDF", + "select_k_best", + Seq(X, importanceList, k) + ) } /** - * @see hivemall.tools.math.SigmoidUDF + * @see [[hivemall.tools.math.SigmoidGenericUDF]] * @group misc */ def sigmoid(expr: Column): Column = { @@ -1313,11 +1336,15 @@ object HivemallOps { } /** - * @see hivemall.tools.mapred.RowIdUDF + * @see [[hivemall.tools.mapred.RowIdUDFWrapper]] * @group misc */ def rowid(): Column = withExpr { - HiveGenericUDF("rowid", new HiveFunctionWrapper("hivemall.tools.mapred.RowIdUDFWrapper"), Nil) + planHiveGenericUDF( + "hivemall.tools.mapred.RowIdUDFWrapper", + "rowid", + Nil + ) }.as("rowid") /** diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala new file mode 100644 index 000000000..6ef0e72ff --- /dev/null +++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.hive.internal + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.internal.Logging +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan} +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper + +/** + * This is an implementation class for [[org.apache.spark.sql.hive.HivemallOps]]. + * This class mainly uses the internal Spark classes (e.g., `Generate` and `HiveGenericUDTF`) that + * have unstable interfaces (so, these interfaces may evolve in upcoming releases). + * Therefore, the objective of this class is to extract these unstable parts + * from [[org.apache.spark.sql.hive.HivemallOps]]. + */ +private[hive] object HivemallOpsImpl extends Logging { + + def planHiveUDF( + className: String, + funcName: String, + argumentExprs: Seq[Column]): Expression = { + HiveSimpleUDF( + name = funcName, + funcWrapper = new HiveFunctionWrapper(className), + children = argumentExprs.map(_.expr) + ) + } + + def planHiveGenericUDF( + className: String, + funcName: String, + argumentExprs: Seq[Column]): Expression = { + HiveGenericUDF( + name = funcName, + funcWrapper = new HiveFunctionWrapper(className), + children = argumentExprs.map(_.expr) + ) + } + + def planHiveGenericUDTF( + df: DataFrame, + className: String, + funcName: String, + argumentExprs: Seq[Column], + outputAttrNames: Seq[String]): LogicalPlan = { + Generate( + generator = HiveGenericUDTF( + name = funcName, + funcWrapper = new HiveFunctionWrapper(className), + children = argumentExprs.map(_.expr) + ), + join = false, + outer = false, + qualifier = None, + generatorOutput = outputAttrNames.map(UnresolvedAttribute(_)), + child = df.logicalPlan) + } +} diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala index 49773cccd..3e16ac306 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala @@ -357,7 +357,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { testData("features"), lit(true)).as("predicted") ) .groupBy($"rowid") - .rf_ensemble("predicted").as("rowid", "predicted") + .rf_ensemble("predicted").toDF("rowid", "predicted") .select($"predicted.label") checkAnswer(predicted, Seq(Row(0), Row(1))) @@ -447,7 +447,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { val testDf = Seq( (Array(0.3, 0.1, 0.2), 1), (Array(0.3, 0.1, 0.2), 0), - (Array(0.3, 0.1, 0.2), 0)).toDF.as("features", "label") + (Array(0.3, 0.1, 0.2), 0)).toDF.toDF("features", "label") Seq( "train_randomforest_regr", "train_randomforest_classifier" @@ -469,7 +469,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { } else { res.groupBy("feature").argmin_kld("weight", "conv") } - }.as("feature", "weight") + }.toDF("feature", "weight") // Data preparation val testDf = LargeRegrTrainData @@ -485,14 +485,14 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER") .select($"rowid", ($"weight" * $"value").as("value")) .groupBy("rowid").sum("value") - .as("rowid", "predicted") + .toDF("rowid", "predicted") // Evaluation val eval = predict .join(testDf, predict("rowid") === testDf("rowid")) .groupBy() .agg(Map("target" -> "avg", "predicted" -> "avg")) - .as("target", "predicted") + .toDF("target", "predicted") val diff = eval.map { case Row(target: Double, predicted: Double) => @@ -514,7 +514,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { } else { res.groupBy("feature").argmin_kld("weight", "conv") } - }.as("feature", "weight") + }.toDF("feature", "weight") // Data preparation val testDf = LargeClassifierTestData @@ -537,7 +537,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { * Perhaps you need to use aliases. */ .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0)) - .as("rowid", "predicted") + .toDF("rowid", "predicted") // Evaluation val eval = predict @@ -586,31 +586,31 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("user-defined aggregators for ensembles") { import hiveContext.implicits._ - val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF.as("c0", "c1") + val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF.toDF("c0", "c1") val row1 = df1.groupBy($"c0").voted_avg("c1").collect assert(row1(0).getDouble(1) ~== 0.15) assert(row1(1).getDouble(1) ~== 0.10) - val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF.as("c0", "c1") + val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF.toDF("c0", "c1") val row3 = df3.groupBy($"c0").weight_voted_avg("c1").collect assert(row3(0).getDouble(1) ~== 0.50) assert(row3(1).getDouble(1) ~== 0.30) - val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF.as("c0", "c1", "c2") + val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF.toDF("c0", "c1", "c2") val row5 = df5.groupBy($"c0").argmin_kld("c1", "c2").collect assert(row5(0).getFloat(1) ~== 0.266666666) assert(row5(1).getFloat(1) ~== 0.80) - val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF.as("c0", "c1", "c2") + val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF.toDF("c0", "c1", "c2") val row6 = df6.groupBy($"c0").max_label("c2", "c1").collect assert(row6(0).getString(1) == "id-1") - val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF.as("c0", "c1", "c2") - val row7 = df7.groupBy($"c0").maxrow("c2", "c1").as("c0", "c1").select($"c1.col1").collect + val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF.toDF("c0", "c1", "c2") + val row7 = df7.groupBy($"c0").maxrow("c2", "c1").toDF("c0", "c1").select($"c1.col1").collect assert(row7(0).getString(0) == "id-0") - val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF.as("c0", "c1") - val row8 = df8.groupBy($"c0").rf_ensemble("c1").as("c0", "c1").select("c1.probability").collect + val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF.toDF("c0", "c1") + val row8 = df8.groupBy($"c0").rf_ensemble("c1").toDF("c0", "c1").select("c1.probability").collect assert(row8(0).getDouble(0) ~== 0.3333333333) assert(row8(1).getDouble(0) ~== 1.0) } @@ -618,20 +618,20 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("user-defined aggregators for evaluation") { import hiveContext.implicits._ - val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF.as("c0", "c1", "c2") + val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF.toDF("c0", "c1", "c2") val row1 = df1.groupBy($"c0").mae("c1", "c2").collect assert(row1(0).getDouble(1) ~== 0.26666666) - val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2") + val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.toDF("c0", "c1", "c2") val row2 = df2.groupBy($"c0").mse("c1", "c2").collect assert(row2(0).getDouble(1) ~== 0.29999999) - val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.as("c0", "c1", "c2") + val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.toDF("c0", "c1", "c2") val row3 = df3.groupBy($"c0").rmse("c1", "c2").collect assert(row3(0).getDouble(1) ~== 0.54772253) val df4 = Seq((1, Array(1, 2), Array(2, 3)), (1, Array(3, 8), Array(5, 4))).toDF - .as("c0", "c1", "c2") + .toDF("c0", "c1", "c2") val row4 = df4.groupBy($"c0").f1score("c1", "c2").collect assert(row4(0).getDouble(1) ~== 0.25) } diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala index 3b87a967b..06a4dc0f3 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala @@ -170,7 +170,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter { } else { res.groupBy("feature").argmin_kld("weight", "conv") } - }.as("feature", "weight") + }.toDF("feature", "weight") // Data preparation val testDf = testA9aData @@ -186,14 +186,14 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter { .join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER") .select($"rowid", ($"weight" * $"value").as("value")) .groupBy("rowid").sum("value") - .as("rowid", "predicted") + .toDF("rowid", "predicted") // Evaluation val eval = predict .join(testDf, predict("rowid") === testDf("rowid")) .groupBy() .agg(Map("target" -> "avg", "predicted" -> "avg")) - .as("target", "predicted") + .toDF("target", "predicted") val (target, predicted) = eval.map { case Row(target: Double, predicted: Double) => (target, predicted) @@ -238,7 +238,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter { } else { res.groupBy("feature").argmin_kld("weight", "conv") } - }.as("feature", "weight") + }.toDF("feature", "weight") // Data preparation val testDf = testKdd2010aData @@ -255,7 +255,7 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter { .select($"rowid", ($"weight" * $"value").as("value")) .groupBy("rowid").sum("value") .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0)) - .as("rowid", "predicted") + .toDF("rowid", "predicted") // Evaluation val eval = predict diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala index 8c9c0c3d1..37e0989d1 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala @@ -75,7 +75,7 @@ final class XGBoostSuite extends VectorQueryTest { val predict = model.join(mllibTestDf) .xgboost_predict($"rowid", $"features", $"model_id", $"pred_model") .groupBy("rowid").avg() - .as("rowid", "predicted") + .toDF("rowid", "predicted") val result = predict.join(mllibTestDf, predict("rowid") === mllibTestDf("rowid"), "INNER") .select(predict("rowid"), $"predicted", $"label") @@ -100,7 +100,7 @@ final class XGBoostSuite extends VectorQueryTest { val predict = model.join(mllibTestDf) .xgboost_predict($"rowid", $"features", $"model_id", $"pred_model") .groupBy("rowid").avg() - .as("rowid", "predicted") + .toDF("rowid", "predicted") val result = predict.join(mllibTestDf, predict("rowid") === mllibTestDf("rowid"), "INNER") .select( diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala index 598479da7..b15c77c4c 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala @@ -143,7 +143,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { testDf.join(testModel, testDf("feature") === testModel("feature"), "LEFT_OUTER") .select($"rowid", ($"weight" * $"value").as("value")) .groupBy("rowid").sum("value") - .as("rowid", "value") + .toDF("rowid", "value") .select($"rowid", sigmoid($"value")) } From c506a9b422a34bf6190760a724975f86315328d6 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 27 Jan 2017 01:11:04 +0900 Subject: [PATCH 2/2] Fix syntax errors --- .../sql/hive/internal/HivemallOpsImpl.scala | 2 +- .../spark/sql/hive/HivemallOpsSuite.scala | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala index 6ef0e72ff..ab5c5fbf9 100644 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala +++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/internal/HivemallOpsImpl.scala @@ -18,10 +18,10 @@ */ package org.apache.spark.sql.hive.internal -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala index 3e16ac306..493feda0b 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala @@ -447,7 +447,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { val testDf = Seq( (Array(0.3, 0.1, 0.2), 1), (Array(0.3, 0.1, 0.2), 0), - (Array(0.3, 0.1, 0.2), 0)).toDF.toDF("features", "label") + (Array(0.3, 0.1, 0.2), 0)).toDF("features", "label") Seq( "train_randomforest_regr", "train_randomforest_classifier" @@ -586,31 +586,32 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("user-defined aggregators for ensembles") { import hiveContext.implicits._ - val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF.toDF("c0", "c1") + val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF("c0", "c1") val row1 = df1.groupBy($"c0").voted_avg("c1").collect assert(row1(0).getDouble(1) ~== 0.15) assert(row1(1).getDouble(1) ~== 0.10) - val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF.toDF("c0", "c1") + val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF("c0", "c1") val row3 = df3.groupBy($"c0").weight_voted_avg("c1").collect assert(row3(0).getDouble(1) ~== 0.50) assert(row3(1).getDouble(1) ~== 0.30) - val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF.toDF("c0", "c1", "c2") + val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF("c0", "c1", "c2") val row5 = df5.groupBy($"c0").argmin_kld("c1", "c2").collect assert(row5(0).getFloat(1) ~== 0.266666666) assert(row5(1).getFloat(1) ~== 0.80) - val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF.toDF("c0", "c1", "c2") + val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF("c0", "c1", "c2") val row6 = df6.groupBy($"c0").max_label("c2", "c1").collect assert(row6(0).getString(1) == "id-1") - val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF.toDF("c0", "c1", "c2") + val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF("c0", "c1", "c2") val row7 = df7.groupBy($"c0").maxrow("c2", "c1").toDF("c0", "c1").select($"c1.col1").collect assert(row7(0).getString(0) == "id-0") - val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF.toDF("c0", "c1") - val row8 = df8.groupBy($"c0").rf_ensemble("c1").toDF("c0", "c1").select("c1.probability").collect + val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF("c0", "c1") + val row8 = df8.groupBy($"c0").rf_ensemble("c1").toDF("c0", "c1") + .select("c1.probability").collect assert(row8(0).getDouble(0) ~== 0.3333333333) assert(row8(1).getDouble(0) ~== 1.0) } @@ -618,15 +619,15 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("user-defined aggregators for evaluation") { import hiveContext.implicits._ - val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF.toDF("c0", "c1", "c2") + val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF("c0", "c1", "c2") val row1 = df1.groupBy($"c0").mae("c1", "c2").collect assert(row1(0).getDouble(1) ~== 0.26666666) - val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.toDF("c0", "c1", "c2") + val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF("c0", "c1", "c2") val row2 = df2.groupBy($"c0").mse("c1", "c2").collect assert(row2(0).getDouble(1) ~== 0.29999999) - val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF.toDF("c0", "c1", "c2") + val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF("c0", "c1", "c2") val row3 = df3.groupBy($"c0").rmse("c1", "c2").collect assert(row3(0).getDouble(1) ~== 0.54772253)