In [1]:
# Import findspark module to locate Spark installation
import findspark

# Initialize Spark installation path
findspark.init("/home/alumno/Escritorio/spark/spark-3.2.2-bin-hadoop2.7")

# Import SparkContext and SparkConf modules from PySpark
from pyspark import SparkContext, SparkConf

# Create a SparkConf object with the app name and master URL
conf=SparkConf().setAppName("intro").setMaster("local")

# Create a SparkContext object with the SparkConf object
sc = SparkContext(conf=conf)

# Import SparkSession and functions modules from PySpark SQL
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create a SparkSession object with the SparkContext object
spark=SparkSession(sc)

## 1. -Data analysis with Spark DataFrames: loading datasets

In [2]:
# Read the movies.csv file into a Spark DataFrame
movies_df = spark.read.option("inferSchema","true").option("header", "true").csv("ml-latest-small/movies.csv")

# Read the ratings.csv file into a Spark DataFrame
ratings_df = spark.read.option("inferSchema","true").option("header", "true").csv("ml-latest-small/ratings.csv")


In [3]:
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [4]:
movies_df.show(5, truncate = False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



In [5]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



## 2. -Training Datasets

In [3]:
# Join the ratings_df and movies_df dataframes on the "movieId" column using a right join
# This will create a new dataframe with all the movies and their ratings 
movie_ratings = ratings_df.join(movies_df, ["movieId"], "right")

# Show the first 5 rows of the new dataframe
movie_ratings.show(5)

+-------+------+------+----------+----------------+--------------------+
|movieId|userId|rating| timestamp|           title|              genres|
+-------+------+------+----------+----------------+--------------------+
|      1|   610|   5.0|1479542900|Toy Story (1995)|Adventure|Animati...|
|      1|   609|   3.0| 847221025|Toy Story (1995)|Adventure|Animati...|
|      1|   608|   2.5|1117408267|Toy Story (1995)|Adventure|Animati...|
|      1|   607|   4.0| 964744033|Toy Story (1995)|Adventure|Animati...|
|      1|   606|   2.5|1349082950|Toy Story (1995)|Adventure|Animati...|
+-------+------+------+----------+----------------+--------------------+
only showing top 5 rows



In [7]:
movie_ratings.count()

100854

**Question 1:**

- Can you obtain a basic summary list of statistics for our new movie ratings dataframe? Interesting information is the count, mean, max, and some selected percentiles. For each question of the tutorial, you must provide the following information:


- What command are you going to use? Why?



- Which is your Spark operation to solve the question?



- Which output is providing your Spark command (3 lines max.)

In [4]:
# Group the movie_ratings dataframe by "movieId" column
movie_grouped = movie_ratings.groupBy("movieId")

# Aggregate the grouped dataframe to calculate total ratings, mean rating, and max rating for each movie
movie_stats = movie_grouped.agg(
        F.count(F.col("rating")).alias("total_ratings"), 
        F.mean(F.col("rating")).alias("mean_rating"), 
        F.max(F.col("rating")).alias("max_rating"),

)

# Collect the statistics for each movie into a list
stats_collected = movie_stats.collect()

# Calculate the 25th, 50th, and 75th percentiles of the "rating" column in the movie_ratings dataframe
percentiles = movie_ratings.stat.approxQuantile("rating", [0.25, 0.5, 0.75], 0.0)

In [7]:
movie_stats.show(3)

print("25th percentile:", percentiles[0])
print("50th percentile (median):", percentiles[1])
print("75th percentile:", percentiles[2])

+-------+-------------+-----------+----------+
|movieId|total_ratings|mean_rating|max_rating|
+-------+-------------+-----------+----------+
|    148|            1|        5.0|       5.0|
|    471|           40|       3.55|       5.0|
|    496|            1|        5.0|       5.0|
+-------+-------------+-----------+----------+
only showing top 3 rows

25th percentile: 3.0
50th percentile (median): 3.5
75th percentile: 4.0


*First off, to understand how viewers feel about different movies, we can organize the data by each film and summarize it. We do this by grouping all the ratings for the same movie together and then calculating the total number of ratings it's received, the average rating, and the highest rating it's gotten. This is done through a process called 'group by' and 'aggregate,' which is just a fancy way of saying we're pooling together all the information by movie and then doing some quick math to get the numbers we want.*

*The reason we use these particular methods is because they're built for handling large amounts of data efficiently. Especially when we're curious about how ratings spread out — like what's a low rating, a typical rating, or a high rating — we use something called 'approximate quantiles.' This is a shortcut that helps us estimate these specific points in our data without getting bogged down in details, making it a lot faster when dealing with lots of data.*

*As for what we end up with, imagine a neat table where each row shows a movie's ID, how many times it's been rated, what its average rating is, and the highest rating it's received. That's what our first method produces. The second part, with the 'approximate quantiles,' gives us a simple list of numbers that represent typical rating milestones, like what rating is low-end, middle-of-the-road, and on the higher side for all the movies together.*


In [8]:
# Call the help function on the randomSplit method of the movie_ratings dataframe
# This will display the documentation for the randomSplit method
help(movie_ratings.randomSplit)

# Split the movie_ratings dataframe into two dataframes for training and validation
# The split is done randomly with 80% of the data going to the training dataframe and 20% going to the validation dataframe
# The second argument of the randomSplit method is a seed value for the random number generator
(train_df, v_df) = movie_ratings.randomSplit([0.8, 0.2], 0)

# Count the number of rows in the movie_ratings dataframe
movie_ratings.count()

Help on method randomSplit in module pyspark.sql.dataframe:

randomSplit(weights, seed=None) method of pyspark.sql.dataframe.DataFrame instance
    Randomly splits this :class:`DataFrame` with the provided weights.
    
    .. versionadded:: 1.4.0
    
    Parameters
    ----------
    weights : list
        list of doubles as weights with which to split the :class:`DataFrame`.
        Weights will be normalized if they don't sum up to 1.0.
    seed : int, optional
        The seed for sampling.
    
    Examples
    --------
    >>> splits = df4.randomSplit([1.0, 2.0], 24)
    >>> splits[0].count()
    2
    
    >>> splits[1].count()
    2



100854

In [9]:
train_df.count()

80500

In [10]:
v_df.count()

20354

In [11]:
v_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [12]:
# Join the validation dataframe with the train dataframe on the "userId" column using a left semi join
# This will create a new dataframe with only the records that have a matching "userId" in both dataframes
# Then join the resulting dataframe with the train dataframe on the "movieId" column using a left semi join
# This will create a new dataframe with only the records that have a matching "movieId" in both dataframes
validation_df = ( v_df
                 .join(train_df, ["userId"], "left_semi")
                 .join(train_df, ["movieId"], "left_semi")
)

# Join the validation dataframe with the validation dataframe and keep only the records that do not have a matching "movieId" and "userId" in both dataframes
# This will create a new dataframe with only the records that are in the validation dataframe but not in the validation_df dataframe
non_matching_recs = v_df.join(validation_df, ["movieId", "userId"], "left_anti")

# Add the non-matching records to the train dataframe
train_df = train_df.union(non_matching_recs)

**Question 2:**

- What kind of join operations are used in left semi and left anti? Can you explain these operations with our validation example?

1. *A left semi join acts as a filter, keeping only the rows from the left DataFrame that have corresponding matches in the right DataFrame. It's like an intersection, but it only brings the columns from the left side. When applied to our DataFrames, the left semi join on userId then on movieId between v_df and train_df returns only those records from v_df where the userId and movieId also exist in train_df, essentially creating a validation set that only includes users and movies that the model has already seen during training.*

2. *On the flip side, a left anti join finds the rows in the left DataFrame that do not have matching rows in the right DataFrame. It’s the opposite of a left semi join – a set difference. For our DataFrames, when v_df is left anti joined with validation_df, it produces non_matching_recs, a set of records that are unique to your validation set, meaning these user-movie pairs were not included in the training set. This allows us to add back any unique combinations to train_df, ensuring our training set is as comprehensive as possible.*


**Question 3:**

- Train_df has now more or less records than initially? Why?

*After performing the operations described, train_df would have more records than it initially had. This increase is because of the addition of non_matching_recs to train_df using the union operation. The non_matching_recs DataFrame contains records from v_df that did not have a corresponding user and movie pair in train_df (as determined by the left anti join). By uniting train_df with non_matching_recs, we are adding unique user-movie pairs from the validation set into the training set, which were not there before. This process ensures that the training set includes all possible user-movie interactions, potentially improving the model's ability to generalize from the training data.*

# **3. -Managing columns of validation datasets**

In [14]:
from pyspark.sql.functions import col, countDistinct, mean, split, explode, count

In [15]:
validation_df.sort(col("rating").desc()).show(5)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|    193|   243|   5.0| 837155377|    Showgirls (1995)|               Drama|
|  44191|   540|   5.0|1179108778|V for Vendetta (2...|Action|Sci-Fi|Thr...|
|    296|   540|   5.0|1179108599| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|     62|   243|   5.0| 837155394|Mr. Holland's Opu...|               Drama|
|   1221|   392|   5.0|1027524082|Godfather: Part I...|         Crime|Drama|
+-------+------+------+----------+--------------------+--------------------+
only showing top 5 rows



**Question 4:**

- Create a new DF derived from train_df grouping all records with the same rating, count them, and sort by the rating column in descending order. The expected output of this transformation should be

In [17]:
# Group the train_df dataframe by "rating" column and count the number of records in each group
ratings_distribution = train_df.groupBy("rating").count()

# Sort the ratings_distribution dataframe by "rating" column in descending order
ratings_distribution = ratings_distribution.orderBy(F.desc("rating"))

# Display the ratings_distribution dataframe
ratings_distribution.show()

+------+-----+
|rating|count|
+------+-----+
|   5.0|10632|
|   4.5| 6856|
|   4.0|21725|
|   3.5|10610|
|   3.0|16100|
|   2.5| 4486|
|   2.0| 6055|
|   1.5| 1465|
|   1.0| 2308|
|   0.5| 1082|
|  null|   18|
+------+-----+



*To summarize the records in the DataFrame by their ratings and then arrange them in descending order, we'll employ a sequence of specific Spark DataFrame operations. First, we'll use the groupBy function on the rating column. This function clusters the DataFrame into groups based on the unique rating values.*

*After grouping the data by rating, we'll apply the count function. This aggregation function calculates the number of occurrences for each distinct rating in the DataFrame. It's a standard approach to quantify the frequency of each rating value within the dataset.*

*Finally, to present the data in descending order based on the rating values, we'll utilize the orderBy function in conjunction with the desc (descending) method. This operation sorts the aggregated counts so that the higher ratings appear first. This sequence of operations—groupBy, followed by count, and then orderBy with desc—effectively produces the final DataFrame, which will list the count of each rating from highest to lowest.*

In [20]:
# Split the "genres" column of the train_df dataframe by "|" character and create a new column "genres_array" with the resulting array
train_with_genres_array = train_df.withColumn("genres_array", split("genres", "\|"))

# Display the first 5 rows of the resulting dataframe
train_with_genres_array.show(5, truncate=False)

+-------+------+------+----------+----------------+-------------------------------------------+-------------------------------------------------+
|movieId|userId|rating|timestamp |title           |genres                                     |genres_array                                     |
+-------+------+------+----------+----------------+-------------------------------------------+-------------------------------------------------+
|1      |1     |4.0   |964982703 |Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|
|1      |5     |4.0   |847434962 |Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|
|1      |7     |4.5   |1106635946|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|
|1      |15    |2.5   |1510577970|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Childr

In [21]:
# Select the columns "movieId", "userId", "rating", "genres", and "genres_array" from the train_with_genres_array dataframe
# Then, explode the "genres_array" column into multiple rows, one for each genre in the array, and create a new column "genre" with the resulting genre value
train_with_genres_exploded = (
    train_with_genres_array
    .select("movieId", "userId", "rating", "genres", "genres_array")
    .withColumn("genre", explode("genres_array"))
)

# Display the first 5 rows of the resulting dataframe
train_with_genres_exploded.show(5, truncate = False)

+-------+------+------+-------------------------------------------+-------------------------------------------------+---------+
|movieId|userId|rating|genres                                     |genres_array                                     |genre    |
+-------+------+------+-------------------------------------------+-------------------------------------------------+---------+
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|Adventure|
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|Animation|
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|Children |
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|Comedy   |
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Come

In [23]:
# Group the train_with_genres_exploded dataframe by "genre" column and calculate the mean of the "rating" column for each group
# Rename the resulting column to "genre_rating"
mean_genre_rating = (
    train_with_genres_exploded
        .groupBy("genre")
        .agg(mean(col("rating")).alias("genre_rating"))
)

# Display the first 10 rows of the resulting dataframe
mean_genre_rating.show(10, truncate = False)

+-----------+------------------+
|genre      |genre_rating      |
+-----------+------------------+
|Crime      |3.6657157871030703|
|Romance    |3.5048340647284695|
|Thriller   |3.4936604449472095|
|Adventure  |3.507378740970072 |
|Drama      |3.6545317265271073|
|War        |3.7982522796352582|
|Documentary|3.780632411067194 |
|Fantasy    |3.4920134510298446|
|Mystery    |3.630674448767834 |
|Musical    |3.570934776074542 |
+-----------+------------------+
only showing top 10 rows



In [24]:
# Group the train_with_genres_exploded dataframe by "genre" column and calculate the mean of the "rating" column for each group
# Also, count the number of unique movies in each group
# Rename the resulting columns to "genre_rating" and "num_movies"
mean_genre_rating_movies = (
    train_with_genres_exploded
        .groupBy("genre")
        .agg(
            mean(col("rating")).alias("genre_rating"),
            countDistinct("movieId").alias("num_movies")
        )
)

# Display the first 10 rows of the resulting dataframe
mean_genre_rating_movies.show(10, truncate = False)

+-----------+------------------+----------+
|genre      |genre_rating      |num_movies|
+-----------+------------------+----------+
|Crime      |3.6657157871030703|13324     |
|Romance    |3.5048340647284695|14589     |
|Thriller   |3.4936604449472095|21221     |
|Adventure  |3.507378740970072 |19381     |
|Drama      |3.6545317265271073|33785     |
|War        |3.7982522796352582|3949      |
|Documentary|3.780632411067194 |1014      |
|Fantasy    |3.4920134510298446|9517      |
|Mystery    |3.630674448767834 |6168      |
|Musical    |3.570934776074542 |3328      |
+-----------+------------------+----------+
only showing top 10 rows



In [25]:
# Group the train_with_genres_exploded dataframe by "genre" column and calculate the mean of the "rating" column for each group
# Also, count the number of unique movies in each group
# Rename the resulting columns to "genre_rating" and "num_movies"
mean_genre_rating_movies = (
    train_with_genres_exploded
        .groupBy("genre")
        .agg(
            mean(col("rating")).alias("genre_rating"),
            countDistinct("movieId").alias("num_movies")
        )
)

# Display the first 10 rows of the resulting dataframe
mean_genre_rating_movies.show(10, truncate = False)

+-----------+------------------+----------+
|genre      |genre_rating      |num_movies|
+-----------+------------------+----------+
|Crime      |3.6657157871030703|1199      |
|Romance    |3.5048340647284695|1596      |
|Thriller   |3.4936604449472095|1894      |
|Adventure  |3.507378740970072 |1263      |
|Drama      |3.6545317265271073|4361      |
|War        |3.7982522796352582|382       |
|Documentary|3.780632411067194 |440       |
|Fantasy    |3.4920134510298446|779       |
|Mystery    |3.630674448767834 |573       |
|Musical    |3.570934776074542 |334       |
+-----------+------------------+----------+
only showing top 10 rows



**Question 5:**

Extend the previous DataFrame to have a new column with the unique number of ratings for each movie. You need to consider a countDistinct with both "movieId" and "userId" so that a user only ranks
once for each movie.

In [26]:
# Group the train_with_genres_exploded dataframe by "genre" column and calculate the mean of the "rating" column for each group
# Also, count the number of unique movies in each group and the number of unique ratings for each movie
# Rename the resulting columns to "genre_rating", "num_movies", and "num_ratings"
mean_ratings = (
    train_with_genres_exploded
    .groupBy("genre")
    .agg(
        mean(col("rating")).alias("genre_rating"),
        countDistinct("movieId").alias("num_movies"),
        countDistinct("movieId", "userId").alias("num_ratings")
    )
)

# Display the first 3 rows of the resulting dataframe
mean_ratings.show(3)

+--------+------------------+----------+-----------+
|   genre|      genre_rating|num_movies|num_ratings|
+--------+------------------+----------+-----------+
|   Crime|3.6657157871030703|      1199|      13321|
| Romance|3.5048340647284695|      1596|      14584|
|Thriller|3.4936604449472095|      1894|      21216|
+--------+------------------+----------+-----------+
only showing top 3 rows



*This command will group the data by the genre column and calculate three aggregated metrics:*

*The average rating for the genre (genre_rating).*
*The number of unique movies in the genre (num_movies).*
*The number of unique ratings for each movie within the genre (num_ratings).*

*The countDistinct function takes two columns as arguments, in this case userId and movieId, to ensure that the count is based on unique pairs of users and movies. This means that if a user has rated the same movie multiple times, it will only count once.*

*The groupBy followed by agg is the Spark operation used to solve this question. It allows us to compute multiple aggregate functions at once after grouping the data, which is efficient and concise for this type of computation.*


**Question 6:** 

Can you program a top 10 list of best average rating genres? and a top 10 list of genres with most ratings?

In [28]:
# This code creates a new dataframe called top10_avg_rating_genres that contains the top 10 genres with the highest average rating.
# It does this by sorting the mean_ratings dataframe by the "genre_rating" column in descending order and then limiting the output to the top 10 rows.
# Finally, it displays the resulting dataframe using the show() method.
top10_avg_rating_genres = (
    mean_ratings
    .orderBy(col("genre_rating").desc())
    .limit(10)
)

top10_avg_rating_genres.show()

+-----------+------------------+----------+-----------+
|      genre|      genre_rating|num_movies|num_ratings|
+-----------+------------------+----------+-----------+
|  Film-Noir|3.9236804564907275|        87|        701|
|        War|3.7982522796352582|       382|       3948|
|Documentary| 3.780632411067194|       440|       1012|
|      Crime|3.6657157871030703|      1199|      13321|
|      Drama|3.6545317265271073|      4361|      33773|
|    Mystery| 3.630674448767834|       573|       6168|
|  Animation| 3.627240461401952|       611|       5635|
|       IMAX|  3.62106682649086|       158|       3337|
|    Western|3.6017983301220293|       167|       1557|
|    Musical| 3.570934776074542|       334|       3327|
+-----------+------------------+----------+-----------+



In [29]:
# This code creates a new dataframe called top10_most_rating_genres that contains the top 10 genres with the most ratings.
# It does this by sorting the mean_ratings dataframe by the "num_ratings" column in descending order and then limiting the output to the top 10 rows.
# Finally, it displays the resulting dataframe using the show() method.
top10_most_rating_genres = (
    mean_ratings
    .orderBy(col("num_ratings").desc())
    .limit(10)
)

top10_most_rating_genres.show()

+---------+------------------+----------+-----------+
|    genre|      genre_rating|num_movies|num_ratings|
+---------+------------------+----------+-----------+
|    Drama|3.6545317265271073|      4361|      33773|
|   Comedy| 3.386513679933981|      3756|      31506|
|   Action|3.4520333468889794|      1828|      24590|
| Thriller|3.4936604449472095|      1894|      21216|
|Adventure| 3.507378740970072|      1263|      19380|
|  Romance|3.5048340647284695|      1596|      14584|
|   Sci-Fi|3.4512548962715797|       980|      13786|
|    Crime|3.6657157871030703|      1199|      13321|
|  Fantasy|3.4920134510298446|       779|       9516|
| Children|3.4114681365958592|       664|       7438|
+---------+------------------+----------+-----------+



*If we want to create a top 10 list of genres by best average rating, we would start by grouping the data by the 'genre' field. We would then aggregate this grouped data to calculate the average rating for each genre using the avg function. After calculating the averages, we would order the results in descending order based on the average rating. To get only the top 10 genres, we would apply the limit function with a parameter of 10.*

*We would follow a similar approach for obtaining the top 10 genres by the most ratings. First, we would group the data by 'genre'. Then, instead of averaging, we would count the number of ratings each genre has received using the count function. We would sort these counts in descending order to get the genres with the most ratings first. Finally, we would use the limit function again to get the top 10 results.*