# Frequent patterns on MovieLens 25M dataset using FP-Growth
This notebook provides code to mine frequent patterns on MovieLens 25M.  
I select only good ratings (>3.0) and apply fp-growth algorithms (implemented by [PySpark](https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html#fp-growth)) on different size of dataset (1k, 10k, 100k, 1M, 2M, 5M, 10M, all~15M) without sorting.  
The dataset is downloaded from https://grouplens.org/datasets/movielens/, extracted and copied to the directory `/opt/spark/data`.

The association rules and the elapsed time is shown bellow.  
Please ignore the ordering number of execution cells because I had to restart the notebook several times.

In [1]:
import pyspark
import os
import socket

In [2]:
os.environ['PYSPARK_PYTHON'] = 'python3'
driver_host = socket.gethostbyname(socket.gethostname())

In [3]:
conf = pyspark.SparkConf()

conf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443") 

conf.set("spark.kubernetes.container.image", "gcr.io/spark-operator/spark-py:v2.4.5")
conf.set("spark.kubernetes.authenticate.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
conf.set("spark.kubernetes.authenticate.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") 
conf.set("spark.executor.instances", "2")
conf.set("spark.executor.memory", "1g")
conf.set("spark.kubernetes.pyspark.pythonVersion", "3")
conf.set("spark.driver.host", driver_host)
conf.set("spark.driver.port", "29413")

<pyspark.conf.SparkConf at 0x7f9278b2e890>

In [4]:
spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()

In [5]:
df = spark.read.load('/opt/spark/data/ratings.csv', format='csv', sep=',', inferSchema=True, header=True)

In [6]:
df.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
+------+-------+------+----------+
only showing top 10 rows



In [7]:
df.count()

25000095

In [8]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import functions as F

In [9]:
df = df.filter(df['rating'] > 3.0).drop_duplicates()

In [8]:
# Skip for more than 10M
# df = df.sort(F.col('userId'), F.col('movieId'))

In [10]:
df.count()

15630129

## 1k

In [11]:
df_1k = df.limit(1000)

In [12]:
df_1k.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     2|   5952|   5.0|1141415528|
|     3|   5782|   4.0|1484754375|
|     3|   8783|   4.0|1566090061|
|     3|  90866|   3.5|1566090425|
|    12|   1343|   4.5|1167582491|
|    12|   6539|   3.5|1167574710|
|    12|  40870|   4.0|1225098996|
|    18|    733|   4.0|1108273563|
|    23|    372|   4.0| 943135548|
|    23|   1147|   5.0| 942967008|
+------+-------+------+----------+
only showing top 10 rows



In [13]:
movies_rating = df_1k.groupBy('userId').agg(F.collect_set('movieId').alias('movieIds'))

In [14]:
movies_rating.show(10)

+------+-------------------+
|userId|           movieIds|
+------+-------------------+
| 32911|          [474, 21]|
| 32914|             [1663]|
| 32916|            [33794]|
| 32917|             [2396]|
| 32922| [2404, 5572, 5810]|
| 32923|            [91529]|
| 32928|             [1428]|
| 32929|             [1721]|
| 32933|[71033, 2160, 1093]|
| 32934|              [107]|
+------+-------------------+
only showing top 10 rows



In [15]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.5, minConfidence=0.5)

In [16]:
%%timeit
model = fpGrowth.fit(movies_rating)

22.1 s ± 431 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [18]:
import time
start = time.time()
model = fpGrowth.fit(movies_rating)
end = time.time()
print('{:.2f} s'.format(end - start))

22.68 s


In [19]:
# Display generated association rules.
import time
start = time.time()
model.associationRules.show()
end = time.time()
print('{:.4f} s'.format(end - start))

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+

0.3412 s


## 10k ratings

In [20]:
training_df = df.limit(10000)

In [21]:
training_df.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
| 32906|   2470|   4.0| 965801332|
| 32906|   2989|   4.0| 965801729|
| 32906|   3062|   5.0| 965798832|
| 32912|    376|   4.0| 862430318|
| 32922|   1753|   3.5|1070161136|
| 32922|   2474|   4.0|1068252703|
| 32922|   8016|   4.5|1110294181|
| 32922|  62235|   4.0|1234216474|
| 32922|  68791|   3.5|1298106255|
| 32923|   2959|   5.0|1547047970|
+------+-------+------+----------+
only showing top 10 rows



In [22]:
movies_rating = training_df.groupBy('userId').agg(F.collect_set('movieId').alias('movieIds'))

In [23]:
movies_rating.show(10)

+------+--------------------+
|userId|            movieIds|
+------+--------------------+
| 32906|  [2989, 2470, 3062]|
| 32912|               [376]|
| 32922|[62235, 2474, 801...|
| 32923|              [2959]|
| 32927|             [87232]|
| 32928|    [1084, 50, 1178]|
| 32932|       [1206, 30707]|
| 32933|    [106916, 147750]|
| 32936|      [4995, 164179]|
| 32937|              [3175]|
+------+--------------------+
only showing top 10 rows



In [24]:
movies_rating.count()

6153

In [25]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.25, minConfidence=0.5)

In [26]:
%%timeit
fpGrowth.fit(movies_rating)

22.7 s ± 382 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [27]:
import time
start = time.time()
model = fpGrowth.fit(movies_rating)
end = time.time()
print('{:.2f} s'.format(end - start))

22.45 s


In [28]:
# Display generated association rules.
model.associationRules.count()

0

In [29]:
# Display generated association rules.
import time
start = time.time()
model.associationRules.show()
end = time.time()
print('{:.4f} s'.format(end - start))

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+

0.0643 s


## 100k ratings

In [30]:
training_df = df.limit(100000)

In [31]:
movies_rating = training_df.groupBy('userId').agg(F.collect_set('movieId').alias('movieIds'))

In [32]:
movies_rating.show(10)

+------+--------------------+
|userId|            movieIds|
+------+--------------------+
|     2|              [5952]|
|     3|[5013, 1222, 9086...|
|     5|               [170]|
|    12|[40870, 1257, 121...|
|    13|              [1097]|
|    16|              [6350]|
|    18|[1682, 733, 1608,...|
|    19|[148468, 1721, 59...|
|    23|[5060, 2734, 2797...|
|    27|               [969]|
+------+--------------------+
only showing top 10 rows



In [33]:
movies_rating.count()

54726

In [34]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.1, minConfidence=0.8)

In [35]:
%%timeit
fpGrowth.fit(movies_rating)

24.5 s ± 487 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [36]:
start = time.time()
model = fpGrowth.fit(movies_rating)
end = time.time()
print('{:.2f} s'.format(end - start))

24.97 s


In [37]:
# Display generated association rules.
model.associationRules.count()

0

In [38]:
# Display generated association rules.
start = time.time()
model.associationRules.show()
end = time.time()
print('{:.4f} s'.format(end - start))

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+

0.0798 s


## 1M ratings

In [39]:
training_df = df.limit(1000000)

In [40]:
movies_rating = training_df.groupBy('userId').agg(F.collect_set('movieId').alias('movieIds'))

In [41]:
movies_rating.count()

141625

In [42]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.1, minConfidence=0.8)

In [43]:
%%timeit
fpGrowth.fit(movies_rating)

25.8 s ± 682 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [44]:
model = fpGrowth.fit(movies_rating)

In [45]:
start = time.time()
model.associationRules.count()
end = time.time()
print('{:.4f} s'.format(end - start))

1.0202 s


In [46]:
model.associationRules.show()

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+



## 2M ratings

In [47]:
training_df = df.limit(2000000)

In [48]:
movies_rating = training_df.groupBy('userId').agg(F.collect_set('movieId').alias('movieIds'))

In [49]:
movies_rating.count()

155608

In [50]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.1, minConfidence=0.8)

In [51]:
%%timeit
fpGrowth.fit(movies_rating)

26.1 s ± 756 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [52]:
model = fpGrowth.fit(movies_rating)

In [53]:
start = time.time()
model.associationRules.count()
end = time.time()
print('{:.4f} s'.format(end - start))

1.6278 s


In [54]:
start = time.time()
model.associationRules.show()
end = time.time()
print('{:.4f} s'.format(end - start))

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+

0.0594 s


## 5M ratings

In [55]:
training_df = df.limit(5000000)

In [56]:
movies_rating = training_df.groupBy('userId').agg(F.collect_set('movieId').alias('movieIds'))

In [57]:
movies_rating.count()

161705

In [58]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.1, minConfidence=0.8)

In [59]:
%%timeit
fpGrowth.fit(movies_rating)

30.7 s ± 2.04 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [60]:
start = time.time()
model = fpGrowth.fit(movies_rating)
end = time.time()
print('{:.4f} s'.format(end - start))

31.6478 s


In [61]:
start = time.time()
model.associationRules.count()
end = time.time()
print('{:.4f} s'.format(end - start))

4.5718 s


In [62]:
start = time.time()
model.associationRules.show()
end = time.time()
print('{:.4f} s'.format(end - start))

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+

0.0932 s


## 10M ratings

In [63]:
training_df = df.limit(10000000)

In [64]:
movies_rating = training_df.groupBy('userId').agg(F.collect_set('movieId').alias('movieIds'))

In [65]:
movies_rating.show(10)

+------+--------------------+
|userId|            movieIds|
+------+--------------------+
|     1|[2843, 1217, 2632...|
|     2|[4995, 4720, 1293...|
|     3|[356, 104841, 436...|
|     4|[1220, 4974, 7028...|
|     5|[1271, 1, 1120, 3...|
|     6|[858, 902, 924, 9...|
|     7|[150, 527, 593, 3...|
|     8|[161, 1242, 1, 78...|
|     9|[539, 1374, 256, ...|
|    10|[110, 1, 32, 5395...|
+------+--------------------+
only showing top 10 rows



In [66]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.1, minConfidence=0.8)

In [67]:
%%timeit
fpGrowth.fit(movies_rating)

36.3 s ± 1.91 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [68]:
start = time.time()
model = fpGrowth.fit(movies_rating)
end = time.time()
print('{:.2f} s'.format(end - start))

32.37 s


In [69]:
start = time.time()
model.associationRules.show()
end = time.time()
print('{:.2f} s'.format(end - start))

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+

35.90 s


## 15M ratings

In [70]:
training_df = df

In [71]:
movies_rating = training_df.groupBy('userId').agg(F.collect_set('movieId').alias('movieIds'))

In [72]:
movies_rating.show(10)

+------+--------------------+
|userId|            movieIds|
+------+--------------------+
|   148|[110, 356, 2186, ...|
|   463|[799, 785, 648, 3...|
|   471|[356, 103141, 922...|
|   496|[1220, 1947, 4995...|
|   833|[44195, 356, 1446...|
|  1088|[356, 589, 8604, ...|
|  1238|[110, 356, 4262, ...|
|  1342|[356, 4008, 306, ...|
|  1580|[1271, 2273, 277,...|
|  1591|[3753, 2701, 2890...|
+------+--------------------+
only showing top 10 rows



In [73]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.05, minConfidence=0.5)

In [74]:
start = time.time()
model = fpGrowth.fit(movies_rating)
end = time.time()
print('{:.2f} s'.format(end - start))

59.05 s


In [76]:
start = time.time()
model = fpGrowth.fit(movies_rating)
end = time.time()
print('{:.2f} s'.format(end - start))

70.34 s


In [80]:
fpGrowth = FPGrowth(itemsCol="movieIds", minSupport=0.1, minConfidence=0.8)

In [81]:
start = time.time()
model = fpGrowth.fit(movies_rating)
end = time.time()
print('{:.2f} s'.format(end - start))

66.92 s


In [82]:
start = time.time()
model.associationRules.count()
end = time.time()
print('{:.2f} s'.format(end - start))

1721.46 s


In [83]:
model.associationRules.count()

1020

In [84]:
model.associationRules.show(10)

+--------------------+----------+------------------+------------------+
|          antecedent|consequent|        confidence|              lift|
+--------------------+----------+------------------+------------------+
|         [858, 2959]|     [296]|0.8097227341606258|1.9737698021276604|
|        [1136, 1196]|     [260]|0.9076998050682261|2.6046494017729835|
|[1291, 1210, 1198...|    [1196]|0.9666776823088786|3.2663155406934945|
|          [608, 593]|     [296]|0.8216192698651268| 2.002768645722954|
|         [480, 1210]|     [260]|0.9049160117121282| 2.596661291982572|
|         [480, 1210]|    [1196]|0.8930497765449221| 3.017533576211683|
|           [1, 1210]|     [260]|0.9071204345009717| 2.602986894859378|
|           [1, 1210]|    [1196]|0.8460311923862673|2.8586620775214433|
|  [7153, 1198, 2571]|    [4993]|0.9319274475524476|3.2543122869658836|
|  [7153, 1198, 2571]|    [5952]|0.9122049825174825|  3.47194553877471|
+--------------------+----------+------------------+------------

In [85]:
model.associationRules.show(20)

+--------------------+----------+------------------+------------------+
|          antecedent|consequent|        confidence|              lift|
+--------------------+----------+------------------+------------------+
|         [858, 2959]|     [296]|0.8097227341606258|1.9737698021276604|
|        [1136, 1196]|     [260]|0.9076998050682261|2.6046494017729835|
|[1291, 1210, 1198...|    [1196]|0.9666776823088786|3.2663155406934945|
|          [608, 593]|     [296]|0.8216192698651268| 2.002768645722954|
|         [480, 1210]|     [260]|0.9049160117121282| 2.596661291982572|
|         [480, 1210]|    [1196]|0.8930497765449221| 3.017533576211683|
|           [1, 1210]|     [260]|0.9071204345009717| 2.602986894859378|
|           [1, 1210]|    [1196]|0.8460311923862673|2.8586620775214433|
|  [7153, 1198, 2571]|    [4993]|0.9319274475524476|3.2543122869658836|
|  [7153, 1198, 2571]|    [5952]|0.9122049825174825|  3.47194553877471|
|        [7153, 2959]|    [2571]|0.8447453155914187| 2.221513717

In [86]:
model.associationRules.show(50)

+--------------------+----------+------------------+------------------+
|          antecedent|consequent|        confidence|              lift|
+--------------------+----------+------------------+------------------+
|         [858, 2959]|     [296]|0.8097227341606258|1.9737698021276604|
|        [1136, 1196]|     [260]|0.9076998050682261|2.6046494017729835|
|[1291, 1210, 1198...|    [1196]|0.9666776823088786|3.2663155406934945|
|          [608, 593]|     [296]|0.8216192698651268| 2.002768645722954|
|         [480, 1210]|     [260]|0.9049160117121282| 2.596661291982572|
|         [480, 1210]|    [1196]|0.8930497765449221| 3.017533576211683|
|           [1, 1210]|     [260]|0.9071204345009717| 2.602986894859378|
|           [1, 1210]|    [1196]|0.8460311923862673|2.8586620775214433|
|  [7153, 1198, 2571]|    [4993]|0.9319274475524476|3.2543122869658836|
|  [7153, 1198, 2571]|    [5952]|0.9122049825174825|  3.47194553877471|
|        [7153, 2959]|    [2571]|0.8447453155914187| 2.221513717

In [87]:
spark.stop()