<a href="https://colab.research.google.com/github/SrijaG29/Movie-Data-Analysis-and-Recommendations-Using-PySpark/blob/main/Movies_ratings_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

To create a project using the movie and ratings datasets with PySpark, here’s a step-by-step guide along with some ideas for questions you can answer. This will help demonstrate your PySpark skills and data analysis capabilities on your resume.

### Step 1: Data Exploration
1. **Load the datasets into PySpark**:
   - Load the movie dataset (`movies.csv`) and ratings dataset (`ratings.csv`) into PySpark DataFrames.

2. **Inspect the Data**:
   - Display the schema and first few rows of both DataFrames to understand the data structure.

### Step 2: Data Cleaning and Preparation
1. **Handle Missing Values**:
   - Check for any missing values and decide how to handle them (e.g., drop rows, fill with default values).

2. **Data Transformation**:
   - Extract the year from the movie title and create a new column `year`.
   - Split the `genres` column into an array of genres.

3. **Join the Datasets**:
   - Perform an inner join on the `movieId` column to combine the movie and ratings datasets.

### Step 3: Data Analysis
1. **Top-Rated Movies**:
   - Find the top 10 movies with the highest average rating.

2. **Popular Genres**:
   - Identify the most popular genres based on the number of ratings.

3. **User Behavior**:
   - Analyze the distribution of ratings by users. For example, find how many movies each user has rated.

4. **Yearly Trends**:
   - Determine how the average movie rating has changed over the years.

5. **Genre-Based Recommendations**:
   - For a given genre, list the top 5 movies based on average ratings.

6. **Movies with the Most Reviews**:
   - Identify the movies with the highest number of ratings.

### Step 4: Advanced Analysis
1. **User-Specific Recommendations**:
   - Build a basic recommendation system by suggesting top-rated movies that a user has not rated yet.

2. **Correlate Ratings and Release Year**:
   - Analyze if there’s any correlation between the release year and the average rating of movies.

3. **Genre Diversity in Top-Rated Movies**:
   - Examine the genre diversity among the top 100 highest-rated movies.

### Step 5: Performance Optimization
1. **Cache and Persist**:
   - Use caching and persistence in PySpark to optimize the performance of your queries.

2. **Partitioning**:
   - Apply partitioning to the data to improve the efficiency of operations, especially for large datasets.

### Step 6: Visualization
1. **Visualize Data**:
   - Use PySpark with an external library like Matplotlib, Seaborn, or even Power BI to create visualizations such as:
     - Distribution of ratings.
     - Average rating per genre.
     - Trends in movie ratings over the years.

### Conclusion
This project structure not only demonstrates your technical ability to handle and analyze data using PySpark but also your ability to derive meaningful insights from complex datasets. Once complete, you can showcase this project in your portfolio or resume, highlighting key aspects such as data cleaning, transformation, analysis, and visualization.

In [1]:
# Step 1: Download the ZIP file
!wget https://files.grouplens.org/datasets/movielens/ml-32m.zip -O movielens.zip

# Step 2: Extract the ZIP file
!unzip movielens.zip -d movielens_data


--2024-08-26 15:42:39--  https://files.grouplens.org/datasets/movielens/ml-32m.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 238950008 (228M) [application/zip]
Saving to: ‘movielens.zip’


2024-08-26 15:42:44 (47.9 MB/s) - ‘movielens.zip’ saved [238950008/238950008]

Archive:  movielens.zip
   creating: movielens_data/ml-32m/
  inflating: movielens_data/ml-32m/tags.csv  
  inflating: movielens_data/ml-32m/links.csv  
  inflating: movielens_data/ml-32m/README.txt  
  inflating: movielens_data/ml-32m/checksums.txt  
  inflating: movielens_data/ml-32m/ratings.csv  
  inflating: movielens_data/ml-32m/movies.csv  


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=9970999b427d537be408d8ae0062e63440d7b311b72ac731e8dc8657d5d11db5
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = (
    SparkSession
    .builder
    .appName("Movie ratings")
    .master("local[*]")
    .getOrCreate()
)

In [5]:
spark

**Step 1: Data Exploration**
Load the datasets into PySpark session **"spark"**:

Load the movie dataset (movies.csv) and ratings dataset (ratings.csv) into PySpark DataFrames.
Inspect the Data:

Display the schema and first few rows of both DataFrames to understand the data structure.

In [6]:
movies_df = spark.read.format("csv").option("header",True).load("/content/movielens_data/ml-32m/movies.csv")
movies_df.show(truncate = False)

+-------+-------------------------------------+-------------------------------------------+
|movieId|title                                |genres                                     |
+-------+-------------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)   |Comedy                                     |
|6      |Heat (1995)                          |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                       |Comedy|Romance                             |
|8      |Tom and Huck (1995)                  |Adventure|Children               

In [7]:
ratings_df = spark.read.format("csv").option("header",True).load("/content/movielens_data/ml-32m/ratings.csv")
ratings_df.show(truncate = False)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|1     |17     |4.0   |944249077|
|1     |25     |1.0   |944250228|
|1     |29     |2.0   |943230976|
|1     |30     |5.0   |944249077|
|1     |32     |5.0   |943228858|
|1     |34     |2.0   |943228491|
|1     |36     |1.0   |944249008|
|1     |80     |5.0   |944248943|
|1     |110    |3.0   |943231119|
|1     |111    |5.0   |944249008|
|1     |161    |1.0   |943231162|
|1     |166    |5.0   |943228442|
|1     |176    |4.0   |944079496|
|1     |223    |3.0   |944082810|
|1     |232    |5.0   |943228442|
|1     |260    |5.0   |943228696|
|1     |302    |4.0   |944253272|
|1     |306    |5.0   |944248888|
|1     |307    |5.0   |944253207|
|1     |322    |4.0   |944053801|
+------+-------+------+---------+
only showing top 20 rows



**Step 2: Data Cleaning and Preparation**

**Handle Missing Values:**

Check for any missing values and decide how to handle them (e.g., drop rows, fill with default values).

In [8]:
from pyspark.sql.functions import sum,col,when,split

In [9]:
print(movies_df.dtypes)

[('movieId', 'string'), ('title', 'string'), ('genres', 'string')]


In [10]:
print(ratings_df.dtypes)

[('userId', 'string'), ('movieId', 'string'), ('rating', 'string'), ('timestamp', 'string')]


In [11]:
from pyspark.sql import functions as F

# Create a list of expressions to count nulls in each column
null_counts_expr = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in movies_df.columns]

# Use the list in select()
null_counts = movies_df.select(*null_counts_expr)

# Show the result
null_counts.show()


+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
|      0|    0|     0|
+-------+-----+------+



In [12]:
# Create a list of expressions to count nulls in each column
null_counts_expr = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in ratings_df.columns]

# Use the list in select()
null_counts = ratings_df.select(*null_counts_expr)

# Show the result
null_counts.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+



In [13]:
ratings_df = ratings_df.dropna()

In [14]:
null_count = ratings_df.filter(col("rating").isNull()).count()
print(null_count)

0


In [15]:
null_count = ratings_df.filter(col("timestamp").isNull()).count()
print(null_count)

0


**" There are no null values in these datasets. "**

**Data Transformation:**

Extract the year from the movie title and create a new column year. Split the genres column into an array of genres.

In [16]:
movies_df.show(truncate = False)

+-------+-------------------------------------+-------------------------------------------+
|movieId|title                                |genres                                     |
+-------+-------------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)   |Comedy                                     |
|6      |Heat (1995)                          |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                       |Comedy|Romance                             |
|8      |Tom and Huck (1995)                  |Adventure|Children               

In [17]:
total_count = movies_df.count()
print(total_count)

87585


Extracting year from title.

In [18]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def year_extract(x):
  try:
    f = x.split('(')[-1].strip(')')
    return int(f)
  except:
    return None

x_udf = udf(year_extract,IntegerType())

movies_df = movies_df.withColumn('year',x_udf(col('title')))
movies_df.show(truncate = False)

+-------+-------------------------------------+-------------------------------------------+----+
|movieId|title                                |genres                                     |year|
+-------+-------------------------------------+-------------------------------------------+----+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|1995|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |1995|
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |1995|
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |1995|
|5      |Father of the Bride Part II (1995)   |Comedy                                     |1995|
|6      |Heat (1995)                          |Action|Crime|Thriller                      |1995|
|7      |Sabrina (1995)                       |Comedy|Romance                             |1995|
|8      |Tom and Huck (1995)  

In [19]:
total_count = movies_df.count()
print(total_count)

87585


In [20]:
from pyspark.sql.functions import asc
# f = movies_df.orderBy(movies_df.year.desc())
f = movies_df.orderBy(movies_df.year.asc())
f.show(truncate = False)

+-------+--------------------------------------------------------------------------------+----------------------------+----+
|movieId|title                                                                           |genres                      |year|
+-------+--------------------------------------------------------------------------------+----------------------------+----+
|290775 |Being Romanian: A Family Journal                                                |(no genres listed)          |NULL|
|79607  |Millions Game, The (Das Millionenspiel)                                         |Action|Drama|Sci-Fi|Thriller|NULL|
|290799 |"Ordinary Men: The ""Forgotten Holocaust"" (2022)"                              |Documentary                 |NULL|
|87697  |Shoppen (2006)                                                                  |Comedy|Romance              |NULL|
|291897 |The Green Oak Guardian                                                          |Comedy|Romance              |NULL|


Few columns are None so we will remove those columns.

In [21]:
null_count = movies_df.filter(col("year").isNull()).count()
print(null_count)

796


In [22]:
movies_df = movies_df.dropna(subset=['year'])
movies_df.show(truncate = False)

+-------+-------------------------------------+-------------------------------------------+----+
|movieId|title                                |genres                                     |year|
+-------+-------------------------------------+-------------------------------------------+----+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|1995|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |1995|
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |1995|
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |1995|
|5      |Father of the Bride Part II (1995)   |Comedy                                     |1995|
|6      |Heat (1995)                          |Action|Crime|Thriller                      |1995|
|7      |Sabrina (1995)                       |Comedy|Romance                             |1995|
|8      |Tom and Huck (1995)  

In [23]:
null_count = movies_df.filter(col("year").isNull()).count()
print(null_count)

0


In [24]:
from pyspark.sql.functions import asc
# f = movies_df.orderBy(movies_df.year.desc())
f = movies_df.orderBy(movies_df.year.asc())
f.show(truncate = False)

+-------+------------------------------------------------------------------------------------+------------------+----+
|movieId|title                                                                               |genres            |year|
+-------+------------------------------------------------------------------------------------+------------------+----+
|148054 |Passage de Venus (1874)                                                             |Documentary       |1874|
|148048 |Sallie Gardner at a Gallop (1878)                                                   |(no genres listed)|1878|
|275697 |The Magic Rosette (1878)                                                            |Animation         |1878|
|202045 |Athlete Swinging a Pick (1880)                                                      |Documentary       |1880|
|275699 |The Kiss (1882)                                                                     |Documentary       |1882|
|166800 |Buffalo Running (1883)                 

In [25]:
genres_df = movies_df.select(movies_df.movieId,movies_df.genres)
genres_df.show(truncate = False)

+-------+-------------------------------------------+
|movieId|genres                                     |
+-------+-------------------------------------------+
|1      |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Adventure|Children|Fantasy                 |
|3      |Comedy|Romance                             |
|4      |Comedy|Drama|Romance                       |
|5      |Comedy                                     |
|6      |Action|Crime|Thriller                      |
|7      |Comedy|Romance                             |
|8      |Adventure|Children                         |
|9      |Action                                     |
|10     |Action|Adventure|Thriller                  |
|11     |Comedy|Drama|Romance                       |
|12     |Comedy|Horror                              |
|13     |Adventure|Animation|Children               |
|14     |Drama                                      |
|15     |Action|Adventure|Romance                   |
|16     |Crime|Drama        

In [26]:
genres_df = genres_df.withColumn("gener",split(genres_df.genres,"\|"))
genres_df.show(truncate = False)

+-------+-------------------------------------------+-------------------------------------------------+
|movieId|genres                                     |gener                                            |
+-------+-------------------------------------------+-------------------------------------------------+
|1      |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|
|2      |Adventure|Children|Fantasy                 |[Adventure, Children, Fantasy]                   |
|3      |Comedy|Romance                             |[Comedy, Romance]                                |
|4      |Comedy|Drama|Romance                       |[Comedy, Drama, Romance]                         |
|5      |Comedy                                     |[Comedy]                                         |
|6      |Action|Crime|Thriller                      |[Action, Crime, Thriller]                        |
|7      |Comedy|Romance                             |[Comedy, Ro

In [27]:
genres_df = genres_df.drop(genres_df.genres)
genres_df.show(truncate = False)

+-------+-------------------------------------------------+
|movieId|gener                                            |
+-------+-------------------------------------------------+
|1      |[Adventure, Animation, Children, Comedy, Fantasy]|
|2      |[Adventure, Children, Fantasy]                   |
|3      |[Comedy, Romance]                                |
|4      |[Comedy, Drama, Romance]                         |
|5      |[Comedy]                                         |
|6      |[Action, Crime, Thriller]                        |
|7      |[Comedy, Romance]                                |
|8      |[Adventure, Children]                            |
|9      |[Action]                                         |
|10     |[Action, Adventure, Thriller]                    |
|11     |[Comedy, Drama, Romance]                         |
|12     |[Comedy, Horror]                                 |
|13     |[Adventure, Animation, Children]                 |
|14     |[Drama]                        

In movies_df there are rows in genre which have value like (no genre listed) so we will remove them.

In [28]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

def clean_genre(x):
    if x != '(no genres listed)':
        return x
    else:
        return None

clean_genre_udf = udf(clean_genre, StringType())
movies_df = movies_df.withColumn('genres', clean_genre_udf(col('genres')))
movies_df.show(truncate=False)

+-------+-------------------------------------+-------------------------------------------+----+
|movieId|title                                |genres                                     |year|
+-------+-------------------------------------+-------------------------------------------+----+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|1995|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |1995|
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |1995|
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |1995|
|5      |Father of the Bride Part II (1995)   |Comedy                                     |1995|
|6      |Heat (1995)                          |Action|Crime|Thriller                      |1995|
|7      |Sabrina (1995)                       |Comedy|Romance                             |1995|
|8      |Tom and Huck (1995)  

Checking for null values.

In [29]:
null_count = movies_df.filter(col("genres").isNull()).count()
print(null_count)

6702


In [30]:
total_rows = movies_df.count()
print(total_rows)

86789


In [31]:
movies_df = movies_df.filter(movies_df.genres.isNotNull())
movies_df.show(truncate=False)

+-------+-------------------------------------+-------------------------------------------+----+
|movieId|title                                |genres                                     |year|
+-------+-------------------------------------+-------------------------------------------+----+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|1995|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |1995|
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |1995|
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |1995|
|5      |Father of the Bride Part II (1995)   |Comedy                                     |1995|
|6      |Heat (1995)                          |Action|Crime|Thriller                      |1995|
|7      |Sabrina (1995)                       |Comedy|Romance                             |1995|
|8      |Tom and Huck (1995)  

In [32]:
null_count = movies_df.filter(col("genres").isNull()).count()
print(null_count)

0


In [33]:
total_rows = movies_df.count()
print(total_rows)

80087


**Join the Datasets:**

Perform an inner join on the movieId column to combine the movie and ratings datasets.

In [34]:
movies_ratings_df = movies_df.join(ratings_df,on='movieId',how='inner')
movies_ratings_df.show(truncate = False)

+-------+---------------------------------------------------------------+--------------------------------------+----+------+------+---------+
|movieId|title                                                          |genres                                |year|userId|rating|timestamp|
+-------+---------------------------------------------------------------+--------------------------------------+----+------+------+---------+
|17     |Sense and Sensibility (1995)                                   |Drama|Romance                         |1995|1     |4.0   |944249077|
|25     |Leaving Las Vegas (1995)                                       |Drama|Romance                         |1995|1     |1.0   |944250228|
|29     |City of Lost Children, The (Cité des enfants perdus, La) (1995)|Adventure|Drama|Fantasy|Mystery|Sci-Fi|1995|1     |2.0   |943230976|
|30     |Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)           |Crime|Drama                           |1995|1     |5.0   |944249077|
|32   

Now we will change the data types of each column accordingly.

movieId -> int

userId -> int

ratings -> Float

In [35]:
print(movies_ratings_df.dtypes)

[('movieId', 'string'), ('title', 'string'), ('genres', 'string'), ('year', 'int'), ('userId', 'string'), ('rating', 'string'), ('timestamp', 'string')]


In [36]:
# Change the data types of multiple columns
movies_ratings_df = movies_ratings_df.withColumn("movieId", F.col("movieId").cast("integer")) \
                     .withColumn("rating", F.col("rating").cast("float"))\
                     .withColumn("userId", F.col("userId").cast("integer"))

print(movies_ratings_df.dtypes)



[('movieId', 'int'), ('title', 'string'), ('genres', 'string'), ('year', 'int'), ('userId', 'int'), ('rating', 'float'), ('timestamp', 'string')]


**Step 3: Data Analysis**



**Top-Rated Movies:**

Find the top 10 movies with the highest average rating.

In [None]:
from pyspark.sql.functions import avg,desc,asc
top_ten_movies = movies_ratings_df.groupby('movieId','title').agg(avg('rating').alias('Avg_rating'))
top_ten_movies = top_ten_movies.orderBy(top_ten_movies.Avg_rating.desc())
top_ten_movies.show(10,truncate = False)

+-------+-------------------------------------------------------+----------+
|movieId|title                                                  |Avg_rating|
+-------+-------------------------------------------------------+----------+
|267928 |Unreal News Reel (1923)                                |5.0       |
|189587 |The Idiot Cycle (2009)                                 |5.0       |
|240070 |SpongeBob SquarePants: Heroes of Bikini Bottom (2011)  |5.0       |
|266250 |Brian Wilson: I Just Wasn't Made for These Times (1995)|5.0       |
|221334 |Bavaria - A magical journey (2012)                     |5.0       |
|228813 |The Hero's Journey: The World of Joseph Campbell (1987)|5.0       |
|287247 |The Beach Boys: Making Pet Sounds (2017)               |5.0       |
|226832 |The Beast (2014)                                       |5.0       |
|199756 |Zum Teufel mit der Penne (1968)                        |5.0       |
|223908 |Lung (2016)                                            |5.0       |

**Popular Genres:**

Identify the most popular genres based on the number of ratings.


In [None]:
from pyspark.sql.functions import count

popular_genres = movies_ratings_df.groupBy('genres').agg(count('rating').alias('No_of_ratings'))
popular_genres = popular_genres.sort(popular_genres.No_of_ratings.desc())
popular_genres.show(truncate = False)

+--------------------------------+-------------+
|genres                          |No_of_ratings|
+--------------------------------+-------------+
|Drama                           |2247114      |
|Comedy                          |1876631      |
|Comedy|Romance                  |1090830      |
|Drama|Romance                   |978575       |
|Comedy|Drama                    |919399       |
|Comedy|Drama|Romance            |871837       |
|Action|Adventure|Sci-Fi         |799753       |
|Crime|Drama                     |762923       |
|Action|Crime|Thriller           |482833       |
|Drama|Thriller                  |464062       |
|Action|Adventure|Sci-Fi|Thriller|438591       |
|Action|Adventure|Thriller       |415166       |
|Action|Sci-Fi|Thriller          |395940       |
|Crime|Drama|Thriller            |387572       |
|Drama|War                       |359478       |
|Action|Crime|Drama|Thriller     |327357       |
|Action|Drama|War                |325357       |
|Comedy|Crime       

**User Behavior:**

Analyze the distribution of ratings by users. For example, find how many movies each user has rated.


In [None]:
user_rating = movies_ratings_df.groupBy('userId').agg(count('userId').alias('No_of_ratings')).orderBy(movies_ratings_df.userId.asc())
user_rating.show(truncate = False)

+------+-------------+
|userId|No_of_ratings|
+------+-------------+
|1     |141          |
|2     |52           |
|3     |147          |
|4     |27           |
|5     |33           |
|6     |26           |
|7     |44           |
|8     |30           |
|9     |58           |
|10    |660          |
|11    |20           |
|12    |23           |
|13    |65           |
|14    |29           |
|15    |82           |
|16    |294          |
|17    |86           |
|18    |138          |
|19    |47           |
|20    |139          |
+------+-------------+
only showing top 20 rows



**Yearly Trends:**

Determine how the average movie rating has changed over the years.


In [None]:
movies_ratings_df.select('year').show()

+----+
|year|
+----+
|1995|
|1995|
|1995|
|1995|
|1995|
|1995|
|1995|
|1995|
|1995|
|1976|
|1995|
|1995|
|1995|
|1994|
|1994|
|1977|
|1994|
|1994|
|1993|
|1995|
+----+
only showing top 20 rows



In [None]:
avg_rating_year = movies_ratings_df.groupBy('year').agg(avg('rating').alias('Avg_rating'))
avg_rating_year.show(truncate = False)

+----+------------------+
|year|Avg_rating        |
+----+------------------+
|1959|3.8107774040575713|
|1990|3.4513714113949603|
|1896|2.9234972677595628|
|1903|3.066532258064516 |
|1975|3.8815535107687804|
|1977|3.8233296303051407|
|1888|2.577272727272727 |
|1924|3.763372956909361 |
|2003|3.47684039842985  |
|2007|3.507449919061784 |
|1892|2.7777777777777777|
|2018|3.4762552780695954|
|1974|3.8950891613262746|
|2015|3.5664814492136325|
|2023|3.345118898623279 |
|1927|3.8604250713372035|
|1955|3.719717275877964 |
|1890|2.238095238095238 |
|2006|3.555690373687808 |
|1978|3.4619190067903967|
+----+------------------+
only showing top 20 rows



**Genre-Based Recommendations:**

For a given genre, list the top 5 movies based on average ratings.


In [None]:
genre_input = input("Enter the genre to filter movies: ")

top_five_movies = (movies_ratings_df.filter(col('genres') == genre_input)\
                   .groupBy('title')\
                   .agg(avg('rating').alias('Avg_rating'))\
                   .orderBy(col('Avg_rating').desc())
                   .limit(5))
top_five_movies.show(truncate = False)


Enter the genre to filter movies: Action
+--------------------------+----------+
|title                     |Avg_rating|
+--------------------------+----------+
|Drunken Master 3 (1994)   |5.0       |
|FB: Fighting Beat (2007)  |5.0       |
|Legend of a Fighter (1982)|5.0       |
|Shanghai 13 (1984)        |5.0       |
|Street Level (2016)       |5.0       |
+--------------------------+----------+



**Movies with the Most Reviews:**

Identify the movies with the highest number of ratings.

In [None]:
from pyspark.sql.functions import desc

In [None]:
most_reviews = movies_ratings_df.groupBy('movieId','title')\
              .agg(count(col('rating')).alias('No_of_reviews'))\
              .orderBy(col('No_of_reviews').desc())\
              .limit(1)
most_reviews.show(truncate = False)

+-------+--------------------------------+-------------+
|movieId|title                           |No_of_reviews|
+-------+--------------------------------+-------------+
|318    |Shawshank Redemption, The (1994)|102929       |
+-------+--------------------------------+-------------+



In [None]:
x = movies_df.filter(col("title").like("Shawshank Redemption%"))
x.show(truncate = False)

+-------+--------------------------------+-----------+----+
|movieId|title                           |genres     |year|
+-------+--------------------------------+-----------+----+
|318    |Shawshank Redemption, The (1994)|Crime|Drama|1994|
+-------+--------------------------------+-----------+----+



**Step 4: Advanced Analysis**


**User-Specific Recommendations:**

Build a basic recommendation system by suggesting top-rated movies that a user has not rated yet.


In [None]:
top_rated_movies = movies_ratings_df.groupBy('movieId', 'title')\
    .agg(avg(col('rating')).alias('Avg_rating'))\
    .orderBy(col('Avg_rating').desc())\
    .limit(100)\
    .select('movieId','title')

top_rated_movies.show(truncate = False)

+-------+-------------------------------------------------------+
|movieId|title                                                  |
+-------+-------------------------------------------------------+
|136996 |The Wrong Girl (1999)                                  |
|117656 |Drunken Master 3 (1994)                                |
|277798 |Frank & Zed (2020)                                     |
|289775 |Toma (2021)                                            |
|226640 |Zabardast (2018)                                       |
|248740 |Legs - Atavism (2012)                                  |
|226832 |The Beast (2014)                                       |
|282565 |The Book of the Sea (2018)                             |
|266250 |Brian Wilson: I Just Wasn't Made for These Times (1995)|
|292047 |Bamboo Doll of Echizen (1963)                          |
|289249 |Cirque du Soleil: Quidam (1999)                        |
|267928 |Unreal News Reel (1923)                                |
|137018 |A

In [None]:
total_users = movies_ratings_df.select('userId').distinct().count()
print(total_users)

200948


In [None]:
distinct_user_ids = movies_ratings_df.select(movies_ratings_df.userId).distinct()
distinct_user_ids.show(truncate = False)

+------+
|userId|
+------+
|148   |
|463   |
|471   |
|496   |
|833   |
|1088  |
|1238  |
|1342  |
|1580  |
|1591  |
|1645  |
|1829  |
|1959  |
|2122  |
|2142  |
|2366  |
|2659  |
|2866  |
|3175  |
|3749  |
+------+
only showing top 20 rows



In [None]:
user_id_input = input('Enter user id')

user_ratings = movies_ratings_df.filter(col('userId') == user_id_input).select('movieId','title')
user_ratings.show(truncate = False)

Enter user id1
+-------+---------------------------------------------------------------+
|movieId|title                                                          |
+-------+---------------------------------------------------------------+
|17     |Sense and Sensibility (1995)                                   |
|25     |Leaving Las Vegas (1995)                                       |
|29     |City of Lost Children, The (Cité des enfants perdus, La) (1995)|
|30     |Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)           |
|32     |Twelve Monkeys (a.k.a. 12 Monkeys) (1995)                      |
|34     |Babe (1995)                                                    |
|36     |Dead Man Walking (1995)                                        |
|80     |White Balloon, The (Badkonake sefid) (1995)                    |
|110    |Braveheart (1995)                                              |
|111    |Taxi Driver (1976)                                             |
|161    |Crimson Tide (


The **left anti join** in PySpark is similar to the join functionality, but it returns only columns from the left DataFrame for non-matched records.

In [None]:
recommended_movies_df = top_rated_movies.join(user_ratings, on='movieId', how='left_anti')
recommended_movies_df.show(truncate = False)

+-------+-------------------------------------------------------+
|movieId|title                                                  |
+-------+-------------------------------------------------------+
|267948 |Christmas Cracker (1963)                               |
|243480 |Jealousy Is My Middle Name (2002)                      |
|247138 |The Science of Sleep (2016)                            |
|172071 |Atelier Fontana (2011)                                 |
|140369 |War Arrow (1954)                                       |
|285265 |Love in Bloom (2022)                                   |
|214182 |The Wedding Do Over (2018)                             |
|289249 |Cirque du Soleil: Quidam (1999)                        |
|257905 |The Sauce of Love (2021)                               |
|228813 |The Hero's Journey: The World of Joseph Campbell (1987)|
|143422 |2 (2007)                                               |
|282531 |A Night at the Kindergarten (2022)                     |
|267038 |C

left_anti join will keep only the movies that are not in the user’s rated movies list.

**Correlate Ratings and Release Year:**

Analyze if there’s any correlation between the release year and the average rating of movies.


In [None]:
avg_rating_year = movies_ratings_df.groupBy('year').agg(avg(col('rating')).alias('Avg_rating'))
avg_rating_year.show(truncate = False)

+----+------------------+
|year|Avg_rating        |
+----+------------------+
|1959|3.8107774040575713|
|1990|3.4513714113949603|
|1896|2.9234972677595628|
|1903|3.066532258064516 |
|1975|3.8815535107687804|
|1977|3.8233296303051407|
|1888|2.577272727272727 |
|1924|3.763372956909361 |
|2003|3.47684039842985  |
|2007|3.507449919061784 |
|1892|2.7777777777777777|
|2018|3.4762552780695954|
|1974|3.8950891613262746|
|2015|3.5664814492136325|
|2023|3.345118898623279 |
|1927|3.8604250713372035|
|1955|3.719717275877964 |
|1890|2.238095238095238 |
|2006|3.555690373687808 |
|1978|3.4619190067903967|
+----+------------------+
only showing top 20 rows



In [None]:
avg_rating_year_pd = avg_rating_year.toPandas()
correlation = avg_rating_year_pd['year'].astype(float).corr(avg_rating_year_pd['Avg_rating'])
print("Correlation between release year and average rating: ",correlation)


Correlation between release year and average rating:  0.3809941166120315


In this we are caluclatuing Pearson correlation coefficient between the year and Avg_rating columns.

**Conclusion:** There appears to be a moderate trend where newer movies (those released more recently) tend to receive slightly higher average ratings than older movies as pearson correlation coefficient comes under moderate strength.

**Genre Diversity in Top-Rated Movies:**

Examine the genre diversity among the top 100 highest-rated movies.

In [None]:
top_rated_movies = movies_ratings_df.groupBy('movieId', 'title')\
    .agg(avg(col('rating')).alias('Avg_rating'))\
    .orderBy(col('Avg_rating').desc())\
    .limit(100)\
    .select('movieId','title')

top_rated_movies.show(truncate = False)

+-------+-------------------------------------------------------+
|movieId|title                                                  |
+-------+-------------------------------------------------------+
|136996 |The Wrong Girl (1999)                                  |
|117656 |Drunken Master 3 (1994)                                |
|277798 |Frank & Zed (2020)                                     |
|289775 |Toma (2021)                                            |
|226640 |Zabardast (2018)                                       |
|248740 |Legs - Atavism (2012)                                  |
|226832 |The Beast (2014)                                       |
|282565 |The Book of the Sea (2018)                             |
|266250 |Brian Wilson: I Just Wasn't Made for These Times (1995)|
|292047 |Bamboo Doll of Echizen (1963)                          |
|289249 |Cirque du Soleil: Quidam (1999)                        |
|267928 |Unreal News Reel (1923)                                |
|137018 |A

In [None]:
from pyspark.sql.functions import explode,countDistinct

In [None]:
movies_df_aliased = movies_df.alias('m')
top_rated_movies_aliased = top_rated_movies.alias('t')


genres_split_df = movies_df_aliased.withColumn("genre", explode(split(col("genres"), "\\|"))) \
                                   .join(top_rated_movies_aliased, on="movieId") \
                                   .select('m.title', 'genre')

genres_split_df.show(truncate=False)


+------------------------------+-----------+
|title                         |genre      |
+------------------------------+-----------+
|Fitzgerald (2002)             |Drama      |
|Until They Sail (1957)        |Drama      |
|Until They Sail (1957)        |Romance    |
|Until They Sail (1957)        |War        |
|A Sister's Revenge (2013)     |Drama      |
|A Sister's Revenge (2013)     |Mystery    |
|A Sister's Revenge (2013)     |Thriller   |
|The Perfect Neighbor (2005)   |Drama      |
|The Perfect Neighbor (2005)   |Thriller   |
|War Arrow (1954)              |Adventure  |
|War Arrow (1954)              |Drama      |
|War Arrow (1954)              |Romance    |
|War Arrow (1954)              |War        |
|War Arrow (1954)              |Western    |
|Uomo e galantuomo (1975)      |Comedy     |
|2 (2007)                      |Drama      |
|Tammy and the Doctor (1963)   |Comedy     |
|Tammy and the Doctor (1963)   |Romance    |
|Who Killed Chea Vichea? (2010)|Documentary|
|The Far P

In [None]:
# Count distinct genres
genre_diversity = genres_split_df.groupBy("title") \
                                .agg(countDistinct("genre").alias("distinct_genres"))
genre_diversity = genre_diversity.orderBy(col('distinct_genres').desc())
genre_diversity.show(truncate = False)

+--------------------------------------------------+---------------+
|title                                             |distinct_genres|
+--------------------------------------------------+---------------+
|War Arrow (1954)                                  |5              |
|Christmas Cracker (1963)                          |4              |
|The Light in the Forest (1958)                    |4              |
|Me Sinto Bem com Você (2021)                      |3              |
|Silvery Moon (1933)                               |3              |
|Paper Marriage (1988)                             |3              |
|Until They Sail (1957)                            |3              |
|A Sister's Revenge (2013)                         |3              |
|See You Next Christmas (2021)                     |3              |
|Nico the Unicorn (1998)                           |3              |
|The Final Exit of the Disciples of Ascensia (2019)|3              |
|B&B Merry (2022)                 

**Total Genre Counts:**

You might want to count how many times each genre appears across all the top 100 movies.


In [None]:
total_count = genres_split_df.groupBy('genre').agg(count(col('genre')).alias('No_of_count'))
total_count = total_count.orderBy(col('No_of_count').desc())
total_count.show(truncate = False)

+-----------+-----------+
|genre      |No_of_count|
+-----------+-----------+
|Romance    |52         |
|Comedy     |33         |
|Drama      |31         |
|Documentary|15         |
|Animation  |6          |
|Children   |5          |
|Action     |5          |
|Thriller   |4          |
|War        |4          |
|Horror     |4          |
|Adventure  |3          |
|Fantasy    |3          |
|Mystery    |2          |
|Western    |2          |
|Crime      |1          |
|Sci-Fi     |1          |
+-----------+-----------+



**Percentage Representation:**

Calculate the percentage representation of each genre among the top-rated movies to understand which genres dominate.

In [None]:
from pyspark.sql.functions import round, col

percentage_count = total_count.withColumn('percentage_out_of_5',(col('No_of_count') / 5) * 100)
percentage_count.show(truncate=False)

+-----------+-----------+-------------------+
|genre      |No_of_count|percentage_out_of_5|
+-----------+-----------+-------------------+
|Romance    |52         |1040.0             |
|Comedy     |33         |660.0              |
|Drama      |31         |620.0              |
|Documentary|15         |300.0              |
|Animation  |6          |120.0              |
|Children   |5          |100.0              |
|Action     |5          |100.0              |
|Thriller   |4          |80.0               |
|War        |4          |80.0               |
|Horror     |4          |80.0               |
|Adventure  |3          |60.0               |
|Fantasy    |3          |60.0               |
|Mystery    |2          |40.0               |
|Western    |2          |40.0               |
|Crime      |1          |20.0               |
|Sci-Fi     |1          |20.0               |
+-----------+-----------+-------------------+



**Step 5: Performance Optimization**


**Cache and Persist:**

Use caching and persistence in PySpark to optimize the performance of your queries.


**Partitioning:**

Apply partitioning to the data to improve the efficiency of operations, especially for large datasets.


**Step 6: Visualization**

**Visualize Data:**
Use PySpark with an external library like Matplotlib, Seaborn, or even Power BI to create visualizations such as:
Distribution of ratings.
Average rating per genre.
Trends in movie ratings over the years.

**Pyspark MLib:**

1Q) Can we predict whether a user will give a movie a high rating (e.g., 4 or 5 stars) based on the movie's genre, year, and user's past ratings?

For this we need to do data preprocessing.

In [None]:
movies_ratings_df = movies_ratings_df.withColumn('genres',split('genres','\|'))
movies_ratings_df.show(truncate = False)

In [None]:
from pyspark.sql.functions import explode
x = movies_ratings_df.select('genres')
x.show(truncate = False)

In [None]:
genres_exploded_df =  x.withColumn('genres',explode('genres'))
genres_exploded_df.show(truncate = False)

In [None]:
genres_exploded_df = genres_exploded_df.distinct()
genres_exploded_df.show()

In [None]:
genres_list = genres_exploded_df.collect()
print(genres_list[0][0])

In [None]:
l = len(genres_list)
print(l)

In [None]:
f = movies_ratings_df.select('*')
f.show(truncate = False)

In [None]:

from pyspark.sql.functions import lit

for i in range(l):
    f = f.withColumn(str(genres_list[i][0]),lit(0))
f.show(truncate = False)


In [None]:
def genres_update(x, y):
    if y in x:
        return 1
    else:
        return 0


x_udf = udf(genres_update, IntegerType())

for i in range(l):
    genre = genres_list[i][0]
    f = f.withColumn(genre, x_udf(col('genres'), lit(genre)))


f.show(truncate=False)

In [None]:
f.show()

In [None]:
print(f.dtypes)

In [None]:
f = f.drop('movieId','title','genres','year','timestamp')
f.show()

In [None]:
# print(f.dtypes)
print(genres_list)

In [None]:
f = f.withColumn('userId',col('userId').cast('int'))\
    .withColumn('rating',col('rating').cast('int'))

print(f.dtypes)

In [None]:
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols = ['userId','Crime','Romance','Thriller','Adventure','Drama','War','Documentary','Fantasy','Mystery','Musical','Animation','Film-Noir','IMAX','Horror','Western','Comedy','Children','Action','Sci-Fi'],outputCol = 'user_details')

f = assembler.transform(f)

f.show(truncate = False)

In [None]:
f.show(truncate = False)

In [None]:
 f.select('user_details').show(truncate = False)

In [None]:
f.filter(f.userId == 148).show(truncate= False)

In [None]:
x = f.select('rating','user_details')
x.show(truncate = False)

In [None]:
x.cache()

In [None]:
x.count()

In [None]:
train_data, test_data = x.randomSplit([0.8, 0.2], seed=42)

In [None]:
train_data.show(truncate = False)

In [None]:
test_data.show(truncate = False)

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="user_details", labelCol="rating")

model = lr.fit(train_data)

In [None]:
predictions = model.transform(test_data)

In [None]:
predictions.show(truncate=False)

In [None]:
user_id = 148
generes = ['Action','Adventure','Thriller']

In [None]:
d ={}

for i in range(l):
    d[genres_list[i][0]] = i+1

print(d)



In [None]:
details = [0]*20
print(details)

In [None]:
details[0] = 148

for i in generes:
    details[d.get(i)] = 1

print(details)

In [None]:
schema = StructType(
    [
        StructField('userId',IntegerType()),
        StructField('Crime',IntegerType()),
        StructField('Romance',IntegerType()),
        StructField('Thriller',IntegerType()),
        StructField('Adventure',IntegerType()),
        StructField('Drama',IntegerType()),
        StructField('War',IntegerType()),
        StructField('Documentary',IntegerType()),
        StructField('Fantasy',IntegerType()),
        StructField('Mystery',IntegerType()),
        StructField('Musical',IntegerType()),
        StructField('Animation',IntegerType()),
        StructField('Film-Noir',IntegerType()),
        StructField('IMAX',IntegerType()),
        StructField('Horror',IntegerType()),
        StructField('Western',IntegerType()),
        StructField('Comedy',IntegerType()),
        StructField('Children',IntegerType()),
        StructField('Action',IntegerType()),
        StructField('Sci-Fi',IntegerType())
    ]
)

In [None]:
data = tuple(details)
print(data)

In [None]:
df = spark.createDataFrame([data],schema=schema)
df.show()

In [None]:
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols = ['userId','Crime','Romance','Thriller','Adventure','Drama','War','Documentary','Fantasy','Mystery','Musical','Animation','Film-Noir','IMAX','Horror','Western','Comedy','Children','Action','Sci-Fi'],outputCol = 'user_details')

df = assembler.transform(df)

df.show(truncate = False)

In [None]:
x = df.select('user_details')
x.show()

In [None]:
prediction_1 = model.transform(x)

In [None]:
prediction_1.show(truncate = False)