In [1]:
from pyspark.ml.fpm import FPGrowth
from numpy import array
from math import sqrt
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
import pandas as pd

In [2]:
# set config of spark
# conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '16g'), ('spark.driver.memory','16g'),("spark.memory.offHeap.enabled","true"),("spark.memory.offHeap.size","4g"),("spark.driver.extraJavaOptions","12g")])

spark = SparkSession.builder.appName('dat500').config("spark.driver.memory", "15g").getOrCreate()
spark = SparkSession(spark)

In [3]:
%%time
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files

path1="/mydataset/proc_netflix/pre_part-01.json"
path2="/mydataset/proc_netflix/pre_part-02.json"
path3="/mydataset/proc_netflix/pre_part-03.json"
path4="/mydataset/proc_netflix/pre_part-04.json"
path5="/mydataset/proc_netflix/pre_part-05.json"
path6="/mydataset/proc_netflix/pre_part-06.json"
# .option("multiline", "true")
df = spark.read.json(path1)
df2 = spark.read.json(path2)
df3 = spark.read.json(path3)
df4 = spark.read.json(path4)
df5 = spark.read.json(path5)
df6 = spark.read.json(path6)

2022-05-01 14:58:17,456 WARN spark.HeartbeatReceiver: Removing executor driver with no recent heartbeats: 130113 ms exceeds timeout 120000 ms
2022-05-01 14:58:17,668 WARN spark.SparkContext: Killing executors is not supported by current scheduler.
                                                                                

CPU times: user 123 ms, sys: 69.9 ms, total: 192 ms
Wall time: 3min 47s


In [4]:
%%time
# root
#  |-- movie: string (nullable = true)
#  |-- rating: string (nullable = true)
#  |-- review_id: string (nullable = true)
#  |-- reviewer: string (nullable = true)


# join dfs 
cols=["movie","rating","reviewer","review_id","spoiler_tag","review_summary","helpful","review_date"]
df.join(df2,cols).join(df3,cols).join(df4,cols).join(df5,cols).join(df6,cols)

# clean the df
df = df.na.drop().drop("review_summary","spoiler_tag","helpful","review_date")
df = df.groupBy("reviewer").agg(F.collect_set("movie").alias("movies"))

df = df.sort(F.desc("reviewer"))
row = df.count()
print(f'The number of reviewers are: {row}')
df.printSchema()
df.show(10,truncate=False)

                                                                                

The number of reviewers are: 418187
root
 |-- reviewer: long (nullable = true)
 |-- movies: array (nullable = false)
 |    |-- element: string (containsNull = false)





+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|reviewer|movies                                                                                                                                                                    |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|99999979|[Infinity Train (I) (2019– ), Out (II) (2020), Primal (2019– )]                                                                                                           |
|99999733|[The Rewrite (2014)]                                                                                                                                                      |
|99999554|[Night Gallery: The House/Certain Shadows on the Wall (1970) Season 1, Episode 3



In [5]:
%%time
# fp-growth

fpGrowth = FPGrowth(itemsCol="movies",minSupport=0.001, minConfidence=0.006)
model = fpGrowth.fit(df)
# Display frequent itemsets.
model.freqItemsets.sort(F.desc("freq")).show(20,truncate=False)



+-----------------------------------------------------+----+
|items                                                |freq|
+-----------------------------------------------------+----+
|[Dil Bechara (2020)]                                 |7573|
|[小丑 (2019)]                                        |6289|
|[Wonder Woman 1984 (2020)]                           |5264|
|[STAR WARS：天行者的崛起 (2019)]                     |5074|
|[Laxmii (2020)]                                      |4698|
|[Gunjan Saxena: The Kargil Girl (2020)]              |4011|
|[Supernatural: Carry On (2020) Season 15, Episode 20]|3586|
|[Tenet (2020)]                                       |3008|
|[Scam 1992: The Harshad Mehta Story (2020)]          |2984|
|[Mrs. Serial Killer (2020)]                          |2913|
|[Batman v Superman: Dawn of Justice (2016)]          |2860|
|[Coolie No. 1 (2020)]                                |2798|
|[獵魔士 (2019– )]                                    |2706|
|[Mulan (2020)]                     



In [6]:
%%time
items = model.freqItemsets
# Display generated association rules.
model.associationRules.show(20,truncate=False)
rules = model.associationRules

                                                                                

+----------------------------------------------------------------------------------------+---------------------------+------------------+-----------------+---------------------+
|antecedent                                                                              |consequent                 |confidence        |lift             |support              |
+----------------------------------------------------------------------------------------+---------------------------+------------------+-----------------+---------------------+
|[Poison (2019– ), Tiki Taka (2020)]                                                     |[London Confidental (2020)]|0.9754464285714286|758.2137836895912|0.0010449870512474085|
|[Poison (2019– ), Tiki Taka (2020)]                                                     |[Comedy Couple (2020)]     |0.9709821428571429|671.1605113636364|0.0010402045018137818|
|[Poison (2019– ), Tiki Taka (2020)]                                                     |[Atkan Chatkan (2020

In [7]:
%%time
# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df).show(20,truncate=False)
transformed = model.transform(df)

                                                                                

+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|reviewer|movies                                                                                                                                                                    |prediction|
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|99999979|[Infinity Train (I) (2019– ), Out (II) (2020), Primal (2019– )]                                                                                                           |[]        |
|99999733|[The Rewrite (2014)]                                                                                                                                                      |[]        |
|99999554|[Night Gallery: The House



CPU times: user 28.1 ms, sys: 3.46 ms, total: 31.6 ms
Wall time: 15.9 s


