In [6]:
# set the environment path to find Recommenders
import sys
import os
from datetime import datetime
import pandas as pd
import numpy as np
import seaborn as sns
import sys
import pandas as pd
import warnings
import logging
warnings.simplefilter(action='ignore', category=FutureWarning)

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql.functions import col 
from pyspark.ml.tuning import CrossValidator
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongType

from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics

from elasticsearch import Elasticsearch

print("System version: {}".format(sys.version))
print("Pandas version: {}".format(pd.__version__))
print("PySpark version: {}".format(pyspark.__version__))

System version: 3.12.0 | packaged by conda-forge | (main, Oct  3 2023, 08:43:22) [GCC 12.3.0]
Pandas version: 2.1.1
PySpark version: 3.4.1


In [2]:
def setup_logging(log_directory, logger_name):
    os.makedirs(log_directory, exist_ok=True)

    log_filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.log")
    log_filepath = os.path.join(log_directory, log_filename)

    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

    handler = logging.FileHandler(log_filepath)
    handler.setLevel(logging.INFO)
    handler.setFormatter(formatter)

    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.INFO)
    logger.addHandler(handler)

    return logger

def setup_SparkML_logging():
    return setup_logging("Log/SparkML", "ALS_Model")

In [3]:
sys.path.append("/home/hadoop/Syst-me-de-Recommandation-de-Films-en-Temps-R-el-avec-Apache-Spark-Elasticsearch-Kibana-et-Flask/data/MovieLens")

from MovieLensData import get_all_data

try:
    # Create a Spark session
    spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

    # Get data from MovieLens
    df = get_all_data()

    # Select only the desired columns
    selected_columns = ['userId', 'movieId', 'rating', 'timestamp']
    df = df[selected_columns]

    # Convert the final_data DataFrame to a Spark DataFrame
    spark_final_data = spark.createDataFrame(df)


except Exception as e:
    logging.error(f"Error: {e}")
    # Handle the exception as needed


23/12/10 20:25:14 WARN Utils: Your hostname, sabri-Parallels-Virtual-Platform resolves to a loopback address: 127.0.1.1; using 10.211.55.7 instead (on interface enp0s5)
23/12/10 20:25:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/10 20:25:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/10 20:25:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
spark_final_data.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   196|    257|     2|881251577|
|   196|    111|     4|881251793|
|   196|     25|     4|881251955|
|   196|    382|     4|881251843|
+------+-------+------+---------+
only showing top 5 rows



                                                                                

In [5]:
(training, test) = spark_final_data.randomSplit([0.8, 0.2])

In [6]:
# Define the ALS model
import shutil


als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

# Define the parameter grid to search for the best hyperparameters
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Create a CrossValidator
crossval = CrossValidator(estimator=als,
                            estimatorParamMaps=param_grid,
                            evaluator=evaluator,
                            numFolds=3)  # You can adjust the number of folds
# Fit the CrossValidator to the data
cv_model = crossval.fit(training)  

# Get the best model from cross-validation
best_model = cv_model.bestModel

model_path = "/home/hadoop/Syst-me-de-Recommandation-de-Films-en-Temps-R-el-avec-Apache-Spark-Elasticsearch-Kibana-et-Flask/src/model/best_model"

if os.path.exists(model_path):
    print(f"Le répertoire {model_path} existe déjà. Suppression du répertoire existant.")
    shutil.rmtree(model_path)

best_model.save(model_path)

# Use the best model to make predictions on the test set
predictions = best_model.transform(test)




23/12/10 20:25:45 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/10 20:25:46 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [7]:
# Calculate RMSE
evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Calculate RMSE
rmse = evaluator_rmse.evaluate(predictions)
print(f"RMSE: {rmse}")

RMSE: 0.9124169376944185


In [None]:
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")

# Calculate MAE
mae = evaluator_mae.evaluate(predictions)
print(f"MAE: {mae}")

In [None]:
# Calculate R2 using RegressionMetrics
predictionAndLabels = predictions.select("prediction", "rating").rdd.map(lambda x: (float(x[0]), float(x[1])))

metrics = RegressionMetrics(predictionAndLabels)
r2 = metrics.r2
print(f"R2: {r2}")

In [13]:
from elasticsearch.helpers import scan

def get_movie_id_by_title(es, index_name, movie_title):
    query = {
        "query": {
            "match": {
                "title": movie_title
            }
        }
    }

    # Use scan to get all documents matching the query
    results = scan(es, index=index_name, query=query)

    for result in results:
        return result["_source"]["movieId"]

    return None

In [18]:
# Example usage:
es = Elasticsearch("http://localhost:9200")  # Replace with your Elasticsearch URL

movie_inndex = "movie"
# Get recommendations for a movie title
movie_title_to_search = "Raising Arizona"


print(get_movie_id_by_title(es, movie_inndex, movie_title_to_search))

None


  for result in results:


In [20]:
def get_users_ids_for_movie_review(es, index_name, movie_id):
    query = {
        "query": {
            "term": {
                "movieId": movie_id
            }
        }
    }

    # Use scan to get all documents matching the query
    results = scan(es, index=index_name, query=query)

    user_ids = set()
    for result in results:
        user_ids.add(result["_source"]["userId"])

    return list(user_ids)


In [21]:
# Example usage:
es = Elasticsearch("http://localhost:9200")  # Replace with your Elasticsearch URL

review_index = "review"

# Get recommendations for a movie title
movie_id = "762"


print(get_users_ids_for_movie_review(es, review_index, movie_id))

['63', '196']


  for result in results:


In [None]:
from pyspark.sql.functions import expr

def get_recommendations_for_movie(es, als_model, movie_title, num_recommendations=5):
    # Step 1: Get movieId for the given movie title
    movie_index = "movie"
    movie_id = get_movie_id_by_title(es, movie_index, movie_title)

    if movie_id is None:
        print(f"Movie with title '{movie_title}' not found.")
        return None

    review_index = "review"
    # Step 2: Get user ids that have reviewed the movie
    user_ids = get_users_ids_for_movie_review(es, review_index, movie_id)

    if not user_ids:
        print(f"No user reviews found for the movie with title '{movie_title}'.")
        return None

    # Step 3: Create a DataFrame with user and movie data
    user_movie_data = [(int(user_id),) for user_id in user_ids]
    schema = StructType([StructField("userId", IntegerType(), True)])
    user_movie_df = spark.createDataFrame(user_movie_data, schema)

    # Step 4: Use ALS model to get recommendations for the users
    recommendations = best_model.recommendForUserSubset(user_movie_df, num_recommendations)

    # Extract movie IDs from recommendations DataFrame
    movie_ids = [movie.movieId for row in recommendations.rdd.collect() for movie in row.recommendations]


    return movie_ids

# Example usage:
es = Elasticsearch("http://localhost:9200")  # Replace with your Elasticsearch URL

# Get recommendations for a movie title
movie_title_to_search = "Toy Story (1995)"
print(get_recommendations_for_movie(es, best_model, movie_title_to_search))


es.transport.close()
