From 018aea438eb585c0cd0e08433bb5be310e38f582 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 8 May 2018 13:29:24 +0800 Subject: [PATCH 1/4] init pr --- python/pyspark/ml/fpm.py | 75 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index b8dafd49d354d..f0f8f62468ea0 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -16,8 +16,9 @@ # from pyspark import keyword_only, since +from pyspark.ml.common import _java2py, _py2java from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel +from pyspark.ml.wrapper import JavaEstimator, JavaModel, _jvm from pyspark.ml.param.shared import * __all__ = ["FPGrowth", "FPGrowthModel"] @@ -243,3 +244,75 @@ def setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", def _create_model(self, java_model): return FPGrowthModel(java_model) + + +class PrefixSpan(object): + """ + .. note:: Experimental + + A parallel PrefixSpan algorithm to mine frequent sequential patterns. + The PrefixSpan algorithm is described in J. Pei, et al., PrefixSpan: Mining Sequential Patterns + Efficiently by Prefix-Projected Pattern Growth + (see here). + + .. versionadded:: 2.4.0 + + """ + @staticmethod + @since("2.4.0") + def findFrequentSequentialPatterns(dataset, + sequenceCol, + minSupport, + maxPatternLength, + maxLocalProjDBSize): + """ + .. note:: Experimental + Finds the complete set of frequent sequential patterns in the input sequences of itemsets. + + :param dataset: A dataset or a dataframe containing a sequence column which is + `Seq[Seq[_]]` type. + :param sequenceCol: The name of the sequence column in dataset, rows with nulls in this + column are ignored. + :param minSupport: The minimal support level of the sequential pattern, any pattern that + appears more than (minSupport * size-of-the-dataset) times will be + output (recommended value: `0.1`). + :param maxPatternLength: The maximal length of the sequential pattern + (recommended value: `10`). + :param maxLocalProjDBSize: The maximum number of items (including delimiters used in the + internal storage format) allowed in a projected database before + local processing. If a projected database exceeds this size, + another iteration of distributed prefix growth is run + (recommended value: `32000000`). + :return: A `DataFrame` that contains columns of sequence and corresponding frequency. + The schema of it will be: + - `sequence: Seq[Seq[T]]` (T is the item type) + - `freq: Long` + + >>> from pyspark.ml.fpm import PrefixSpan + >>> from pyspark.sql import Row + >>> df = sc.parallelize([Row(sequence=[[1, 2], [3]]), + ... Row(sequence=[[1], [3, 2], [1, 2]]), + ... Row(sequence=[[1, 2], [5]]), + ... Row(sequence=[[6]])]).toDF() + >>> PrefixSpan.findFrequentSequentialPatterns(df, "sequence", minSupport = 0.5, + ... maxPatternLength = 5, + ... maxLocalProjDBSize = 32000000) + ... .sort("sequence").show(truncate=False) + +----------+----+ + |sequence |freq| + +----------+----+ + |[[1]] |3 | + |[[1], [3]]|2 | + |[[1, 2]] |3 | + |[[2]] |3 | + |[[3]] |2 | + +----------+----+ + + """ + javaPrefixSpanObj = _jvm().org.apache.spark.ml.fpm.PrefixSpan + sc = SparkContext._active_spark_context + dataset = _py2java(sc, dataset) + res = javaPrefixSpanObj\ + .findFrequentSequentialPatterns(dataset, sequenceCol, minSupport, maxPatternLength, + maxLocalProjDBSize) + return _java2py(sc, res) \ No newline at end of file From 4dbc4fc6e269658a94020904b1961b01100d6ef2 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 8 May 2018 15:19:24 +0800 Subject: [PATCH 2/4] fix python style --- python/pyspark/ml/fpm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index f0f8f62468ea0..8148bf3a37e64 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -315,4 +315,4 @@ def findFrequentSequentialPatterns(dataset, res = javaPrefixSpanObj\ .findFrequentSequentialPatterns(dataset, sequenceCol, minSupport, maxPatternLength, maxLocalProjDBSize) - return _java2py(sc, res) \ No newline at end of file + return _java2py(sc, res) From 0be3a94d27f4203608ef82d2ef197b37606c53b3 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 31 May 2018 13:52:05 +0800 Subject: [PATCH 3/4] update --- .../org/apache/spark/ml/fpm/PrefixSpan.scala | 2 +- python/pyspark/ml/fpm.py | 94 ++++++++++++------- 2 files changed, 63 insertions(+), 33 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala index 41716c621ca98..20278805caa28 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala @@ -53,7 +53,7 @@ final class PrefixSpan(@Since("2.4.0") override val uid: String) extends Params @Since("2.4.0") val minSupport = new DoubleParam(this, "minSupport", "The minimal support level of the " + "sequential pattern. Sequential pattern that appears more than " + - "(minSupport * size-of-the-dataset)." + + "(minSupport * size-of-the-dataset)" + "times will be output.", ParamValidators.gtEq(0.0)) /** @group getParam */ diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 8148bf3a37e64..f8330cd78c489 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -16,9 +16,9 @@ # from pyspark import keyword_only, since -from pyspark.ml.common import _java2py, _py2java +from pyspark.sql import DataFrame from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel, _jvm +from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, _jvm from pyspark.ml.param.shared import * __all__ = ["FPGrowth", "FPGrowthModel"] @@ -246,7 +246,7 @@ def _create_model(self, java_model): return FPGrowthModel(java_model) -class PrefixSpan(object): +class PrefixSpan(JavaParams): """ .. note:: Experimental @@ -254,35 +254,69 @@ class PrefixSpan(object): The PrefixSpan algorithm is described in J. Pei, et al., PrefixSpan: Mining Sequential Patterns Efficiently by Prefix-Projected Pattern Growth (see here). + This class is not yet an Estimator/Transformer, use :py:func:`findFrequentSequentialPatterns` + method to run the PrefixSpan algorithm. + @see Sequential Pattern Mining + (Wikipedia) .. versionadded:: 2.4.0 """ - @staticmethod + + minSupport = Param(Params._dummy(), "minSupport", "The minimal support level of the " + + "sequential pattern. Sequential pattern that appears more than " + + "(minSupport * size-of-the-dataset) times will be output. Must be >= 0.", + typeConverter=TypeConverters.toFloat) + + maxPatternLength = Param(Params._dummy(), "maxPatternLength", + "The maximal length of the sequential pattern. Must be > 0.", + typeConverter=TypeConverters.toInt) + + maxLocalProjDBSize = Param(Params._dummy(), "maxLocalProjDBSize", + "The maximum number of items (including delimiters used in the " + + "internal storage format) allowed in a projected database before " + + "local processing. If a projected database exceeds this size, " + + "another iteration of distributed prefix growth is run. " + + "Must be > 0.", + typeConverter=TypeConverters.toInt) + + sequenceCol = Param(Params._dummy(), "sequenceCol", "The name of the sequence column in " + + "dataset, rows with nulls in this column are ignored.", + typeConverter=TypeConverters.toString) + + @keyword_only + def __init__(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, + sequenceCol="sequence"): + """ + __init__(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, \ + sequenceCol="sequence") + """ + super(PrefixSpan, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.PrefixSpan", self.uid) + self._setDefault(minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, + sequenceCol="sequence") + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.4.0") + def setParams(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, + sequenceCol="sequence"): + """ + setParams(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, \ + sequenceCol="sequence") + """ + kwargs = self._input_kwargs + return self._set(**kwargs) + @since("2.4.0") - def findFrequentSequentialPatterns(dataset, - sequenceCol, - minSupport, - maxPatternLength, - maxLocalProjDBSize): + def findFrequentSequentialPatterns(self, dataset): """ .. note:: Experimental Finds the complete set of frequent sequential patterns in the input sequences of itemsets. :param dataset: A dataset or a dataframe containing a sequence column which is `Seq[Seq[_]]` type. - :param sequenceCol: The name of the sequence column in dataset, rows with nulls in this - column are ignored. - :param minSupport: The minimal support level of the sequential pattern, any pattern that - appears more than (minSupport * size-of-the-dataset) times will be - output (recommended value: `0.1`). - :param maxPatternLength: The maximal length of the sequential pattern - (recommended value: `10`). - :param maxLocalProjDBSize: The maximum number of items (including delimiters used in the - internal storage format) allowed in a projected database before - local processing. If a projected database exceeds this size, - another iteration of distributed prefix growth is run - (recommended value: `32000000`). :return: A `DataFrame` that contains columns of sequence and corresponding frequency. The schema of it will be: - `sequence: Seq[Seq[T]]` (T is the item type) @@ -294,10 +328,9 @@ def findFrequentSequentialPatterns(dataset, ... Row(sequence=[[1], [3, 2], [1, 2]]), ... Row(sequence=[[1, 2], [5]]), ... Row(sequence=[[6]])]).toDF() - >>> PrefixSpan.findFrequentSequentialPatterns(df, "sequence", minSupport = 0.5, - ... maxPatternLength = 5, - ... maxLocalProjDBSize = 32000000) - ... .sort("sequence").show(truncate=False) + >>> prefixSpan = PrefixSpan(minSupport=0.5, maxPatternLength=5, + ... maxLocalProjDBSize=32000000) + >>> prefixSpan.findFrequentSequentialPatterns(df).sort("sequence").show(truncate=False) +----------+----+ |sequence |freq| +----------+----+ @@ -308,11 +341,8 @@ def findFrequentSequentialPatterns(dataset, |[[3]] |2 | +----------+----+ + .. versionadded:: 2.4.0 """ - javaPrefixSpanObj = _jvm().org.apache.spark.ml.fpm.PrefixSpan - sc = SparkContext._active_spark_context - dataset = _py2java(sc, dataset) - res = javaPrefixSpanObj\ - .findFrequentSequentialPatterns(dataset, sequenceCol, minSupport, maxPatternLength, - maxLocalProjDBSize) - return _java2py(sc, res) + self._transfer_params_to_java() + jdf = self._java_obj.findFrequentSequentialPatterns(dataset._jdf) + return DataFrame(jdf, dataset.sql_ctx) From 6f404747e13c500289920d15ee79b0f0509984f8 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 31 May 2018 14:59:35 +0800 Subject: [PATCH 4/4] address minor issues --- .../main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala | 6 +++--- python/pyspark/ml/fpm.py | 9 ++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala index 20278805caa28..bd1c1a8885201 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala @@ -53,7 +53,7 @@ final class PrefixSpan(@Since("2.4.0") override val uid: String) extends Params @Since("2.4.0") val minSupport = new DoubleParam(this, "minSupport", "The minimal support level of the " + "sequential pattern. Sequential pattern that appears more than " + - "(minSupport * size-of-the-dataset)" + + "(minSupport * size-of-the-dataset) " + "times will be output.", ParamValidators.gtEq(0.0)) /** @group getParam */ @@ -128,10 +128,10 @@ final class PrefixSpan(@Since("2.4.0") override val uid: String) extends Params * Finds the complete set of frequent sequential patterns in the input sequences of itemsets. * * @param dataset A dataset or a dataframe containing a sequence column which is - * {{{Seq[Seq[_]]}}} type + * {{{ArrayType(ArrayType(T))}}} type, T is the item type for the input dataset. * @return A `DataFrame` that contains columns of sequence and corresponding frequency. * The schema of it will be: - * - `sequence: Seq[Seq[T]]` (T is the item type) + * - `sequence: ArrayType(ArrayType(T))` (T is the item type) * - `freq: Long` */ @Since("2.4.0") diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index f8330cd78c489..fd19fd96c4df6 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -315,11 +315,11 @@ def findFrequentSequentialPatterns(self, dataset): .. note:: Experimental Finds the complete set of frequent sequential patterns in the input sequences of itemsets. - :param dataset: A dataset or a dataframe containing a sequence column which is - `Seq[Seq[_]]` type. + :param dataset: A dataframe containing a sequence column which is + `ArrayType(ArrayType(T))` type, T is the item type for the input dataset. :return: A `DataFrame` that contains columns of sequence and corresponding frequency. The schema of it will be: - - `sequence: Seq[Seq[T]]` (T is the item type) + - `sequence: ArrayType(ArrayType(T))` (T is the item type) - `freq: Long` >>> from pyspark.ml.fpm import PrefixSpan @@ -328,8 +328,7 @@ def findFrequentSequentialPatterns(self, dataset): ... Row(sequence=[[1], [3, 2], [1, 2]]), ... Row(sequence=[[1, 2], [5]]), ... Row(sequence=[[6]])]).toDF() - >>> prefixSpan = PrefixSpan(minSupport=0.5, maxPatternLength=5, - ... maxLocalProjDBSize=32000000) + >>> prefixSpan = PrefixSpan(minSupport=0.5, maxPatternLength=5) >>> prefixSpan.findFrequentSequentialPatterns(df).sort("sequence").show(truncate=False) +----------+----+ |sequence |freq|