In [2]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Ign:3 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:4 https://r2u.stat.illinois.edu/ubuntu jammy Release
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [921 kB]
Hit:7 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:8 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Hit:13 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:14 http://security.ubuntu.com/ubuntu jammy-security/main amd

## Import Libriries

In [3]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, explode, split, rank
from pyspark.sql.window import Window

## Import the csv files to DataFrame and drop duplicates

In [5]:
directory = "/content/drive/MyDrive/Movable_Ink_ml20"

spark_dataframes = {}

for filename in os.listdir(directory):
  if filename.endswith(".csv"):
    file_path = os.path.join(directory, filename)
    df_name = filename[:-4].replace('-', '_')
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    # In case there is duplication, drop duplicates
    df = df.dropDuplicates()
    spark_dataframes[df_name] = df

print(spark_dataframes)

{'tags': DataFrame[userId: int, movieId: int, tag: string, timestamp: int], 'ratings': DataFrame[userId: int, movieId: int, rating: double, timestamp: int], 'movies': DataFrame[movieId: int, title: string, genres: string], 'genome_scores': DataFrame[movieId: int, tagId: int, relevance: double], 'genome_tags': DataFrame[tagId: int, tag: string], 'links': DataFrame[movieId: int, imdbId: int, tmdbId: int]}


## Drop null, Crate Temp View, and data checking

In [6]:
for df_name, dataframe in spark_dataframes.items():
  # drop null values
  dataframe = dataframe.dropna()
  # create temp views for later sql query
  dataframe.createOrReplaceTempView(df_name)
  print(f"Schema of {df_name}:")
  dataframe.printSchema()

  print(f"Sample data from the {df_name} DataFrame:")
  spark.sql(f"SELECT * FROM {df_name} LIMIT 5").show()

Schema of tags:
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)

Sample data from the tags DataFrame:
+------+-------+-------------------+----------+
|userId|movieId|                tag| timestamp|
+------+-------+-------------------+----------+
|   342|  55908|  entirely dialogue|1328035276|
|   342| 112556|Neal Patrick Harris|1422733874|
|   348|  59549|            netflix|1244560720|
|   359|  67197|     shame on them!|1238921785|
|   370|   5303|             change|1255283711|
+------+-------+-------------------+----------+

Schema of ratings:
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

Sample data from the ratings DataFrame:
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     2|   2951|   4.0|

## Question A: The most common tag for a movie title

In [7]:
movies_tags_join = spark_dataframes['movies'].join(spark_dataframes['tags'], on="movieId", how='inner') \
                           .groupBy("title", "tag") \
                           .agg(count("tag") \
                           .alias("tag_count")) \
                           .orderBy(col("tag_count").desc()) \
                           .limit(1)
movies_tags_join.show()

+-------------------+-----------------+---------+
|              title|              tag|tag_count|
+-------------------+-----------------+---------+
|Pulp Fiction (1994)|Quentin Tarantino|      185|
+-------------------+-----------------+---------+



In [8]:
df_try = spark.sql("select movies.title, tags.tag, count(tags.tag) as tag_count \
           from movies \
           join tags \
           on movies.movieId = tags.movieId \
           group by movies.title, tags.tag \
           order by tag_count desc \
           limit 1")
df_try.show()

+-------------------+-----------------+---------+
|              title|              tag|tag_count|
+-------------------+-----------------+---------+
|Pulp Fiction (1994)|Quentin Tarantino|      185|
+-------------------+-----------------+---------+



## Question B: The most common genre rated by a user

In [9]:
spark_dataframes['movies'].show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|    109|Headless Body in ...|Comedy|Drama|Thri...|
|    381|When a Man Loves ...|       Drama|Romance|
|    681|Coup de torchon (...|               Crime|
|    745|Wallace & Gromit:...|Animation|Childre...|
|   1161|Tin Drum, The (Bl...|           Drama|War|
|   1381|     Grease 2 (1982)|Comedy|Musical|Ro...|
|   1410|Evening Star, The...|        Comedy|Drama|
|   1678|Joy Luck Club, Th...|       Drama|Romance|
|   2230|Always Tell Your ...|              Comedy|
|   2344|Runaway Train (1985)|Action|Adventure|...|
|   2519|House on Haunted ...|Drama|Horror|Thri...|
|   2904|         Rain (1932)|               Drama|
|   3342|        Birdy (1984)|           Drama|War|
|   3417|Crimson Pirate, T...|    Adventure|Comedy|
|   3461|Lord of the Flies...|Adventure|Drama|T...|
|   3537|Where the Money I...|        Comedy|Drama|
|   3983|You

In [10]:
spark_dataframes['ratings'].show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     2|   2951|   4.0| 974820598|
|     3|    905|   3.0| 944917494|
|     7|   2694|   3.0|1011209036|
|     7|   3528|   2.0|1011208366|
|     9|   1997|   5.0| 994020231|
|    11|   7458|   4.5|1230920139|
|    11|  48516|   4.5|1294795982|
|    14|    468|   3.5|1225320550|
|    18|   7034|   4.0|1196423419|
|    19|     74|   4.0| 855176849|
|    20|   5952|   3.5|1126539610|
|    21|   2159|   3.0| 992190071|
|    21|   3424|   3.0| 992189504|
|    22|     16|   5.0| 994638228|
|    22|   2403|   4.0| 994638387|
|    23|   1722|   4.0| 914458237|
|    24|    555|   4.0| 994232682|
|    24|   1073|   2.0|1000098839|
|    25|    253|   3.5|1277963607|
|    29|    173|   4.0| 835562042|
+------+-------+------+----------+
only showing top 20 rows



## movies join ratings on movieId, select "userId", "movieId", "genres"

In [11]:
movies_ratings_join = spark_dataframes['movies'].join(spark_dataframes['ratings'], on="movieId", how="inner")
movies_ratings_before_split = movies_ratings_join.select("userId", "movieId", "genres")
print(type(movies_ratings_before_split))
movies_ratings_before_split.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+------+-------+--------------------+
|userId|movieId|              genres|
+------+-------+--------------------+
|     2|   2951|      Action|Western|
|     3|    905|      Comedy|Romance|
|     7|   2694|              Comedy|
|     7|   3528|       Drama|Romance|
|     9|   1997|      Horror|Mystery|
|    11|   7458|Action|Adventure|...|
|    11|  48516|Crime|Drama|Thriller|
|    14|    468|      Comedy|Romance|
|    18|   7034|       Drama|Romance|
|    19|     74|       Drama|Romance|
|    20|   5952|   Adventure|Fantasy|
|    21|   2159|Crime|Horror|Thri...|
|    21|   3424|               Drama|
|    22|     16|         Crime|Drama|
|    22|   2403|Action|Adventure|...|
|    23|   1722|Action|Adventure|...|
|    24|    555|      Crime|Thriller|
|    24|   1073|Children|Comedy|F...|
|    25|    253|        Drama|Horror|
|    29|    173| Action|Crime|Sci-Fi|
+------+-------+--------------------+
only showing top 20 rows



## Split the genres column, Explode the genres Column to create individual rows for each genre associated with a movie.

In [12]:
movies_ratings_after_split = movies_ratings_before_split.withColumn("genre", explode(split(col("genres"), "\|")))
movies_ratings_after_split.show()

+------+-------+--------------------+---------+
|userId|movieId|              genres|    genre|
+------+-------+--------------------+---------+
|     2|   2951|      Action|Western|   Action|
|     2|   2951|      Action|Western|  Western|
|     3|    905|      Comedy|Romance|   Comedy|
|     3|    905|      Comedy|Romance|  Romance|
|     7|   2694|              Comedy|   Comedy|
|     7|   3528|       Drama|Romance|    Drama|
|     7|   3528|       Drama|Romance|  Romance|
|     9|   1997|      Horror|Mystery|   Horror|
|     9|   1997|      Horror|Mystery|  Mystery|
|    11|   7458|Action|Adventure|...|   Action|
|    11|   7458|Action|Adventure|...|Adventure|
|    11|   7458|Action|Adventure|...|    Drama|
|    11|   7458|Action|Adventure|...|      War|
|    11|  48516|Crime|Drama|Thriller|    Crime|
|    11|  48516|Crime|Drama|Thriller|    Drama|
|    11|  48516|Crime|Drama|Thriller| Thriller|
|    14|    468|      Comedy|Romance|   Comedy|
|    14|    468|      Comedy|Romance|  R

In [13]:
genre_counts = movies_ratings_after_split.groupBy("userId", "genre").count()
genre_counts.createOrReplaceTempView("genre_counts")
genre_counts.show()

+------+---------+-----+
|userId|    genre|count|
+------+---------+-----+
|    23|Adventure|    8|
|    56|   Comedy|   56|
|   381|   Sci-Fi|    4|
|   628|   Action|   71|
|   717|    Drama|  350|
|   793|   Comedy|   43|
|  1011|  Romance|  189|
|  1023|   Sci-Fi|   17|
|  1073| Thriller|   78|
|  1207| Thriller|  159|
|  1246|    Drama|   36|
|  1280|   Action|   48|
|  1388| Thriller|    6|
|  1513|   Sci-Fi|    6|
|  1722|    Drama|   48|
|  1775|  Romance|   44|
|  2068|    Crime|   47|
|  2254| Children|    8|
|  2330|   Action|  140|
|  2640|    Crime|   21|
+------+---------+-----+
only showing top 20 rows



## Applying the window function, filter for the top genre per user, and show the result

In [14]:
windowSpec = Window.partitionBy("userId").orderBy(col("count").desc())
ranked_genres = genre_counts.withColumn("rank", rank().over(windowSpec))
most_common_genre = ranked_genres.filter(col("rank") == 1).select("userId", "genre", "count")
most_common_genre.show()

+------+--------+-----+
|userId|   genre|count|
+------+--------+-----+
|   148| Romance|  105|
|   463|   Drama|   45|
|   471|   Drama|  300|
|   496|  Comedy|  110|
|   833|  Action|   20|
|  1088|Thriller|   30|
|  1238|   Drama|   46|
|  1238|Thriller|   46|
|  1342|  Action|   15|
|  1580|  Comedy|   19|
|  1591|  Comedy|   20|
|  1645|   Drama|   47|
|  1829|   Drama|  120|
|  1959|  Action|  119|
|  2122|   Drama|   59|
|  2142|   Drama|   21|
|  2366|Thriller|   23|
|  2659|   Drama|   57|
|  2866|   Drama|  469|
|  3175|  Comedy|   12|
+------+--------+-----+
only showing top 20 rows



In [15]:
table = spark.sql("""
    WITH CTE AS (
        SELECT userId, genre, count,
            RANK() OVER (PARTITION BY userId ORDER BY count DESC) AS count_rank
        FROM genre_counts
    )
    SELECT userId, genre, count
    FROM CTE
    WHERE count_rank = 1
""")
table.show()

+------+--------+-----+
|userId|   genre|count|
+------+--------+-----+
|   148| Romance|  105|
|   463|   Drama|   45|
|   471|   Drama|  300|
|   496|  Comedy|  110|
|   833|  Action|   20|
|  1088|Thriller|   30|
|  1238|   Drama|   46|
|  1238|Thriller|   46|
|  1342|  Action|   15|
|  1580|  Comedy|   19|
|  1591|  Comedy|   20|
|  1645|   Drama|   47|
|  1829|   Drama|  120|
|  1959|  Action|  119|
|  2122|   Drama|   59|
|  2142|   Drama|   21|
|  2366|Thriller|   23|
|  2659|   Drama|   57|
|  2866|   Drama|  469|
|  3175|  Comedy|   12|
+------+--------+-----+
only showing top 20 rows



## Question 3:

Most Similar Genre

How do we calculate similarity
For exmpale if we are trying to figure out the most similar genre for genre "Children", then we find out there are 10 movies have both
genres "Children" and "Animation" labeled, and there are 5 movies have both gernes "Children" and "Adventure" labeled.
So, "Animation" will be considered more similar to "Children" than "Adventure".

In [16]:
from pyspark.sql.functions import count

movies_split = spark_dataframes['movies'].withColumn('genre', explode(split("genres", "\|")))
movies_split.createOrReplaceTempView("movies_split")
spark.sql("""
          WITH CTE as (
             select m1.genre as m1_genre, m2.genre as m2_genre, COUNT(*) as count_genre
             from movies_split m1
             join movies_split m2
             on m1.movieId = m2.movieId
             where m1.genre < m2.genre
             group by m1.genre, m2.genre
             order by count_genre desc
         )

         select *
         from (
            select m1_genre, m2_genre, count_genre,
            RANK() OVER (PARTITION BY m1_genre ORDER BY count_genre DESC) AS rank_genre
            FROM CTE
         ) ranked_genres
         WHERE rank_genre = 1


""").show()

+-----------+--------+-----------+----------+
|   m1_genre|m2_genre|count_genre|rank_genre|
+-----------+--------+-----------+----------+
|      Crime|   Drama|       1712|         1|
|    Romance|Thriller|        262|         1|
|   Thriller|     War|        100|         1|
|  Adventure|   Drama|        690|         1|
|      Drama| Romance|       2573|         1|
|        War| Western|         27|         1|
|Documentary|   Drama|        121|         1|
|    Fantasy| Romance|        247|         1|
|    Mystery|Thriller|        795|         1|
|    Musical| Romance|        324|         1|
|  Animation|Children|        470|         1|
|  Film-Noir|Thriller|        124|         1|
|       IMAX|  Sci-Fi|         63|         1|
|     Horror|Thriller|       1073|         1|
|     Comedy|   Drama|       2546|         1|
|   Children|  Comedy|        526|         1|
|     Action|   Drama|       1203|         1|
|     Sci-Fi|Thriller|        459|         1|
+-----------+--------+-----------+

In [18]:
 spark_dataframes['movies'].explain("extended")

== Parsed Logical Plan ==
Deduplicate [movieId#64, title#65, genres#66]
+- Relation[movieId#64,title#65,genres#66] csv

== Analyzed Logical Plan ==
movieId: int, title: string, genres: string
Deduplicate [movieId#64, title#65, genres#66]
+- Relation[movieId#64,title#65,genres#66] csv

== Optimized Logical Plan ==
Aggregate [movieId#64, title#65, genres#66], [movieId#64, title#65, genres#66]
+- Relation[movieId#64,title#65,genres#66] csv

== Physical Plan ==
*(2) HashAggregate(keys=[movieId#64, title#65, genres#66], functions=[], output=[movieId#64, title#65, genres#66])
+- Exchange hashpartitioning(movieId#64, title#65, genres#66, 200), ENSURE_REQUIREMENTS, [id=#1666]
   +- *(1) HashAggregate(keys=[movieId#64, title#65, genres#66], functions=[], output=[movieId#64, title#65, genres#66])
      +- FileScan csv [movieId#64,title#65,genres#66] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/Movable_Ink_ml20/movies.csv], PartitionFilters