In [194]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [77]:
spark = SparkSession.builder.appName("MovieLense_ML").getOrCreate()
spark

In [124]:
df_ratings = spark.read.csv("./ml-100k/u.data", sep="\t", inferSchema=True)
df_movies= spark.read.csv("./ml-100k/u.item", sep="|", inferSchema=True)
df_users = spark.read.csv("./ml-100k/u.user", sep="|", inferSchema=True)

base_cols = [f"_c{k}" for k in range(24)]
# The dataset has no header, so we create them bases on README 
for old, new in zip(base_cols[:4], ["user_id", "item_id", "rating", "timestamp"]):
    df_ratings = df_ratings.withColumnRenamed(old, new)
    
# Now for movie names 
movie_cols = ["item_id", "movie_title", "release_date", "video_release_date", "IMDb_URL", "unknown", "Action", "Adventure", "Animation",
              "Childrens", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", 
              "Sci-Fi", "Thriller", "War", "Western" ]

for old, new in zip(base_cols, movie_cols):
    df_movies = df_movies.withColumnRenamed(old, new)

df_movie_title_mapping = df_movies.select(["item_id", "movie_title"])

# Now for users 
user_cols = ["user_id", "age", "gender", "job", "zip_code"]
for old, new in zip(base_cols, user_cols):
    df_users = df_users.withColumnRenamed(old, new)
    

# join all
df_ratings = df_ratings.join(df_movies, on=["item_id"], how="left").join(df_users, on=["user_id"], how="left")

# drop unneccessary (for ML) columns 
df_ratings = df_ratings.drop("IMDb_URL")
df_ratings = df_ratings.drop("movie_title")
df_ratings = df_ratings.drop("zip_code")

df_ratings.show(5)


+-------+-------+------+---------+------------+------------------+-------+------+---------+---------+---------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+
|user_id|item_id|rating|timestamp|release_date|video_release_date|unknown|Action|Adventure|Animation|Childrens|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|       job|
+-------+-------+------+---------+------------+------------------+-------+------+---------+---------+---------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+
|    196|    242|     3|881250949| 24-Jan-1997|              NULL|      0|     0|        0|        0|        0|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0| 49|     M|    writer|
|    186|    302|     3|89171774

In [125]:
# Number of missing values 
df_ratings.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_ratings.columns]).show()

+-------+-------+------+---------+------------+------------------+-------+------+---------+---------+---------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+---+
|user_id|item_id|rating|timestamp|release_date|video_release_date|unknown|Action|Adventure|Animation|Childrens|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|job|
+-------+-------+------+---------+------------+------------------+-------+------+---------+---------+---------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+---+
|      0|      0|     0|        0|           9|            100000|      0|     0|        0|        0|        0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|  0|     0|  0|
+-------+-------+------+---------+------------+-------------

We see that video_release_date is irrelevant, since there are no values. Also release_date has 9 missing values

In [126]:
df_ratings = df_ratings.drop("video_release_date")
df_ratings = df_ratings.na.drop()

We now have to transform release_date

In [154]:
# Step 1: Parse date into YYYY-MM-DD
df = df_ratings.withColumn("parsed_date", F.to_date(F.col("release_date"), "d-MMM-yyyy")).drop("release_date")

# Step 2: reformat it into DDMMYYYY
df = df.withColumn("formatted_date", F.date_format(F.col("parsed_date"), "ddMMyyyy")).drop("parsed_date")
df = df.withColumn("formatted_date_cast", df.formatted_date.cast("int"))     # Convert to int 
df = df.drop("formatted_date")
df = df.withColumnRenamed("formatted_date_cast", "formatted_date")

Encode Job and Gender

In [155]:
indexer = StringIndexer(inputCols=["gender", "job"], outputCols=["gender_encoded", "job_encoded"])
indexed_df = indexer.fit(df).transform(df)
# drop old columns 
indexed_df = indexed_df.drop("job").withColumnRenamed("job_encoded", "job")
indexed_df = indexed_df.drop("gender").withColumnRenamed("gender_encoded", "gender")
indexed_df.show(5)

+-------+-------+------+---------+-------+------+---------+---------+---------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+--------------+------+---+
|user_id|item_id|rating|timestamp|unknown|Action|Adventure|Animation|Childrens|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|formatted_date|gender|job|
+-------+-------+------+---------+-------+------+---------+---------+---------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+--------------+------+---+
|    196|    242|     3|881250949|      0|     0|        0|        0|        0|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0| 49|      24011997|   0.0|6.0|
|    186|    302|     3|891717742|      0|     0|        0|        0|        0|     0|    1|          0|    0|      0|        1|

### (Small) Dataset analysis

Best Movie

In [156]:
df_avg_rating = indexed_df.select(["item_id", "rating"]).groupBy("item_id").mean().drop("avg(item_id)")

# Join movie names 
df_avg_rating = df_avg_rating.join(df_movie_title_mapping, on="item_id", how="left")

# Display top 5 movie Ids
df_avg_rating = df_avg_rating.orderBy("avg(rating)", ascending=False)

df_avg_rating.show(11, truncate=False)

+-------+-----------+-------------------------------------------------+
|item_id|avg(rating)|movie_title                                      |
+-------+-----------+-------------------------------------------------+
|1599   |5.0        |Someone Else's America (1995)                    |
|1500   |5.0        |Santa with Muscles (1996)                        |
|1201   |5.0        |Marlene Dietrich: Shadow and Light (1996)        |
|1653   |5.0        |Entertaining Angels: The Dorothy Day Story (1996)|
|1122   |5.0        |They Made Me a Criminal (1939)                   |
|1467   |5.0        |Saint of Fort Washington, The (1993)             |
|1189   |5.0        |Prefontaine (1997)                               |
|1293   |5.0        |Star Kid (1997)                                  |
|1536   |5.0        |Aiqing wansui (1994)                             |
|814    |5.0        |Great Day in Harlem, A (1994)                    |
|1449   |4.625      |Pather Panchali (1955)                     

Most frequent user: 

In [157]:
df_most_freq_user = indexed_df.select(["user_id", "rating"]).groupBy("user_id").count().orderBy("count", ascending=False).show(10)

+-------+-----+
|user_id|count|
+-------+-----+
|    405|  737|
|    655|  685|
|     13|  636|
|    450|  540|
|    276|  518|
|    416|  493|
|    537|  490|
|    303|  484|
|    234|  480|
|    393|  448|
+-------+-----+
only showing top 10 rows


Male vs Female ditribution

In [158]:
male_female = indexed_df.select(["gender", "rating"]).groupBy("gender").count()
male_female.show()

# Mapping: 
# Males    0
# Females  1

# We have a sever class imbalance

+------+-----+
|gender|count|
+------+-----+
|   0.0|74253|
|   1.0|25738|
+------+-----+



### Recommendation System 

We build a recomendation system using linear regression. This is *not* a good approach, since there are conceptually better models 
for recommandation (Like collaborative filtering). 

PySpark gives us the ALS model (collab filtering with two vectors instead of matrices), but this only used the user_id and movie_id as features. 

We will first build the linear regression model on all features and compare it to the build-in ALS model

In [191]:
## Regression 
# Vectorize and Standardize 
featurecols = [col for col in indexed_df.columns if col != "rating"]

featureassembler = VectorAssembler(inputCols=featurecols, outputCol="Features")

output = featureassembler.transform(indexed_df)

model_unscaled = output.select(["features", "rating"])
 
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
model_scaled = scaler.fit(model_unscaled).transform(model_unscaled)

final_df = model_scaled.select("scaled_features", "rating")

# Make regression
train_data_linreg, test_data_linreg = final_df.randomSplit([0.7, 0.3])

regressor = LinearRegression(featuresCol="scaled_features", labelCol="rating")
regressor = regressor.fit(train_data_linreg)

In [None]:
# Recommendation

train_data_recom, test_data_recom = indexed_df.randomSplit([0.7, 0.3])

als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", nonnegative=True, coldStartStrategy="drop")

# create hyperparameter grid 
param_grid = ParamGridBuilder().addGrid(
    als.rank, [5, 10, 30]
).addGrid(
    als.maxIter, [10, 20]
).addGrid(
    als.regParam, [0.01, 0.05]
).build()

als_eval = RegressionEvaluator(metricName="mse", labelCol="rating", predictionCol="prediction")

cv = CrossValidator(
    estimator=als, 
    estimatorParamMaps=param_grid, 
    evaluator=als_eval, 
    numFolds=5
)

# Find best model via cross validation
als_model = cv.fit(train_data_recom)
best_model = als_model.bestModel

## Evaluation

In [193]:
# regression 
linreg_prediction = regressor.evaluate(test_data_linreg)
print("MSE of linear regression:\t", linreg_prediction.meanSquaredError)

# recommendation 
predictions = best_model.transform(test_data_recom)

als_mse = als_eval.evaluate(predictions)
print("MSE of recommendation system (ALS):\t", als_mse)

MSE of linear regression:	 1.170957160299126
MSE of recommendation system (ALS):	 0.8888869307248878


We see, that ALS is still superior to the linear regression, even tho it uses less features. 

Reason: The data is not well suitable for linear regression. Other (non-linear) regressor will likely beat the ALS, since it uses way less features. 

Careful: We did not do any feature importance. So, there can be features, that drag down the performance. 