Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
init

init
  • Loading branch information
zhengruifeng committed Apr 9, 2020
1 parent d89fcc6 commit 0107dc4
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.stat.{ChiSquareTest, SelectionTestResult}
import org.apache.spark.ml.stat.ChiSquareTest
import org.apache.spark.ml.util._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.sql.types.{StructField, StructType}

/**
* Params for [[ChiSqSelector]] and [[ChiSqSelectorModel]].
Expand Down Expand Up @@ -197,50 +197,48 @@ final class ChiSqSelector @Since("1.6.0") (@Since("1.6.0") override val uid: Str
@Since("2.0.0")
override def fit(dataset: Dataset[_]): ChiSqSelectorModel = {
transformSchema(dataset.schema, logging = true)
dataset.select(col($(labelCol)).cast(DoubleType), col($(featuresCol))).rdd.map {
case Row(label: Double, features: Vector) =>
LabeledPoint(label, features)
}
val spark = dataset.sparkSession
import spark.implicits._

val resultDF = ChiSquareTest.test(dataset.toDF, $(featuresCol), $(labelCol), true)

val testResult = ChiSquareTest.testChiSquare(dataset, getFeaturesCol, getLabelCol)
.zipWithIndex
val features = $(selectorType) match {
val indices = $(selectorType) match {
case "numTopFeatures" =>
testResult
.sortBy { case (res, _) => res.pValue }
.take(getNumTopFeatures)
resultDF.sort("pValue").select("featureIndex")
.as[Int].take($(numTopFeatures))
case "percentile" =>
testResult
.sortBy { case (res, _) => res.pValue }
.take((testResult.length * getPercentile).toInt)
val numFeatures = resultDF.count
resultDF.sort("pValue").select("featureIndex")
.as[Int].take((numFeatures * getPercentile).toInt)
case "fpr" =>
testResult
.filter { case (res, _) => res.pValue < getFpr }
resultDF.select("featureIndex").where(col("pValue").lt($(fpr)))
.as[Int].collect()
case "fdr" =>
// This uses the Benjamini-Hochberg procedure.
// https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure
val tempRes = testResult
.sortBy { case (res, _) => res.pValue }
val selected = tempRes
val numFeatures = resultDF.count
val maxIndex = resultDF.sort("pValue")
.select("featureIndex", "pValue")
.as[(Int, Double)].rdd
.zipWithIndex
.filter { case ((res, _), index) =>
res.pValue <= getFdr * (index + 1) / testResult.length
}
if (selected.isEmpty) {
Array.empty[(SelectionTestResult, Int)]
} else {
val maxIndex = selected.map(_._2).max
tempRes.take(maxIndex + 1)
}
.flatMap { case ((featureIndex, pValue), index) =>
if (pValue <= $(fdr) * (index + 1) / numFeatures) {
Iterator.single(featureIndex)
} else Iterator.empty
}.fold(-1)(math.max)
if (maxIndex >= 0) {
resultDF.sort("pValue").select("featureIndex")
.as[Int].take(maxIndex + 1)
} else Array.emptyIntArray
case "fwe" =>
testResult
.filter { case (res, _) => res.pValue < getFwe / testResult.length }
val numFeatures = resultDF.count
resultDF.select("featureIndex").where(col("pValue").lt($(fwe) / numFeatures))
.as[Int].collect()
case errorType =>
throw new IllegalStateException(s"Unknown Selector Type: $errorType")
}
val indices = features.map { case (_, index) => index }
copyValues(new ChiSqSelectorModel(uid, indices.sorted)
.setParent(this))

copyValues(new ChiSqSelectorModel(uid, indices.sorted).setParent(this))
}

@Since("1.6.0")
Expand Down
65 changes: 32 additions & 33 deletions mllib/src/main/scala/org/apache/spark/ml/stat/ChiSquareTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.util.SchemaUtils
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.mllib.stat.test.{ChiSqTest => OldChiSqTest}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.DoubleType


/**
Expand All @@ -37,12 +35,6 @@ import org.apache.spark.sql.types.DoubleType
@Since("2.2.0")
object ChiSquareTest {

/** Used to construct output schema of tests */
private case class ChiSquareResult(
pValues: Vector,
degreesOfFreedom: Array[Int],
statistics: Vector)

/**
* Conduct Pearson's independence test for every feature against the label. For each feature, the
* (feature, label) pairs are converted into a contingency matrix for which the Chi-squared
Expand All @@ -63,40 +55,47 @@ object ChiSquareTest {
*/
@Since("2.2.0")
def test(dataset: DataFrame, featuresCol: String, labelCol: String): DataFrame = {
val spark = dataset.sparkSession
import spark.implicits._

SchemaUtils.checkColumnType(dataset.schema, featuresCol, new VectorUDT)
SchemaUtils.checkNumericType(dataset.schema, labelCol)
val rdd = dataset.select(col(labelCol).cast("double"), col(featuresCol)).as[(Double, Vector)]
.rdd.map { case (label, features) => OldLabeledPoint(label, OldVectors.fromML(features)) }
val testResults = OldStatistics.chiSqTest(rdd)
val pValues = Vectors.dense(testResults.map(_.pValue))
val degreesOfFreedom = testResults.map(_.degreesOfFreedom)
val statistics = Vectors.dense(testResults.map(_.statistic))
spark.createDataFrame(Seq(ChiSquareResult(pValues, degreesOfFreedom, statistics)))
test(dataset, featuresCol, labelCol, false)
}

/**
* @param dataset DataFrame of categorical labels and categorical features.
* Real-valued features will be treated as categorical for each distinct value.
* @param featuresCol Name of features column in dataset, of type `Vector` (`VectorUDT`)
* @param labelCol Name of label column in dataset, of any numerical type
* @return Array containing the SelectionTestResult for every feature against the label.
* @param flatten If false, the returned DataFrame contains only a single Row, otherwise, one
* row per feature.
*/
@Since("3.1.0")
def testChiSquare(
dataset: Dataset[_],
def test(
dataset: DataFrame,
featuresCol: String,
labelCol: String): Array[SelectionTestResult] = {

labelCol: String,
flatten: Boolean): DataFrame = {
SchemaUtils.checkColumnType(dataset.schema, featuresCol, new VectorUDT)
SchemaUtils.checkNumericType(dataset.schema, labelCol)
val input = dataset.select(col(labelCol).cast(DoubleType), col(featuresCol)).rdd
.map { case Row(label: Double, features: Vector) =>
OldLabeledPoint(label, OldVectors.fromML(features))
}
val chiTestResult = OldStatistics.chiSqTest(input)
chiTestResult.map(r => new ChiSqTestResult(r.pValue, r.degreesOfFreedom, r.statistic))

val spark = dataset.sparkSession
import spark.implicits._

val data = dataset.select(col(labelCol).cast("double"), col(featuresCol)).rdd
.map { case Row(label: Double, vec: Vector) => (label, OldVectors.fromML(vec)) }
val resultRDD = OldChiSqTest.computeChiSquared(data)

if (flatten) {
resultRDD.map { case (col, (pValue, degreesOfFreedom, statistic, _)) =>
(col, pValue, degreesOfFreedom, statistic)
}.toDF("featureIndex", "pValue", "degreesOfFreedom", "statistic")
} else {
resultRDD.map { case (col, (pValue, degreesOfFreedom, statistic, _)) =>
(0, (col, pValue, degreesOfFreedom, statistic))
}.groupByKey().map { case (_, seq) =>
val results = seq.toArray.sortBy(_._1)
val pValues = Vectors.dense(results.map(_._2))
val degreesOfFreedom = results.map(_._3)
val statistics = Vectors.dense(results.map(_._4))
(pValues, degreesOfFreedom, statistics)
}.toDF("pValues", "degreesOfFreedom", "statistics")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,32 @@ private[spark] object ChiSqTest extends Logging {
* the independence test.
* Returns an array containing the ChiSquaredTestResult for every feature against the label.
*/
def chiSquaredFeatures(data: RDD[LabeledPoint],
def chiSquaredFeatures(
data: RDD[LabeledPoint],
methodName: String = PEARSON.name): Array[ChiSqTestResult] = {
data.first().features match {
computeChiSquared(data.map(l => (l.label, l.features)), methodName)
.collect().sortBy(_._1).map {
case (_, (pValue, degreesOfFreedom, statistic, nullHypothesis)) =>
new ChiSqTestResult(pValue, degreesOfFreedom, statistic, methodName, nullHypothesis)
}
}

private[spark] def computeChiSquared(
data: RDD[(Double, Vector)],
methodName: String = PEARSON.name): RDD[(Int, (Double, Int, Double, String))] = {
data.first()._2 match {
case dv: DenseVector =>
chiSquaredDenseFeatures(data, dv.size, methodName)
case sv: SparseVector =>
chiSquaredSparseFeatures(data, sv.size, methodName)
}
}

private def chiSquaredDenseFeatures(data: RDD[LabeledPoint],
private def chiSquaredDenseFeatures(
data: RDD[(Double, Vector)],
numFeatures: Int,
methodName: String = PEARSON.name): Array[ChiSqTestResult] = {
data.flatMap { case LabeledPoint(label, features) =>
methodName: String = PEARSON.name): RDD[(Int, (Double, Int, Double, String))] = {
data.flatMap { case (label, features) =>
require(features.size == numFeatures,
s"Number of features must be $numFeatures but got ${features.size}")
features.iterator.map { case (col, value) => (col, (label, value)) }
Expand All @@ -107,16 +119,14 @@ private[spark] object ChiSqTest extends Logging {
}
).map { case (col, counts) =>
(col, computeChiSq(counts.toMap, methodName, col))
}.collect().sortBy(_._1).map {
case (_, (pValue, degreesOfFreedom, statistic, nullHypothesis)) =>
new ChiSqTestResult(pValue, degreesOfFreedom, statistic, methodName, nullHypothesis)
}
}

private def chiSquaredSparseFeatures(data: RDD[LabeledPoint],
private def chiSquaredSparseFeatures(
data: RDD[(Double, Vector)],
numFeatures: Int,
methodName: String = PEARSON.name): Array[ChiSqTestResult] = {
val labelCounts = data.map(_.label).countByValue().toMap
methodName: String = PEARSON.name): RDD[(Int, (Double, Int, Double, String))] = {
val labelCounts = data.map(_._1).countByValue().toMap
val numInstances = labelCounts.valuesIterator.sum
val numLabels = labelCounts.size
if (numLabels > maxCategories) {
Expand All @@ -126,15 +136,13 @@ private[spark] object ChiSqTest extends Logging {

val numParts = data.getNumPartitions
data.mapPartitionsWithIndex { case (pid, iter) =>
iter.flatMap { case LabeledPoint(label, features) =>
iter.flatMap { case (label, features) =>
require(features.size == numFeatures,
s"Number of features must be $numFeatures but got ${features.size}")
features.nonZeroIterator.map { case (col, value) => (col, (label, value)) }
} ++ {
// append this to make sure that all columns are taken into account
Iterator.range(0, numFeatures)
.filter(_ % numParts == pid)
.map(col => (col, null))
Iterator.range(pid, numFeatures, numParts).map(col => (col, null))
}
}.aggregateByKey(new OpenHashMap[(Double, Double), Long])(
seqOp = { case (counts, labelAndValue) =>
Expand Down Expand Up @@ -164,9 +172,6 @@ private[spark] object ChiSqTest extends Logging {
}

(col, computeChiSq(counts.toMap, methodName, col))
}.collect().sortBy(_._1).map {
case (_, (pValue, degreesOfFreedom, statistic, nullHypothesis)) =>
new ChiSqTestResult(pValue, degreesOfFreedom, statistic, methodName, nullHypothesis)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ class ChiSquareTestSuite
withClue("ChiSquare should throw an exception when given a continuous-valued label") {
intercept[SparkException] {
val df = spark.createDataFrame(continuousLabel)
ChiSquareTest.test(df, "features", "label")
ChiSquareTest.test(df, "features", "label").count()
}
}
val continuousFeature = Seq.fill(tooManyCategories)(
LabeledPoint(random.nextInt(2), Vectors.dense(random.nextDouble())))
withClue("ChiSquare should throw an exception when given continuous-valued features") {
intercept[SparkException] {
val df = spark.createDataFrame(continuousFeature)
ChiSquareTest.test(df, "features", "label")
ChiSquareTest.test(df, "features", "label").count()
}
}
}
Expand Down

0 comments on commit 0107dc4

Please sign in to comment.