Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7212][MLlib]Add sequence learning flag #6997

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.storage.StorageLevel
* :: Experimental ::
*
* Model trained by [[FPGrowth]], which holds frequent itemsets.
* @param freqItemsets frequent itemset, which is an RDD of [[FreqItemset]]
* @param freqItemsets frequent itemsets, which is an RDD of [[FreqItemset]]
* @tparam Item item type
*/
@Experimental
Expand All @@ -62,13 +62,14 @@ class FPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]]) ex
@Experimental
class FPGrowth private (
private var minSupport: Double,
private var numPartitions: Int) extends Logging with Serializable {
private var numPartitions: Int,
private var ordered: Boolean) extends Logging with Serializable {

/**
* Constructs a default instance with default parameters {minSupport: `0.3`, numPartitions: same
* as the input data}.
* as the input data, ordered: `false`}.
*/
def this() = this(0.3, -1)
def this() = this(0.3, -1, false)

/**
* Sets the minimal support level (default: `0.3`).
Expand All @@ -86,6 +87,15 @@ class FPGrowth private (
this
}

/**
* Indicates whether to mine itemsets (unordered) or sequences (ordered) (default: false, mine
* itemsets).
*/
def setOrdered(ordered: Boolean): this.type = {
this.ordered = ordered
this
}

/**
* Computes an FP-Growth model that contains frequent itemsets.
* @param data input data set, each element contains a transaction
Expand Down Expand Up @@ -155,7 +165,7 @@ class FPGrowth private (
.flatMap { case (part, tree) =>
tree.extract(minCount, x => partitioner.getPartition(x) == part)
}.map { case (ranks, count) =>
new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)
new FreqItemset(ranks.map(i => freqItems(i)).reverse.toArray, count, ordered)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you updated the ordering, we need to update Python doctest. See the Jenkins build log: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35828/consoleFull

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

}
}

Expand All @@ -171,9 +181,12 @@ class FPGrowth private (
itemToRank: Map[Item, Int],
partitioner: Partitioner): mutable.Map[Int, Array[Int]] = {
val output = mutable.Map.empty[Int, Array[Int]]
// Filter the basket by frequent items pattern and sort their ranks.
// Filter the basket by frequent items pattern
val filtered = transaction.flatMap(itemToRank.get)
ju.Arrays.sort(filtered)
if (!this.ordered) {
ju.Arrays.sort(filtered)
}
// Generate conditional transactions
val n = filtered.length
var i = n - 1
while (i >= 0) {
Expand All @@ -198,9 +211,18 @@ object FPGrowth {
* Frequent itemset.
* @param items items in this itemset. Java users should call [[FreqItemset#javaItems]] instead.
* @param freq frequency
* @param ordered indicates if items represents an itemset (false) or sequence (true)
* @tparam Item item type
*/
class FreqItemset[Item](val items: Array[Item], val freq: Long) extends Serializable {
class FreqItemset[Item](val items: Array[Item], val freq: Long, val ordered: Boolean)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a break change. Please create an auxiliary constructor with the original signature.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NAVER - http://www.naver.com/

sujkh@naver.com 님께 보내신 메일 <Re: [spark] [SPARK-7212][MLlib]Add sequence learning flag (#6997)> 이 다음과 같은 이유로 전송 실패했습니다.


받는 사람이 회원님의 메일을 수신차단 하였습니다.


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

extends Serializable {

/**
* Auxillary constructor, assumes unordered by default.
*/
def this(items: Array[Item], freq: Long) {
this(items, freq, false)
}

/**
* Returns items in a Java List.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {


test("FP-Growth using String type") {
test("FP-Growth frequent itemsets using String type") {
val transactions = Seq(
"r z h k p",
"z y x w v u t s",
Expand All @@ -38,12 +38,14 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
val model6 = fpg
.setMinSupport(0.9)
.setNumPartitions(1)
.setOrdered(false)
.run(rdd)
assert(model6.freqItemsets.count() === 0)

val model3 = fpg
.setMinSupport(0.5)
.setNumPartitions(2)
.setOrdered(false)
.run(rdd)
val freqItemsets3 = model3.freqItemsets.collect().map { itemset =>
(itemset.items.toSet, itemset.freq)
Expand All @@ -61,17 +63,49 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
val model2 = fpg
.setMinSupport(0.3)
.setNumPartitions(4)
.setOrdered(false)
.run(rdd)
assert(model2.freqItemsets.count() === 54)

val model1 = fpg
.setMinSupport(0.1)
.setNumPartitions(8)
.setOrdered(false)
.run(rdd)
assert(model1.freqItemsets.count() === 625)
}

test("FP-Growth using Int type") {
test("FP-Growth frequent sequences using String type"){
val transactions = Seq(
"r z h k p",
"z y x w v u t s",
"s x o n r",
"x z y m t s q e",
"z",
"x z y r q t p")
.map(_.split(" "))
val rdd = sc.parallelize(transactions, 2).cache()

val fpg = new FPGrowth()

val model1 = fpg
.setMinSupport(0.5)
.setNumPartitions(2)
.setOrdered(true)
.run(rdd)

val expected = Set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you compared the output with R, could you put the R statements that generate the same result in comment? For example,

https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala#L504

But do not use JavaDoc comment, use multiline comments without the leading * on each line. So people can easily copy & paste the commands into R:

/*
  library("glmnet")
  data <- read.csv("path", header=FALSE)
  label = factor(data$V1)
  features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
  weights = coef(glmnet(features,label, family="binomial", alpha = 1.0, lambda = 6.0))
  weights
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

The file at path has a specific format for encoding transactions; should I include that in comment as well or can I assume the user will be able to figure it out (it is the same format as the zaki.txt example file provided with R's arulesSequences package).

(List("r"), 3L), (List("s"), 3L), (List("t"), 3L), (List("x"), 4L), (List("y"), 3L),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Seq instead of List. See my comments below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

(List("z"), 5L), (List("z", "y"), 3L), (List("x", "t"), 3L), (List("y", "t"), 3L),
(List("z", "t"), 3L), (List("z", "y", "t"), 3L)
)
val freqItemseqs1 = model1.freqItemsets.collect().map { itemset =>
(itemset.items.toList, itemset.freq)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toList -> toSeq. Array.toSeq returns a WrappedArray, which doesn't copy data.

scala> val a = Array(1, 2)

scala> a.toSeq.toArray.eq(a)
res4: Boolean = true

scala> a.toList.toArray.eq(a)
res5: Boolean = false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

}.toSet
assert(freqItemseqs1 === expected)
}

test("FP-Growth frequent itemsets using Int type") {
val transactions = Seq(
"1 2 3",
"1 2 3 4",
Expand All @@ -88,12 +122,14 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
val model6 = fpg
.setMinSupport(0.9)
.setNumPartitions(1)
.setOrdered(false)
.run(rdd)
assert(model6.freqItemsets.count() === 0)

val model3 = fpg
.setMinSupport(0.5)
.setNumPartitions(2)
.setOrdered(false)
.run(rdd)
assert(model3.freqItemsets.first().items.getClass === Array(1).getClass,
"frequent itemsets should use primitive arrays")
Expand All @@ -109,12 +145,14 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
val model2 = fpg
.setMinSupport(0.3)
.setNumPartitions(4)
.setOrdered(false)
.run(rdd)
assert(model2.freqItemsets.count() === 15)

val model1 = fpg
.setMinSupport(0.1)
.setNumPartitions(8)
.setOrdered(false)
.run(rdd)
assert(model1.freqItemsets.count() === 65)
}
Expand Down