diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 58897cca4e5c6..98a9674343b2a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType +import org.apache.spark.util.VersionUtils.majorVersion /** * Params for [[IDF]] and [[IDFModel]]. @@ -151,6 +152,15 @@ class IDFModel private[ml] ( @Since("2.0.0") def idf: Vector = idfModel.idf.asML + /** Returns the document frequency */ + @Since("3.0.0") + def docFreq: Array[Long] = idfModel.docFreq + + /** Returns number of documents evaluated to compute idf */ + @Since("3.0.0") + def numDocs: Long = idfModel.numDocs + + @Since("1.6.0") override def write: MLWriter = new IDFModelWriter(this) } @@ -160,11 +170,11 @@ object IDFModel extends MLReadable[IDFModel] { private[IDFModel] class IDFModelWriter(instance: IDFModel) extends MLWriter { - private case class Data(idf: Vector) + private case class Data(idf: Vector, docFreq: Array[Long], numDocs: Long) override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.idf) + val data = Data(instance.idf, instance.docFreq, instance.numDocs) val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } @@ -178,10 +188,19 @@ object IDFModel extends MLReadable[IDFModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val data = sparkSession.read.parquet(dataPath) - val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf") - .select("idf") - .head() - val model = new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf))) + + val model = if (majorVersion(metadata.sparkVersion) >= 3) { + val Row(idf: Vector, df: Seq[_], numDocs: Long) = data.select("idf", "docFreq", "numDocs") + .head() + new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf), + df.asInstanceOf[Seq[Long]].toArray, numDocs)) + } else { + val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf") + .select("idf") + .head() + new IDFModel(metadata.uid, + new feature.IDFModel(OldVectors.fromML(idf), new Array[Long](idf.size), 0L)) + } metadata.getAndSetParams(model) model } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala index bb4b37ef21a84..6407be68c9da5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD * This implementation supports filtering out terms which do not appear in a minimum number * of documents (controlled by the variable `minDocFreq`). For terms that are not in * at least `minDocFreq` documents, the IDF is found as 0, resulting in TF-IDFs of 0. + * The document frequency is 0 as well for such terms * * @param minDocFreq minimum of documents in which a term * should appear for filtering @@ -50,12 +51,12 @@ class IDF @Since("1.2.0") (@Since("1.2.0") val minDocFreq: Int) { */ @Since("1.1.0") def fit(dataset: RDD[Vector]): IDFModel = { - val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator( - minDocFreq = minDocFreq))( + val (idf: Vector, docFreq: Array[Long], numDocs: Long) = dataset.treeAggregate( + new IDF.DocumentFrequencyAggregator(minDocFreq = minDocFreq))( seqOp = (df, v) => df.add(v), combOp = (df1, df2) => df1.merge(df2) ).idf() - new IDFModel(idf) + new IDFModel(idf, docFreq, numDocs) } /** @@ -128,13 +129,14 @@ private object IDF { private def isEmpty: Boolean = m == 0L - /** Returns the current IDF vector. */ - def idf(): Vector = { + /** Returns the current IDF vector, docFreq, number of documents */ + def idf(): (Vector, Array[Long], Long) = { if (isEmpty) { throw new IllegalStateException("Haven't seen any document yet.") } val n = df.length val inv = new Array[Double](n) + val dfv = new Array[Long](n) var j = 0 while (j < n) { /* @@ -148,10 +150,11 @@ private object IDF { */ if (df(j) >= minDocFreq) { inv(j) = math.log((m + 1.0) / (df(j) + 1.0)) + dfv(j) = df(j) } j += 1 } - Vectors.dense(inv) + (Vectors.dense(inv), dfv, m) } } } @@ -160,7 +163,9 @@ private object IDF { * Represents an IDF model that can transform term frequency vectors. */ @Since("1.1.0") -class IDFModel private[spark] (@Since("1.1.0") val idf: Vector) extends Serializable { +class IDFModel private[spark](@Since("1.1.0") val idf: Vector, + @Since("3.0.0") val docFreq: Array[Long], + @Since("3.0.0") val numDocs: Long) extends Serializable { /** * Transforms term frequency (TF) vectors to TF-IDF vectors. diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index cdd62be43b54c..73b2b82daaf43 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -44,7 +44,7 @@ class IDFSuite extends MLTest with DefaultReadWriteTest { test("params") { ParamsSuite.checkParams(new IDF) - val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0))) + val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0), Array(1L), 1)) ParamsSuite.checkParams(model) } @@ -112,10 +112,13 @@ class IDFSuite extends MLTest with DefaultReadWriteTest { } test("IDFModel read/write") { - val instance = new IDFModel("myIDFModel", new OldIDFModel(Vectors.dense(1.0, 2.0))) + val instance = new IDFModel("myIDFModel", + new OldIDFModel(Vectors.dense(1.0, 2.0), Array(1, 2), 2)) .setInputCol("myInputCol") .setOutputCol("myOutputCol") val newInstance = testDefaultReadWrite(instance) assert(newInstance.idf === instance.idf) + assert(newInstance.docFreq === instance.docFreq) + assert(newInstance.numDocs === instance.numDocs) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala index 5c938a61ed990..1049730ffd01d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala @@ -39,9 +39,11 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext { math.log((m + 1.0) / (x + 1.0)) }) assert(model.idf ~== expected absTol 1e-12) + assert(model.numDocs === 3) + assert(model.docFreq === Array(0, 3, 1, 2)) val assertHelper = (tfidf: Array[Vector]) => { - assert(tfidf.size === 3) + assert(tfidf.length === 3) val tfidf0 = tfidf(0).asInstanceOf[SparseVector] assert(tfidf0.indices === Array(1, 3)) assert(Vectors.dense(tfidf0.values) ~== @@ -70,19 +72,21 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext { ) val m = localTermFrequencies.size val termFrequencies = sc.parallelize(localTermFrequencies, 2) - val idf = new IDF(minDocFreq = 1) + val idf = new IDF(minDocFreq = 2) val model = idf.fit(termFrequencies) val expected = Vectors.dense(Array(0, 3, 1, 2).map { x => - if (x > 0) { + if (x >= 2) { math.log((m + 1.0) / (x + 1.0)) } else { 0 } }) assert(model.idf ~== expected absTol 1e-12) + assert(model.numDocs === 3) + assert(model.docFreq === Array(0, 3, 0, 2)) val assertHelper = (tfidf: Array[Vector]) => { - assert(tfidf.size === 3) + assert(tfidf.length === 3) val tfidf0 = tfidf(0).asInstanceOf[SparseVector] assert(tfidf0.indices === Array(1, 3)) assert(Vectors.dense(tfidf0.values) ~== diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 3e232baaec498..4cf312d259eae 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -273,7 +273,11 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$1"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.productIterator"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.productPrefix"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$3") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy$default$3"), + + // [SPARK-26616][MLlib] Expose document frequency in IDFModel + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf") ) // Exclude rules for 2.4.x diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 08ae58246adb6..23d56c8ad3570 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -966,6 +966,10 @@ class IDF(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritab >>> model = idf.fit(df) >>> model.idf DenseVector([0.0, 0.0]) + >>> model.docFreq + [0, 3] + >>> model.numDocs == df.count() + True >>> model.transform(df).head().idf DenseVector([0.0, 0.0]) >>> idf.setParams(outputCol="freqs").fit(df).transform(df).collect()[1].freqs @@ -1045,6 +1049,22 @@ def idf(self): """ return self._call_java("idf") + @property + @since("3.0.0") + def docFreq(self): + """ + Returns the document frequency. + """ + return self._call_java("docFreq") + + @property + @since("3.0.0") + def numDocs(self): + """ + Returns number of documents evaluated to compute idf + """ + return self._call_java("numDocs") + @inherit_doc class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable): diff --git a/python/pyspark/ml/tests/test_feature.py b/python/pyspark/ml/tests/test_feature.py index 325feaba66957..53d3ff9ee331d 100644 --- a/python/pyspark/ml/tests/test_feature.py +++ b/python/pyspark/ml/tests/test_feature.py @@ -67,6 +67,8 @@ def test_idf(self): "Model should inherit the UID from its parent estimator.") output = idf0m.transform(dataset) self.assertIsNotNone(output.head().idf) + self.assertIsNotNone(idf0m.docFreq) + self.assertEqual(idf0m.numDocs, 3) # Test that parameters transferred to Python Model check_params(self, idf0m) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index b1bcdb9078e3e..905c4da2d880d 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -518,6 +518,20 @@ def idf(self): """ return self.call('idf') + @since('3.0.0') + def docFreq(self): + """ + Returns the document frequency. + """ + return self.call('docFreq') + + @since('3.0.0') + def numDocs(self): + """ + Returns number of documents evaluated to compute idf + """ + return self.call('numDocs') + class IDF(object): """