Skip to content
This repository has been archived by the owner on Sep 20, 2022. It is now read-only.

[HIVEMALL-35] Remove unnecessary implicit conversions in HivemallUtils #26

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentEx

@Override
public FloatWritable evaluate(DeferredObject[] arguments) throws HiveException {
float d = PrimitiveObjectInspectorUtils.getFloat(arguments[0], distanceOI);
float d = PrimitiveObjectInspectorUtils.getFloat(arguments[0].get(), distanceOI);
float sim = 1.f / (1.f + d);
return new FloatWritable(sim);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package hivemall.tools

import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HivemallOps._
import org.apache.spark.sql.hive.HivemallUtils._
import org.apache.spark.sql.types._

object RegressionDatagen {
Expand Down Expand Up @@ -57,10 +57,10 @@ object RegressionDatagen {
)
import sc.implicits._
df.lr_datagen(
s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims -prob_one $prob_one"
+ (if (dense) " -dense" else "")
+ (if (sort) " -sort" else "")
+ (if (cl) " -cl" else ""))
.select($"label".cast(DoubleType).as("label"), $"features")
lit(s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims -prob_one $prob_one"
+ (if (dense) " -dense" else "")
+ (if (sort) " -sort" else "")
+ (if (cl) " -cl" else ""))
).select($"label".cast(DoubleType).as("label"), $"features")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive

import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.HivemallFeature
Expand Down Expand Up @@ -57,8 +55,6 @@ import org.apache.spark.unsafe.types.UTF8String
* @groupname misc
*/
final class HivemallOps(df: DataFrame) extends Logging {
import HivemallOps._
import HivemallUtils._

/**
* @see hivemall.regression.AdaDeltaUDTF
Expand Down Expand Up @@ -687,10 +683,14 @@ final class HivemallOps(df: DataFrame) extends Logging {
* Amplifies and shuffle data inside partitions.
* @group ftvec.amplify
*/
def part_amplify(xtimes: Int): DataFrame = {
def part_amplify(xtimes: Column): DataFrame = {
val xtimesInt = xtimes.expr match {
case Literal(v: Any, IntegerType) => v.asInstanceOf[Int]
case e => throw new AnalysisException("`xtimes` must be integer, however " + e)
}
val rdd = df.rdd.mapPartitions({ iter =>
val elems = iter.flatMap{ row =>
Seq.fill[Row](xtimes)(row)
Seq.fill[Row](xtimesInt)(row)
}
// Need to check how this shuffling affects results
scala.util.Random.shuffle(elems)
Expand Down Expand Up @@ -792,7 +792,7 @@ final class HivemallOps(df: DataFrame) extends Logging {
*/
def each_top_k(k: Int, group: String, score: String, args: String*)
: DataFrame = withTypedPlan {
val clusterDf = df.repartition(group).sortWithinPartitions(group)
val clusterDf = df.repartition(df(group)).sortWithinPartitions(group)
val childrenAttributes = clusterDf.logicalPlan.output
val generator = Generate(
EachTopK(
Expand Down Expand Up @@ -881,7 +881,7 @@ final class HivemallOps(df: DataFrame) extends Logging {

@inline private[this] def toHivemallFeatureDf(exprs: Column*): Seq[Column] = {
df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map {
case (StructField(_, _: VectorUDT, _, _), c) => to_hivemall_features(c)
case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c)
case (_, c) => c
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.spark.sql.hive

import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
Expand All @@ -31,19 +30,40 @@ object HivemallUtils {
private[this] val maxDims = 100000000

/**
* An implicit conversion to avoid doing annoying transformation.
* This class must be in o.a.spark.sql._ because
* a Column class is private.
* Check whether the given schema contains a column of the required data type.
* @param colName column name
* @param dataType required column data type
*/
@inline implicit def toBooleanLiteral(i: Boolean): Column = Column(Literal.create(i, BooleanType))
@inline implicit def toIntLiteral(i: Int): Column = Column(Literal.create(i, IntegerType))
@inline implicit def toFloatLiteral(i: Float): Column = Column(Literal.create(i, FloatType))
@inline implicit def toDoubleLiteral(i: Double): Column = Column(Literal.create(i, DoubleType))
@inline implicit def toStringLiteral(i: String): Column = Column(Literal.create(i, StringType))
@inline implicit def toIntArrayLiteral(i: Seq[Int]): Column =
Column(Literal.create(i, ArrayType(IntegerType)))
@inline implicit def toStringArrayLiteral(i: Seq[String]): Column =
Column(Literal.create(i, ArrayType(StringType)))
private[this] def checkColumnType(schema: StructType, colName: String, dataType: DataType)
: Unit = {
val actualDataType = schema(colName).dataType
require(actualDataType.equals(dataType),
s"Column $colName must be of type $dataType but was actually $actualDataType.")
}

def to_vector_func(dense: Boolean, dims: Int): Seq[String] => Vector = {
if (dense) {
// Dense features
i: Seq[String] => {
val features = new Array[Double](dims)
i.map { ft =>
val s = ft.split(":").ensuring(_.size == 2)
features(s(0).toInt) = s(1).toDouble
}
Vectors.dense(features)
}
} else {
// Sparse features
i: Seq[String] => {
val features = i.map { ft =>
// val s = ft.split(":").ensuring(_.size == 2)
val s = ft.split(":")
(s(0).toInt, s(1).toDouble)
}
Vectors.sparse(dims, features)
}
}
}

def to_hivemall_features_func(): Vector => Array[String] = {
case dv: DenseVector =>
Expand Down Expand Up @@ -83,20 +103,28 @@ object HivemallUtils {
}

/**
* Transforms `org.apache.spark.ml.linalg.Vector` into Hivemall features.
* Transforms Hivemall features into a [[Vector]].
*/
def to_vector(dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = {
udf(to_vector_func(dense, dims))
}

/**
* Transforms a [[Vector]] into Hivemall features.
*/
def to_hivemall_features: UserDefinedFunction = udf(to_hivemall_features_func)

/**
* Returns a new vector with `1.0` (bias) appended to the input vector.
* Returns a new [[Vector]] with `1.0` (bias) appended to the input [[Vector]].
* @group ftvec
*/
def append_bias: UserDefinedFunction = udf(append_bias_func)

/**
* Make up a function object from a Hivemall model.
* Builds a [[Vector]]-based model from a table of Hivemall models
*/
def funcModel(df: DataFrame, dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = {
def vectorized_model(df: DataFrame, dense: Boolean = false, dims: Int = maxDims)
: UserDefinedFunction = {
checkColumnType(df.schema, "feature", StringType)
checkColumnType(df.schema, "weight", DoubleType)

Expand All @@ -106,55 +134,12 @@ object HivemallUtils {
.select($"weight")
.map { case Row(weight: Double) => weight}
.reduce(_ + _)
val weights = funcVectorizerImpl(dense, dims)(
val weights = to_vector_func(dense, dims)(
df.select($"feature", $"weight")
.where($"feature" !== "0")
.map { case Row(label: String, feature: Double) => s"${label}:$feature"}
.collect.toSeq)

udf((input: Vector) => BLAS.dot(input, weights) + intercept)
}

/**
* Make up a function object to transform Hivemall features into Vector.
*/
def funcVectorizer(dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = {
udf(funcVectorizerImpl(dense, dims))
}

private[this] def funcVectorizerImpl(dense: Boolean, dims: Int): Seq[String] => Vector = {
if (dense) {
// Dense features
i: Seq[String] => {
val features = new Array[Double](dims)
i.map { ft =>
val s = ft.split(":").ensuring(_.size == 2)
features(s(0).toInt) = s(1).toDouble
}
Vectors.dense(features)
}
} else {
// Sparse features
i: Seq[String] => {
val features = i.map { ft =>
// val s = ft.split(":").ensuring(_.size == 2)
val s = ft.split(":")
(s(0).toInt, s(1).toDouble)
}
Vectors.sparse(dims, features)
}
}
}

/**
* Check whether the given schema contains a column of the required data type.
* @param colName column name
* @param dataType required column data type
*/
private[this] def checkColumnType(schema: StructType, colName: String, dataType: DataType)
: Unit = {
val actualDataType = schema(colName).dataType
require(actualDataType.equals(dataType),
s"Column $colName must be of type $dataType but was actually $actualDataType.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive

import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HivemallUtils._
import org.apache.spark.test.HivemallFeatureQueryTest
import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest
import org.apache.spark.test.VectorQueryTest

final class HiveUdfWithFeatureSuite extends HivemallFeatureQueryTest {
Expand Down