# 1. Data Loading

In [1]:
import os
import sys
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
import pyspark.ml as ml

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

In [3]:
def load_dfs():
    global movies, users, ratings
    movies = spark.read.csv("./Data/movieLens/movies.dat", sep="::", encoding="latin1")
    movies = movies.toDF("movie_id", "movie_name", "genre").cache()
    movies.createOrReplaceTempView("movies_info")
    
    users = spark.read.csv("./Data/movieLens/users.dat", sep="::", encoding="latin1")
    users = users.toDF("user_id", "gender", "age", "occupation", "zipcode").cache()
    users.createOrReplaceTempView("users_info")
    
    ratings = spark.read.csv("./Data/movieLens/ratings.dat", sep="::", encoding="latin1")
    ratings = ratings.toDF("user_id", "movie_id", "rating", "time_stamp").cache()
    ratings.createOrReplaceTempView("ratings_info")

In [4]:
load_dfs()

In [5]:
movies.show(5)

+--------+--------------------+--------------------+
|movie_id|          movie_name|               genre|
+--------+--------------------+--------------------+
|       1|    Toy Story (1995)|Animation|Childre...|
|       2|      Jumanji (1995)|Adventure|Childre...|
|       3|Grumpier Old Men ...|      Comedy|Romance|
|       4|Waiting to Exhale...|        Comedy|Drama|
|       5|Father of the Bri...|              Comedy|
+--------+--------------------+--------------------+
only showing top 5 rows



In [6]:
users.show(5)

+-------+------+---+----------+-------+
|user_id|gender|age|occupation|zipcode|
+-------+------+---+----------+-------+
|      1|     F|  1|        10|  48067|
|      2|     M| 56|        16|  70072|
|      3|     M| 25|        15|  55117|
|      4|     M| 45|         7|  02460|
|      5|     M| 25|        20|  55455|
+-------+------+---+----------+-------+
only showing top 5 rows



In [7]:
ratings.show(5)

+-------+--------+------+----------+
|user_id|movie_id|rating|time_stamp|
+-------+--------+------+----------+
|      1|    1193|     5| 978300760|
|      1|     661|     3| 978302109|
|      1|     914|     3| 978301968|
|      1|    3408|     4| 978300275|
|      1|    2355|     5| 978824291|
+-------+--------+------+----------+
only showing top 5 rows



# 2. Data Cleaning

## 2.1. Checking Null Values

In [None]:
def inspect_null(df):
    for col in df.columns:
        empty = df.filter(df[col].isNull()).count()
        print(f"For columns {col}:\t{empty} null records")

In [None]:
inspect_null(movies)

In [None]:
inspect_null(users)

In [None]:
inspect_null(ratings)

## 2.2. Data Types

### 2.2.1. User dataset

In [None]:
users.printSchema()

**Gender**

In [None]:
users = users.withColumn("gender", pyspark.sql.functions.when(users["gender"] == 'M', 1).otherwise(0))

**Mapping Age to Age Category**

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

label_mapping = {
    1: 1,
    18: 2,
    25: 3,
    35: 4,
    45: 5,
    50: 6,
    56: 7
}

age_udf = udf(lambda record: label_mapping[int(record)], IntegerType())
users = users.withColumn("age", age_udf(users["age"]))

**Mapping Zipcode to Region**

In [None]:
users = users.withColumn("casted_zipcode", users["zipcode"].cast(IntegerType()))
inspect_null(users)

In [None]:
users.filter(users["casted_zipcode"].isNull()).show(10)

In [None]:
def to_region(record):
    record = str(record)
    return int(record[0])

zipcode_udf = udf(lambda record: to_region(record), IntegerType())
users = users.withColumn("region", zipcode_udf(users["zipcode"]))
users = users.drop("zipcode")

In [None]:
inspect_null(users)

**Asserting that All the data are in integer type with no nulls**

In [None]:
for col in users.columns:
    users = users.withColumn(col, users[col].cast(IntegerType()))
    
inspect_null(users)

### 2.2.2. For Movies Dataset

In [None]:
movies.printSchema()

**Transforming movies_id to integer**

In [None]:
movies = movies.withColumn("movie_id", movies["movie_id"].cast(IntegerType()))

**Parsing movie_name to year and name**

In [None]:
import re

def extract_date(record):
    pattern  = r'\((\d{4})\)'
    if re.findall(pattern, record.strip()[-6:]):
        return int(record.strip()[-5:-1])
    return None

def extract_name(record):
    pattern  = r'\((\d{4})\)'
    if re.findall(pattern, record.strip()[-6:]):
        return record.strip()[:-6].strip()
    return record

# Define UDFs for extract_date and extract_name functions
extract_date_udf = udf(lambda record: extract_date(record), IntegerType())
extract_name_udf = udf(lambda record: extract_name(record), StringType())

movies = movies.withColumn("year", extract_date_udf(movies["movie_name"]))
movies = movies.withColumn("name", extract_name_udf(movies["movie_name"]))

**Parsing the genre into a serie of genres**

In [None]:
movies = movies.withColumn("parsed_genre", pyspark.sql.functions.explode(pyspark.sql.functions.split(movies["genre"], "\\s*\\|\\s*")))
movies = movies.withColumn("value", (movies["parsed_genre"]==movies["parsed_genre"]).cast(IntegerType()))

### 2.2.3. For Ratings Dataset

In [None]:
ratings.printSchema()

**Casting All the attributes to int type**

In [None]:
for col in ratings.columns:
    ratings = ratings.withColumn(col, ratings[col].cast(IntegerType()))

ratings.printSchema()

# 3. Saving The cleaned datasets 

In [None]:
ratings.toPandas().to_csv("./Data/cleaned_data/ratings.csv", header=True, columns = ratings.columns, index=False)
movies.toPandas().to_csv("./Data/cleaned_data/movies.csv", header=True, columns = movies.columns, index=False)
users.toPandas().to_csv("./Data/cleaned_data/users.csv", header=True, columns = users.columns, index=False)

# 4. Feature Engineering

In [None]:
movies.createOrReplaceTempView("movies_info")
users.createOrReplaceTempView("users_info")
ratings.createOrReplaceTempView("ratings_info")

## 4.1. Movies Dataset

**Features used for the movie:**
1. year
2. genres
3. watch_count
4. popularity among its genre
5. avarage rating
6. rating ratio per genre

**watch count**

In [None]:
popularity = spark.sql("SELECT movie_id, COUNT(DISTINCT(user_id)) AS watches FROM ratings_info GROUP BY movie_id")
popularity.createOrReplaceTempView("popularity_info")

In [None]:
popularity.show(5)

**Popularity among its genre**

In [None]:
query = """
    SELECT parsed_genre AS genre, COUNT(user_id) AS genre_count
    FROM   ratings_info LEFT JOIN movies_info ON movies_info.movie_id = ratings_info.movie_id  
    GROUP BY parsed_genre
"""

watches_per_genre = spark.sql(query)
watches_per_genre.createOrReplaceTempView("watches_per_genre_info")

In [None]:
watches_per_genre.show(5)

In [None]:
query = """
    SELECT *, watches/genre_count AS popularity_per_genre
    FROM    (SELECT movies_info.movie_id AS movie_id, year, parsed_genre, watches
             FROM   movies_info INNER JOIN popularity_info ON movies_info.movie_id = popularity_info.movie_id
            ) A INNER JOIN 
            watches_per_genre_info ON A.parsed_genre = watches_per_genre_info.genre
"""

df = spark.sql(query)
df = df.drop("parsed_genre")
df.createOrReplaceTempView("df_info")
df.show(5)

**Avarage Rating**

In [None]:
query = """
    SELECT movie_id, AVG(rating) AS avg_rating 
    FROM ratings_info 
    GROUP BY movie_id
"""

avg = spark.sql(query)
avg.createOrReplaceTempView("avg_info")
avg.show(5)

In [None]:
query = """
    SELECT A.movie_id, genre, year, watches, genre_count, popularity_per_genre, avg_rating
    FROM   df_info AS A LEFT JOIN avg_info ON A.movie_id = avg_info.movie_id
"""

df = spark.sql(query)
df.createOrReplaceTempView("df_info")
df.show(5)

**rating ratio to genre rating**

In [None]:
query = """
    SELECT A.parsed_genre AS genre, MEAN(B.rating) AS mean_genre_rating
    FROM   movies_info AS A JOIN ratings_info B ON A.movie_id = B.movie_id
    GROUP BY A.parsed_genre
"""

avg = spark.sql(query)
avg.createOrReplaceTempView("avg_info")
avg.show(5)

In [None]:
query = """
    SELECT A.movie_id, A.genre, A.year, watches, popularity_per_genre, avg_rating, avg_rating/mean_genre_rating AS rating_per_genre
    FROM   df_info AS A LEFT JOIN avg_info B ON A.genre = B.genre
"""

df = spark.sql(query)
df.createOrReplaceTempView("df_info")
df.show(5)

In [None]:
df.toPandas().to_csv("./Data/cleaned_data/unpivoted_movies_features.csv", header=True, columns=df.columns, index=False)

### 4.1.2. Pivoting the Movies Table

In [None]:
excluded = ["movie_id", "year", "watches", "avg_rating"]

sub1 = df[["movie_id", "genre", "year", "watches", "avg_rating", "popularity_per_genre"]]
sub1 = sub1.groupBy(["movie_id", "year", "watches", "avg_rating"]).pivot("genre").sum("popularity_per_genre")

columns = {col: 0 for col in sub1.columns if not(col in excluded)}
sub1 = sub1.fillna(columns)

In [None]:
sub1.printSchema()

In [None]:
excluded = ["movie_id", "year", "watches", "avg_rating"]
for col in sub1.columns:
    if not(col in excluded):
        sub1 = sub1.withColumnRenamed(col, "popularity_per_"+col)

sub1.printSchema()

In [None]:
excluded = ["movie_id", "year"]

sub2 = df[["movie_id", "genre", "year", "rating_per_genre"]]
sub2 = sub2.groupBy(["movie_id", "year"]).pivot("genre").sum("rating_per_genre")

columns = {col: 0 for col in sub2.columns if not(col in excluded)}
sub2 = sub2.fillna(columns)

In [None]:
sub2.printSchema()

In [None]:
excluded = ["movie_id", "year"]
for col in sub2.columns:
    if not(col in excluded):
        sub2 = sub2.withColumnRenamed(col, "rating_per_"+col)

sub2.printSchema()

In [None]:
sub1.createOrReplaceTempView("sub1_info")
sub2.createOrReplaceTempView("sub2_info")

query = """
        SELECT * 
        FROM sub1_info INNER JOIN sub2_info
        USING (movie_id, year)
"""

sub1 = spark.sql(query)
sub1.createOrReplaceTempView("sub1_info")
sub1.printSchema()

In [None]:
inspect_null(sub1)

In [None]:
sub1.toPandas().to_csv("./Data/cleaned_data/pivoted_movies_features.csv", header=True, columns=sub1.columns, index=False)

## 4.2. Users Dataset

**Features used for the user:**

1. gender
2. age class
3. Occupation class
4. Region
5. Avarage ratings
6. number of watched movies
7. avarage rating per genre
8. the mode year of the movies watched
9. Median year of the movies watched

**For missing category avarage rating & Popularity avarage rating impute with avarage rating of all users**

In [None]:
users.printSchema()

**Avarage ratings & number of watched movies**

In [None]:
query = """
    SELECT  A.user_id, A.gender, A.age, A.occupation, A.region, B.avg_rating, B.watched_movies
    FROM    users_info A
            INNER JOIN 
            (SELECT   user_id, MEAN(rating) AS avg_rating, COUNT(movie_id) AS watched_movies
            FROM     ratings_info
            GROUP BY user_id) B
            ON B.user_id = A.user_id
"""

users = spark.sql(query)
users.createOrReplaceTempView("users_info")
users.show(5)

In [None]:
movies.printSchema()

**Avarage ratings per genre**

In [None]:
spark.sql("SELECT * FROM movies_info").show(5)

In [None]:
query = """SELECT  parsed_genre, user_id,  MEAN(rating) avg_rating_per_genre
            FROM    ratings_info A INNER JOIN movies_info B USING (movie_id)
            GROUP BY parsed_genre, user_id
"""

avg_per_genre = spark.sql(query)
avg_per_genre.show(5)

In [None]:
excluded = ["user_id"]
avg_per_genre = avg_per_genre.groupBy("user_id").pivot("parsed_genre").sum("avg_rating_per_genre")
columns = {col: 0 for col in avg_per_genre.columns if not(col in excluded)}
avg_per_genre = avg_per_genre.fillna(columns)

for col in avg_per_genre.columns:
    if not(col in excluded):
        avg_per_genre = avg_per_genre.withColumnRenamed(col, "avg_rating_for_"+col)

        
avg_per_genre.createOrReplaceTempView("avg_info")
avg_per_genre.show(5)

In [None]:
query = """
        SELECT  *
        FROM    users_info INNER JOIN avg_info USING (user_id)
"""

users = spark.sql(query)
users.createOrReplaceTempView("users_info")
users.printSchema()

In [None]:
users = users.withColumn("value", (users["occupation"]==users["occupation"]).cast(IntegerType()))

columns = [col for col in users.columns if not(col in ["occupation", "value"])]

In [None]:
users = users.groupBy(columns).pivot("occupation").sum("value")
cols = {col: 0 for col in users.columns if not(col in columns)}
users = users.fillna(cols)

In [None]:
users.printSchema()

In [None]:
label_mapping = {
    0: "other",
    1: "academic/educator", 
    2: "artist",
    3: "clerical/admin",
    4: "college/grad student",
    5: "customer service",
    6: "doctor/health care",
    7: "executive/managerial",
    8: "farmer",
    9: "homemaker",
    10: "K-12 student",
    11: "lawyer",
    12: "programmer",
    13: "retired",
    14: "sales/marketing",
    15: "scientist",
    16: "self-employed",
    17: "technician/engineer",
    18: "tradesman/craftsman",
    19: "unemployed",
    20: "writer"
}

for col in cols.keys():
    users = users.withColumnRenamed(col, label_mapping[int(col)])

In [None]:
users.printSchema()

In [None]:
query = """
        SELECT  user_id, MEAN(year) AS year
        FROM    (SELECT * FROM ratings_info WHERE rating>3) A 
                INNER JOIN 
                (SELECT movie_id, MEAN(year) year FROM movies_info GROUP BY movie_id) B
                USING (movie_id)
        GROUP BY user_id
"""

year = spark.sql(query)
year.show(5)

In [None]:
year[year["year"].isNull()].show()

In [None]:
users.createOrReplaceTempView("users_info")
year.createOrReplaceTempView("year_info")

In [None]:
query = """
        SELECT    *
        FROM users_info INNER JOIN year_info USING (user_id)
"""

users = spark.sql(query)
users.printSchema()

In [None]:
users.select("region").distinct().show(25)

In [None]:
users.toPandas().to_csv("./Data/cleaned_data/pivoted_users_features.csv", header=True, columns=users.columns, index=False)
users.createOrReplaceTempView("users_info")
sub1.createOrReplaceTempView("movies_info")
print(len(sub1.columns))
print(len(users.columns))

# 5. Joining Features and creating unified dataset

In [None]:
query = """
        SELECT *
        FROM (ratings_info INNER JOIN users_info USING (user_id)) INNER JOIN sub1_info USING (movie_id)
"""

result = spark.sql(query)

In [None]:
len(result.columns)

In [None]:
result.printSchema()

In [None]:
result.toPandas().to_csv("./Data/cleaned_data/unified_rating_features.csv", header=True, columns=result.columns, index=False)