In [1]:
# Chargement des Données dans Elasticsearch depuis un Notebook

# Dans ce notebook, nous allons explorer comment charger un jeu de données dans Elasticsearch à partir d'un fichier dat.
# Nous avons une installation cloud Elasticsearch, ce qui signifie que nous devrons utiliser les informations d'identification fournies 
# par l'utilisateur elastic pour nous connecter, et le mot de passe sera celui de l'API Elasticsearch.


In [2]:
# Étape 1 : Configuration de l'Environnement

# Tout d'abord, nous devons configurer notre environnement en important les bibliothèques nécessaires 
# et en définissant nos informations d'authentification Elasticsearch.

In [51]:
# import os
from elasticsearch import Elasticsearch, helpers

# Récupérer le mot de passe à partir d'une variable d'environnement
# password = os.environ.get("ENV_PASSWORD")
#le mot de passe de l' API elastic
password = "Qn5HOUiNO7V3GWpgxXok1aeE"

# Définir l'URL Elasticsearch (IL S' AGIT DE L' URL  elasticsearch endpoint de notre deploiement movielens)
elastic_url = "https://2830fc520a954b858492459c95e36087.us-central1.gcp.cloud.es.io:443"

# Créer une connexion Elasticsearch
client = Elasticsearch(hosts=[elastic_url], basic_auth=('elastic', password))


In [4]:
# Étape 2 : Chargement des Données

# Maintenant que notre environnement est configuré, nous pouvons procéder au chargement des données. 
# Nous allons charger un jeu de données à partir des fichiers dat dans Elasticsearch.

In [8]:
# Fonction pour charger les données ratings du fichier ratings.dat dans Elasticsearch
def load_ratings_to_elasticsearch(filename, index_name):
    # Lecture du fichier ratings.dat
    with open(filename, 'r') as file:
        # Parcourir chaque ligne du fichier
        for line in file:
            # Séparer les champs en utilisant '::' comme séparateur
            fields = line.strip().split("::")
            # Créer un document Elasticsearch à partir des champs
            doc = {
                "user_id": int(fields[0]),
                "movie_id": int(fields[1]),
                "rating": float(fields[2]),
                "timestamp": int(fields[3])
            }
            # Indexer le document dans Elasticsearch
            yield {
                "_index": index_name,
                "_source": doc
            }

# Nom de l'index Elasticsearch
index_name = "ratings"

# Chemin vers le fichier ratings.dat
ratings_file = "/home/chelsie/ml-1m/ratings.dat"

# Charger les données dans Elasticsearch en utilisant la fonction d'aide bulk
helpers.bulk(client, load_ratings_to_elasticsearch(ratings_file, index_name))

print("Les données ratings ont été chargées dans Elasticsearch avec succès.")


Les données ratings ont été chargées dans Elasticsearch avec succès.


In [9]:
# Fonction pour charger les données users du fichier users.dat dans Elasticsearch
def load_users_to_elasticsearch(filename, index_name):
    # Lecture du fichier users.dat
    with open(filename, 'r') as file:
        # Parcourir chaque ligne du fichier
        for line in file:
            # Séparer les champs en utilisant '::' comme séparateur
            fields = line.strip().split("::")
            # Créer un document Elasticsearch à partir des champs
            doc = {
                "user_id": int(fields[0]),
                "gender": fields[1],
                "age": int(fields[2]),
                "occupation": fields[3],
                "zipcode": fields[4]
            }
            # Indexer le document dans Elasticsearch
            yield {
                "_index": index_name,
                "_source": doc
            }

# Nom de l'index Elasticsearch
index_name = "users"

# Chemin vers le fichier users.dat
users_file = "/home/chelsie/ml-1m/users.dat"

# Charger les données dans Elasticsearch en utilisant la fonction d'aide bulk
helpers.bulk(client, load_users_to_elasticsearch(users_file, index_name))

print("Les données users ont été chargées dans Elasticsearch avec succès.")


Les données users ont été chargées dans Elasticsearch avec succès.


In [6]:
# Fonction pour charger les données movies du fichier movies.dat dans Elasticsearch
def load_movies_to_elasticsearch(filename, index_name):
    # Lecture du fichier movies.dat
    with open(filename, 'r', encoding="latin1") as file:
        # Parcourir chaque ligne du fichier
        for line in file:
            # Séparer les champs en utilisant '::' comme séparateur
            fields = line.strip().split("::")
            # Créer un document Elasticsearch à partir des champs
            doc = {
                "movie_id": int(fields[0]),
                "title": fields[1],
                "genres": fields[2].split("|")
            }
            # Indexer le document dans Elasticsearch
            yield {
                "_index": index_name,
                "_source": doc
            }

# Nom de l'index Elasticsearch
index_name = "movies"

# Chemin vers le fichier movies.dat
movies_file = "/home/chelsie/ml-1m/movies.dat"

# Charger les données dans Elasticsearch en utilisant la fonction d'aide bulk
helpers.bulk(client, load_movies_to_elasticsearch(movies_file, index_name))

print("Les données movies ont été chargées dans Elasticsearch avec succès.")


Les données movies ont été chargées dans Elasticsearch avec succès.


In [18]:
# Étape 3 : Récupération des Données depuis Elasticsearch

# Maintenant que nous avons établi la connexion avec Elasticsearch et construit notre requête, 
# nous pouvons procéder à la récupération des données depuis Elasticsearch.

# Nous utilisons la méthode search de notre client Elasticsearch pour exécuter la requête et récupérer les résultats.
# Si le nombre de document à importer est supérieure à la limite par défaut de 10 000, 
# nous pouvons utiliser la méthode scann pour récupérer nos documents par lots de 10 000.

In [5]:
# Création d'une session Spark
spark = SparkSession.builder \
    .appName("Chargement des données de Elasticsearch vers PySpark DataFrame") \
    .getOrCreate()

In [6]:
# Requête Elasticsearch pour récupérer tous les documents de l'index "users"
query = {
    "query": {
        "match_all": {}
    },
    "size": 6040
}

# Récupération des données Elasticsearch
es_response = client.search(index="users", body=query)  

# Convertir les résultats en une liste de dictionnaires
documents = [hit["_source"] for hit in es_response["hits"]["hits"]]

# Convertir la liste de dictionnaires en DataFrame Spark
es_users_df = spark.createDataFrame(documents)

# Afficher les premières lignes du DataFrame
es_users_df.show()

print(es_users_df.count()) 



                                                                                

+---+------+----------+-------+-------+
|age|gender|occupation|user_id|zipcode|
+---+------+----------+-------+-------+
|  1|     F|        10|      1|  48067|
| 56|     M|        16|      2|  70072|
| 25|     M|        15|      3|  55117|
| 45|     M|         7|      4|  02460|
| 25|     M|        20|      5|  55455|
| 50|     F|         9|      6|  55117|
| 35|     M|         1|      7|  06810|
| 25|     M|        12|      8|  11413|
| 25|     M|        17|      9|  61614|
| 35|     F|         1|     10|  95370|
| 25|     F|         1|     11|  04093|
| 25|     M|        12|     12|  32793|
| 45|     M|         1|     13|  93304|
| 35|     M|         0|     14|  60126|
| 25|     M|         7|     15|  22903|
| 35|     F|         0|     16|  20670|
| 50|     M|         1|     17|  95350|
| 18|     F|         3|     18|  95825|
|  1|     M|        10|     19|  48073|
| 25|     M|        14|     20|  55113|
+---+------+----------+-------+-------+
only showing top 20 rows



[Stage 1:>                                                          (0 + 2) / 2]

6040


                                                                                

In [7]:
from elasticsearch.helpers import scan

# Requête Elasticsearch pour récupérer tous les documents de l'index "ratings"
query = {
    "query": {
        "match_all": {}
    }
}

# Utilisation de la fonction scan pour obtenir les résultats de la recherche par lots
results = scan(client, query=query, index="ratings", size=10000)

# Créer une liste pour stocker les données des documents
data = []

# Parcourir les résultats
for res in results:
    data.append(res['_source'])

# Créer un DataFrame Spark à partir de la liste de dictionnaires
es_ratings_df = spark.createDataFrame(data)

# Afficher les premières lignes du DataFrame
es_ratings_df.show()
print(es_ratings_df.count()) 


24/04/12 05:11:34 WARN TaskSetManager: Stage 4 contains a task of very large size (11175 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+------+----------+-------+
|movie_id|rating| timestamp|user_id|
+--------+------+----------+-------+
|    2066|   3.0|1042049967|    146|
|     804|   1.0| 977348138|    146|
|    1193|   4.0| 979940868|    146|
|    1267|   4.0| 977349660|    146|
|    1269|   5.0| 977434285|    146|
|    1196|   4.0| 977336700|    146|
|    1197|   4.0| 977341318|    146|
|    1198|   5.0| 979939805|    146|
|    1199|   4.0| 979939730|    146|
|     592|   3.0| 979940007|    146|
|     594|   5.0| 977348818|    146|
|     595|   5.0| 977348788|    146|
|     596|   3.0| 977348869|    146|
|     597|   4.0|1022004006|    146|
|    2212|   4.0|1010690287|    146|
|    2140|   2.0| 977348578|    146|
|    1340|   4.0|1010690174|    146|
|    2143|   2.0| 977348665|    146|
|    2070|   2.0| 979939838|    146|
|    2144|   3.0| 979940007|    146|
+--------+------+----------+-------+
only showing top 20 rows



24/04/12 05:11:35 WARN TaskSetManager: Stage 5 contains a task of very large size (11175 KiB). The maximum recommended task size is 1000 KiB.
[Stage 5:>                                                          (0 + 2) / 2]

1000209


                                                                                

In [None]:
#preparation des donn'ees ratings
# Nous verrez que le champ timestamp est un timestamp UNIX en secondes. Elasticsearch prend les timestamps en millisecondes, 
# vous utiliserez donc quelques opérations DataFrame pour convertir les timestamps en millisecondes.

In [8]:
from pyspark.sql.functions import col
# Sélectionner les colonnes nécessaires et effectuer le casting sur la colonne "timestamp"
ratings = es_ratings_df.select(
    col("user_id"),
    col("movie_id"),
    col("rating"),
    (col("timestamp").cast("long") * 1000).alias("timestamp")
)

# Afficher les premières lignes du DataFrame modifié
ratings.show(5)

24/04/12 05:20:34 WARN TaskSetManager: Stage 8 contains a task of very large size (11175 KiB). The maximum recommended task size is 1000 KiB.
[Stage 8:>                                                          (0 + 1) / 1]

+-------+--------+------+-------------+
|user_id|movie_id|rating|    timestamp|
+-------+--------+------+-------------+
|    146|    2066|   3.0|1042049967000|
|    146|     804|   1.0| 977348138000|
|    146|    1193|   4.0| 979940868000|
|    146|    1267|   4.0| 977349660000|
|    146|    1269|   5.0| 977434285000|
+-------+--------+------+-------------+
only showing top 5 rows



                                                                                

In [9]:
# Requête Elasticsearch pour récupérer tous les documents de l'index "movies"
query = {
    "query": {
        "match_all": {}
    },
    "size": 3883
}
# Récupération des données Elasticsearch
es_response = client.search(index="movies", body=query)  # Msize fait reference au nombre de documents de l' index movies

# Convertir les résultats en une liste de dictionnaires
documents = [hit["_source"] for hit in es_response["hits"]["hits"]]

# Création d'une session Spark
spark = SparkSession.builder \
    .appName("Chargement des données Elasticsearch vers PySpark DataFrame") \
    .getOrCreate()

# Convertir la liste de dictionnaires en DataFrame Spark
es_movies_df = spark.createDataFrame(documents)

# Afficher les premières lignes du DataFrame
es_movies_df.show()

# Calculer la taille du DataFrame
print("Taille du DataFrame :", es_movies_df.count())

# N'oubliez pas de fermer la session Spark lorsque vous avez terminé
# spark.stop()


24/04/12 05:20:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+--------------------+--------+--------------------+
|              genres|movie_id|               title|
+--------------------+--------+--------------------+
|[Animation, Child...|       1|    Toy Story (1995)|
|[Adventure, Child...|       2|      Jumanji (1995)|
|   [Comedy, Romance]|       3|Grumpier Old Men ...|
|     [Comedy, Drama]|       4|Waiting to Exhale...|
|            [Comedy]|       5|Father of the Bri...|
|[Action, Crime, T...|       6|         Heat (1995)|
|   [Comedy, Romance]|       7|      Sabrina (1995)|
|[Adventure, Child...|       8| Tom and Huck (1995)|
|            [Action]|       9| Sudden Death (1995)|
|[Action, Adventur...|      10|    GoldenEye (1995)|
|[Comedy, Drama, R...|      11|American Presiden...|
|    [Comedy, Horror]|      12|Dracula: Dead and...|
|[Animation, Child...|      13|        Balto (1995)|
|             [Drama]|      14|        Nixon (1995)|
|[Action, Adventur...|      15|Cutthroat Island ...|
|   [Drama, Thriller]|      16|       Casino (



Taille du DataFrame : 3883


                                                                                

In [None]:
# Vous remarquerez peut-être aussi que les titres des films contiennent l'année de sortie. 
# Il serait utile de disposer de ce champ dans votre index de recherche pour filtrer les résultats 
# (par exemple, si vous souhaitez filtrer nos recommandations pour n'inclure que les films les plus récents).

In [11]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField

# Définir une UDF pour extraire l'année de sortie du titre des films
def extract_year_fn(title):
    result = re.search("\(\d{4}\)", title)
    try:
        if result:
            group = result.group()
            year = group[1:-1]
            start_pos = result.start()
            title = title[:start_pos-1]
            return (title, year)
        else:
            return (title, "1970")
    except:
        print(title)

# Enregistrer la fonction UDF
extract_year = udf(extract_year_fn, StructType([
    StructField("title", StringType(), True),
    StructField("release_date", StringType(), True)
]))

# Appliquer la fonction UDF à votre DataFrame des films
es_movies_with_year = es_movies_df.withColumn("title", extract_year("title").title)\
    .withColumn("release_date", extract_year("title").release_date)

# Afficher les données des films nettoyées
print("Données des films nettoyées :")
es_movies_with_year.show(5, truncate=False)


Données des films nettoyées :


[Stage 14:>                                                         (0 + 1) / 1]

+--------------------------------+--------+---------------------------+------------+
|genres                          |movie_id|title                      |release_date|
+--------------------------------+--------+---------------------------+------------+
|[Animation, Children's, Comedy] |1       |Toy Story                  |1970        |
|[Adventure, Children's, Fantasy]|2       |Jumanji                    |1970        |
|[Comedy, Romance]               |3       |Grumpier Old Men           |1970        |
|[Comedy, Drama]                 |4       |Waiting to Exhale          |1970        |
|[Comedy]                        |5       |Father of the Bride Part II|1970        |
+--------------------------------+--------+---------------------------+------------+
only showing top 5 rows



                                                                                

In [None]:

# Créer des index Elasticsearch, avec des mappings pour les utilisateurs, les films et les événements de notation

# Dans Elasticsearch, un "index" est à peu près similaire à une "base de données" ou à une "table de base de données". 
# Le schéma d'un index s'appelle un mappage d'index.

# Bien qu'Elasticsearch prenne en charge le mappage dynamique, il est conseillé de spécifier explicitement le mappage 
# lors de la création d'un index si vous savez à quoi ressemblent vos données.

# Pour les besoins de votre moteur de recommandation, cela est également nécessaire pour que vous puissiez spécifier 
# le champ vectoriel qui contiendra le "modèle" de recommandation (c'est-à-dire les vecteurs de facteurs). 
# Lors de la création d'un champ vectoriel, vous devez fournir explicitement la dimension du vecteur, 
# de sorte qu'il ne peut s'agir d'un mappage dynamique.


In [None]:

# Chargement des DataFrames Ratings et Movies dans Elasticsearch

# Tout d'abord, vous allez écrire les données d'évaluation dans Elasticsearch. 
# Notez que vous pouvez simplement utiliser le connecteur Spark Elasticsearch pour écrire un DataFrame avec 
# l'API native Spark datasource en spécifiant format("es")


In [12]:
# set the factor vector dimension for the recommendation model
VECTOR_DIM = 20

create_ratings = {
    "mappings": {
        "properties": {
            "timestamp": {
                "type": "date"
            },
            "userId": {
                "type": "integer"
            },
            "movieId": {
                "type": "integer"
            },
            "rating": {
                "type": "double"
            }
        }  
    }
}

create_users = {
    "mappings": {
        "properties": {
            "userId": {
                "type": "integer"
            },
            "model_factor": {
                "type": "dense_vector",
                "dims" : VECTOR_DIM
            },
            "model_version": {
                "type": "keyword"
            },
            "model_timestamp": {
                "type": "date"
            }
        }
    }
}

create_movies = {
    "mappings": {
        "properties": {
            "movieId": {
                "type": "integer"
            },
            "tmdbId": {
                "type": "keyword"
            },
            "genres": {
                "type": "keyword"
            },
            "release_date": {
                "type": "date",
                "format": "year"
            },
            "model_factor": {
                "type": "dense_vector",
                "dims" : VECTOR_DIM
            },
            "model_version": {
                "type": "keyword"
            },
            "model_timestamp": {
                "type": "date"
            }          
        }
    }
}

# create indices with the settings and mappings above
res_ratings = client.indices.create(index="ratingsdf", body=create_ratings)
res_users = client.indices.create(index="usersdf", body=create_users)
res_movies = client.indices.create(index="moviesdf", body=create_movies)

print("Created indices:")
print(res_ratings)
print(res_users)
print(res_movies)


Created indices:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'ratingsdf'}
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'usersdf'}
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'moviesdf'}


In [None]:
# Maintenant que nous avons indexé les données de notation dans Elasticsearch, 
# nous pouvons exploiter toutes les fonctionnalités d'un moteur de recherche pour interroger les données. 
# Par exemple, nous pouvons facilement compter le nombre d'événements de notation dans une plage de dates donnée 
# en utilisant les fonctionnalités de manipulation des dates d'Elasticsearch dans une simple requête de recherche.

In [21]:
client.count(index="ratingsdf", q="timestamp:[2018-01-01 TO 2018-02-01]")

ObjectApiResponse({'count': 0, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}})

In [18]:
from elasticsearch import helpers

# Convertir les données de notation en un format compatible avec la méthode bulk
rating_documents =[
    {
        "_index": "ratingsdf",  # Index Elasticsearch
        "_source": {           # Source des données
            "userId": row.user_id,
            "movieId": row.movie_id,
            "rating": row.rating,
            "timestamp": row.timestamp
        }
    }
    for row in ratings.collect()  # Parcourir les lignes du DataFrame
]

# Utiliser la méthode bulk pour insérer les documents dans Elasticsearch
success, _ = helpers.bulk(client, rating_documents, index="ratingsdf")

# Vérifier si l'opération s'est bien déroulée
if success:
    print("Les données de notation ont été insérées avec succès dans Elasticsearch.")
else:
    print("Une erreur s'est produite lors de l'insertion des données de notation dans Elasticsearch.")


24/04/12 06:08:33 WARN TaskSetManager: Stage 18 contains a task of very large size (11175 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Les données de notation ont été insérées avec succès dans Elasticsearch.


In [20]:
from elasticsearch import helpers

# Convertir les données des films avec années en un format compatible avec la méthode bulk
movie_documents = [
    {
        "_index": "moviesdf",    # Index Elasticsearch
        "_source": {           # Source des données
            "movieId": row.movie_id,
            "title": row.title,
            "genres": row.genres,
            "release_date": row.release_date
        }
    }
    for row in es_movies_with_year.collect()  # Parcourir les lignes du DataFrame
]

# Utiliser la méthode bulk pour insérer les documents dans Elasticsearch
success, _ = helpers.bulk(client, movie_documents, index="moviesdf")

# Vérifier si l'opération s'est bien déroulée
if success:
    print("Les données des films avec années ont été insérées avec succès dans Elasticsearch.")
else:
    print("Une erreur s'est produite lors de l'insertion des données des films avec années dans Elasticsearch.")


                                                                                

Les données des films avec années ont été insérées avec succès dans Elasticsearch.


In [22]:
# test things out by searching for movies containing "matrix" in the title
client.search(index="moviesdf", q="title:matrix", size=3)

ObjectApiResponse({'took': 19, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 9.14208, 'hits': [{'_index': 'moviesdf', '_id': 'Cg-Z0I4BZ7VmenlAeLeS', '_score': 9.14208, '_source': {'movieId': 2571, 'title': 'Matrix, The', 'genres': ['Action', 'Sci-Fi', 'Thriller'], 'release_date': '1970'}}]}})

In [None]:
# ETAPE 4: Choix du type de modèle 

#     En fonction de notre cas d'utilisation, nous devons adopte le type de modèle de recommandation le plus approprié. 
# Pour commencer, Nous pouvez opter pour :
#         Filtrage collaboratif basé sur les utilisateurs (User-Based Collaborative Filtering) : 
# recommande des éléments à un utilisateur basé sur les préférences des utilisateurs similaires.
#         Filtrage collaboratif basé sur les articles (Item-Based Collaborative Filtering) : 
# recommande des éléments similaires à ceux que l'utilisateur a aimés dans le passé.
#         Décomposition en valeurs singulières (Singular Value Decomposition - SVD) : 
# décompose la matrice des évaluations utilisateur-item en matrices de caractéristiques pour capturer les relations latentes.
#         Factorisation de matrices non négatives (Non-Negative Matrix Factorization - NMF) : 
# similaire à SVD mais contraint les matrices de caractéristiques à être non négatives.

In [None]:
# Comme piste intéressante nous pouvons trouver des groupes d'utilisateurs ayant le même ressenti (films, genres, notes, tags).
# Un utilisateur d'un de ces groupes sera plus disposé à apprécier les films plébiscités  par les autres membres du groupe. 
# il s' agit donc du Filtrage collaboratif basé sur les utilisateurs, nous implementerons donc ce type de filtrage.

In [28]:
from elasticsearch.helpers import scan

# Requête Elasticsearch pour récupérer tous les documents de l'index "ratings"
query = {
    "query": {
        "match_all": {}
    }
}

# Utilisation de la fonction scan pour obtenir les résultats de la recherche par lots
results = scan(client, query=query, index="ratingsdf", size=10000)

# Créer une liste pour stocker les données des documents
data = []

# Parcourir les résultats
for res in results:
    data.append(res['_source'])

# Créer un DataFrame Spark à partir de la liste de dictionnaires
ratings_from_es = spark.createDataFrame(data)

# Afficher les premières lignes du DataFrame
ratings_from_es.show(5)


24/04/12 07:11:22 WARN TaskSetManager: Stage 121 contains a task of very large size (12675 KiB). The maximum recommended task size is 1000 KiB.
[Stage 121:>                                                        (0 + 1) / 1]

+-------+------+------------+------+
|movieId|rating|   timestamp|userId|
+-------+------+------------+------+
|   1625|   5.0|975421312000|   793|
|    953|   5.0|975422430000|   793|
|   2431|   4.0|975421125000|   793|
|   3095|   4.0|975421987000|   793|
|    969|   4.0|975422943000|   793|
+-------+------+------------+------+
only showing top 5 rows



24/04/12 07:11:26 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 121 (TID 274): Attempting to kill Python Worker
                                                                                

In [29]:
from pyspark.sql.functions import from_unixtime

# Convertir le timestamp en format de date lisible
ratings_from_es = ratings_from_es.withColumn("timestamp", from_unixtime(ratings_from_es.timestamp / 1000).alias("timestamp"))

# Afficher les premières lignes du DataFrame avec le timestamp converti
ratings_from_es.show(5)


24/04/12 07:14:37 WARN TaskSetManager: Stage 122 contains a task of very large size (12675 KiB). The maximum recommended task size is 1000 KiB.
[Stage 122:>                                                        (0 + 1) / 1]

+-------+------+-------------------+------+
|movieId|rating|          timestamp|userId|
+-------+------+-------------------+------+
|   1625|   5.0|2000-11-28 15:21:52|   793|
|    953|   5.0|2000-11-28 15:40:30|   793|
|   2431|   4.0|2000-11-28 15:18:45|   793|
|   3095|   4.0|2000-11-28 15:33:07|   793|
|    969|   4.0|2000-11-28 15:49:03|   793|
+-------+------+-------------------+------+
only showing top 5 rows



24/04/12 07:14:41 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 122 (TID 275): Attempting to kill Python Worker
                                                                                

In [31]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Définir le modèle ALS
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", regParam=0.02, rank=VECTOR_DIM, seed=54,coldStartStrategy="drop", nonnegative=True)

# Entraîner le modèle ALS sur les données chargées depuis Elasticsearch
model = als.fit(ratings_from_es)

# Afficher les premières lignes des facteurs d'utilisateurs
print("Facteurs d'utilisateurs:")
model.userFactors.show(5)

# Afficher les premières lignes des facteurs d'éléments (films)
print("Facteurs d'éléments (films):")
model.itemFactors.show(5)


24/04/12 07:15:48 WARN TaskSetManager: Stage 123 contains a task of very large size (12675 KiB). The maximum recommended task size is 1000 KiB.
24/04/12 07:15:53 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 123 (TID 276): Attempting to kill Python Worker
24/04/12 07:15:53 WARN TaskSetManager: Stage 124 contains a task of very large size (12675 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Facteurs d'utilisateurs:
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.70144784, 0.41...|
| 20|[0.91114306, 0.0,...|
| 30|[0.3812649, 0.0, ...|
| 40|[0.5583849, 0.100...|
| 50|[0.61492586, 0.0,...|
+---+--------------------+
only showing top 5 rows

Facteurs d'éléments (films):
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.8227385, 0.0, ...|
| 20|[0.6605314, 0.227...|
| 30|[0.0, 0.0, 0.7150...|
| 40|[0.04367039, 0.0,...|
| 50|[0.42771438, 0.52...|
+---+--------------------+
only showing top 5 rows



In [32]:
from pyspark.sql.functions import lit, current_timestamp, unix_timestamp

# Récupération de l'identifiant unique du modèle ALS
ver = model.uid

# Sélection des colonnes nécessaires à partir du DataFrame des facteurs d'éléments (films)
# - Sélection de la colonne "id" pour l'identifiant de l'élément
# - Sélection de la colonne "features" et renommage en "model_factor" pour le vecteur de facteurs du modèle
# - Ajout d'une colonne "model_version" avec la valeur de l'identifiant unique du modèle
# - Ajout d'une colonne "model_timestamp" avec la valeur du timestamp actuel en format Unix
ts = unix_timestamp(current_timestamp())
movie_vectors = model.itemFactors.select("id",\
                                         col("features").alias("model_factor"),\
                                         lit(ver).alias("model_version"),\
                                         ts.alias("model_timestamp"))
movie_vectors.show(5)

# Sélection des colonnes nécessaires à partir du DataFrame des facteurs d'utilisateurs
# - Sélection de la colonne "id" pour l'identifiant de l'utilisateur
# - Sélection de la colonne "features" et renommage en "model_factor" pour le vecteur de facteurs du modèle
# - Ajout d'une colonne "model_version" avec la valeur de l'identifiant unique du modèle
# - Ajout d'une colonne "model_timestamp" avec la valeur du timestamp actuel en format Unix
user_vectors = model.userFactors.select("id",\
                                        col("features").alias("model_factor"),\
                                        lit(ver).alias("model_version"),\
                                        ts.alias("model_timestamp"))
user_vectors.show(5)


+---+--------------------+----------------+---------------+
| id|        model_factor|   model_version|model_timestamp|
+---+--------------------+----------------+---------------+
| 10|[0.8227385, 0.0, ...|ALS_6867d8ccd66c|     1712899519|
| 20|[0.6605314, 0.227...|ALS_6867d8ccd66c|     1712899519|
| 30|[0.0, 0.0, 0.7150...|ALS_6867d8ccd66c|     1712899519|
| 40|[0.04367039, 0.0,...|ALS_6867d8ccd66c|     1712899519|
| 50|[0.42771438, 0.52...|ALS_6867d8ccd66c|     1712899519|
+---+--------------------+----------------+---------------+
only showing top 5 rows

+---+--------------------+----------------+---------------+
| id|        model_factor|   model_version|model_timestamp|
+---+--------------------+----------------+---------------+
| 10|[0.70144784, 0.41...|ALS_6867d8ccd66c|     1712899519|
| 20|[0.91114306, 0.0,...|ALS_6867d8ccd66c|     1712899519|
| 30|[0.3812649, 0.0, ...|ALS_6867d8ccd66c|     1712899519|
| 40|[0.5583849, 0.100...|ALS_6867d8ccd66c|     1712899519|
| 50|[0.6149258

In [43]:
from pyspark.sql import SparkSession
# Créer une session Spark en spécifiant le package Elasticsearch
spark = SparkSession.builder \
    .appName("movielensrecommendationsystem") \
    .config("spark.jars.packages", "/home/chelsie/ml-1m/elasticsearch-spark-20_2.11-8.13.2.jar") \
    .getOrCreate()

In [1]:
from elasticsearch import spark as es_spark

## Écriture des données dans Elasticsearch pour les vecteurs de films
movie_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "update") \
    .save("moviesdf", mode="append")

# Écriture des données dans Elasticsearch pour les vecteurs d'utilisateurs
user_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "index") \
    .save("users", mode="append")

ImportError: cannot import name 'spark' from 'elasticsearch' (/home/chelsie/.local/lib/python3.10/site-packages/elasticsearch/__init__.py)

In [53]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk


# Convertir les données de facteurs de films en un format compatible avec la méthode bulk
factor_documents = [
    {
        "_index": "movies_factor",  # Index Elasticsearch
        "_id": row.id,              # ID du document
        "_source": {                # Source des données
            "model_factor": row.model_factor,
            "model_version": row.model_version,
            "model_timestamp": row.model_timestamp
        }
    }
    for row in movie_vectors.collect()  # Parcourir les lignes du DataFrame movie_vectors
]

# Utiliser la méthode bulk pour envoyer les données à Elasticsearch
success, _ = bulk(client, factor_documents)

# Vérifier si l'opération s'est bien déroulée
if success:
    print("Les données des facteurs de films ont été insérées avec succès dans Elasticsearch.")
else:
    print("Une erreur s'est produite lors de l'insertion des données des facteurs de films dans Elasticsearch.")


Les données des facteurs de films ont été insérées avec succès dans Elasticsearch.


In [None]:
# Convertir les données de facteurs des utilisateurs en un format compatible avec la méthode bulk
user_factor_documents = [
    {
        "_index": "users_factor",  # Index Elasticsearch
        "_id": row.id,             # ID du document
        "_source": {               # Source des données
            "model_factor": row.model_factor,
            "model_version": row.model_version,
            "model_timestamp": row.model_timestamp
        }
    }
    for row in user_vectors.collect()  # Parcourir les lignes du DataFrame user_vectors
]

# Utiliser la méthode bulk pour envoyer les données à Elasticsearch
success, _ = bulk(client, user_factor_documents)

# Vérifier si l'opération s'est bien déroulée
if success:
    print("Les données des facteurs des utilisateurs ont été insérées avec succès dans Elasticsearch.")
else:
    print("Une erreur s'est produite lors de l'insertion des données des facteurs des utilisateurs dans Elasticsearch.")
