# Movie recomender system using PySpark
under construction...

In [1]:
import pandas as pd

# spark libs.
from pyspark.context import SparkContext
from pyspark.sql import Row
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import lower, col, when, regexp_replace

# spark datatypes
from pyspark.sql.types import *

# sparm MLlib
from pyspark.ml.recommendation import ALS

# check Spark UI from 
# http://localhost:4040

### Creating spark session and setting up number of partitions.

In [2]:
# creating spark local machine session.
sc = SparkContext('local')
spark = SparkSession(sc)

# setup 5 partitions since data is not "big-data", and is running locally.
spark.conf.set("spark.sql.shuffle.partitions", "5")

### Reading movie data from CSV files into Spark DataFrames.
We could let Spark '<b>infer</b>' the schema but in this case we do the <b>explicit</b> datyping as good practice.

In [3]:
# define schemas (DataFrame's Metadata) to load CSV into Spark DataFrame

# USER SCHEMA
# user id | age | gender | occupation | zip code
user_schema =  StructType([
                    StructField("user_id", ShortType(), False), # False = not null
                    StructField("age", ByteType(), True),
                    StructField("gender", StringType(), True),
                    StructField("occupation", StringType(), True),
                    StructField("zip", StringType(), True),
                ])

# GENRE SCHEMA
# genre | genre_id
genre_schema = StructType([
                    StructField("genre", StringType(), True),
                    StructField("genre_id", ShortType(), False), # False = not null
                ])

# MOVIE SCHEMA 
# movie id | movie title | release date | video release date | IMDb URL | unknown | Action | 
# Adventure | Animation | Children's | Comedy | Crime | Documentary | Drama | Fantasy |
# Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi | Thriller | War | Western |
movie_schema = StructType([
                    StructField("movie_id", ShortType(), False), # False = not null
                    StructField("title", StringType(), True),
                    StructField("release_date", StringType(), True), # format: 01-Jan-1995
                    StructField("video_date", StringType(), True), # format: 01-Jan-1995
                    StructField("url", StringType(), True),
                    StructField("unknown", ByteType(), True),
                    StructField("action", ByteType(), True),
                    StructField("adventure", ByteType(), True),
                    StructField("animation", ByteType(), True),
                    StructField("children", ByteType(), True),
                    StructField("comedy", ByteType(), True),
                    StructField("crime", ByteType(), True),
                    StructField("documentary", ByteType(), True),
                    StructField("drama", ByteType(), True),
                    StructField("fantasy", ByteType(), True),
                    StructField("film_noir", ByteType(), True),
                    StructField("horror", ByteType(), True),
                    StructField("musical", ByteType(), True),
                    StructField("mystery", ByteType(), True),
                    StructField("romance", ByteType(), True),
                    StructField("sci_fi", ByteType(), True),
                    StructField("thriller", ByteType(), True),
                    StructField("war", ByteType(), True),
                    StructField("western", ByteType(), True),
            ])

# USER-MOVIE RATINGS SCHEMA
# user id | item id | rating | timestamp
rating_schema = StructType([
                    StructField("user_id", ShortType(), False), # False = not null
                    StructField("movie_id", ShortType(), False),
                    StructField("rating", ShortType(), False),
                    StructField("timestamp", StringType(), True),
                    #StructField("timestamp", TimestampType(), True), 
                ])


In [4]:
def read_csv(path, name, schema, delimiter=','):
    """
    Takes in the path, name and schema of the CSV
    Returns a Spark DataFrame.
    """
    
    fullpath = path + name

    # read CSV using the provided schema.
    dataframe = spark.read.format("csv")\
        .schema(schema)\
        .option("header", "false")\
        .option("delimiter", delimiter)\
        .option("mode", "FAILFAST")\
        .load(fullpath)

    return dataframe

# load CSVs into Spark.
user_df = read_csv( path='ml-100k/', name='u.user', schema=user_schema, delimiter='|' )
genre_df = read_csv( path='ml-100k/', name='u.genre', schema=genre_schema, delimiter='|' )
movie_df = read_csv( path='ml-100k/', name='u.item', schema=movie_schema, delimiter='|' )
rating_df = read_csv( path='ml-100k/', name='u.data', schema=rating_schema, delimiter='\t' )

# create Spark tables to enable use of SQL.
user_df.createOrReplaceTempView('user_df')
genre_df.createOrReplaceTempView('genre_df')
movie_df.createOrReplaceTempView('movie_df')
rating_df.createOrReplaceTempView('rating_df')

In [5]:
# checking movie DataFrame datatypes.
movie_df.printSchema()
print(f'Total Movies:{movie_df.count()}')

root
 |-- movie_id: short (nullable = true)
 |-- title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- video_date: string (nullable = true)
 |-- url: string (nullable = true)
 |-- unknown: byte (nullable = true)
 |-- action: byte (nullable = true)
 |-- adventure: byte (nullable = true)
 |-- animation: byte (nullable = true)
 |-- children: byte (nullable = true)
 |-- comedy: byte (nullable = true)
 |-- crime: byte (nullable = true)
 |-- documentary: byte (nullable = true)
 |-- drama: byte (nullable = true)
 |-- fantasy: byte (nullable = true)
 |-- film_noir: byte (nullable = true)
 |-- horror: byte (nullable = true)
 |-- musical: byte (nullable = true)
 |-- mystery: byte (nullable = true)
 |-- romance: byte (nullable = true)
 |-- sci_fi: byte (nullable = true)
 |-- thriller: byte (nullable = true)
 |-- war: byte (nullable = true)
 |-- western: byte (nullable = true)

Total Movies:1682


In [6]:
# print top 5 rows nicely with show()
user_df.show(5)
print(f'Total Users:{user_df.count()}')

+-------+---+------+----------+-----+
|user_id|age|gender|occupation|  zip|
+-------+---+------+----------+-----+
|      1| 24|     M|technician|85711|
|      2| 53|     F|     other|94043|
|      3| 23|     M|    writer|32067|
|      4| 24|     M|technician|43537|
|      5| 33|     F|     other|15213|
+-------+---+------+----------+-----+
only showing top 5 rows

Total Users:943


In [7]:
# get top 5 genre rows with take()
print(genre_df.take(5))
print(f'\nTotal genres:{genre_df.count()}')

[Row(genre='unknown', genre_id=0), Row(genre='Action', genre_id=1), Row(genre='Adventure', genre_id=2), Row(genre='Animation', genre_id=3), Row(genre="Children's", genre_id=4)]

Total genres:19


In [8]:
# get top movies rows.
print(movie_df.take(2))
print(f'\nTotal movies:{movie_df.count()}')

[Row(movie_id=1, title='Toy Story (1995)', release_date='01-Jan-1995', video_date=None, url='http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)', unknown=0, action=0, adventure=0, animation=1, children=1, comedy=1, crime=0, documentary=0, drama=0, fantasy=0, film_noir=0, horror=0, musical=0, mystery=0, romance=0, sci_fi=0, thriller=0, war=0, western=0), Row(movie_id=2, title='GoldenEye (1995)', release_date='01-Jan-1995', video_date=None, url='http://us.imdb.com/M/title-exact?GoldenEye%20(1995)', unknown=0, action=1, adventure=1, animation=0, children=0, comedy=0, crime=0, documentary=0, drama=0, fantasy=0, film_noir=0, horror=0, musical=0, mystery=0, romance=0, sci_fi=0, thriller=1, war=0, western=0)]

Total movies:1682


In [9]:
# print top 5 ratings.
rating_df.show(5)
print(f'\nTotal ratings:{rating_df.count()}')

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|    196|     242|     3|881250949|
|    186|     302|     3|891717742|
|     22|     377|     1|878887116|
|    244|      51|     2|880606923|
|    166|     346|     1|886397596|
+-------+--------+------+---------+
only showing top 5 rows


Total ratings:100000


### Making some adjustmens to genre names, lowecase and replacing dash for underscore.

In [10]:
# different ways to update a Spark DataFrame
genre_df = genre_df.withColumn('genre', lower(col('genre')))
genre_df = genre_df.withColumn('genre', when(col('genre') == "children's", 'children').otherwise(col('genre')))
genre_df = genre_df.withColumn('genre', regexp_replace('genre', '-', '_'))
genre_df.createOrReplaceTempView('genre_df')

genre_df.show()

+-----------+--------+
|      genre|genre_id|
+-----------+--------+
|    unknown|       0|
|     action|       1|
|  adventure|       2|
|  animation|       3|
|   children|       4|
|     comedy|       5|
|      crime|       6|
|documentary|       7|
|      drama|       8|
|    fantasy|       9|
|  film_noir|      10|
|     horror|      11|
|    musical|      12|
|    mystery|      13|
|    romance|      14|
|     sci_fi|      15|
|   thriller|      16|
|        war|      17|
|    western|      18|
+-----------+--------+



#### Movie table is not normalized to second normal form, we need can fix that to make it easier to query with SQL.
First we create a movie-genre table, which is a many to many relationship between movie and genre tables.

In [11]:
# MOVIE-GENRE SCHEMA
movie_genre_schema = StructType([
                        StructField("movie_id", ShortType(), True),
                        StructField("genre_id", ShortType(), True),
                    ])

# create empty DataFrame
movie_genre_df = spark.createDataFrame(sc.emptyRDD(), movie_genre_schema)
movie_genre_df.createOrReplaceTempView('movie_genre_df')
movie_genre_df.show()

+--------+--------+
|movie_id|genre_id|
+--------+--------+
+--------+--------+



In [12]:
# collect all movie records from every genre and insert into new table.
for genre in genre_df.collect():
    # get all records from given genre
    records = spark.sql(
                f"""
                SELECT movie_id, {genre.genre_id} genre_id
                FROM movie_df
                WHERE {genre.genre} = 1
                """
            ).collect()
    
    print(f'Inserting: {genre.genre_id} {genre.genre}')
    
    # insert records into new table
    parallelized_rows = spark.sparkContext.parallelize(records)
    insert_rows = spark.createDataFrame(parallelized_rows, movie_genre_schema)
    movie_genre_df = movie_genre_df.union(insert_rows)

movie_genre_df.createOrReplaceTempView('movie_genre_df')

Inserting: 0 unknown
Inserting: 1 action
Inserting: 2 adventure
Inserting: 3 animation
Inserting: 4 children
Inserting: 5 comedy
Inserting: 6 crime
Inserting: 7 documentary
Inserting: 8 drama
Inserting: 9 fantasy
Inserting: 10 film_noir
Inserting: 11 horror
Inserting: 12 musical
Inserting: 13 mystery
Inserting: 14 romance
Inserting: 15 sci_fi
Inserting: 16 thriller
Inserting: 17 war
Inserting: 18 western


In [13]:
movie_genre_df.show(10)

+--------+--------+
|movie_id|genre_id|
+--------+--------+
|     267|       0|
|    1373|       0|
|       2|       1|
|       4|       1|
|      17|       1|
|      21|       1|
|      22|       1|
|      24|       1|
|      27|       1|
|      28|       1|
+--------+--------+
only showing top 10 rows



## Spark SQL
Testing Spark SQL: get movies that belong to more than one genre.

In [14]:
spark.sql(
    """
    SELECT 
        m.movie_id,
        m.title,
        genres.genres
    
    FROM (
        SELECT movie_id, COUNT(genre_id) genres
        FROM movie_genre_df
        GROUP BY movie_id
        HAVING COUNT(genre_id) > 1
        ) genres

    JOIN movie_df m ON genres.movie_id = m.movie_id
    ORDER BY genres.genres DESC
    """
).show(15)

+--------+--------------------+------+
|movie_id|               title|genres|
+--------+--------------------+------+
|     560|Kid in King Arthu...|     6|
|     172|Empire Strikes Ba...|     6|
|     426|Transformers: The...|     6|
|     855|         Diva (1981)|     5|
|    1076|Pagemaster, The (...|     5|
|     755|      Jumanji (1995)|     5|
|      21|Muppet Treasure I...|     5|
|     181|Return of the Jed...|     5|
|     820|    Space Jam (1996)|     5|
|     993|     Hercules (1997)|     5|
|     101|  Heavy Metal (1981)|     5|
|     184|Army of Darkness ...|     5|
|      17|From Dusk Till Da...|     5|
|      50|    Star Wars (1977)|     5|
|     411|Nutty Professor, ...|     4|
+--------+--------------------+------+
only showing top 15 rows



### Create some views to get the ratings.

In [15]:
# create a temporary SQL view that gets all movie ratings by genre.
spark.sql(
    """
    CREATE OR REPLACE TEMP VIEW movie_rating_vw AS
        SELECT mg.genre_id, mg.movie_id, SUM(r.rating)/COUNT(r.rating) movie_rating
        FROM movie_genre_df mg
            JOIN rating_df r ON mg.movie_id = r.movie_id
        GROUP BY mg.genre_id, mg.movie_id
    """
    )

# create a temporary SQL view that gets all movie ratings by Date.
spark.sql(
    """
    CREATE OR REPLACE TEMP VIEW movie_rating_date_vw AS
        SELECT YEAR(TO_DATE(release_date, 'dd-MMM-yyyy')) release_date,
            mg.movie_id,
            SUM(r.rating)/COUNT(r.rating) movie_rating
        FROM movie_genre_df mg
            JOIN movie_df m ON mg.movie_id = m.movie_id
            JOIN rating_df r ON mg.movie_id = r.movie_id
        WHERE release_date IS NOT NULL
        GROUP BY TO_DATE(release_date, 'dd-MMM-yyyy'), mg.movie_id
    """
    )

DataFrame[]

### Get top movies from each genre.

In [16]:
spark.sql(
    """
    SELECT
        g.genre,
        m.title,
        ROUND(maxr.movie_rating, 2) top_rating
    
    FROM movie_rating_vw mr
        JOIN (
            -- gets MAX movie rating from each genre --
            SELECT genre_id, MAX(movie_rating) movie_rating
            FROM movie_rating_vw
            GROUP BY genre_id
            ) maxr ON mr.genre_id = maxr.genre_id AND mr.movie_rating = maxr.movie_rating
        -- full movie details --
        JOIN movie_df m ON mr.movie_id = m.movie_id
        JOIN genre_df g ON mr.genre_id = g.genre_id
    
    ORDER BY g.genre, top_rating DESC
    """
).show(30)

+-----------+--------------------+----------+
|      genre|               title|top_rating|
+-----------+--------------------+----------+
|     action|    Star Wars (1977)|      4.36|
|  adventure|     Star Kid (1997)|       5.0|
|  animation|Close Shave, A (1...|      4.49|
|   children|     Star Kid (1997)|       5.0|
|     comedy|Santa with Muscle...|       5.0|
|      crime|They Made Me a Cr...|       5.0|
|documentary|Great Day in Harl...|       5.0|
|documentary|Marlene Dietrich:...|       5.0|
|      drama|They Made Me a Cr...|       5.0|
|      drama|Entertaining Ange...|       5.0|
|      drama|Saint of Fort Was...|       5.0|
|      drama|Aiqing wansui (1994)|       5.0|
|      drama|Someone Else's Am...|       5.0|
|      drama|  Prefontaine (1997)|       5.0|
|    fantasy|     Star Kid (1997)|       5.0|
|  film_noir|Manchurian Candid...|      4.26|
|     horror|       Psycho (1960)|       4.1|
|    musical|Wizard of Oz, The...|      4.08|
|    mystery|  Rear Window (1954)|

### Top movies by year.

In [17]:
spark.sql(
    """
    SELECT
        YEAR(TO_DATE(m.release_date, 'dd-MMM-yyyy')) year,
        m.title,
        ROUND(maxr.movie_rating, 2) top_rating
    
    FROM movie_rating_date_vw mr
        JOIN (
            -- gets MAX movie rating from each YEAR --
            SELECT release_date, MAX(movie_rating) movie_rating
            FROM movie_rating_date_vw
            GROUP BY release_date
            ) maxr ON mr.release_date = maxr.release_date AND mr.movie_rating = maxr.movie_rating
        
        -- full movie details --
        JOIN movie_df m ON mr.movie_id = m.movie_id
    
    ORDER BY mr.release_date DESC, top_rating DESC
    """
).show(30)

+----+--------------------+----------+
|year|               title|top_rating|
+----+--------------------+----------+
|1998|     Star Kid (1997)|       5.0|
|1997|  Prefontaine (1997)|       5.0|
|1996|Santa with Muscle...|       5.0|
|1996|Entertaining Ange...|       5.0|
|1996|Marlene Dietrich:...|       5.0|
|1996|Aiqing wansui (1994)|       5.0|
|1996|Someone Else's Am...|       5.0|
|1995|Usual Suspects, T...|      4.39|
|1994|Great Day in Harl...|       5.0|
|1993|Saint of Fort Was...|       5.0|
|1992|Grand Day Out, A ...|      4.11|
|1991|Silence of the La...|      4.29|
|1990|Nikita (La Femme ...|      4.01|
|1989|      Henry V (1989)|      4.14|
|1988|Cinema Paradiso (...|      4.17|
|1987|Princess Bride, T...|      4.17|
|1986|Manon of the Spri...|      4.12|
|1985|          Ran (1985)|       4.1|
|1984|      Amadeus (1984)|      4.16|
|1983|   Local Hero (1983)|      3.97|
|1982| Blade Runner (1982)|      4.14|
|1981|Raiders of the Lo...|      4.25|
|1980|Empire Strikes Ba..

## Spark MLlib
Machine learning with Spark.

Collaborative Filtering with built-in Alternating Least Squares model.

In [18]:
# Train Test split 80-20%
traing_set, test_set = rating_df.randomSplit([0.8, 0.2])

print(f'Training samples:{traing_set.count()}')
print(f'Test samples:{test_set.count()}')

Training samples:80147
Test samples:19853


In [19]:
# Spark has already a ALS (Alternating Least Squares) recommender model built-in
als = ALS().setMaxIter(5)\
            .setRegParam(0.01)\
            .setUserCol("user_id")\
            .setItemCol("movie_id")\
            .setRatingCol("rating")

print(als.explainParams())
alsModel = als.fit(traing_set)
predictions = alsModel.transform(test_set)

alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (default: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: movie_id)
maxIter: max number of iterations (>= 

In [20]:
# getting N top movie_id recomendations from each user
n = 5
alsModel.recommendForAllUsers(n).selectExpr("user_id", "explode(recommendations)").show(10)

+-------+-----------------+
|user_id|              col|
+-------+-----------------+
|     22| [592, 7.3787413]|
|     22| [919, 7.1898932]|
|     22|  [57, 6.6123157]|
|     22| [543, 6.4810753]|
|     22|[1463, 6.4714212]|
|     23| [613, 5.7697434]|
|     23| [1071, 5.740668]|
|     23| [390, 5.7294116]|
|     23| [1252, 5.659588]|
|     23|[1591, 5.6058683]|
+-------+-----------------+
only showing top 10 rows

