# Analisi di 28 milioni di recensioni di film
# Procuriamoci il Dataset

In [1]:
# !wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

# Inizializziamo Spark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("movie_reviews").getOrCreate()

# Importiamo il Dataset in un Dataframe

In [3]:
df = spark.read.load("ml-latest/ratings.csv", format="csv")
df.show()

+------+-------+------+----------+
|   _c0|    _c1|   _c2|       _c3|
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
|     1|   2986|   2.5|1256677496|
|     1|   3020|   4.0|1256677260|
|     1|   3424|   4.5|1256677444|
|     1|   3698|   3.5|1256677243|
|     1|   3826|   2.0|1256677210|
|     1|   3893|   3.5|1256677486|
|     2|    170|   3.5|1192913581|
|     2|    849|   3.5|1192913537|
|     2|   1186|   3.5|1192913611|
+------+-------+------+----------+
only showing top 20 rows



In [4]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [5]:
df = spark.read.option("header", "true").option("inferSchema", "true").csv("ml-latest/ratings.csv")
df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
|     1|   2986|   2.5|1256677496|
|     1|   3020|   4.0|1256677260|
|     1|   3424|   4.5|1256677444|
|     1|   3698|   3.5|1256677243|
|     1|   3826|   2.0|1256677210|
|     1|   3893|   3.5|1256677486|
|     2|    170|   3.5|1192913581|
|     2|    849|   3.5|1192913537|
|     2|   1186|   3.5|1192913611|
|     2|   1235|   3.0|1192913585|
+------+-------+------+----------+
only showing top 20 rows



In [6]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



# Correggiamo lo schema

In [7]:
from pyspark.sql.types import *

data_schema = [StructField('userID', StringType(), True),
                StructField('movieID', StringType(), True),
                StructField('rating', FloatType(), True),
                StructField('timestamp', IntegerType(), True)]

schema = StructType(fields=data_schema)

In [8]:
df = spark.read.schema(schema) \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .csv("ml-latest/ratings.csv")

df.show()

+------+-------+------+----------+
|userID|movieID|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
|     1|   2986|   2.5|1256677496|
|     1|   3020|   4.0|1256677260|
|     1|   3424|   4.5|1256677444|
|     1|   3698|   3.5|1256677243|
|     1|   3826|   2.0|1256677210|
|     1|   3893|   3.5|1256677486|
|     2|    170|   3.5|1192913581|
|     2|    849|   3.5|1192913537|
|     2|   1186|   3.5|1192913611|
|     2|   1235|   3.0|1192913585|
+------+-------+------+----------+
only showing top 20 rows



In [9]:
df.printSchema()

root
 |-- userID: string (nullable = true)
 |-- movieID: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: integer (nullable = true)



In [10]:
from pyspark.sql.functions import  from_unixtime , to_date

df.withColumn('timestamp', to_date(from_unixtime(df["timestamp"]))).show()

+------+-------+------+----------+
|userID|movieID|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|2009-10-27|
|     1|    481|   3.5|2009-10-27|
|     1|   1091|   1.5|2009-10-27|
|     1|   1257|   4.5|2009-10-27|
|     1|   1449|   4.5|2009-10-27|
|     1|   1590|   2.5|2009-10-27|
|     1|   1591|   1.5|2009-10-27|
|     1|   2134|   4.5|2009-10-27|
|     1|   2478|   4.0|2009-10-27|
|     1|   2840|   3.0|2009-10-27|
|     1|   2986|   2.5|2009-10-27|
|     1|   3020|   4.0|2009-10-27|
|     1|   3424|   4.5|2009-10-27|
|     1|   3698|   3.5|2009-10-27|
|     1|   3826|   2.0|2009-10-27|
|     1|   3893|   3.5|2009-10-27|
|     2|    170|   3.5|2007-10-20|
|     2|    849|   3.5|2007-10-20|
|     2|   1186|   3.5|2007-10-20|
|     2|   1235|   3.0|2007-10-20|
+------+-------+------+----------+
only showing top 20 rows



In [11]:
from pyspark.sql.functions import to_utc_timestamp

df = df.withColumn('timestamp', to_utc_timestamp(from_unixtime(df["timestamp"]), "yyyy-MM-dd hh:mm:ss"))
df.show(5)

+------+-------+------+-------------------+
|userID|movieID|rating|          timestamp|
+------+-------+------+-------------------+
|     1|    307|   3.5|2009-10-27 22:00:21|
|     1|    481|   3.5|2009-10-27 22:04:16|
|     1|   1091|   1.5|2009-10-27 22:04:31|
|     1|   1257|   4.5|2009-10-27 22:04:20|
|     1|   1449|   4.5|2009-10-27 22:01:04|
+------+-------+------+-------------------+
only showing top 5 rows



In [12]:
df.printSchema()

root
 |-- userID: string (nullable = true)
 |-- movieID: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)



# Quante recensioni ci sono esattamente nel dataset?

In [13]:
total_reviews = df.count()
print(total_reviews)

27753444


# Qual è il numero di recensioni medie per utente?

In [14]:
from pyspark.sql.functions import countDistinct

total_unique_reviewers = df.agg(countDistinct("userId").alias("reviewers_count"))
total_unique_reviewers.show()

+---------------+
|reviewers_count|
+---------------+
|         283228|
+---------------+



In [15]:
total_unique_reviewers = total_unique_reviewers.head()['reviewers_count']
print(total_unique_reviewers)

283228


In [16]:
mean_reviews = total_reviews/total_unique_reviewers
print(mean_reviews)

97.98976089934611


# Quale utente ha scritto più recensioni? Quante sono le recensioni che ha scritto? Qual è il voto medio?

In [17]:
df.groupBy("userID").count().orderBy('count', ascending=False).show(5)

+------+-----+
|userID|count|
+------+-----+
|123100|23715|
|117490| 9279|
|134596| 8381|
|212343| 7884|
|242683| 7515|
+------+-----+
only showing top 5 rows



In [18]:
df.filter("userID=='123100'").agg({"rating":"mean"}).show()

+------------------+
|       avg(rating)|
+------------------+
|3.1306346194391734|
+------------------+



# Quali sono i film che hanno ricevuto più recensioni?

In [25]:
dfMovies = df.groupBy("movieID")

In [20]:
dfMovies.count().orderBy("count", ascending=False).show(10)

+-------+-----+
|movieID|count|
+-------+-----+
|    318|97999|
|    356|97040|
|    296|92406|
|    593|87899|
|   2571|84545|
|    260|81815|
|    480|76451|
|    527|71516|
|    110|68803|
|      1|68469|
+-------+-----+
only showing top 10 rows



# Quali sono i 10 film con le recensioni più positive?
# OPZIONE1

In [23]:
dfMoviesAvg = dfMovies.agg({"rating":"mean", "movieID":"count"}) \
                    .withColumnRenamed("avg(rating)", "avg_rating") \
                    .withColumnRenamed("count(movieId)", "count_movieID") 
dfMoviesAvg.show(5)

+-------+------------------+-------------+
|movieID|        avg_rating|count_movieID|
+-------+------------------+-------------+
|    296| 4.173971387139363|        92406|
|   1090|3.9017529880478086|        18825|
|   2294|3.2357021735779252|        12974|
|   3210| 3.636775639067115|         9819|
|  48738| 3.849010703859877|         6166|
+-------+------------------+-------------+
only showing top 5 rows



# OPZIONE 2

In [27]:
from pyspark.sql.functions import avg, count

dfMoviesAvg = dfMovies.agg(avg("rating").alias("avg_rating"), count("movieID").alias("count_rating"))
dfMoviesAvg.show(5)

+-------+------------------+------------+
|movieID|        avg_rating|count_rating|
+-------+------------------+------------+
|    296| 4.173971387139363|       92406|
|   1090|3.9017529880478086|       18825|
|   2294|3.2357021735779252|       12974|
|   3210| 3.636775639067115|        9819|
|  48738| 3.849010703859877|        6166|
+-------+------------------+------------+
only showing top 5 rows



In [28]:
dfMoviesMostRated = dfMoviesAvg.filter('count_rating >= 100')
dfMoviesMostRated.count()

10500

In [29]:
dfMoviesTopRated = dfMoviesMostRated.orderBy("avg_rating", ascending=False)
dfMoviesTopRated.show()

+-------+------------------+------------+
|movieID|        avg_rating|count_rating|
+-------+------------------+------------+
| 171011|4.4865181711606095|         853|
| 159817| 4.458092485549133|        1384|
|    318| 4.424188001918387|       97999|
| 170705| 4.399898373983739|         984|
| 174053| 4.350558659217877|        1074|
| 171495| 4.343949044585988|         157|
| 172591| 4.339667458432304|         421|
|    858| 4.332892749244713|       60904|
|     50| 4.291958829205532|       62180|
| 176601| 4.263888888888889|         180|
|   1221|4.2630353697749195|       38875|
| 172577| 4.261904761904762|         126|
|    527| 4.257501817775044|       71516|
|   2019|4.2541157909178215|       14578|
| 163809| 4.244031830238727|         377|
| 185135|  4.23943661971831|         213|
|   1203| 4.237075455914338|       17931|
| 179135| 4.236389684813753|         349|
|    904| 4.230798598634567|       22264|
|   2959| 4.230663235786717|       65678|
+-------+------------------+------

# Quali sono i 10 film con le recensioni più negative?

In [32]:
dfMoviesTopRated = dfMoviesMostRated.orderBy("avg_rating")
dfMoviesTopRated.show()

+-------+------------------+------------+
|movieID|        avg_rating|count_rating|
+-------+------------------+------------+
|   8859|0.8739495798319328|         238|
|   6483|1.0138592750533049|         469|
|   4775| 1.141025641025641|         741|
|   1826|1.2038288288288288|         444|
|   6587|1.2055555555555555|         810|
|  31698|1.2441176470588236|         680|
|   5739|1.2612359550561798|         178|
|  61348|1.2672849915682969|         593|
|   5738|1.3549382716049383|         162|
|   3574|1.3580645161290323|         155|
|   6872|1.3608445297504799|         521|
|   5740| 1.371212121212121|         132|
|   6371| 1.378238341968912|         386|
|   5737|1.3897849462365592|         186|
|  54290|1.4051724137931034|         232|
|   1495|1.4207792207792207|         770|
|   1990|1.4588235294117646|         170|
|   5647|1.4662447257383966|         237|
|   5736|1.4705882352941178|         204|
|  50798|1.4722872755659642|        1281|
+-------+------------------+------