In [0]:
#Finding the ratings of movie with movie-lens datasets
#Files downloaded from: https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
# 1. Import the pyspark and pyspark SQL modules and also specify the app name 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
appName= "project2-movieratings"
master= "local"

# 2. Create a spark session and enable the Hive support to interact with the hive database
spark = SparkSession.builder.master(master).appName(appName).enableHiveSupport().getOrCreate() 
spark.sql("CREATE DATABASE IF NOT EXISTS classproject2").show()
spark.sql("DESCRIBE DATABASE classproject2").show(truncate=False)
spark.sql("USE classproject2").show() 

++
||
++
++

+-------------------------+------------------------------------------+
|database_description_item|database_description_value                |
+-------------------------+------------------------------------------+
|Catalog Name             |spark_catalog                             |
|Namespace Name           |classproject2                             |
|Comment                  |                                          |
|Location                 |dbfs:/user/hive/warehouse/classproject2.db|
|Owner                    |root                                      |
+-------------------------+------------------------------------------+

++
||
++
++



In [0]:
#3. Verify the databases in hive using pyspark
df=spark.sql("show databases")
df.show()

+-------------+
| databaseName|
+-------------+
|classproject2|
|      default|
+-------------+



In [0]:
# 4. Read the CSV file (movies)from the local write to the table in hive using pyspark 

#Import spark sql types 
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Define the schema for the movie DataFrame
movie_schema = StructType([
    StructField('movieId', IntegerType(), True),
    StructField('title', StringType(), True),
    StructField('genres', StringType(), True)
])
#Load data from CSV to Spark Dataframe
movies_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/movies.csv")

In [0]:
# Define the schema for the ratings DataFrame
ratings_schema = StructType([
    StructField('userId', IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('rating', DoubleType(), True),
    StructField('timestamp', TimestampType(), True)
])

#Load data from CSV to Spark Dataframe
ratings_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pawansharma2045@gmail.com/ratings.csv")

In [0]:
#Check and Show dataframe data by .show() method
movies_df.show(10, truncate=False)

#Print The schema of dataframe by .printSchema()
movies_df.printSchema()

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck (1995)               |Adventure|Children                         |
|9      |Sudden Death

In [0]:

#Check and Show dataframe data by .show() method
ratings_df.show(10, truncate=False)

#Print The schema of dataframe by .printSchema()
ratings_df.printSchema()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|1     |1      |4.0   |964982703|
|1     |3      |4.0   |964981247|
|1     |6      |4.0   |964982224|
|1     |47     |5.0   |964983815|
|1     |50     |5.0   |964982931|
|1     |70     |3.0   |964982400|
|1     |101    |5.0   |964980868|
|1     |110    |4.0   |964982176|
|1     |151    |5.0   |964984041|
|1     |157    |5.0   |964984100|
+------+-------+------+---------+
only showing top 10 rows

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
#List all Movies with Number of Ratings
#Write a SQL query in pyspark to list all movies with the number of ratings each has received.




# Join movies and ratings DataFrames and compute number of ratings for each movie
movies_with_ratings = movies_df.join(ratings_df, "movieId", "left")
movies_num_ratings = movies_with_ratings.groupBy("movieId", "title").agg(count("userId").alias("num_ratings"))

# Order by number of ratings in descending order and show top 5
top_movies_by_ratings = movies_num_ratings.orderBy(desc("num_ratings")).limit(5)
top_movies_by_ratings.show()

+-------+--------------------+-----------+
|movieId|               title|num_ratings|
+-------+--------------------+-----------+
|    356| Forrest Gump (1994)|        329|
|    318|Shawshank Redempt...|        317|
|    296| Pulp Fiction (1994)|        307|
|    593|Silence of the La...|        279|
|   2571|  Matrix, The (1999)|        278|
+-------+--------------------+-----------+



In [0]:
#List Users and Number of Ratings:
# Compute number of ratings given by each user 
users_num_ratings = ratings_df.groupBy("userId").agg(count("*").alias("num_ratings_given"))
top_users_by_ratings_given = users_num_ratings.orderBy(desc("num_ratings_given"))
top_users_by_ratings_given.show(10)

+------+-----------------+
|userId|num_ratings_given|
+------+-----------------+
|   414|             2698|
|   599|             2478|
|   474|             2108|
|   448|             1864|
|   274|             1346|
|   610|             1302|
|    68|             1260|
|   380|             1218|
|   606|             1115|
|   288|             1055|
+------+-----------------+
only showing top 10 rows



In [0]:
#List Movie IDs with Ratings:
# List all movie IDs that have received at least one rating
distinct_movie_ids = ratings_df.select("movieId").distinct()
distinct_movie_ids.show(10)


+-------+
|movieId|
+-------+
|    101|
|      3|
|     47|
|    110|
|    163|
|     70|
|      6|
|      1|
|    157|
|    151|
+-------+
only showing top 10 rows



In [0]:
#List Users with Ratings:
# List all users who have rated at least one movie
distinct_user_ids = ratings_df.select("userId").distinct().limit(5)
distinct_user_ids.show(10)


+------+
|userId|
+------+
|     3|
|     5|
|     1|
|     4|
|     2|
+------+



In [0]:
#User Rating Statistics:
from pyspark.sql.functions import avg, max, min

# Compute statistics for each user (max, min, average ratings given)
user_rating_stats = ratings_df.groupBy("userId").agg(
    max("rating").alias("max_rating_given"),
    min("rating").alias("min_rating_given"),
    avg("rating").alias("avg_rating_given")
)
user_rating_stats.show(10)


+------+----------------+----------------+------------------+
|userId|max_rating_given|min_rating_given|  avg_rating_given|
+------+----------------+----------------+------------------+
|     1|             5.0|             1.0| 4.366379310344827|
|    10|             5.0|             0.5|3.2785714285714285|
|   100|             5.0|             1.0| 3.945945945945946|
|   101|             5.0|             1.0| 3.557377049180328|
|   102|             5.0|             1.0| 3.357142857142857|
|   103|             5.0|             0.5| 3.907161803713528|
|   104|             5.0|             0.5|3.5073260073260073|
|   105|             5.0|             0.5| 4.116343490304709|
|   106|             5.0|             2.5|4.4393939393939394|
|   107|             5.0|             3.0| 3.911764705882353|
+------+----------------+----------------+------------------+
only showing top 10 rows



In [0]:
#Movie Rating Statistics:
# Compute statistics for each movie (max, min, average ratings received)
movie_rating_stats = ratings_df.groupBy("movieId").agg(
    max("rating").alias("max_rating_received"),
    min("rating").alias("min_rating_received"),
    avg("rating").alias("avg_rating_received")
)
movie_rating_stats.show(10)

+-------+-------------------+-------------------+-------------------+
|movieId|max_rating_received|min_rating_received|avg_rating_received|
+-------+-------------------+-------------------+-------------------+
|      1|                5.0|                0.5| 3.9209302325581397|
|     10|                5.0|                0.5|  3.496212121212121|
|    100|                4.0|                1.0| 2.7857142857142856|
| 100044|                4.0|                4.0|                4.0|
| 100068|                3.5|                3.5|                3.5|
| 100083|                5.0|                2.0|                3.5|
| 100106|                3.5|                3.5|                3.5|
| 100159|                4.5|                4.5|                4.5|
| 100163|                4.5|                0.5|                2.9|
| 100194|                4.5|                4.5|                4.5|
+-------+-------------------+-------------------+-------------------+
only showing top 10 

In [0]:
from pyspark.sql.functions import col, count, desc, avg, max, min, variance


# Movies with the Highest and Lowest Ratings:
# Compute average ratings for each movie
# Join movies and ratings DataFrames
movies_with_ratings = movies_df.join(ratings_df, "movieId", "inner")

# Calculate variance of ratings for each movie
movie_variance = movies_with_ratings.groupBy("movieId", "title").agg(variance("rating").alias("rating_variance"))

# Order by rating variance in descending order and show top  movies with high variance
high_variance_movies = movie_variance.orderBy(desc("rating_variance"))
high_variance_movies.show(10)


+-------+--------------------+---------------+
|movieId|               title|rating_variance|
+-------+--------------------+---------------+
|  32892|Ivan's Childhood ...|         10.125|
|   2068|Fanny and Alexand...|         10.125|
|   3223|Zed & Two Noughts...|            8.0|
|    484|       Lassie (1994)|            8.0|
|  84847|         Emma (2009)|            8.0|
|   7564|Kwaidan (Kaidan) ...|            8.0|
|  70946|      Troll 2 (1990)|           6.75|
| 108689|I, Frankenstein (...|          6.125|
|  70687|  Paper Heart (2009)|          6.125|
|  26409|Clonus Horror, Th...|          6.125|
+-------+--------------------+---------------+
only showing top 10 rows



In [0]:
# Calculate average ratings for each movie
movie_avg_ratings = movies_with_ratings.groupBy("movieId", "title").agg(avg("rating").alias("avg_rating"))

# Order by average rating in descending order and show top  highest rated movies
highest_rated_movies = movie_avg_ratings.orderBy(desc("avg_rating"))
highest_rated_movies.show(5)

# Order by average rating in ascending order and show top  lowest rated movies
lowest_rated_movies = movie_avg_ratings.orderBy("avg_rating")
lowest_rated_movies.show(5)

+-------+--------------------+----------+
|movieId|               title|avg_rating|
+-------+--------------------+----------+
| 173619|    Fugitives (1986)|       5.0|
| 145994|Formula of Love (...|       5.0|
|   3851|I'm the One That ...|       5.0|
|   5607|Son of the Bride ...|       5.0|
| 141816|    12 Chairs (1976)|       5.0|
+-------+--------------------+----------+
only showing top 5 rows

+-------+--------------------+----------+
|movieId|               title|avg_rating|
+-------+--------------------+----------+
| 102735|Captain America (...|       0.5|
|   5356|Giant Spider Inva...|       0.5|
|   4580|       Cyborg (1989)|       0.5|
| 104017|3 dev adam (Three...|       0.5|
|  89386|Pearl Jam Twenty ...|       0.5|
+-------+--------------------+----------+
only showing top 5 rows

