# Importation des bibliothèques nécessaires :

In [1]:
from pyspark.sql import SparkSession,  functions as F
from pyspark import SparkConf
from pyspark.sql.functions import col, udf, concat_ws, regexp_replace
from pyspark.sql.functions import concat, col, lit, lower, split
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer,HashingTF
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.linalg import DenseVector
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer                                                               
from pyspark.ml.linalg import Vectors
import numpy as np
from pyspark.sql import Row
from pyspark.mllib.linalg.distributed import RowMatrix

# Initialiser la session Spark avec des paramètres ajustés :

In [2]:
conf = SparkConf().setAppName("MovieRecommendations") \
                  .set("spark.executor.memory", "6g") \
                  .set("spark.driver.memory", "6g") \
                  .set("spark.network.timeout", "600s") \
                  .set("spark.executor.heartbeatInterval", "60s")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# l'importation du fichier CSV :

In [3]:
#read file into dataFrame using automatically inferred schema
data = spark.read.csv('C:\\Users\\HP\\Downloads\\dataFinale', inferSchema=True, header=True)

# Finalisation du nettoyage des données avec PySpark :

In [4]:
# Renommer la colonne '\N' en 'duration' avec une chaîne brute
data_renamed = data.withColumnRenamed(r'\N', 'duration')

In [5]:
# Remplacer les valeurs manquantes par des espaces vides dans la colonne "year". Comme l'année d'apparition du film n'est pas très importante, nous choisissons de remplacer les valeurs manquantes par des espaces vides
data_filled = data_renamed.fillna(' ')

In [7]:
from pyspark.sql.functions import col, isnan, when, count

# Compter les valeurs manquantes dans chaque colonne
missing_values = data_filled.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in data_filled.columns])

# Afficher les résultats
missing_values.show()

+-------------+-----+--------------+----+-----+--------+--------+------+------+-----------+
|imdb_title_id|title|original_title|year|genre|duration|director|writer|actors|description|
+-------------+-----+--------------+----+-----+--------+--------+------+------+-----------+
|            0|    1|             1|   0|    0|       0|       0|     0|     0|          0|
+-------------+-----+--------------+----+-----+--------+--------+------+------+-----------+



In [8]:
colonnes_a_convertir=['duration','description','year','genre','director','writer','actors']
# Convertir les colonnes en StringType
for colonne in colonnes_a_convertir:
    data_filled=data_filled.withColumn(colonne, col(colonne).cast(StringType()))

In [9]:
# fonction pour supprimer les espaces dans les chaines de caracteres
def remove_space(text):
    if text is not None:
        return text.replace(" ","")
    return text

In [11]:
# on a trasformer la fonction remove_space vers une fonction udf de Spark:
remove_space_udf=udf(remove_space,StringType())
# on a utiliser la fonction  remove_space_udf pour supprimer les espaces dans les chaines de caracteres
data=data_filled.withColumn('genre',remove_space_udf(col('genre')))
data=data.withColumn('actors',remove_space_udf(col('actors')))
data=data.withColumn('writer',remove_space_udf(col('writer')))
data=data.withColumn('director',remove_space_udf(col('director')))

In [12]:
# Nous avons créé une nouvelle colonne 'tags' qui contient la concaténation des colonnes 'actors', 'description', 'director', 'writer', 'genre', 'duration' et 'year'.
data = data.withColumn('tags', concat(col('description'), lit(' '), col('actors'),lit(' '),col('director'), lit(' '), col('writer'),lit(' '),col('genre'), lit(' '), col('duration'), lit(' '), col('year')))

In [13]:
# transformer la colonne tags en miniscule
data= data.withColumn('tags', lower(col('tags')))

# Préparation des données avant la création de la fonction de recommandation:

In [14]:
# Tokenizer pour transformer 'tags' en 'words'
tokenizer = Tokenizer(inputCol="tags", outputCol="words")
words_data = tokenizer.transform(data)

# StopWordsRemover pour enlever les mots vides
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_data = remover.transform(words_data)
# affichage de la colonne words
filtered_data.select('words').show()

+--------------------+
|               words|
+--------------------+
|[the, adventures,...|
|[true, story, of,...|
|[two, men, of, hi...|
|[the, fabled, que...|
|[loosely, adapted...|
|[an, account, of,...|
|[the, story, of, ...|
|["an, epic, itali...|
|[the, movie, depi...|
|[richard, of, glo...|
|[after, dr., frie...|
|[inspector, juve,...|
|[single, mother, ...|
|[in, part, two, o...|
|[leslie, swayne, ...|
|[an, army, pilot,...|
|[after, a, body, ...|
|[john, howard, pa...|
|[balduin, a, stud...|
|[a, woman, with, ...|
+--------------------+
only showing top 20 rows



In [15]:
# affichage de la colonne 'filtered_words'
filtered_data.select('filtered_words').show()

+--------------------+
|      filtered_words|
+--------------------+
|[adventures, fema...|
|[true, story, not...|
|[two, men, high, ...|
|[fabled, queen, e...|
|[loosely, adapted...|
|[account, life, j...|
|[story, madame, d...|
|["an, epic, itali...|
|[movie, depicts, ...|
|[richard, glouces...|
|[dr., friedrich's...|
|[inspector, juve,...|
|[single, mother, ...|
|[part, two, louis...|
|[leslie, swayne, ...|
|[army, pilot, vis...|
|[body, disappears...|
|[john, howard, pa...|
|[balduin, student...|
|[woman, aid, poli...|
+--------------------+
only showing top 20 rows



In [16]:
# Calcul des TF pour convertir des données textuelles en vecteurs de caractéristiques
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=10000)
featurized_data = hashing_tf.transform(filtered_data)
# Calcul des IDF (Inverse Document Frequency) pour réduire le poids des termes qui apparaissent fréquemment dans le corpus, car ils sont moins discriminants pour la classification ou la recherche d'informations.
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
vectorized_data= idf_model.transform(featurized_data)

In [17]:
# Récupérer la première ligne du DataFrame transformé
first_row = vectorized_data.first()

# Extraire les valeurs de la colonne "features" de la première ligne
features_values = first_row['features']

# Afficher les valeurs
print(features_values)

(10000,[303,1321,2345,2456,4307,5245,6526,8415],[4.490668502017955,13.584378397912428,6.211520001822959,5.172169177191576,6.6169851099311225,5.151335090288734,4.4561281767657785,5.483281501451743])


# Création de la fonction de calcul de similarité :

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType


# Définir la UDF pour le calcul de similarité de Jaccard
def calculate_jaccard_similarity(v1, v2):
    if v1 is None or v2 is None:
        return 0.0
    set1 = set(v1)
    set2 = set(v2)
    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))
    similarity = intersection / union
    return float(similarity)

# Enregistrer la UDF
jaccard_udf = udf(calculate_jaccard_similarity, FloatType())

# Jointure croisée pour créer des paires des films
similarity_data = vectorized_data.alias('df1').crossJoin(vectorized_data.alias('df2'))

# Apply tAppliquer la fonction Jaccard
similarity_data = similarity_data.withColumn('similarity', jaccard_udf(col("df1.features"), col('df2.features')))

# afficher le resultat:
similarity_data.select(col("df1.imdb_title_id").alias("imdb_title_id_1"), 
                        col("df2.imdb_title_id").alias("imdb_title_id_2"), 
                        "similarity").show(truncate=False)

+---------------+---------------+-----------+
|imdb_title_id_1|imdb_title_id_2|similarity |
+---------------+---------------+-----------+
|tt0000009      |tt0000009      |1.0        |
|tt0000009      |tt0000574      |0.04761905 |
|tt0000009      |tt0001892      |0.027777778|
|tt0000009      |tt0002101      |0.08695652 |
|tt0000009      |tt0002130      |0.032258064|
|tt0000009      |tt0002199      |0.033333335|
|tt0000009      |tt0002423      |0.04       |
|tt0000009      |tt0002445      |0.041666668|
|tt0000009      |tt0002452      |0.05       |
|tt0000009      |tt0002461      |0.045454547|
|tt0000009      |tt0002646      |0.03846154 |
|tt0000009      |tt0002844      |0.045454547|
|tt0000009      |tt0003014      |0.1        |
|tt0000009      |tt0003037      |0.030303031|
|tt0000009      |tt0003102      |0.03125    |
|tt0000009      |tt0003131      |0.035714287|
|tt0000009      |tt0003165      |0.028571429|
|tt0000009      |tt0003167      |0.03846154 |
|tt0000009      |tt0003419      |0

## la fonction de recommendation:

In [19]:
def get_recommendations_from_keywords(keywords, top_n=4):
    # Créer un DataFrame avec les mots-clés comme description
    keywords_df = spark.createDataFrame([(0, keywords)], ["imdb_title_id", "tags"])

    # Transformer les mots-clés en vecteur TF-IDF
    words_data = tokenizer.transform(keywords_df)
    filtered_data = remover.transform(words_data)
    featurized_data = hashing_tf.transform(filtered_data)
    tfidf_keywords = idf_model.transform(featurized_data)
    keywords_vector = tfidf_keywords.select("features").collect()[0][0]
    keywords_vector_list = keywords_vector.toArray().tolist()

    # Ajouter les colonnes de similarité
    similarity_data = vectorized_data.withColumn("similarity", jaccard_udf(F.col("features"), F.lit(keywords_vector_list)))

    # Trier et sélectionner les films les plus similaires
    top_similar_movies = similarity_data.orderBy(F.col("similarity").desc()).select("title", "original_title", "tags").limit(top_n)

    return top_similar_movies

In [20]:
# Exemple d'utilisation de la fonction
keywords = "action, adventure, hero"
top_similar_movies_df = get_recommendations_from_keywords(keywords, top_n=4)

In [21]:
print(top_similar_movies_df)

DataFrame[title: string, original_title: string, tags: string]


In [22]:
top_similar_movies_df.show()

Py4JJavaError: An error occurred while calling o10503.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 12.0 failed 1 times, most recent failure: Lost task 3.0 in stage 12.0 (TID 29) (DESKTOP-LK8RKG4.mshome.net executor driver): java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.readFully(DataInputStream.java:195)
	at java.io.DataInputStream.readFully(DataInputStream.java:169)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2493)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1139)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1121)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$1(RDD.scala:1568)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1555)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$1(limit.scala:291)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:285)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.readFully(DataInputStream.java:195)
	at java.io.DataInputStream.readFully(DataInputStream.java:169)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


### On a trouvé des problèmes de configuration et de mémoire pour afficher les résultats, mais la fonction n'affiche aucune erreur. Le problème est juste dans l'affichage du DataFrame des films les plus similaires.Le problème sera resolu si vous avez un pc qui a une grande capacité en ressource mémoire.