Skip to content

Commit

Permalink
[SPARK-31127][ML] Implement abstract Selector
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Implement abstract Selector. Put the common code among ```ANOVASelector```, ```ChiSqSelector```, ```FValueSelector``` and ```VarianceThresholdSelector``` to Selector.

### Why are the changes needed?
code reuse

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing tests

Closes apache#27978 from huaxingao/spark-31127.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
  • Loading branch information
huaxingao authored and zhengruifeng committed May 6, 2020
1 parent 4952f1a commit f05560b
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 886 deletions.
195 changes: 35 additions & 160 deletions mllib/src/main/scala/org/apache/spark/ml/feature/ANOVASelector.scala
Expand Up @@ -17,20 +17,13 @@

package org.apache.spark.ml.feature

import scala.collection.mutable.ArrayBuilder

import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Since
import org.apache.spark.ml._
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.stat.ANOVATest
import org.apache.spark.ml.util._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset}


/**
Expand All @@ -53,111 +46,72 @@ import org.apache.spark.sql.types.{StructField, StructType}
*/
@Since("3.1.0")
final class ANOVASelector @Since("3.1.0")(@Since("3.1.0") override val uid: String)
extends Estimator[ANOVASelectorModel] with FValueSelectorParams
with DefaultParamsWritable {
extends Selector[ANOVASelectorModel] {

@Since("3.1.0")
def this() = this(Identifiable.randomUID("ANOVASelector"))

/** @group setParam */
@Since("3.1.0")
def setNumTopFeatures(value: Int): this.type = set(numTopFeatures, value)
override def setNumTopFeatures(value: Int): this.type = super.setNumTopFeatures(value)

/** @group setParam */
@Since("3.1.0")
def setPercentile(value: Double): this.type = set(percentile, value)
override def setPercentile(value: Double): this.type = super.setPercentile(value)

/** @group setParam */
@Since("3.1.0")
def setFpr(value: Double): this.type = set(fpr, value)
override def setFpr(value: Double): this.type = super.setFpr(value)

/** @group setParam */
@Since("3.1.0")
def setFdr(value: Double): this.type = set(fdr, value)
override def setFdr(value: Double): this.type = super.setFdr(value)

/** @group setParam */
@Since("3.1.0")
def setFwe(value: Double): this.type = set(fwe, value)
override def setFwe(value: Double): this.type = super.setFwe(value)

/** @group setParam */
@Since("3.1.0")
def setSelectorType(value: String): this.type = set(selectorType, value)
override def setSelectorType(value: String): this.type = super.setSelectorType(value)

/** @group setParam */
@Since("3.1.0")
def setFeaturesCol(value: String): this.type = set(featuresCol, value)
override def setFeaturesCol(value: String): this.type = super.setFeaturesCol(value)

/** @group setParam */
@Since("3.1.0")
def setOutputCol(value: String): this.type = set(outputCol, value)
override def setOutputCol(value: String): this.type = super.setOutputCol(value)

/** @group setParam */
@Since("3.1.0")
def setLabelCol(value: String): this.type = set(labelCol, value)

@Since("3.1.0")
override def fit(dataset: Dataset[_]): ANOVASelectorModel = {
transformSchema(dataset.schema, logging = true)

val spark = dataset.sparkSession
import spark.implicits._
override def setLabelCol(value: String): this.type = super.setLabelCol(value)

val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol))
val resultDF = ANOVATest.test(dataset.toDF, $(featuresCol), $(labelCol), true)

def getTopIndices(k: Int): Array[Int] = {
resultDF.sort("pValue", "featureIndex")
.select("featureIndex")
.limit(k)
.as[Int]
.collect()
}

val indices = $(selectorType) match {
case "numTopFeatures" =>
getTopIndices($(numTopFeatures))
case "percentile" =>
getTopIndices((numFeatures * getPercentile).toInt)
case "fpr" =>
resultDF.select("featureIndex")
.where(col("pValue") < $(fpr))
.as[Int].collect()
case "fdr" =>
// This uses the Benjamini-Hochberg procedure.
// https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure
val f = $(fdr) / numFeatures
val maxIndex = resultDF.sort("pValue", "featureIndex")
.select("pValue")
.as[Double].rdd
.zipWithIndex
.flatMap { case (pValue, index) =>
if (pValue <= f * (index + 1)) {
Iterator.single(index.toInt)
} else Iterator.empty
}.fold(-1)(math.max)
if (maxIndex >= 0) {
getTopIndices(maxIndex + 1)
} else Array.emptyIntArray
case "fwe" =>
resultDF.select("featureIndex")
.where(col("pValue") < $(fwe) / numFeatures)
.as[Int].collect()
case errorType =>
throw new IllegalStateException(s"Unknown Selector Type: $errorType")
}
/**
* get the SelectionTestResult for every feature against the label
*/
protected[this] override def getSelectionTestResult(df: DataFrame): DataFrame = {
ANOVATest.test(df, getFeaturesCol, getLabelCol, true)
}

copyValues(new ANOVASelectorModel(uid, indices.sorted).setParent(this))
/**
* Create a new instance of concrete SelectorModel.
* @param indices The indices of the selected features
* @return A new SelectorModel instance
*/
protected[this] def createSelectorModel(
uid: String,
indices: Array[Int]): ANOVASelectorModel = {
new ANOVASelectorModel(uid, indices)
}

@Since("3.1.0")
override def transformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
SchemaUtils.checkNumericType(schema, $(labelCol))
SchemaUtils.appendColumn(schema, $(outputCol), new VectorUDT)
override def fit(dataset: Dataset[_]): ANOVASelectorModel = {
super.fit(dataset)
}

@Since("3.1.0")
override def copy(extra: ParamMap): ANOVASelector = defaultCopy(extra)
override def copy(extra: ParamMap): this.type = defaultCopy(extra)
}

@Since("3.1.0")
Expand All @@ -173,66 +127,16 @@ object ANOVASelector extends DefaultParamsReadable[ANOVASelector] {
@Since("3.1.0")
class ANOVASelectorModel private[ml](
@Since("3.1.0") override val uid: String,
@Since("3.1.0") val selectedFeatures: Array[Int])
extends Model[ANOVASelectorModel] with FValueSelectorParams with MLWritable {

if (selectedFeatures.length >= 2) {
require(selectedFeatures.sliding(2).forall(l => l(0) < l(1)),
"Index should be strictly increasing.")
}
@Since("3.1.0") override val selectedFeatures: Array[Int])
extends SelectorModel[ANOVASelectorModel] (uid, selectedFeatures) {

/** @group setParam */
@Since("3.1.0")
def setFeaturesCol(value: String): this.type = set(featuresCol, value)
override def setFeaturesCol(value: String): this.type = super.setFeaturesCol(value)

/** @group setParam */
@Since("3.1.0")
def setOutputCol(value: String): this.type = set(outputCol, value)

@Since("3.1.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val outputSchema = transformSchema(dataset.schema, logging = true)

val newSize = selectedFeatures.length
val func = { vector: Vector =>
vector match {
case SparseVector(_, indices, values) =>
val (newIndices, newValues) = compressSparse(indices, values)
Vectors.sparse(newSize, newIndices, newValues)
case DenseVector(values) =>
Vectors.dense(selectedFeatures.map(values))
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}

val transformer = udf(func)
dataset.withColumn($(outputCol), transformer(col($(featuresCol))),
outputSchema($(outputCol)).metadata)
}

@Since("3.1.0")
override def transformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
val newField = prepOutputField(schema)
SchemaUtils.appendColumn(schema, newField)
}

/**
* Prepare the output column field, including per-feature metadata.
*/
private def prepOutputField(schema: StructType): StructField = {
val selector = selectedFeatures.toSet
val origAttrGroup = AttributeGroup.fromStructField(schema($(featuresCol)))
val featureAttributes: Array[Attribute] = if (origAttrGroup.attributes.nonEmpty) {
origAttrGroup.attributes.get.zipWithIndex.filter(x => selector.contains(x._2)).map(_._1)
} else {
Array.fill[Attribute](selector.size)(NominalAttribute.defaultAttr)
}
val newAttributeGroup = new AttributeGroup($(outputCol), featureAttributes)
newAttributeGroup.toStructField()
}
override def setOutputCol(value: String): this.type = super.setOutputCol(value)

@Since("3.1.0")
override def copy(extra: ParamMap): ANOVASelectorModel = {
Expand All @@ -248,41 +152,13 @@ class ANOVASelectorModel private[ml](
override def toString: String = {
s"ANOVASelectorModel: uid=$uid, numSelectedFeatures=${selectedFeatures.length}"
}

private[spark] def compressSparse(
indices: Array[Int],
values: Array[Double]): (Array[Int], Array[Double]) = {
val newValues = new ArrayBuilder.ofDouble
val newIndices = new ArrayBuilder.ofInt
var i = 0
var j = 0
while (i < indices.length && j < selectedFeatures.length) {
val indicesIdx = indices(i)
val filterIndicesIdx = selectedFeatures(j)
if (indicesIdx == filterIndicesIdx) {
newIndices += j
newValues += values(i)
j += 1
i += 1
} else {
if (indicesIdx > filterIndicesIdx) {
j += 1
} else {
i += 1
}
}
}
// TODO: Sparse representation might be ineffective if (newSize ~= newValues.size)
(newIndices.result(), newValues.result())
}
}

@Since("3.1.0")
object ANOVASelectorModel extends MLReadable[ANOVASelectorModel] {

@Since("3.1.0")
override def read: MLReader[ANOVASelectorModel] =
new ANOVASelectorModelReader
override def read: MLReader[ANOVASelectorModel] = new ANOVASelectorModelReader

@Since("3.1.0")
override def load(path: String): ANOVASelectorModel = super.load(path)
Expand All @@ -300,8 +176,7 @@ object ANOVASelectorModel extends MLReadable[ANOVASelectorModel] {
}
}

private class ANOVASelectorModelReader extends
MLReader[ANOVASelectorModel] {
private class ANOVASelectorModelReader extends MLReader[ANOVASelectorModel] {

/** Checked against metadata when loading model */
private val className = classOf[ANOVASelectorModel].getName
Expand Down

0 comments on commit f05560b

Please sign in to comment.