Skip to content

Commit

Permalink
[SPARK-26616][MLLIB] Expose document frequency in IDFModel
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This change exposes the `df` (document frequency) as a public val along with the number of documents (`m`) as part of the IDF model.

* The document frequency is returned as an `Array[Long]`
* If the minimum  document frequency is set, this is considered in the df calculation. If the count is less than minDocFreq, the df is 0 for such terms
* numDocs is not very required. But it can be useful, if we plan to provide a provision in future for user to give their own idf function, instead of using a default (log((1+m)/(1+df))). In such cases, the user can provide a function taking input of `m` and `df` and returning the idf value
* Pyspark changes

## How was this patch tested?

The existing test case was edited to also check for the document frequency values.

I  am not very good with python or pyspark. I have committed and run tests based on my understanding. Kindly let me know if I have missed anything

Reviewer request: mengxr  zjffdu yinxusen

Closes #23549 from purijatin/master.

Authored-by: Jatin Puri <purijatin@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
purijatin authored and srowen committed Jan 22, 2019
1 parent f92d276 commit d2e86cb
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 20 deletions.
31 changes: 25 additions & 6 deletions mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.VersionUtils.majorVersion

/**
* Params for [[IDF]] and [[IDFModel]].
Expand Down Expand Up @@ -151,6 +152,15 @@ class IDFModel private[ml] (
@Since("2.0.0")
def idf: Vector = idfModel.idf.asML

/** Returns the document frequency */
@Since("3.0.0")
def docFreq: Array[Long] = idfModel.docFreq

/** Returns number of documents evaluated to compute idf */
@Since("3.0.0")
def numDocs: Long = idfModel.numDocs


@Since("1.6.0")
override def write: MLWriter = new IDFModelWriter(this)
}
Expand All @@ -160,11 +170,11 @@ object IDFModel extends MLReadable[IDFModel] {

private[IDFModel] class IDFModelWriter(instance: IDFModel) extends MLWriter {

private case class Data(idf: Vector)
private case class Data(idf: Vector, docFreq: Array[Long], numDocs: Long)

override protected def saveImpl(path: String): Unit = {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.idf)
val data = Data(instance.idf, instance.docFreq, instance.numDocs)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
Expand All @@ -178,10 +188,19 @@ object IDFModel extends MLReadable[IDFModel] {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath)
val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf")
.select("idf")
.head()
val model = new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf)))

val model = if (majorVersion(metadata.sparkVersion) >= 3) {
val Row(idf: Vector, df: Seq[_], numDocs: Long) = data.select("idf", "docFreq", "numDocs")
.head()
new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf),
df.asInstanceOf[Seq[Long]].toArray, numDocs))
} else {
val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf")
.select("idf")
.head()
new IDFModel(metadata.uid,
new feature.IDFModel(OldVectors.fromML(idf), new Array[Long](idf.size), 0L))
}
metadata.getAndSetParams(model)
model
}
Expand Down
19 changes: 12 additions & 7 deletions mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
* This implementation supports filtering out terms which do not appear in a minimum number
* of documents (controlled by the variable `minDocFreq`). For terms that are not in
* at least `minDocFreq` documents, the IDF is found as 0, resulting in TF-IDFs of 0.
* The document frequency is 0 as well for such terms
*
* @param minDocFreq minimum of documents in which a term
* should appear for filtering
Expand All @@ -50,12 +51,12 @@ class IDF @Since("1.2.0") (@Since("1.2.0") val minDocFreq: Int) {
*/
@Since("1.1.0")
def fit(dataset: RDD[Vector]): IDFModel = {
val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
minDocFreq = minDocFreq))(
val (idf: Vector, docFreq: Array[Long], numDocs: Long) = dataset.treeAggregate(
new IDF.DocumentFrequencyAggregator(minDocFreq = minDocFreq))(
seqOp = (df, v) => df.add(v),
combOp = (df1, df2) => df1.merge(df2)
).idf()
new IDFModel(idf)
new IDFModel(idf, docFreq, numDocs)
}

/**
Expand Down Expand Up @@ -128,13 +129,14 @@ private object IDF {

private def isEmpty: Boolean = m == 0L

/** Returns the current IDF vector. */
def idf(): Vector = {
/** Returns the current IDF vector, docFreq, number of documents */
def idf(): (Vector, Array[Long], Long) = {
if (isEmpty) {
throw new IllegalStateException("Haven't seen any document yet.")
}
val n = df.length
val inv = new Array[Double](n)
val dfv = new Array[Long](n)
var j = 0
while (j < n) {
/*
Expand All @@ -148,10 +150,11 @@ private object IDF {
*/
if (df(j) >= minDocFreq) {
inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
dfv(j) = df(j)
}
j += 1
}
Vectors.dense(inv)
(Vectors.dense(inv), dfv, m)
}
}
}
Expand All @@ -160,7 +163,9 @@ private object IDF {
* Represents an IDF model that can transform term frequency vectors.
*/
@Since("1.1.0")
class IDFModel private[spark] (@Since("1.1.0") val idf: Vector) extends Serializable {
class IDFModel private[spark](@Since("1.1.0") val idf: Vector,
@Since("3.0.0") val docFreq: Array[Long],
@Since("3.0.0") val numDocs: Long) extends Serializable {

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors.
Expand Down
Expand Up @@ -44,7 +44,7 @@ class IDFSuite extends MLTest with DefaultReadWriteTest {

test("params") {
ParamsSuite.checkParams(new IDF)
val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0)))
val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0), Array(1L), 1))
ParamsSuite.checkParams(model)
}

Expand Down Expand Up @@ -112,10 +112,13 @@ class IDFSuite extends MLTest with DefaultReadWriteTest {
}

test("IDFModel read/write") {
val instance = new IDFModel("myIDFModel", new OldIDFModel(Vectors.dense(1.0, 2.0)))
val instance = new IDFModel("myIDFModel",
new OldIDFModel(Vectors.dense(1.0, 2.0), Array(1, 2), 2))
.setInputCol("myInputCol")
.setOutputCol("myOutputCol")
val newInstance = testDefaultReadWrite(instance)
assert(newInstance.idf === instance.idf)
assert(newInstance.docFreq === instance.docFreq)
assert(newInstance.numDocs === instance.numDocs)
}
}
Expand Up @@ -39,9 +39,11 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext {
math.log((m + 1.0) / (x + 1.0))
})
assert(model.idf ~== expected absTol 1e-12)
assert(model.numDocs === 3)
assert(model.docFreq === Array(0, 3, 1, 2))

val assertHelper = (tfidf: Array[Vector]) => {
assert(tfidf.size === 3)
assert(tfidf.length === 3)
val tfidf0 = tfidf(0).asInstanceOf[SparseVector]
assert(tfidf0.indices === Array(1, 3))
assert(Vectors.dense(tfidf0.values) ~==
Expand Down Expand Up @@ -70,19 +72,21 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext {
)
val m = localTermFrequencies.size
val termFrequencies = sc.parallelize(localTermFrequencies, 2)
val idf = new IDF(minDocFreq = 1)
val idf = new IDF(minDocFreq = 2)
val model = idf.fit(termFrequencies)
val expected = Vectors.dense(Array(0, 3, 1, 2).map { x =>
if (x > 0) {
if (x >= 2) {
math.log((m + 1.0) / (x + 1.0))
} else {
0
}
})
assert(model.idf ~== expected absTol 1e-12)
assert(model.numDocs === 3)
assert(model.docFreq === Array(0, 3, 0, 2))

val assertHelper = (tfidf: Array[Vector]) => {
assert(tfidf.size === 3)
assert(tfidf.length === 3)
val tfidf0 = tfidf(0).asInstanceOf[SparseVector]
assert(tfidf0.indices === Array(1, 3))
assert(Vectors.dense(tfidf0.values) ~==
Expand Down
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Expand Up @@ -273,7 +273,11 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$1"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.productIterator"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.productPrefix"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$3"),

// [SPARK-26616][MLlib] Expose document frequency in IDFModel
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf")
)

// Exclude rules for 2.4.x
Expand Down
20 changes: 20 additions & 0 deletions python/pyspark/ml/feature.py
Expand Up @@ -966,6 +966,10 @@ class IDF(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritab
>>> model = idf.fit(df)
>>> model.idf
DenseVector([0.0, 0.0])
>>> model.docFreq
[0, 3]
>>> model.numDocs == df.count()
True
>>> model.transform(df).head().idf
DenseVector([0.0, 0.0])
>>> idf.setParams(outputCol="freqs").fit(df).transform(df).collect()[1].freqs
Expand Down Expand Up @@ -1045,6 +1049,22 @@ def idf(self):
"""
return self._call_java("idf")

@property
@since("3.0.0")
def docFreq(self):
"""
Returns the document frequency.
"""
return self._call_java("docFreq")

@property
@since("3.0.0")
def numDocs(self):
"""
Returns number of documents evaluated to compute idf
"""
return self._call_java("numDocs")


@inherit_doc
class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable):
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/ml/tests/test_feature.py
Expand Up @@ -67,6 +67,8 @@ def test_idf(self):
"Model should inherit the UID from its parent estimator.")
output = idf0m.transform(dataset)
self.assertIsNotNone(output.head().idf)
self.assertIsNotNone(idf0m.docFreq)
self.assertEqual(idf0m.numDocs, 3)
# Test that parameters transferred to Python Model
check_params(self, idf0m)

Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/mllib/feature.py
Expand Up @@ -518,6 +518,20 @@ def idf(self):
"""
return self.call('idf')

@since('3.0.0')
def docFreq(self):
"""
Returns the document frequency.
"""
return self.call('docFreq')

@since('3.0.0')
def numDocs(self):
"""
Returns number of documents evaluated to compute idf
"""
return self.call('numDocs')


class IDF(object):
"""
Expand Down

0 comments on commit d2e86cb

Please sign in to comment.