In [1]:
import os
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, count, desc, row_number, col, length
from pyspark.sql.window import Window
from tabulate import tabulate

In [2]:
pyspark.__version__

'3.3.2'

In [3]:
spark = (
    SparkSession
    .builder
    .appName("MovieLens Analysis")
    .master("local[*]")
    .config("spark.sql.adaptive.enabled", "true") 
    .config("spark.sql.shuffle.partitions", "6") 
    .getOrCreate()
)

In [4]:
conf = spark.sparkContext.getConf()
num_cores = conf.get("spark.executor.instances")
print("Number of Spark cores:", num_cores)

Number of Spark cores: None


In [5]:
# Read ratings.csv, tags.csv, and movies.csv
ratings_df = spark.read.csv("../input_data/ratings.csv", header=True)
tags_df = spark.read.csv("../input_data/tags.csv", header=True)
movies_df = spark.read.csv("../input_data/movies.csv", header=True)

In [11]:
movies_df.count()

27278

In [10]:
tags_df.count()

465564

In [16]:
ratings_df.select('userID').distinct().count()

138493

In [7]:
# Function to print null value counts for a DataFrame
def print_null_info(df, name):
    null_info = []
    for col_name in df.columns:
        null_count = df.where(col(col_name).isNull()).count()
        null_info.append((col_name, null_count))
    
    print(f"Null values in {name} DataFrame:")
    print(tabulate(null_info, headers=["Column", "Null Count"], tablefmt="pretty"))

In [8]:
print_null_info(ratings_df, "ratings")

Null values in ratings DataFrame:
+-----------+------------+
|  Column   | Null Count |
+-----------+------------+
|  userId   |     0      |
|  movieId  |     0      |
|  rating   |     0      |
| timestamp |     0      |
+-----------+------------+


In [61]:
print_null_info(tags_df, "tags")

Null values in tags DataFrame:
+-----------+------------+
|  Column   | Null Count |
+-----------+------------+
|  userId   |     0      |
|  movieId  |     0      |
|    tag    |     0      |
| timestamp |     0      |
+-----------+------------+


In [62]:
print_null_info(movies_df, "movies")

Null values in movies DataFrame:
+---------+------------+
| Column  | Null Count |
+---------+------------+
| movieId |     0      |
|  title  |     0      |
| genres  |     0      |
+---------+------------+


In [9]:
ratings_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
+------+-------+------+----------+
only showing top 5 rows



In [7]:
tags_df.show(5)

+------+-------+-------------+----------+
|userId|movieId|          tag| timestamp|
+------+-------+-------------+----------+
|    18|   4141|  Mark Waters|1240597180|
|    65|    208|    dark hero|1368150078|
|    65|    353|    dark hero|1368150079|
|    65|    521|noir thriller|1368149983|
|    65|    592|    dark hero|1368150078|
+------+-------+-------------+----------+
only showing top 5 rows



In [8]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [10]:
def most_common_tag_for_movie_title(tags_df, movies_df):
    """
    Function finds the most common tag for a movie title.

    Args:
    - tags_df: DataFrame containing tags data
    - movies_df: DataFrame containing movies data

    Returns:
    - DataFrame containing the most common tag for the movie title
    """
    movie_tag_df = movies_df.join(tags_df, "movieId", "left")
    most_common_tag_df = (
        movie_tag_df
        .groupBy("title", "tag")
        .agg(count("*").alias("tag_count"))
        .orderBy(desc("tag_count"))
    )

    # Get the row with the highest tag count for each movie
    # Title Memento (2000) had multiple tags (nonlinear, twist ending). Handle it
    window_spec = Window.partitionBy("title").orderBy(desc("tag_count"))
    most_common_tag_df = (
        most_common_tag_df
        .withColumn("rank", row_number().over(window_spec))
        .filter(col("rank") == 1).drop("rank")
        .orderBy(desc('tag_count'))
    )
    
    return most_common_tag_df

In [11]:
most_common_tag_df = most_common_tag_for_movie_title(tags_df, movies_df)

In [12]:
most_common_tag_df.show(10, truncate=False)

+--------------------------------------------+-----------------+---------+
|title                                       |tag              |tag_count|
+--------------------------------------------+-----------------+---------+
|Pulp Fiction (1994)                         |Quentin Tarantino|185      |
|Fight Club (1999)                           |twist ending     |150      |
|Memento (2000)                              |nonlinear        |145      |
|Usual Suspects, The (1995)                  |twist ending     |139      |
|Inception (2010)                            |alternate reality|128      |
|Eternal Sunshine of the Spotless Mind (2004)|surreal          |127      |
|Matrix, The (1999)                          |sci-fi           |120      |
|Silence of the Lambs, The (1991)            |serial killer    |113      |
|Twelve Monkeys (a.k.a. 12 Monkeys) (1995)   |time travel      |109      |
|Inglourious Basterds (2009)                 |Quentin Tarantino|108      |
+------------------------

In [6]:
def most_common_genre_rated_by_user(ratings_df, movies_df):
    """
    Function finds the most common genre rated by a user.

    Args:
    - ratings_df: DataFrame containing ratings data
    - movies_df: DataFrame containing movies data

    Returns:
    - DataFrame containing the most common genre rated by a user
    """
    user_rating_df = ratings_df.join(movies_df, "movieId", "left")
    most_common_genre_df = (
        user_rating_df
        .withColumn("genre", explode(split("genres", "\\|")))
        .groupBy("userId", "genre")
        .agg(count("*").alias("genre_count"))
        .orderBy(desc("genre_count"))
    )
    # Get the row with the most common genre for each user
    # userId 104 has multiple (19). Handle it
    window_spec = Window.partitionBy("userId").orderBy(desc("genre_count"))
    most_common_genre_df = (
        most_common_genre_df
        .withColumn("rank", row_number().over(window_spec))
        .filter(col("rank") == 1).drop("rank")
        .orderBy(desc('genre_count'))
    )
    return most_common_genre_df

In [7]:
most_common_genre_df = most_common_genre_rated_by_user(ratings_df, movies_df)

In [8]:
most_common_genre_df.write.csv('../output_data/most_common_genre_rated_by_user.csv', header=True, mode="overwrite")

Py4JJavaError: An error occurred while calling o124.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:851)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$26(FileFormatWriter.scala:277)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:277)
	... 42 more


In [72]:
most_common_genre_df.show(20, truncate=False)



+------+------+-----------+
|userId|genre |genre_count|
+------+------+-----------+
|118205|Drama |4857       |
|8405  |Drama |3684       |
|8963  |Drama |3240       |
|121535|Drama |3228       |
|82418 |Drama |3042       |
|130767|Drama |3003       |
|125794|Drama |2938       |
|131904|Drama |2907       |
|15617 |Drama |2693       |
|63147 |Drama |2661       |
|79159 |Drama |2501       |
|83090 |Drama |2421       |
|20132 |Drama |2351       |
|74142 |Comedy|2319       |
|125978|Drama |2308       |
|111549|Drama |2238       |
|105580|Drama |2193       |
|88820 |Drama |2190       |
|92011 |Drama |2117       |
|68026 |Drama |2073       |
+------+------+-----------+
only showing top 20 rows



                                                                                

### Done