In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from elasticsearch import Elasticsearch
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Initialize Spark session
spark = SparkSession.builder \
    .appName(os.getenv("SPARK_APP_NAME")) \
    .getOrCreate()

In [12]:
# pip install dotenv

In [13]:
ratings_data_path = os.getenv("RATINGS_DATA_PATH")
movies_data_path = os.getenv("MOVIES_DATA_PATH")
users_data_path = os.getenv("USERS_DATA_PATH")

# Load ratings data
ratings_df = spark.read.csv(ratings_data_path, sep="\t", inferSchema=True, header=False) \
    .toDF("user_id", "movie_id", "rating", "timestamp")
ratings_df.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|    196|     242|     3|881250949|
|    186|     302|     3|891717742|
|     22|     377|     1|878887116|
|    244|      51|     2|880606923|
|    166|     346|     1|886397596|
|    298|     474|     4|884182806|
|    115|     265|     2|881171488|
|    253|     465|     5|891628467|
|    305|     451|     3|886324817|
|      6|      86|     3|883603013|
|     62|     257|     2|879372434|
|    286|    1014|     5|879781125|
|    200|     222|     5|876042340|
|    210|      40|     3|891035994|
|    224|      29|     3|888104457|
|    303|     785|     3|879485318|
|    122|     387|     5|879270459|
|    194|     274|     2|879539794|
|    291|    1042|     4|874834944|
|    234|    1184|     2|892079237|
+-------+--------+------+---------+
only showing top 20 rows



In [14]:
films_df = spark.read.csv(movies_data_path, sep="|", inferSchema=True, header=False) \
    .toDF("movie_id", "title", "release_date", "video_release_date", "IMDb_url", "unknown", "Action", "Adventure", 
          "Animation", "Children's", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", 
          "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western")


In [15]:
users_df = spark.read.csv(users_data_path, sep="|", inferSchema=True, header=False) \
    .toDF("user_id", "age", "gender", "occupation", "zip_code")
users_df.toPandas()

Unnamed: 0,user_id,age,gender,occupation,zip_code
0,1,24,M,technician,85711
1,2,53,F,other,94043
2,3,23,M,writer,32067
3,4,24,M,technician,43537
4,5,33,F,other,15213
...,...,...,...,...,...
938,939,26,F,student,33319
939,940,32,M,administrator,02215
940,941,20,M,student,97229
941,942,48,F,librarian,78209


In [16]:
# Drop the timestamp column since it's not needed for the collaborative filtering model
ratings_df = ratings_df.drop("timestamp")

In [17]:
ratings_with_movie_titles = ratings_df.join(films_df.select("movie_id", "title"), on="movie_id", how="left")
ratings_with_movie_titles.toPandas()

Unnamed: 0,movie_id,user_id,rating,title
0,242,196,3,Kolya (1996)
1,302,186,3,L.A. Confidential (1997)
2,377,22,1,Heavyweights (1994)
3,51,244,2,Legends of the Fall (1994)
4,346,166,1,Jackie Brown (1997)
...,...,...,...,...
99995,476,880,3,"First Wives Club, The (1996)"
99996,204,716,5,Back to the Future (1985)
99997,1090,276,1,Sliver (1993)
99998,225,13,2,101 Dalmatians (1996)


In [18]:
films_df.toPandas()

Unnamed: 0,movie_id,title,release_date,video_release_date,IMDb_url,unknown,Action,Adventure,Animation,Children's,...,Fantasy,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,1,Toy Story (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Toy%20Story%2...,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,0
1,2,GoldenEye (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?GoldenEye%20(...,0,1,1,0,0,...,0,0,0,0,0,0,0,1,0,0
2,3,Four Rooms (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Four%20Rooms%...,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
3,4,Get Shorty (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Get%20Shorty%...,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,5,Copycat (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Copycat%20(1995),0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1677,1678,Mat' i syn (1997),06-Feb-1998,,http://us.imdb.com/M/title-exact?Mat%27+i+syn+...,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1678,1679,B. Monkey (1998),06-Feb-1998,,http://us.imdb.com/M/title-exact?B%2E+Monkey+(...,0,0,0,0,0,...,0,0,0,0,0,1,0,1,0,0
1679,1680,Sliding Doors (1998),01-Jan-1998,,http://us.imdb.com/Title?Sliding+Doors+(1998),0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
1680,1681,You So Crazy (1994),01-Jan-1994,,http://us.imdb.com/M/title-exact?You%20So%20Cr...,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [19]:
genre_columns = ["Action", "Adventure", "Animation", "Children's", "Comedy", "Crime", "Documentary", "Drama", 
                 "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]


In [20]:
print(genre_columns)

['Action', 'Adventure', 'Animation', "Children's", 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']


In [21]:
for genre in genre_columns:
    ratings_with_movie_titles = ratings_with_movie_titles.join(films_df.select("movie_id", genre), 
                                                               on="movie_id", how="left")

In [22]:
# (training_data, test_data) = ratings_df.randomSplit([0.8, 0.2], seed=1234)
training_data, test_data = ratings_df.randomSplit(eval(os.getenv("ALS_TRAIN_TEST_SPLIT")), seed=int(os.getenv("ALS_SEED")))


In [23]:
als = ALS(
    userCol=os.getenv("ALS_USER_COL"),
    itemCol=os.getenv("ALS_ITEM_COL"),
    ratingCol=os.getenv("ALS_RATING_COL"),
    coldStartStrategy=os.getenv("ALS_COLD_START_STRATEGY")
)

In [24]:
als_model = als.fit(training_data)

25/03/17 19:42:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/03/17 19:42:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/03/17 19:42:32 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [25]:
predictions = als_model.transform(test_data)

In [26]:
evaluator = RegressionEvaluator(
    metricName=os.getenv("EVALUATOR_METRIC_NAME"),
    labelCol=os.getenv("EVALUATOR_LABEL_COL"),
    predictionCol=os.getenv("EVALUATOR_PREDICTION_COL")
)
rmse = evaluator.evaluate(predictions)

                                                                                

In [27]:
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 0.9210085988690141


In [28]:
user_recommendations = als_model.recommendForAllUsers(int(os.getenv("RECOMMENDATIONS_PER_USER")))

# Example: Show recommendations for the first user
user_recommendations.show(truncate=False)




+-------+------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                   |
+-------+------------------------------------------------------------------------------------------------------------------+
|1      |[{1589, 5.2932286}, {1643, 5.0609336}, {1463, 5.0107517}, {1405, 5.0078783}, {1368, 5.0037465}, {1449, 4.9983063}]|
|2      |[{1643, 5.236021}, {1463, 5.214125}, {1449, 5.0153537}, {1064, 4.7973843}, {1642, 4.767312}, {113, 4.7462816}]    |
|3      |[{1368, 4.8977747}, {1643, 4.6288605}, {1093, 4.2605057}, {1128, 4.14693}, {1022, 4.14237}, {1251, 4.0198016}]    |
|4      |[{1251, 6.114808}, {793, 5.708388}, {1642, 5.6966963}, {1463, 5.591889}, {1589, 5.5779037}, {613, 5.500054}]      |
|5      |[{1368, 5.19239}, {793, 4.818605}, {1154, 4.739668}, {1500, 4.6563835}, {1367, 4.6388745}, {1643, 4.628043}]      |


                                                                                

In [29]:
user_recommendations.show(truncate=False)



+-------+------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                   |
+-------+------------------------------------------------------------------------------------------------------------------+
|1      |[{1589, 5.2932286}, {1643, 5.0609336}, {1463, 5.0107517}, {1405, 5.0078783}, {1368, 5.0037465}, {1449, 4.9983063}]|
|2      |[{1643, 5.236021}, {1463, 5.214125}, {1449, 5.0153537}, {1064, 4.7973843}, {1642, 4.767312}, {113, 4.7462816}]    |
|3      |[{1368, 4.8977747}, {1643, 4.6288605}, {1093, 4.2605057}, {1128, 4.14693}, {1022, 4.14237}, {1251, 4.0198016}]    |
|4      |[{1251, 6.114808}, {793, 5.708388}, {1642, 5.6966963}, {1463, 5.591889}, {1589, 5.5779037}, {613, 5.500054}]      |
|5      |[{1368, 5.19239}, {793, 4.818605}, {1154, 4.739668}, {1500, 4.6563835}, {1367, 4.6388745}, {1643, 4.628043}]      |


                                                                                

In [31]:
model_save_path = os.getenv("MODEL_SAVE_PATH")
als_model.save(model_save_path)

In [32]:

recommendations_list = []

for row in user_recommendations.collect():
    user_id = row['user_id']
    movies = row['recommendations']
    for movie in movies:
        movie_id = movie['movie_id']
        rating = movie['rating']
        recommendations_list.append({
            "user_id": user_id,
            "movie_id": movie_id,
            "rating": rating
        })

In [33]:
es_host = os.getenv("ELASTICSEARCH_HOST")
es = Elasticsearch(hosts=[es_host])


In [34]:
# Connect to Elasticsearch
es_index_recommendations = os.getenv("ELASTICSEARCH_INDEX_RECOMMENDATIONS")

# Index recommendations into Elasticsearcha
for recommendation in recommendations_list:
    es.index(index=es_index_recommendations, document=recommendation)

In [35]:
users_list = users_df.toJSON().collect()
es_index_users = os.getenv("ELASTICSEARCH_INDEX_USERS")
for user_json in users_list:
    es.index(index=es_index_users, document=user_json)

In [36]:
ratings_list = []
for row in ratings_with_movie_titles.collect():
    ratings_list.append({
        "user_id": row['user_id'],
        "movie_id": row['movie_id'],
        "title": row['title'],
        "rating": row['rating']
    })

es_index_ratings = os.getenv("ELASTICSEARCH_INDEX_RATINGS")
for rating in ratings_list:
    es.index(index=es_index_ratings, document=rating)


                                                                                

In [37]:
# Stop Spark session
spark.stop()