Importing the required libraries

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType, TimestampType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


ModuleNotFoundError: No module named 'pyspark'

So this part is simply the setting up or starting Spark session.
create builder object to initialize
sparksession.
appname sets the name of spark application that is shows on spark UI or logs.
master defines were spark runs(like local[*] means runs locally on computer with available cpu all cores).
spark sets configurations and "spark.sql.shuffle.partitions" defines the number of partitions used by spark when shuffling data for sql or aggregation operations.
getorcreate start the spark session if none exists otherwise return the existing one.





In [None]:
def create_spark(app_name="MovieLensAnalytics"):
  spark = SparkSession.builder\
      .appName(app_name)\
      .master("local[*]")\
      .config("spark.sql.shuffle.partitions","4")\
      .getOrCreate()
  return spark
spark = create_spark()

This section just uses the spark from previous function and then read the rationgs and movies files from data folder and simply for testing showing first five entities of both files.
"inferSchema" is used to autmatically detects the columns type.


In [None]:
def load_data(spark, ratings_path,movies_path):
  ratings = spark.read.csv(ratings_path, header=True, inferSchema=True)
  movies = spark.read.csv(movies_path, header=True, inferSchema=True)
  return ratings, movies

ratings, movies = load_data(spark, "data/ratings.csv", "data/movies.csv")
ratings.show(5)
movies.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



Preprocess the data before using for analysis or machine learning.
simply convert the columns datatype and also change the time into human readable format.

In [None]:
def preprocess(ratings,movies):
  r=ratings\
    .withColumn("userId", F.col("userId").cast(IntegerType()))\
    .withColumn("movieId", F.col("movieId").cast(IntegerType()))\
    .withColumn("rating", F.col("rating").cast(FloatType()))\
    .withColumn("timestamp", F.from_unixtime(F.col("timestamp")).cast(TimestampType()))

  m=movies.withColumn("movieId", F.col("movieId").cast(IntegerType()))
  r=r.na.drop(subset=["userId","movieId","rating"])
  return r,m

ratings,movies=preprocess(ratings,movies)
ratings.printSchema()
ratings.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
+------+-------+------+-------------------+
only showing top 5 rows



Top Rated Movies:


*   Calculate Avg Rating per movie
*   Cal Num of ratings per movie


*   Then apply filters like min rating and num rating.






In [None]:
from os import truncate
def top_rated(movies,ratings,min_ratings=50,top_values=10):
  agg = ratings.groupBy("movieId")\
  .agg(F.avg("rating").alias("avg_rating"),
       F.count("rating").alias("num_ratings"))
  filtered=agg.filter(F.col("num_ratings")>=min_ratings)
  joined = filtered.join(movies, on="movieId",how="left")\
      .select("title","avg_rating","num_ratings")\
      .orderBy(F.desc("avg_rating"),F.desc("num_ratings"))
  return joined.limit(top_values)

top_movies_df=top_rated(movies,ratings)
top_movies_df.show(truncate=False)


+---------------------------------------------------------------------------+-----------------+-----------+
|title                                                                      |avg_rating       |num_ratings|
+---------------------------------------------------------------------------+-----------------+-----------+
|Shawshank Redemption, The (1994)                                           |4.429022082018927|317        |
|Godfather, The (1972)                                                      |4.2890625        |192        |
|Fight Club (1999)                                                          |4.272935779816514|218        |
|Cool Hand Luke (1967)                                                      |4.271929824561403|57         |
|Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)|4.268041237113402|97         |
|Rear Window (1954)                                                         |4.261904761904762|84         |
|Godfather: Part II, The (19

A sql querry that gives us the most active or mostly give his/her feedback to the movielens

In [None]:
def MostActiveUsers(ratings,top_n=10):
  return ratings.groupBy("userId") \
      .count()\
      .orderBy(F.desc("count"))\
      .limit(top_n)

MostActiveUsers(ratings).show()


+------+-----+
|userId|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
+------+-----+



Getting most popular movies on the base of most getting ratings each movie and then order them in descending format so getting the top 10 highest entities.

In [None]:
def popular_movies(ratings,movies,top_value=10):
  pop=ratings.groupBy("movieId").count()
  joined = pop.join(movies,"movieId")\
              .select("title","count")\
              .orderBy(F.desc("count"))
  return joined.limit(top_value)

popular_movies(ratings,movies).show(truncate=False)

+-----------------------------------------+-----+
|title                                    |count|
+-----------------------------------------+-----+
|Forrest Gump (1994)                      |329  |
|Shawshank Redemption, The (1994)         |317  |
|Pulp Fiction (1994)                      |307  |
|Silence of the Lambs, The (1991)         |279  |
|Matrix, The (1999)                       |278  |
|Star Wars: Episode IV - A New Hope (1977)|251  |
|Jurassic Park (1993)                     |238  |
|Braveheart (1995)                        |237  |
|Terminator 2: Judgment Day (1991)        |224  |
|Schindler's List (1993)                  |220  |
+-----------------------------------------+-----+



In this section we are just dividing the genres of the movies into individual one and then just count the ratings that each genres gets and display the most higly rated genres

In [None]:
def popular_genres(ratings, movies, top_n=10):
    movies_with_genre = movies.withColumn("genre", F.explode(F.split(F.col("genres"), "\\|")))
    ratings_with_genre = ratings.join(movies_with_genre.select("movieId","genre"), "movieId")
    return ratings_with_genre.groupBy("genre").count().orderBy(F.desc("count")).limit(top_n)

popular_genres(ratings, movies).show(truncate=False)


+---------+-----+
|genre    |count|
+---------+-----+
|Drama    |41928|
|Comedy   |39053|
|Action   |30635|
|Thriller |26452|
|Adventure|24161|
|Romance  |18124|
|Sci-Fi   |17243|
|Crime    |16681|
|Fantasy  |11834|
|Children |9208 |
+---------+-----+



In [None]:
def time_series_analysis(ratings):
    return ratings.withColumn("year_month", F.date_format("timestamp", "yyyy-MM")) \
                  .groupBy("year_month").count().orderBy("year_month")

time_series_analysis(ratings).show(10, truncate=False)


+----------+-----+
|year_month|count|
+----------+-----+
|1996-03   |58   |
|1996-04   |165  |
|1996-05   |832  |
|1996-06   |883  |
|1996-07   |489  |
|1996-08   |1010 |
|1996-09   |384  |
|1996-10   |935  |
|1996-11   |978  |
|1996-12   |306  |
+----------+-----+
only showing top 10 rows

