In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Exo2") \
    .config("spark.ui.port","4050")\
    .config("spark.executor.memory","10g")\
    .config("spark.driver.memory","4g")\
    .getOrCreate()

In [2]:
dfgenomescores = spark.read.csv("exo2_data/genome_scores.csv", header=True, inferSchema=True)
dfgenomescores.show(5)
dfgenomescores.printSchema()
dfgenomescores.select('relevance').describe().show()

+-------+-----+---------+
|movieId|tagId|relevance|
+-------+-----+---------+
|      1|    1|    0.025|
|      1|    2|    0.025|
|      1|    3|  0.05775|
|      1|    4|  0.09675|
|      1|    5|  0.14675|
+-------+-----+---------+
only showing top 5 rows

root
 |-- movieId: integer (nullable = true)
 |-- tagId: integer (nullable = true)
 |-- relevance: double (nullable = true)

+-------+-------------------+
|summary|          relevance|
+-------+-------------------+
|  count|           11709768|
|   mean|0.11648331730398041|
| stddev| 0.1542462566337565|
|    min|2.49999999999972E-4|
|    max|                1.0|
+-------+-------------------+



In [3]:
dfgenometags = spark.read.csv("exo2_data/genome_tags.csv", header=True, inferSchema=True)
dfgenometags.show(5)
dfgenometags.printSchema()

+-----+------------+
|tagId|         tag|
+-----+------------+
|    1|         007|
|    2|007 (series)|
|    3|18th century|
|    4|       1920s|
|    5|       1930s|
+-----+------------+
only showing top 5 rows

root
 |-- tagId: integer (nullable = true)
 |-- tag: string (nullable = true)



In [4]:
dflink = spark.read.csv("exo2_data/link.csv", header=True, inferSchema=True)
dflink.show(5)
dflink.printSchema()

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [5]:
dfmovie = spark.read.csv("exo2_data/movie.csv", header=True, inferSchema=True)
dfmovie.show(5)
dfmovie.printSchema()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [6]:
dfrating = spark.read.csv("exo2_data/rating.csv", header=True, inferSchema=True)
dfrating.show(5)
dfrating.printSchema()
dfrating.select('rating').describe().show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|          20000263|
|   mean|3.5255285642993797|
| stddev| 1.051988919294243|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



In [7]:
dftag = spark.read.csv("exo2_data/tag.csv", header=True, inferSchema=True)
dftag.show(5)
dftag.printSchema()

+------+-------+-------------+-------------------+
|userId|movieId|          tag|          timestamp|
+------+-------+-------------+-------------------+
|    18|   4141|  Mark Waters|2009-04-24 18:19:40|
|    65|    208|    dark hero|2013-05-10 01:41:18|
|    65|    353|    dark hero|2013-05-10 01:41:19|
|    65|    521|noir thriller|2013-05-10 01:39:43|
|    65|    592|    dark hero|2013-05-10 01:41:18|
+------+-------+-------------+-------------------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [8]:
from pyspark.sql.functions import col

dfgenomescores.groupby('movieId', 'tagId').count().where(col('count') > 1).show()
dfgenomescores.groupby('movieId', 'tagId').count().orderBy('count', ascending=False).show()

+-------+-----+-----+
|movieId|tagId|count|
+-------+-----+-----+
+-------+-----+-----+

+-------+-----+-----+
|movieId|tagId|count|
+-------+-----+-----+
|      1|   60|    1|
|      1|   80|    1|
|      1|   94|    1|
|      1|  248|    1|
|      1|  443|    1|
|      1|  350|    1|
|      1|  544|    1|
|      1|  431|    1|
|      1|   65|    1|
|      1|  530|    1|
|      1|  771|    1|
|      1|  638|    1|
|      1|  479|    1|
|      1|  662|    1|
|      1|  888|    1|
|      9| 1121|    1|
|      1|  224|    1|
|     10| 1101|    1|
|      1|  989|    1|
|     13| 1078|    1|
+-------+-----+-----+
only showing top 20 rows



In [9]:
print(dfgenomescores.select('movieId').count())
print(dfgenomescores.select('movieId').subtract(dfmovie.select('movieId')).count())

11709768
0


In [10]:
print(dfgenomescores.select('tagId').count())
print(dfgenomescores.select('tagId').subtract(dfgenometags.select('tagId')).count())

11709768
0


In [11]:
print(dfgenometags.count())
print(dfgenometags.select('tagId').distinct().count())

1128
1128


In [12]:
print(dfgenometags.count())
print(dfgenometags.select('tag').distinct().count())

1128
1128


In [13]:
print(dfmovie.count())
print(dfmovie.select('movieId').distinct().count())

27278
27278


In [14]:
print(dfmovie.count())
print(dfmovie.select('title').distinct().count())

27278
27262


In [15]:
dfmovie_duplicateswithcount = dfmovie.groupby('title').count().where(col('count') > 1)
dfmovie_duplicateswithcount.show()

+--------------------+-----+
|               title|count|
+--------------------+-----+
|     Casanova (2005)|    2|
|     Paradise (2013)|    2|
|       Hamlet (2000)|    2|
|     Blackout (2007)|    2|
|        Chaos (2005)|    2|
|         Emma (1996)|    2|
|      Aladdin (1992)|    2|
|Johnny Express (2...|    2|
|War of the Worlds...|    2|
|      Beneath (2013)|    2|
|Clear History (2013)|    2|
|Men with Guns (1997)|    2|
|20,000 Leagues Un...|    2|
|      Darling (2007)|    2|
|    Girl, The (2012)|    2|
|      Offside (2006)|    2|
+--------------------+-----+



In [16]:
dfmovie_duplicates = dfmovie_duplicateswithcount.select('title')
dfmovie_duplicates.show()

+--------------------+
|               title|
+--------------------+
|     Casanova (2005)|
|     Paradise (2013)|
|       Hamlet (2000)|
|     Blackout (2007)|
|        Chaos (2005)|
|         Emma (1996)|
|      Aladdin (1992)|
|Johnny Express (2...|
|War of the Worlds...|
|      Beneath (2013)|
|Clear History (2013)|
|Men with Guns (1997)|
|20,000 Leagues Un...|
|      Darling (2007)|
|    Girl, The (2012)|
|      Offside (2006)|
+--------------------+



In [17]:
dfmovie.join(dfmovie_duplicates, on='title').orderBy('title').show(1000)

+--------------------+-------+--------------------+
|               title|movieId|              genres|
+--------------------+-------+--------------------+
|20,000 Leagues Un...| 102190|Adventure|Romance...|
|20,000 Leagues Un...| 114130|      Romance|Sci-Fi|
|      Aladdin (1992)|    588|Adventure|Animati...|
|      Aladdin (1992)| 114240|Adventure|Animati...|
|      Beneath (2013)| 104035|              Horror|
|      Beneath (2013)| 115777|              Horror|
|     Blackout (2007)|  66140|     Horror|Thriller|
|     Blackout (2007)|  85070|               Drama|
|     Casanova (2005)|  42015|Action|Adventure|...|
|     Casanova (2005)| 128862|Comedy|Drama|Romance|
|        Chaos (2005)|  47254|Action|Crime|Dram...|
|        Chaos (2005)|  67459|  Crime|Drama|Horror|
|Clear History (2013)| 104155|              Comedy|
|Clear History (2013)| 122940|              Comedy|
|      Darling (2007)|  93279|               Drama|
|      Darling (2007)| 130062|               Drama|
|         Em

In [18]:
dfmovie_toberemoved = dfmovie.join(dfmovie_duplicates, on='title').select('movieId')
dfmovie_toberemoved.show(1000)

+-------+
|movieId|
+-------+
| 128862|
|  42015|
| 121586|
| 113459|
|  65665|
|   3598|
|  85070|
|  66140|
|  67459|
|  47254|
|  26958|
|    838|
| 114240|
|    588|
| 128991|
| 111519|
|  64997|
|  34048|
| 115777|
| 104035|
| 122940|
| 104155|
|  26982|
|   1788|
| 114130|
| 102190|
| 130062|
|  93279|
| 101212|
|  97773|
|  80330|
|  48682|
+-------+



In [19]:
from pyspark.sql.functions import split, explode
dfmoviegenre = dfmovie.select('movieId', 'genres')
dfmoviegenre = dfmoviegenre.withColumn('genre', explode(split(dfmoviegenre['genres'], '\|')))
dfmoviegenre = dfmoviegenre.select('movieId', 'genre')
dfmoviegenre.show()

+-------+---------+
|movieId|    genre|
+-------+---------+
|      1|Adventure|
|      1|Animation|
|      1| Children|
|      1|   Comedy|
|      1|  Fantasy|
|      2|Adventure|
|      2| Children|
|      2|  Fantasy|
|      3|   Comedy|
|      3|  Romance|
|      4|   Comedy|
|      4|    Drama|
|      4|  Romance|
|      5|   Comedy|
|      6|   Action|
|      6|    Crime|
|      6| Thriller|
|      7|   Comedy|
|      7|  Romance|
|      8|Adventure|
+-------+---------+
only showing top 20 rows



  dfmoviegenre = dfmoviegenre.withColumn('genre', explode(split(dfmoviegenre['genres'], '\|')))


In [20]:
dfmoviegenre.groupby('movieId', 'genre').count().where(col('count') > 1).show()

+-------+-----+-----+
|movieId|genre|count|
+-------+-----+-----+
+-------+-----+-----+



In [21]:
print(dfrating.select('movieId').count())
print(dfrating.select('movieId').join(dfmovie, on='movieId').count())

20000263
20000263


In [22]:
dfrating.groupby('userId', 'movieId').count().orderBy('count', ascending=False).show()

+------+-------+-----+
|userId|movieId|count|
+------+-------+-----+
|     1|   1333|    1|
|     1|   2644|    1|
|     2|    908|    1|
|     7|   3717|    1|
|     1|  31696|    1|
|     7|   3988|    1|
|     5|   1073|    1|
|    11|   8977|    1|
|     2|    924|    1|
|    14|    780|    1|
|     7|   2942|    1|
|    19|    802|    1|
|     2|    110|    1|
|    22|    303|    1|
|    11|     32|    1|
|    24|   1375|    1|
|     3|   1247|    1|
|    24|   2268|    1|
|    11|   6731|    1|
|    24|   2501|    1|
+------+-------+-----+
only showing top 20 rows



In [23]:
from pyspark.sql import functions as F

dfrating.groupBy('userId').agg(F.avg("rating"), F.count("rating")).orderBy('avg(rating)').show()

+------+------------------+-------------+
|userId|       avg(rating)|count(rating)|
+------+------------------+-------------+
| 20501|               0.5|           20|
| 16983|               0.5|           29|
| 20511|               0.5|           20|
| 44174|               0.5|           20|
| 67322|               0.5|           20|
| 24530|               0.5|           21|
| 65365|               0.5|           20|
| 72823|               0.5|           20|
|106618|               0.5|           20|
| 74276|               0.5|           20|
|117730|               0.5|           20|
| 74084|               0.5|           20|
|103630|               0.5|           20|
| 85669|0.5227272727272727|           22|
| 90247|             0.525|           20|
| 84564|              0.55|           20|
|  7098|               0.6|           20|
| 39471|0.6666666666666666|           21|
| 99445|0.6739130434782609|           23|
|122284|             0.675|           20|
+------+------------------+-------

In [24]:
dfrating.groupBy('userId').agg(F.avg("rating"), F.count("rating")).orderBy('avg(rating)', ascending=False).show()

+------+-----------+-------------+
|userId|avg(rating)|count(rating)|
+------+-----------+-------------+
| 40559|        5.0|           20|
| 54126|        5.0|           49|
|  7793|        5.0|           21|
| 66525|        5.0|           22|
| 19801|        5.0|           20|
|119100|        5.0|           57|
| 81755|        5.0|           28|
| 61498|        5.0|           20|
| 42101|        5.0|          100|
|  3354|        5.0|           20|
| 53212|        5.0|           20|
| 59065|        5.0|           26|
| 51651|        5.0|           65|
| 20173|        5.0|           20|
| 48423|        5.0|           20|
| 81489|        5.0|           21|
| 48140|        5.0|           20|
| 46067|        5.0|           24|
| 62285|        5.0|           21|
|124248|        5.0|           21|
+------+-----------+-------------+
only showing top 20 rows



In [25]:
from pyspark.ml.recommendation import ALS

(ratings_training, ratings_test) = dfrating.randomSplit([0.8,0.2])

als = ALS(maxIter=10, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

model = als.fit(ratings_training)

In [26]:
predictions = model.transform(ratings_test)

In [27]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

Root-mean-square error = 0.806644630069214
