# Recommender system final project

Mohamed Amine El Maghraoui

SCIA

### 1) Chargement des données

In [1]:
# Import libraries
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

import matplotlib.pyplot as plt
import logging

logging.getLogger().setLevel(logging.INFO)

# Set plot parameters
plt.rcParams["figure.figsize"] = (20, 13)
%matplotlib inline
%config InlineBackend.figure_format = "retina"
# Load datasets
test_customers = pd.read_csv('project_data/test_customers2.csv')
test_locations = pd.read_csv('project_data/test_locations2.csv')
train_customers = pd.read_csv('project_data/train_customers2.csv')
train_locations = pd.read_csv('project_data/train_locations2.csv')
orders = pd.read_csv('project_data/orders2.csv')
vendors = pd.read_csv('project_data/vendors2.csv')



  orders = pd.read_csv('project_data/orders2.csv')


### 2) Prétraitement des données

- orders 

In [2]:
# Drop unnecessary columns
orders.drop([ 'promo_code', 'delivery_time', 'order_accepted_time', 'driver_accepted_time', 'ready_for_pickup_time', 'picked_up_time', 'delivered_time', 'delivery_date'], axis=1, inplace=True)

# Convert 'is_favorite' and 'is_rated' to binary
orders['is_favorite'] = orders['is_favorite'].map({'Yes': 1, 'No': 0})
orders['is_rated'] = orders['is_rated'].map({'Yes': 1, 'No': 0})

# Fill NaN values in 'is_favorite' and 'is_rated' with 0
orders[['is_favorite', 'is_rated']] = orders[['is_favorite', 'is_rated']].fillna(0)

# Fill NaN values in 'vendor_rating', 'driver_rating', 'vendor_discount_amount', and 'promo_code_discount_percentage' with 0
for col in ['vendor_rating', 'driver_rating', 'deliverydistance', 'preparationtime']:
    orders[col].fillna(orders[col].mean(), inplace=True)
# Fill NaN values in 'LOCATION_TYPE' with 'Other'
orders['LOCATION_TYPE'].fillna('Other', inplace=True)


(vendors.head())


Unnamed: 0,id,authentication_id,latitude,longitude,vendor_category_en,vendor_category_id,delivery_charge,serving_distance,is_open,OpeningTime,...,open_close_flags,vendor_tag,vendor_tag_name,one_click_vendor,country_id,city_id,created_at,updated_at,device_type,display_orders
0,4,118597.0,-0.588596,0.754434,Restaurants,2.0,0.0,6.0,1.0,11:00AM-11:30PM,...,1.0,2458912212241623,"Arabic,Breakfast,Burgers,Desserts,Free Deliver...",Y,1.0,1.0,2018-01-30 14:42:04,2020-04-07 15:12:43,3,1
1,13,118608.0,-0.471654,0.74447,Restaurants,2.0,0.7,5.0,1.0,08:30AM-10:30PM,...,1.0,44151342715241628,"Breakfast,Cakes,Crepes,Italian,Pasta,Pizzas,Sa...",Y,1.0,1.0,2018-05-03 12:32:06,2020-04-05 20:46:03,3,1
2,20,118616.0,-0.407527,0.643681,Restaurants,2.0,0.0,8.0,1.0,08:00AM-10:45PM,...,1.0,489110,"Breakfast,Desserts,Free Delivery,Indian",Y,1.0,1.0,2018-05-04 22:28:22,2020-04-07 16:35:55,3,1
3,23,118619.0,-0.585385,0.753811,Restaurants,2.0,0.0,5.0,1.0,10:59AM-10:30PM,...,1.0,583024,"Burgers,Desserts,Fries,Salads",Y,1.0,1.0,2018-05-06 19:20:48,2020-04-02 00:56:17,3,1
4,28,118624.0,0.480602,0.55285,Restaurants,2.0,0.7,15.0,1.0,11:00AM-11:45PM,...,1.0,5,Burgers,Y,1.0,1.0,2018-05-17 22:12:38,2020-04-05 15:57:41,3,1


- train_locations

In [3]:
train_locations.location_type.fillna('Other', inplace=True)
train_locations.latitude.fillna(0, inplace=True)
train_locations.longitude.fillna(0, inplace=True)
train_locations.head()

Unnamed: 0,customer_id,location_number,location_type,latitude,longitude
0,02SFNJH,0,Other,1.682392,-78.789737
1,02SFNJH,1,Other,1.679137,0.766823
2,02SFNJH,2,Other,-0.498648,0.661241
3,RU43CXC,0,Home,0.100853,0.438165
4,BDFBPRD,0,Other,2.523125,0.733464


- test_locations

In [4]:
test_locations.location_type.fillna('Other', inplace=True)
test_locations.latitude.fillna(0, inplace=True)
test_locations.longitude.fillna(0, inplace=True)
test_locations.head()

Unnamed: 0,customer_id,location_number,location_type,latitude,longitude
0,Z59FTQD,0,Other,126.032278,-9.106019
1,0JP29SK,0,Home,0.278709,-78.623847
2,0JP29SK,1,Home,0.124485,-78.605621
3,0JP29SK,2,Other,-0.113891,-78.577449
4,0JP29SK,3,Other,-0.848796,0.136726


- vendors

In [5]:
vendors.rename(columns={'id': 'vendor_id'}, inplace=True)


 Nous construisons ici le dataframe des utilisateurs pour l'algorithme ALS, ici les utilisateurs sont les clients

In [6]:
users = train_locations
users.head()

Unnamed: 0,customer_id,location_number,location_type,latitude,longitude
0,02SFNJH,0,Other,1.682392,-78.789737
1,02SFNJH,1,Other,1.679137,0.766823
2,02SFNJH,2,Other,-0.498648,0.661241
3,RU43CXC,0,Home,0.100853,0.438165
4,BDFBPRD,0,Other,2.523125,0.733464


et ici nous construisons le même mais pour nos données de test

In [7]:
users_test = test_locations
users_test.head()

Unnamed: 0,customer_id,location_number,location_type,latitude,longitude
0,Z59FTQD,0,Other,126.032278,-9.106019
1,0JP29SK,0,Home,0.278709,-78.623847
2,0JP29SK,1,Home,0.124485,-78.605621
3,0JP29SK,2,Other,-0.113891,-78.577449
4,0JP29SK,3,Other,-0.848796,0.136726


Le code suivant utilise l'algorithme des plus proches voisins pour trouver, parmi un groupe d'utilisateurs d'entraînement, l'utilisateur le plus proche de chaque utilisateur de test en fonction de leurs coordonnées géographiques (latitude et longitude). Tout d'abord, les données sont préparées en supprimant les utilisateurs dont les coordonnées sont manquantes. Ensuite, les coordonnées sont converties en radians pour faciliter les calculs. L'algorithme des plus proches voisins est ensuite appliqué en utilisant les données d'entraînement pour trouver le voisin le plus proche de chaque utilisateur de test. Les informations de l'utilisateur le plus proche, telles que son identifiant et le numéro de son emplacement, sont ensuite associées à chaque utilisateur de test. Cela permet de déterminer les utilisateurs les plus similaires en termes de localisation géographique.

In [8]:
from sklearn.neighbors import NearestNeighbors
import numpy as np
users = users.dropna(subset=['latitude', 'longitude'])
users_test = users_test.dropna(subset=['latitude', 'longitude'])

# Convert latitude and longitude to radians
users['latitude'] = np.radians(users['latitude'])
users['longitude'] = np.radians(users['longitude'])
users_test['latitude'] = np.radians(users_test['latitude'])
users_test['longitude'] = np.radians(users_test['longitude'])

# Prepare data for NearestNeighbors
X_train = users[['latitude', 'longitude']]
X_test = users_test[['latitude', 'longitude']]

# Fit NearestNeighbors
nn = NearestNeighbors(n_neighbors=1, algorithm='ball_tree', metric='haversine')
nn.fit(X_train)

# Find closest users
distances, indices = nn.kneighbors(X_test)



# Replace each test user with the closest user from the training set
users_test['closest_user'] = [users.iloc[index[0]]['customer_id'] for index in indices]
users_test['location_number_2'] = [users.iloc[index[0]]['location_number'] for index in indices]


In [9]:
users_test.head()

Unnamed: 0,customer_id,location_number,location_type,latitude,longitude,closest_user,location_number_2
0,Z59FTQD,0,Other,2.199678,-0.15893,OV01MGG,0
1,0JP29SK,0,Home,0.004864,-1.372245,S1SJNJX,1
2,0JP29SK,1,Home,0.002173,-1.371927,EKBKVS9,0
3,0JP29SK,2,Other,-0.001988,-1.371435,C5JX00T,0
4,0JP29SK,3,Other,-0.014814,0.002386,IK3W1ZZ,2



Le code suivant réalise une opération appelée "produit cartésien" entre les données des utilisateurs de test et les informations sur les vendeurs. Cela signifie qu'il crée toutes les combinaisons possibles entre chaque utilisateur de test et chaque vendeur. Ensuite, il crée deux nouvelles colonnes dans le DataFrame résultant. La première colonne combine l'identifiant de l'utilisateur le plus proche, le numéro de son emplacement et l'identifiant du vendeur pour représenter une paire utilisateur-vendeur. La deuxième colonne combine l'identifiant du client réel, le numéro de son emplacement et l'identifiant du vendeur. Enfin, seules les colonnes nécessaires sont conservées dans le DataFrame final, qui contient les informations pertinentes pour l'analyse ultérieure.

C'est sur ce DataFrame que l'on va tester l'algorithme ALS à la fin et obtenir les recommendations pour les clients du dataset de test.

En résumé on va donner à chaque client du dataset test, les recommandations pour les clients du dataset train étant les plus proches géographiquement que l'on obtiendra grâce à ALS.

In [10]:
users_test['key'] = 1
vendors['key'] = 1

# Perform the Cartesian product (cross join)
test_df = pd.merge(users_test, vendors, on='key').drop('key', axis=1)

# Create the new column
test_df['CID X LOC_NUM X VENDOR'] = test_df['closest_user'].astype(str) + ' X ' + test_df['location_number_2'].astype(str) + ' X ' + test_df['vendor_id'].astype(str)
test_df['real_CID X LOC_NUM X VENDOR'] = test_df['customer_id'].astype(str) + ' X ' + test_df['location_number'].astype(str) + ' X ' + test_df['vendor_id'].astype(str)

# Keep only the necessary columns
test_df = test_df[['CID X LOC_NUM X VENDOR', 'vendor_id','real_CID X LOC_NUM X VENDOR']]

La fonction haversine dans le code permet de calculer la distance en kilomètres entre deux points géographiques spécifiés par leurs coordonnées de latitude et de longitude. Elle utilise la formule de la distance haversine, qui prend en compte la courbure de la Terre pour obtenir une mesure plus précise. En convertissant les coordonnées de degrés décimaux en radians, la fonction calcule la différence de latitude et de longitude entre les deux points. En utilisant ces différences dans la formule haversine, elle détermine la distance angulaire centrale entre les deux points. En multipliant cette distance par le rayon de la Terre, la fonction renvoie la distance en kilomètres. Cela permet de calculer facilement la distance géographique entre deux points donnés.

In [11]:
from math import radians, cos, sin, asin, sqrt

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r


Le code combine les données des DataFrames orders, users, et vendors en fusionnant les colonnes d'identifiant correspondantes. Cela crée un nouveau DataFrame appelé df qui contient les informations combinées des commandes, des clients, et des vendeurs. Ensuite, le code utilise la fonction haversine pour calculer la distance géographique entre chaque paire client-vendeur en utilisant les coordonnées de longitude et de latitude. Cette distance est ajoutée en tant que nouvelle colonne 'distance' dans le DataFrame df. Ainsi, le DataFrame df contient maintenant les données de commande, les informations des clients, les informations des vendeurs, et la distance géographique pour chaque commande, ce qui sera utile pour les analyses futures.






In [12]:
# Merge the orders and customers dataframes
df = pd.merge(orders, users, on='customer_id')

# Merge the resultant dataframe with the vendors dataframe
df = pd.merge(df, vendors, on='vendor_id')
df['distance'] = df.apply(lambda row: haversine(row['longitude_x'], row['latitude_x'], row['longitude_y'], row['latitude_y']), axis=1)



Le code regroupe les données du DataFrame df par client et vendeur, puis calcule plusieurs métriques pour chaque groupe, telles que le nombre de commandes répétées, la note moyenne du vendeur, la note moyenne du livreur, le temps moyen de préparation et la distance moyenne. Ensuite, les valeurs de ces métriques sont normalisées pour les ramener à une échelle de 0 à 1. Des poids sont ensuite assignés à chaque métrique pour refléter leur importance relative. La note finale est calculée en combinant les métriques pondérées. Le DataFrame est ensuite réorganisé pour aplatir la structure multi-index et fusionné avec le DataFrame initial pour obtenir un DataFrame final qui contient les notes des clients pour chaque vendeur. Cela permet de classer les vendeurs en fonction de leur performance globale, basée sur les métriques spécifiées.



In [13]:

# Group by customer and vendor to calculate the metrics
grouped = df.groupby(['customer_id', 'vendor_id']).agg({
    'akeed_order_id': 'count',  # Reorder count
    'vendor_rating_x': 'mean',  # Average vendor rating
    'driver_rating': 'mean',  # Average driver rating
    'preparationtime': 'mean',  # Average preparation time
    'distance': 'mean',  # Average distance
}).rename(columns={'akeed_order_id': 'reorders'})

# Normalize columns to be between 0 and 1
grouped = (grouped - grouped.min()) / (grouped.max() - grouped.min())

# Define the weights for the features
weights = {
    'reorders': 0.1,
    'vendor_rating_x': 0.8,
    'driver_rating': 0.1,
    'preparationtime': -0.2,  # Negative because shorter is better
    'distance': -0.8,  # Negative because shorter is better
}

# Calculate rating as a weighted sum of features
grouped['rating'] = np.dot(grouped[weights.keys()], list(weights.values()))

# Flatten the multi-index DataFrame
grouped.reset_index(inplace=True)

# Merge back with original DataFrame
df = pd.merge(df, grouped[['customer_id', 'vendor_id', 'rating']], on=['customer_id', 'vendor_id'], how='left')
final_df = df[['CID X LOC_NUM X VENDOR', 'vendor_id', 'rating']]


In [14]:
final_df

Unnamed: 0,CID X LOC_NUM X VENDOR,vendor_id,rating
0,92PEE24 X 0 X 105,105,0.249295
1,92PEE24 X 0 X 105,105,0.249295
2,92PEE24 X 0 X 105,105,0.249295
3,92PEE24 X 0 X 105,105,0.249295
4,92PEE24 X 0 X 105,105,0.249295
...,...,...,...
395594,6PY2OK5 X 0 X 907,907,0.158494
395595,UPNI9BV X 0 X 907,907,0.158710
395596,U6PTUT5 X 0 X 907,907,0.745027
395597,MSEGQHZ X 0 X 907,907,0.158588


### Utilisation de Spark pour le traitement de données massives et la conversion de DataFrames pandas en DataFrames Spark






In [15]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('ALS') \
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '4g') \
    .getOrCreate()


# Convert the pandas DataFrame to a Spark DataFrame
spark_df = spark.createDataFrame(final_df)
spark_test_df = spark.createDataFrame(test_df)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/17 18:20:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Dataframe de training

In [16]:


indexer = [
    StringIndexer(inputCol=column, outputCol=column + "_index")
    for column in list(set(spark_df.columns) - set(["rating"]))
]

pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(spark_df).transform(spark_df)
transformed.show()


23/05/17 18:21:08 WARN TaskSetManager: Stage 0 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:09 WARN TaskSetManager: Stage 3 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.


+----------------------+---------+------------------+---------------+----------------------------+
|CID X LOC_NUM X VENDOR|vendor_id|            rating|vendor_id_index|CID X LOC_NUM X VENDOR_index|
+----------------------+---------+------------------+---------------+----------------------------+
|     92PEE24 X 0 X 105|      105|0.2492953247882893|            2.0|                     14664.0|
|     92PEE24 X 0 X 105|      105|0.2492953247882893|            2.0|                     14664.0|
|     92PEE24 X 0 X 105|      105|0.2492953247882893|            2.0|                     14664.0|
|     92PEE24 X 0 X 105|      105|0.2492953247882893|            2.0|                     14664.0|
|     92PEE24 X 0 X 105|      105|0.2492953247882893|            2.0|                     14664.0|
|     92PEE24 X 0 X 105|      105|0.2492953247882893|            2.0|                     14664.0|
|     92PEE24 X 1 X 105|      105|0.2492953247882893|            2.0|                     48950.0|
|     92PE

23/05/17 18:21:10 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:10 WARN TaskSetManager: Stage 6 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.


#### Dataframe de test

In [17]:
indexer = [
    StringIndexer(inputCol=column, outputCol=column + "_index")
    for column in list(set(spark_test_df.columns) - set(["rating", "real_CID X LOC_NUM X VENDOR"]))
]

pipeline = Pipeline(stages=indexer)
transformed_test = pipeline.fit(spark_test_df).transform(spark_test_df)
transformed_test.show()

23/05/17 18:21:10 WARN TaskSetManager: Stage 7 contains a task of very large size (8974 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:11 WARN TaskSetManager: Stage 10 contains a task of very large size (8974 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:18 WARN DAGScheduler: Broadcasting large task binary with size 49.5 MiB
23/05/17 18:21:18 WARN TaskSetManager: Stage 13 contains a task of very large size (8974 KiB). The maximum recommended task size is 1000 KiB.
[Stage 13:>                                                         (0 + 1) / 1]

+----------------------+---------+---------------------------+---------------+----------------------------+
|CID X LOC_NUM X VENDOR|vendor_id|real_CID X LOC_NUM X VENDOR|vendor_id_index|CID X LOC_NUM X VENDOR_index|
+----------------------+---------+---------------------------+---------------+----------------------------+
|       OV01MGG X 0 X 4|        4|            Z59FTQD X 0 X 4|           59.0|                   1027259.0|
|      OV01MGG X 0 X 13|       13|           Z59FTQD X 0 X 13|            6.0|                   1027206.0|
|      OV01MGG X 0 X 20|       20|           Z59FTQD X 0 X 20|           27.0|                   1027227.0|
|      OV01MGG X 0 X 23|       23|           Z59FTQD X 0 X 23|           34.0|                   1027234.0|
|      OV01MGG X 0 X 28|       28|           Z59FTQD X 0 X 28|           43.0|                   1027243.0|
|      OV01MGG X 0 X 33|       33|           Z59FTQD X 0 X 33|           54.0|                   1027254.0|
|      OV01MGG X 0 X 43|    

23/05/17 18:21:22 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 13 (TID 37): Attempting to kill Python Worker
                                                                                

Dans ce code, nous utilisons la bibliothèque pyspark pour entraîner un modèle ALS (Alternating Least Squares) à des fins de recommandation personnalisée. Tout d'abord, les données sont divisées en ensembles d'entraînement et de test. L'ensemble d'entraînement représente 80% des données, tandis que l'ensemble de test représente 20%. Ensuite,un modèle ALS est créé avec des paramètres spécifiés, puis ajusté sur l'ensemble d'entraînement pour générer un modèle prêt à être utilisé pour les recommandations personnalisées.

 Une fois l'entraînement terminé, nous obtenons un modèle prêt à être utilisé pour générer des recommandations personnalisées basées sur les notes fournies dans le DataFrame df_final.

In [18]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# Split the data into training and test sets
(training, test) = transformed.randomSplit([0.8, 0.2], seed=42)
als = ALS(
    maxIter=15,
    regParam=0.01,
    rank=50,
    userCol="CID X LOC_NUM X VENDOR_index",
    itemCol="vendor_id_index",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
)


model = als.fit(training)


23/05/17 18:21:23 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:23 WARN TaskSetManager: Stage 14 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:23 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:23 WARN TaskSetManager: Stage 15 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:24 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:24 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:24 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:25 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:25 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:25 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/1

Le code utilise le modèle entraîné pour faire des prédictions sur l'ensemble de test. Ensuite, il évalue les performances du modèle en calculant le RMSE (erreur quadratique moyenne) entre les prédictions et les valeurs réelles. Enfin, il affiche le RMSE et les prédictions pour évaluer la précision du modèle.

In [19]:
predictions = model.transform(test)
# Instantiate the evaluator
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating",
    predictionCol="prediction"
)

# Calculate and print the RMSE
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

# You may also want to take a look at the predictions DataFrame
predictions.show()


23/05/17 18:21:33 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:33 WARN TaskSetManager: Stage 88 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:33 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:33 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:34 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:34 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


Root-mean-square error = 0.04936848481971961


23/05/17 18:21:35 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:35 WARN TaskSetManager: Stage 199 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:35 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:35 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:36 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


+----------------------+---------+-------------------+---------------+----------------------------+----------+
|CID X LOC_NUM X VENDOR|vendor_id|             rating|vendor_id_index|CID X LOC_NUM X VENDOR_index|prediction|
+----------------------+---------+-------------------+---------------+----------------------------+----------+
|     17Q7881 X 1 X 221|      221| 0.2002660426264149|           31.0|                       587.0|0.19181734|
|     17Q7881 X 1 X 221|      221| 0.2002660426264149|           31.0|                       587.0|0.19181734|
|     17Q7881 X 1 X 221|      221| 0.2002660426264149|           31.0|                       587.0|0.19181734|
|     17Q7881 X 1 X 221|      221| 0.2002660426264149|           31.0|                       587.0|0.19181734|
|     17Q7881 X 1 X 221|      221| 0.2002660426264149|           31.0|                       587.0|0.19181734|
|     17Q7881 X 1 X 221|      221| 0.2002660426264149|           31.0|                       587.0|0.19181734|
|

                                                                                

Le code utilise le 90e percentile des prédictions et des valeurs réelles pour définir un seuil. Ensuite, il transforme les prédictions et les valeurs réelles en valeurs binaires en les comparant avec ce seuil. Le DataFrame predictions contient désormais des prédictions et des valeurs binaires dans les colonnes correspondantes.

In [20]:
# Set the threshold to be the 90% of predicted ratings
threshold = predictions.approxQuantile("prediction", [0.9], 0)[0]
threshold = predictions.approxQuantile("rating", [0.9], 0)[0]

# If the predicted rating is greater than the threshold, predict 1, else 0
predictions = predictions.withColumn("prediction", (col("prediction") > threshold).cast("double"))
predictions = predictions.withColumn("rating", (col("rating") > threshold).cast("double"))

# Now "prediction" column contains binary values
predictions.show()

23/05/17 18:21:36 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:36 WARN TaskSetManager: Stage 310 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:36 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:36 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:37 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:38 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:38 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:38 WARN TaskSetManager: Stage 421 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:38 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:38 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:39 WARN DAG

+----------------------+---------+------+---------------+----------------------------+----------+
|CID X LOC_NUM X VENDOR|vendor_id|rating|vendor_id_index|CID X LOC_NUM X VENDOR_index|prediction|
+----------------------+---------+------+---------------+----------------------------+----------+
|     17Q7881 X 1 X 221|      221|   0.0|           31.0|                       587.0|       0.0|
|     17Q7881 X 1 X 221|      221|   0.0|           31.0|                       587.0|       0.0|
|     17Q7881 X 1 X 221|      221|   0.0|           31.0|                       587.0|       0.0|
|     17Q7881 X 1 X 221|      221|   0.0|           31.0|                       587.0|       0.0|
|     17Q7881 X 1 X 221|      221|   0.0|           31.0|                       587.0|       0.0|
|     17Q7881 X 1 X 221|      221|   0.0|           31.0|                       587.0|       0.0|
|     17Q7881 X 1 X 221|      221|   0.0|           31.0|                       587.0|       0.0|
|     17Q7881 X 1 X 

23/05/17 18:21:40 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
                                                                                

Le code évalue les performances du modèle de classification binaire en calculant le score F1. Ce score mesure la précision globale du modèle en prenant en compte à la fois la précision et le rappel.

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col

predictions = predictions.withColumn("prediction", col("prediction").cast("double"))

# Create an evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="rating", metricName="areaUnderPR")

# Compute F1 score
f1 = evaluator.evaluate(predictions)

print("F1 score: ", f1)

23/05/17 18:21:41 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:41 WARN TaskSetManager: Stage 643 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:41 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:41 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:41 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:42 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


F1 score:  0.974568365668147


### Maintenant on refait ca en utilisant les données de test (contenues dans transform_test)

#### On commence par l'entrainer sur toutes les données

In [22]:
als = ALS(
    maxIter=10,
    regParam=0.01,
    rank=50,
    userCol="CID X LOC_NUM X VENDOR_index",
    itemCol="vendor_id_index",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
)


model = als.fit(transformed)

23/05/17 18:21:42 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:42 WARN TaskSetManager: Stage 911 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:42 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:42 WARN TaskSetManager: Stage 912 contains a task of very large size (1038 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:43 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:43 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:44 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:44 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:44 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:44 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:45 WARN DAG

In [23]:
model.setColdStartStrategy("nan")
predictions = model.transform(transformed_test)
predictions = predictions.na.fill({'prediction': 0})

# Set the threshold to be the 90% of predicted ratings
threshold = predictions.approxQuantile("prediction", [0.9], 0)[0]
# If the predicted rating is greater than the threshold, predict 1, else 0
predictions = predictions.withColumn("prediction", (col("prediction") > threshold).cast("int"))
predictions.show()

23/05/17 18:21:50 WARN DAGScheduler: Broadcasting large task binary with size 49.5 MiB
23/05/17 18:21:50 WARN TaskSetManager: Stage 965 contains a task of very large size (8974 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:21:50 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:21:50 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:21:57 WARN DAGScheduler: Broadcasting large task binary with size 52.5 MiB
23/05/17 18:22:02 WARN DAGScheduler: Broadcasting large task binary with size 52.5 MiB
23/05/17 18:22:07 WARN DAGScheduler: Broadcasting large task binary with size 52.5 MiB
23/05/17 18:22:10 WARN DAGScheduler: Broadcasting large task binary with size 49.5 MiB
23/05/17 18:22:10 WARN TaskSetManager: Stage 1047 contains a task of very large size (8974 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:22:10 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:22:11 WA

+----------------------+---------+---------------------------+---------------+----------------------------+----------+
|CID X LOC_NUM X VENDOR|vendor_id|real_CID X LOC_NUM X VENDOR|vendor_id_index|CID X LOC_NUM X VENDOR_index|prediction|
+----------------------+---------+---------------------------+---------------+----------------------------+----------+
|     JO0ZUCM X 0 X 681|      681|          ZL9OED7 X 0 X 681|           78.0|                    150878.0|         0|
|     NEC5RKF X 0 X 681|      681|          QRW34UN X 0 X 681|           78.0|                    173678.0|         0|
|      5JY8STQ X 0 X 23|       23|           ANRC0AL X 1 X 23|           34.0|                    421334.0|         0|
|      OV01MGG X 0 X 78|       78|           Z59FTQD X 0 X 78|           81.0|                   1027281.0|         0|
|      OV01MGG X 0 X 20|       20|           Z59FTQD X 0 X 20|           27.0|                   1027227.0|         0|
|     NEC5RKF X 0 X 849|      849|          QRW3

                                                                                

In [24]:
num_ones = predictions.filter(predictions.prediction == 1).count()
num_zeros = predictions.filter(predictions.prediction == 0).count()

# calculate the ratio
if num_zeros != 0:
    ratio = num_ones / num_zeros
else:
    ratio = 'Undefined' # to avoid division by zero

print('The ratio of 1s to 0s is:', ratio)

23/05/17 18:22:22 WARN DAGScheduler: Broadcasting large task binary with size 49.5 MiB
23/05/17 18:22:22 WARN TaskSetManager: Stage 1128 contains a task of very large size (8974 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:22:22 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:22:22 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:22:27 WARN DAGScheduler: Broadcasting large task binary with size 52.5 MiB
23/05/17 18:22:32 WARN DAGScheduler: Broadcasting large task binary with size 49.5 MiB
23/05/17 18:22:32 WARN TaskSetManager: Stage 1209 contains a task of very large size (8974 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:22:32 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:22:32 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:22:40 WARN DAGScheduler: Broadcasting large task binary with size 52.5 MiB
                    

The ratio of 1s to 0s is: 0.11110963435721112


                                                                                

Le code prépare les résultats des prédictions pour la soumission en sélectionnant les colonnes pertinentes, en les renommant et en enregistrant le résultat au format CSV dans un fichier nommé "submission.csv".

In [25]:
submission = predictions.select("real_CID X LOC_NUM X VENDOR", "prediction")
submission = submission.withColumnRenamed("real_CID X LOC_NUM X VENDOR", "CID X LOC_NUM X VENDOR")
submission = submission.withColumnRenamed("prediction", "target")

submission.coalesce(1).write.format("csv").option("header","true").save("submission.csv")


23/05/17 18:22:46 WARN DAGScheduler: Broadcasting large task binary with size 49.5 MiB
23/05/17 18:22:46 WARN TaskSetManager: Stage 1318 contains a task of very large size (8974 KiB). The maximum recommended task size is 1000 KiB.
23/05/17 18:22:46 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/17 18:22:46 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
23/05/17 18:22:52 WARN DAGScheduler: Broadcasting large task binary with size 52.7 MiB
                                                                                