# Project3_100K_MovieLens Data Analysis
### Author : Farhana Alam

[k-Most popular movies of all time](#section_1)\
[k-Most popular movies for a particular year](#section_2)\
[k-Most popular movies for a particular season](#section_3)\
[Top k movies with the most ratings (presumably most popular) that have the lowest ratings](#section_4)\
[k most popular movies for a particular age](#section_5)\
[For a particular genre finding k-most rated movies](#section_6)\
[Finding the average rating by age group](#section_7)

In [1]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as func
import sys
k = 10

In [2]:
spark = (SparkSession
       .builder
       .appName("Spark Project3 100K Movie Data Analysis")
       .getOrCreate())

In [None]:
# Loading DataFiles with argv 
#movies_file = sys.argv[1]
#ratings_file = sys.argv[2]
#tags_file = sys.argv[3]

# Output CSV file path
#output_csv_path = sys.argv[4]

In [3]:
# Loading the 10M MovieLens DataFiles
movies_file = "Documents/CS535/movie_data/ml-100k/u.item"
ratings_file = "Documents/CS535/movie_data/ml-100k/u.data"
users_file = "Documents/CS535/movie_data/ml-100k/u.user"

# Output CSV file path
output_csv_path = "Documents/CS535/movie_data/ml-100k-output"

In [4]:
#movies datafiles to dataframes
movies_df = (spark.read.format("csv")
      .option("sep", "|")
      .option("inferschema", "true")
      .option("samplingRatio", 0.1)  # Adjust the sampling ratio 
      .load(movies_file)
      .toDF("MovieID","Titles","release_date","video_release_date","IMDb_URL","unknown","Action","Adventure","Animation","Children's","Comedy","Crime","Documentary","Drama",
           "Fantasy","Film-Noir","Horror","Musical","Mystery","Romance","Sci-Fi","Thriller","War","Western"))
#movies_df3.show(k)

In [5]:
movies_df=movies_df.select("MovieID","Titles","release_date","Action","Drama","Fantasy")
movies_df.show(8)

+-------+--------------------+------------+------+-----+-------+
|MovieID|              Titles|release_date|Action|Drama|Fantasy|
+-------+--------------------+------------+------+-----+-------+
|      1|    Toy Story (1995)| 01-Jan-1995|     0|    0|      0|
|      2|    GoldenEye (1995)| 01-Jan-1995|     1|    0|      0|
|      3|   Four Rooms (1995)| 01-Jan-1995|     0|    0|      0|
|      4|   Get Shorty (1995)| 01-Jan-1995|     1|    1|      0|
|      5|      Copycat (1995)| 01-Jan-1995|     0|    1|      0|
|      6|Shanghai Triad (Y...| 01-Jan-1995|     0|    1|      0|
|      7|Twelve Monkeys (1...| 01-Jan-1995|     0|    1|      0|
|      8|         Babe (1995)| 01-Jan-1995|     0|    1|      0|
+-------+--------------------+------------+------+-----+-------+
only showing top 8 rows



In [None]:
# define schema for our data (u.item)
#schema_item = "`MovieID` STRING, `Titles` STRING, `release_date` STRING, `video_release_date` STRING, \
#`IMDb_URL` STRING, `unknown` STRING, `Action` STRING, `Adventure` STRING, `Animation` STRING, `Children's` STRING, `Comedy` STRING, `Crime` STRING, `Documentary` STRING, `Drama` STRING, `Fantasy` STRING, `Film-Noir` STRING, `Horror` STRING, `Musical` STRING, `Mystery` STRING, `Romance` STRING, `Sci-Fi` STRING, `Thriller` STRING, `War` STRING, `Western` STRING"

In [6]:
# define schema for our data (u.user)
schema_user = StructType([
   StructField("UserID", IntegerType(), False),
   StructField("age", IntegerType(), False),
   StructField("gender", StringType(), False),
   StructField("occupation", StringType(), False),
   StructField("zip_code", IntegerType(), False)])

#user id | age | gender | occupation | zip code

In [7]:
users_df = spark.read.option("sep", "|").option("multiLine", "true")\
                     .option("ignoreTrailingWhiteSpace", "true").csv(users_file, schema=schema_user)
users_df.show(5)

+------+---+------+----------+--------+
|UserID|age|gender|occupation|zip_code|
+------+---+------+----------+--------+
|     1| 24|     M|technician|   85711|
|     2| 53|     F|     other|   94043|
|     3| 23|     M|    writer|   32067|
|     4| 24|     M|technician|   43537|
|     5| 33|     F|     other|   15213|
+------+---+------+----------+--------+
only showing top 5 rows



In [8]:
#ratings datafiles to dataframes
ratings_df = (spark.read.format("csv")
    .option("sep", "\t")
    .option("inferschema", "true")
    .option("samplingRatio", 0.1)  # Adjust the sampling ratio
    .load(ratings_file)
    .toDF("UserID", "MovieID", "Rating", "Rating_Timestamp"))

ratings_df.show(k)

+------+-------+------+----------------+
|UserID|MovieID|Rating|Rating_Timestamp|
+------+-------+------+----------------+
|   196|    242|     3|       881250949|
|   186|    302|     3|       891717742|
|    22|    377|     1|       878887116|
|   244|     51|     2|       880606923|
|   166|    346|     1|       886397596|
|   298|    474|     4|       884182806|
|   115|    265|     2|       881171488|
|   253|    465|     5|       891628467|
|   305|    451|     3|       886324817|
|     6|     86|     3|       883603013|
+------+-------+------+----------------+
only showing top 10 rows



In [None]:

"""movies_df = spark.read.option("sep","|").option("multiLine", "true").option("ignoreTrailingWhiteSpace", "true")\
             .csv(movies_file, header=False, schema=schema_item)
movies_df=movies_df.select("MovieID","Titles","release_date")
movies_df.show(8)"""

# Movies and Ratings

In [9]:
#INNER JOIN movies_df & ratings_df
moviesNratings = movies_df.join(ratings_df,movies_df.MovieID == ratings_df.MovieID, 'inner').select(
        movies_df.MovieID,movies_df.Titles,ratings_df.UserID,ratings_df.Rating, ratings_df.Rating_Timestamp,movies_df.Action,movies_df.Drama,movies_df.Fantasy)

moviesNratings.sort(col("UserID")).show(3)

+-------+--------------------+------+------+----------------+------+-----+-------+
|MovieID|              Titles|UserID|Rating|Rating_Timestamp|Action|Drama|Fantasy|
+-------+--------------------+------+------+----------------+------+-----+-------+
|    160|Glengarry Glen Ro...|     1|     4|       875072547|     0|    1|      0|
|    189|Grand Day Out, A ...|     1|     3|       888732928|     0|    0|      0|
|     61|Three Colors: Whi...|     1|     4|       878542420|     0|    1|      0|
+-------+--------------------+------+------+----------------+------+-----+-------+
only showing top 3 rows



<a id="section_1"></a>
## k-Most popular movies of all time
### Finding k most popular movies of all times assuming k=10

In [10]:
most_pop_movies = moviesNratings.groupBy("MovieID","Titles").avg("Rating").orderBy("avg(Rating)", ascending=False)

#writing to file
most_pop_movies.limit(k).write.csv(output_csv_path, header=True, mode="append")
most_pop_movies.show(k)

+-------+--------------------+-----------+
|MovieID|              Titles|avg(Rating)|
+-------+--------------------+-----------+
|   1293|     Star Kid (1997)|        5.0|
|   1599|Someone Else's Am...|        5.0|
|   1536|Aiqing wansui (1994)|        5.0|
|   1653|Entertaining Ange...|        5.0|
|   1500|Santa with Muscle...|        5.0|
|   1467|Saint of Fort Was...|        5.0|
|   1189|  Prefontaine (1997)|        5.0|
|    814|Great Day in Harl...|        5.0|
|   1201|Marlene Dietrich:...|        5.0|
|   1122|They Made Me a Cr...|        5.0|
+-------+--------------------+-----------+
only showing top 10 rows



<a id="section_2"></a>
## k-Most popular movies for a particular year
### Finding k most popular movies of all times assuming k=10, year = 1998
#### Considering rating timestamp, not the movie realease time

In [11]:
#INNER JOIN movies_df & ratings_df
#moviesNratings = movies_df.join(ratings_df,movies_df.MovieID == ratings_df.MovieID, 'inner').select(
#        movies_df.MovieID,movies_df.Titles,ratings_df.UserID,ratings_df.Rating, ratings_df.Rating_Timestamp)
#
#moviesNratings.sort(col("UserID")).show(3)

In [12]:
moviesNratings_withYear = moviesNratings.withColumn("Year", year(from_unixtime("Rating_Timestamp")))\
                               .select("MovieID","Titles","UserID","Rating","Year")\
                               .where(col("Year") == 1998)
moviesNratings_withYear.show(3)

+-------+--------------------+------+------+----+
|MovieID|              Titles|UserID|Rating|Year|
+-------+--------------------+------+------+----+
|    302|L.A. Confidential...|   186|     3|1998|
|    346| Jackie Brown (1997)|   166|     1|1998|
|    474|Dr. Strangelove o...|   298|     4|1998|
+-------+--------------------+------+------+----+
only showing top 3 rows



In [13]:
top_10_pop_movies_of_a_year = moviesNratings_withYear.select("MovieID","Titles","Rating","Year")\
                               .groupBy("MovieID","Titles").avg("Rating").orderBy("avg(Rating)",ascending=False)
#writing to file
top_10_pop_movies_of_a_year.limit(k).write.csv(output_csv_path, header=True, mode="append")
top_10_pop_movies_of_a_year.show(k)

+-------+--------------------+-----------+
|MovieID|              Titles|avg(Rating)|
+-------+--------------------+-----------+
|   1293|     Star Kid (1997)|        5.0|
|   1368|Mina Tannenbaum (...|        5.0|
|   1472|Visitors, The (Vi...|        5.0|
|   1653|Entertaining Ange...|        5.0|
|   1367|        Faust (1994)|        5.0|
|   1189|  Prefontaine (1997)|        5.0|
|    814|Great Day in Harl...|        5.0|
|   1358|The Deadly Cure (...|        5.0|
|   1201|Marlene Dietrich:...|        5.0|
|    884|Year of the Horse...|       4.75|
+-------+--------------------+-----------+
only showing top 10 rows



<a id="section_3"></a>
## k-Most popular movies for a particular season
### Defining the season as (1: Winter, 2: Spring, 3: Summer, 4: Fall) 
### Assuming k=10, target_season = 3 (summer: month 7,8,9)
#### Considering rating timestamp, not the movie realease time

In [14]:
target_season = 3
moviesNratings_withMonth = moviesNratings.withColumn("Month", month(from_unixtime("Rating_Timestamp")))\
                               .select("MovieID","Titles","UserID","Rating","Month")\
                               .where((col("Month") >=(target_season * 3 - 2))&(col("Month")<=(target_season * 3)))

moviesNratings_withMonth.show(3)

+-------+-----------------+------+------+-----+
|MovieID|           Titles|UserID|Rating|Month|
+-------+-----------------+------+------+-----+
|   1042|Just Cause (1995)|   291|     4|    9|
|    118|   Twister (1996)|   291|     2|    9|
|    796|Speechless (1994)|   276|     1|    9|
+-------+-----------------+------+------+-----+
only showing top 3 rows



In [15]:
top_10_pop_movies_of_summer = moviesNratings_withMonth.select("MovieID","Titles","Rating","Month")\
                                  .groupBy("MovieID","Titles").avg("Rating").orderBy("avg(Rating)",ascending=False)
#writing to file
top_10_pop_movies_of_summer.limit(k).write.csv(output_csv_path, header=True, mode="append")
top_10_pop_movies_of_summer.show(k)

+-------+--------------------+-----------+
|MovieID|              Titles|avg(Rating)|
+-------+--------------------+-----------+
|    487|Roman Holiday (1953)|        5.0|
|   1153|     Backbeat (1993)|        5.0|
|    139|Love Bug, The (1969)|        5.0|
|    459|Cry, the Beloved ...|        5.0|
|   1597|Romper Stomper (1...|        5.0|
|    853|    Braindead (1992)|        5.0|
|    149|         Jude (1996)|        5.0|
|   1368|Mina Tannenbaum (...|        5.0|
|    534|    Traveller (1997)|        5.0|
|   1302|Late Bloomers (1996)|        5.0|
+-------+--------------------+-----------+
only showing top 10 rows



<a id="section_4"></a>
## Top k movies with the most ratings (presumably most popular) that have the lowest ratings
#### Most rating counts, but less popular/lowest rating avg

In [16]:
# count of the ratings
moviesNratings_with_rating_counts = moviesNratings.groupBy("MovieID","Titles").count()                                                                 
moviesNratings_with_rating_counts.show(10)

+-------+--------------------+-----+
|MovieID|              Titles|count|
+-------+--------------------+-----+
|    171| Delicatessen (1991)|   65|
|    279|Once Upon a Time....|   28|
|    873|Picture Perfect (...|   81|
|    660|Fried Green Tomat...|  153|
|    708|Sex, Lies, and Vi...|  101|
|    170|Cinema Paradiso (...|  121|
|    394|Radioland Murders...|   12|
|   1512|World of Apu, The...|    6|
|    648|Quiet Man, The (1...|   67|
|    586|Terminal Velocity...|   34|
+-------+--------------------+-----+
only showing top 10 rows



In [17]:
# average of the ratings
moviesNratings_with_avg_rating = moviesNratings.groupBy("MovieID","Titles").avg("Rating")
moviesNratings_with_avg_rating = moviesNratings_with_avg_rating.select("MovieID","Titles",round("avg(Rating)",2).alias("avg_rating"))
moviesNratings_with_avg_rating.show(k)

+-------+--------------------+----------+
|MovieID|              Titles|avg_rating|
+-------+--------------------+----------+
|    171| Delicatessen (1991)|      3.88|
|    279|Once Upon a Time....|      3.29|
|    873|Picture Perfect (...|      2.96|
|    660|Fried Green Tomat...|      3.76|
|    708|Sex, Lies, and Vi...|      3.48|
|    170|Cinema Paradiso (...|      4.17|
|    394|Radioland Murders...|      3.33|
|   1512|World of Apu, The...|       4.0|
|    648|Quiet Man, The (1...|      4.03|
|    586|Terminal Velocity...|      2.68|
+-------+--------------------+----------+
only showing top 10 rows



In [18]:
# Joining movie rating-count and average-rating
moviesNratings_rating_counts_with_avgRatings = moviesNratings_with_rating_counts.join(moviesNratings_with_avg_rating, 
                    moviesNratings_with_rating_counts.MovieID == moviesNratings_with_avg_rating.MovieID)\
                     .orderBy("count", ascending=False)\
                     .select(moviesNratings_with_rating_counts.MovieID,
                             moviesNratings_with_rating_counts.Titles,"count","avg_rating")
moviesNratings_rating_counts_with_avgRatings.show(k)

+-------+--------------------+-----+----------+
|MovieID|              Titles|count|avg_rating|
+-------+--------------------+-----+----------+
|     50|    Star Wars (1977)|  583|      4.36|
|    258|      Contact (1997)|  509|       3.8|
|    100|        Fargo (1996)|  508|      4.16|
|    181|Return of the Jed...|  507|      4.01|
|    294|    Liar Liar (1997)|  485|      3.16|
|    286|English Patient, ...|  481|      3.66|
|    288|       Scream (1996)|  478|      3.44|
|      1|    Toy Story (1995)|  452|      3.88|
|    300|Air Force One (1997)|  431|      3.63|
|    121|Independence Day ...|  429|      3.44|
+-------+--------------------+-----+----------+
only showing top 10 rows



In [19]:
#Calculating the result
movies_hcount_lrating = moviesNratings_rating_counts_with_avgRatings.orderBy(['count', 'avg_rating'], ascending=[False, True])

#writing to file
movies_hcount_lrating.limit(k).write.csv(output_csv_path, header=True, mode="append")
movies_hcount_lrating.show(k)

+-------+--------------------+-----+----------+
|MovieID|              Titles|count|avg_rating|
+-------+--------------------+-----+----------+
|     50|    Star Wars (1977)|  583|      4.36|
|    258|      Contact (1997)|  509|       3.8|
|    100|        Fargo (1996)|  508|      4.16|
|    181|Return of the Jed...|  507|      4.01|
|    294|    Liar Liar (1997)|  485|      3.16|
|    286|English Patient, ...|  481|      3.66|
|    288|       Scream (1996)|  478|      3.44|
|      1|    Toy Story (1995)|  452|      3.88|
|    300|Air Force One (1997)|  431|      3.63|
|    121|Independence Day ...|  429|      3.44|
+-------+--------------------+-----+----------+
only showing top 10 rows



# Movies, Ratings, Users

In [20]:
#INNER JOIN movies_df & ratings_df & users_df

moviesNratingsNusers = moviesNratings.join(users_df,moviesNratings.UserID == users_df.UserID,'inner').select(
        moviesNratings.MovieID,moviesNratings.Titles,moviesNratings.Rating, 
        moviesNratings.Rating_Timestamp,moviesNratings.UserID,users_df.age,users_df.gender,users_df.occupation)
moviesNratingsNusers.sort(col("UserID")).show(3)

+-------+--------------------+------+----------------+------+---+------+----------+
|MovieID|              Titles|Rating|Rating_Timestamp|UserID|age|gender|occupation|
+-------+--------------------+------+----------------+------+---+------+----------+
|    160|Glengarry Glen Ro...|     4|       875072547|     1| 24|     M|technician|
|    189|Grand Day Out, A ...|     3|       888732928|     1| 24|     M|technician|
|     61|Three Colors: Whi...|     4|       878542420|     1| 24|     M|technician|
+-------+--------------------+------+----------------+------+---+------+----------+
only showing top 3 rows



<a id="section_5"></a>
## k most popular movies for a particular age
### Finding most rated 10 movies by a particular age 25 yrs

In [21]:
target_age = 25
top_10_pop_movies_by_age = moviesNratingsNusers.select("MovieID","Titles","Rating","UserID","age")\
                                       .where(col("age") == target_age)
top_10_pop_movies_by_age.show(3)

+-------+--------------------+------+------+---+
|MovieID|              Titles|Rating|UserID|age|
+-------+--------------------+------+------+---+
|    377| Heavyweights (1994)|     1|    22| 25|
|    241|Last of the Mohic...|     5|   249| 25|
|     25|Birdcage, The (1996)|     4|   162| 25|
+-------+--------------------+------+------+---+
only showing top 3 rows



In [22]:
top_10_pop_movies_by_age = top_10_pop_movies_by_age.select("MovieID","Titles","Rating","age")\
                               .groupBy("MovieID","Titles").avg("Rating").orderBy("avg(Rating)",ascending=False)

#writing to file
movies_hcount_lrating.limit(k).write.csv(output_csv_path, header=True, mode="append")
top_10_pop_movies_by_age.show(k)

+-------+--------------------+-----------+
|MovieID|              Titles|avg(Rating)|
+-------+--------------------+-----------+
|   1368|Mina Tannenbaum (...|        5.0|
|    285|Secrets & Lies (1...|        5.0|
|    517|    Manhattan (1979)|        5.0|
|    657|Manchurian Candid...|        5.0|
|    251|Shall We Dance? (...|        5.0|
|    207|Cyrano de Bergera...|        5.0|
|    365|       Powder (1995)|        5.0|
|    521|Deer Hunter, The ...|        5.0|
|   1154|   Alphaville (1965)|        5.0|
|    497|Bringing Up Baby ...|        5.0|
+-------+--------------------+-----------+
only showing top 10 rows



<a id="section_6"></a>
## For a particular genre finding k-most rated movies
### selecting genre = Drama, k = 10

In [23]:
target_genre = 'Drama'
top_10_pop_movies_by_genre = moviesNratings.select("MovieID","Titles","Rating","Drama")\
                                       .where(col(target_genre) == 1)
top_10_pop_movies_by_genre.show(3)

+-------+--------------------+------+-----+
|MovieID|              Titles|Rating|Drama|
+-------+--------------------+------+-----+
|     51|Legends of the Fa...|     2|    1|
|    346| Jackie Brown (1997)|     1|    1|
|     86|Remains of the Da...|     3|    1|
+-------+--------------------+------+-----+
only showing top 3 rows



In [24]:
top_10_pop_movies_by_genre = top_10_pop_movies_by_genre.select("MovieID","Titles","Rating")\
                               .groupBy("MovieID","Titles").avg("Rating").orderBy("avg(Rating)",ascending=False)

#writing to file
top_10_pop_movies_by_genre.limit(k).write.csv(output_csv_path, header=True, mode="append")
top_10_pop_movies_by_genre.show(k)

+-------+--------------------+-----------------+
|MovieID|              Titles|      avg(Rating)|
+-------+--------------------+-----------------+
|   1536|Aiqing wansui (1994)|              5.0|
|   1599|Someone Else's Am...|              5.0|
|   1653|Entertaining Ange...|              5.0|
|   1467|Saint of Fort Was...|              5.0|
|   1189|  Prefontaine (1997)|              5.0|
|   1122|They Made Me a Cr...|              5.0|
|   1449|Pather Panchali (...|            4.625|
|   1642|Some Mother's Son...|              4.5|
|   1398|         Anna (1996)|              4.5|
|    318|Schindler's List ...|4.466442953020135|
+-------+--------------------+-----------------+
only showing top 10 rows



<a id="section_7"></a>
## Finding the average rating by age group

In [25]:
#moviesNratingsNusers
avg_rating_by_age_group = moviesNratingsNusers.select("MovieID","Titles","Rating","UserID","age")\
                               .groupBy("age").avg("Rating").orderBy("avg(Rating)",ascending=False)
avg_rating_by_age_group = avg_rating_by_age_group.select("age",round("avg(Rating)",2).alias("avg_rating"))

#writing to file
avg_rating_by_age_group.limit(k).write.csv(output_csv_path, header=True, mode="append")
avg_rating_by_age_group.show(k)

+---+----------+
|age|avg_rating|
+---+----------+
| 58|      3.99|
| 73|      3.98|
| 59|      3.84|
| 53|      3.83|
| 61|      3.79|
|  7|      3.77|
| 46|      3.77|
| 51|      3.73|
| 45|      3.73|
| 63|      3.73|
+---+----------+
only showing top 10 rows



In [26]:
#stopping spark
spark.stop()