In [1]:
# Installation des packages necéssaires à l'exécution du projet
!pip install pyspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
# Importation des librairies standards 
import findspark
import re
from pyspark.sql import SparkSession
from pyspark import  HiveContext , SparkContext
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import IntegerType ,StringType, FloatType
from pyspark.sql.functions import col , udf ,unix_timestamp , explode, count, when
from pyspark.sql.functions import count, isnan, when
from pyspark.sql.functions import substring
from pyspark.sql.functions import udf

In [4]:
# Définition de la session spark 
spark = SparkSession.builder.appName('json_example').getOrCreate()

In [5]:
# Lecture de la base de donnnées issue du webscrapping
df_booking = spark.read.option("header","true" )\
               .option("inferSchema","true" ).json(r'base_booking.json')

In [6]:
# Affichage de la base de données
df_booking.show()

+---------------+----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----+-------+--------------------+
|_corrupt_record|   accessibilite|             adresse|annulation_gratuite|    breakfast_inclus|         description|         emplacement|                lien|            long_lat|nb_etoiles|                 nom|note|   prix|    type_hebergement|
+---------------+----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----+-------+--------------------+
|              [|            null|                null|               null|                null|                null|                null|                null|                null|      null|                null|null|   null|                null|
|           

In [7]:
# On supprime la première ligne qui est totalement nulle
df_booking = spark.createDataFrame(df_booking.tail(df_booking.count()-1), df_booking.schema)
df_booking=df_booking.drop("_corrupt_record")

In [8]:
#On affiche de nouveau le dataframe
df_booking.show()

+----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----+-------+--------------------+
|   accessibilite|             adresse|annulation_gratuite|    breakfast_inclus|         description|         emplacement|                lien|            long_lat|nb_etoiles|                 nom|note|   prix|    type_hebergement|
+----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----+-------+--------------------+
|4,1 km du centre|\nVia Arbe 63, Bi...|Annulation gratuite|                null|<div id="property...|Bicocca - Zara, M...|https://www.booki...|<a id="hotel_addr...|   3 sur 5|BB Hotels Apartho...| 8,7|€ 1 980|              Studio|
|  50 m du centre|\nPassaggio degli...|               null|                n

In [9]:
# Dimensions de la base de données
print("La base de données contient : ", df_booking.count(), "lignes et", len(df_booking.columns), "colonnes. ")

La base de données contient :  814 lignes et 13 colonnes. 


In [10]:
# Afficher les colonnes et leurs types dans la base de données 
df_booking.printSchema()

root
 |-- accessibilite: string (nullable = true)
 |-- adresse: string (nullable = true)
 |-- annulation_gratuite: string (nullable = true)
 |-- breakfast_inclus: string (nullable = true)
 |-- description: string (nullable = true)
 |-- emplacement: string (nullable = true)
 |-- lien: string (nullable = true)
 |-- long_lat: string (nullable = true)
 |-- nb_etoiles: string (nullable = true)
 |-- nom: string (nullable = true)
 |-- note: string (nullable = true)
 |-- prix: string (nullable = true)
 |-- type_hebergement: string (nullable = true)



### Affichage des valeurs manquantes

In [11]:
# Création d'une liste qui contient les noms des colonnes de la base 
cols = df_booking.columns

# Création d'un dictionnaire qui stocke pour chaque colonne ses valeurs manquantes
nan_counts = df_booking.agg(*[count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cols]).toPandas().to_dict()

# Affichage
for col in cols:
    print(f"{col}: {nan_counts[col][0]}")

accessibilite: 1
adresse: 1
annulation_gratuite: 397
breakfast_inclus: 757
description: 1
emplacement: 1
lien: 1
long_lat: 1
nb_etoiles: 96
nom: 1
note: 76
prix: 13
type_hebergement: 1


### Traitement des valeurs manquantes

In [12]:
# On va supprimer les observations pour lesquelles le prix est manquant
df_booking = df_booking.na.drop(subset=["prix"])

In [13]:
# Les observations ne contenant pas de petit déjeuner inclus et/ou annulation gratuite ont pour ces colonnes des valeurs manquantes 

# Pour le petit déjeuner inclus, on imputera ces observations de la mention "Non compris "
df_booking = df_booking.fillna("Non compris", ["breakfast_inclus"])

# Pour l'annulation gratuire, on imputera ces observations de la mention "Indisponible "
df_booking = df_booking.fillna("Indisponible", ["annulation_gratuite"])


In [14]:
# Pour les observations ne contenant pas de nombre d'étoile cela signifie qu'ils ne sont pas encore étoilées et on les imputera par 0 car la notation commence à partir de une étoile
df_booking = df_booking.fillna('0', ["nb_etoiles"])

In [15]:
# Certaines observations sont nouvellements placées sur booking.com et pour cette raison ils ne se sont pas encore vus attribuer de note ainsi la colonne "note" est pour ces observations manquantes
# Nous allons donc les imputer par '000' afin de signifier qu'ils ne sont pas notés 

df_booking = df_booking.fillna('0', ["note"])

In [16]:
#Affichage du dataframe après modification
df_booking.show()

+----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----+-------+--------------------+
|   accessibilite|             adresse|annulation_gratuite|    breakfast_inclus|         description|         emplacement|                lien|            long_lat|nb_etoiles|                 nom|note|   prix|    type_hebergement|
+----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----+-------+--------------------+
|4,1 km du centre|\nVia Arbe 63, Bi...|Annulation gratuite|         Non compris|<div id="property...|Bicocca - Zara, M...|https://www.booki...|<a id="hotel_addr...|   3 sur 5|BB Hotels Apartho...| 8,7|€ 1 980|              Studio|
|  50 m du centre|\nPassaggio degli...|       Indisponible|         Non comp

In [17]:
#Traitement texte : on choisit de mettre en minuscule la colonne de l'annulation gratuite et breakfast inclus
lower_annulation=udf(lambda x: x.lower())
df_booking = df_booking.withColumn("annulation_gratuite", lower_annulation("annulation_gratuite"))
df_booking = df_booking.withColumn("breakfast_inclus", lower_annulation("breakfast_inclus"))

### Traitement des variables string à travers les REGEX : nettoyage des données pour améliorer leur compréhension 

In [18]:
# Traitement de la colonne nombre d'étoiles : récupération uniquement du nombre d'étoile de l'observation (type d'hébergement)
traitement_note = udf(lambda x: int(x[0]), IntegerType())
df_booking = df_booking.withColumn("nb_etoiles", traitement_note("nb_etoiles"))

#On remplace la virgule par le point et on transforme la note en float
note_to_int= udf(lambda x: float(x.replace(",", ".")), FloatType())
df_booking = df_booking.withColumn("note", note_to_int("note"))

In [19]:
#Traitement de la colonne prix : on veut garder que les caractères qui sont numériques pour eliminer le symbole de l'euro. Puis on transforme la valeur en integer.
traitement_prix=udf(lambda x: ''.join(re.findall(r"\d+", re.sub("\s", "", x))))
price_toint=udf(lambda x: int(x), IntegerType())
df_booking = df_booking.withColumn("prix", traitement_prix("prix"))
df_booking = df_booking.withColumn("prix", price_toint("prix"))

In [20]:
#Traitement de la colonne logn_lat : nous avons d'abord traité la colonne pour garder que les valeurs numériques qui répresentaient la latitude et la longitude des hotels.
#Ensuite nous avons gardé que les deux premiers argument car c'était ceux qui nous interessaient. Avec cela, nous avons créer deux nouvelles colonnes avec la latitude et la longitude séparement.
traitement_long_lat = udf(lambda x: re.findall(r'\d+\.\d+', x))
lat_split = udf(lambda x: float(x[0]), FloatType())
long_split = udf(lambda x: float(x[1]), FloatType())
df_booking = df_booking.withColumn("long_lat", traitement_long_lat("long_lat"))
df_booking = df_booking.withColumn("lat", lat_split("long_lat"))
df_booking = df_booking.withColumn("long", long_split("long_lat"))
#on supprime la colonne qui nous a servi pour créer les deux nouvelles variables 
df_booking=df_booking.drop("long_lat")

In [21]:
#Traitement des adresses : au début et à la fin de chaque adresse, il y avait \n donc nous l'avons remplacé par un espace vide
traitement_adresse = udf(lambda x: x.replace("\n",""))
df_booking = df_booking.withColumn("adresse", traitement_adresse("adresse"))

In [22]:
#Traitement de la description: on garder le texte qui nous interesse et dans certaines descriptions il y avait une phrase qui proposé une réduction que nous avons enlevé (re.sub)
traitement_desc = udf(lambda x: ''.join(re.findall(r">\n<p>(.*?)</p>\n", re.sub(r"(Vous.*économiser\.)", '', x))))
df_booking = df_booking.withColumn("description", traitement_desc("description"))

In [23]:
#Affichage du dataframe
df_booking.show(truncate=False)

+----------------+------------------------------------------------------------------------------+-------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
#On a crée un dataframe qui nous permet de voir les occurrences de chaque type d'hebergement 
type_hebergement_count = df_booking.groupBy("type_hebergement").count()
type_hebergement_count.show(100, truncate=False)

+----------------------------------------------------------+-----+
|type_hebergement                                          |count|
+----------------------------------------------------------+-----+
|Appartement Standard                                      |6    |
|Studio A Ajraghi                                          |1    |
|Maison 1 Chambre                                          |2    |
|Chambre Double                                            |45   |
|Chambre Double Deluxe avec Balcon                         |2    |
|Appartement                                               |49   |
|Chambre Double Supérieure                                 |9    |
|Appartement Supérieur 1 Chambre                           |1    |
|Appartement 1 Chambre - Via Ceresio 3                     |1    |
| Chambre Double                                           |1    |
|Chambre Double Deluxe                                     |6    |
|Studio                                                    |53

Nous pouvons constater que les type d'hébergement sont similaire mais avec une déscription différente. C'est pour cela que nous allons regrouper les type d'hébergement pour avoir une distribution homogène.

In [25]:
#Traitement de l'emplacement : il y a un emplacement qui contenait que Milan et on choisit le donner plus des détails en changeant en Milan banlieu
traitement_emplacement = udf(lambda x: re.sub("^Milan", "Milan banlieu", x))
df_booking = df_booking.withColumn("emplacement", traitement_emplacement("emplacement"))

#Création d'un dataframe qui compte les occurrences de la variable emplacement 
emplacement_count = df_booking.groupBy("emplacement").count()
emplacement_count.show(100, truncate=False)

+-------------------------+-----+
|emplacement              |count|
+-------------------------+-----+
|San Siro, Milan          |35   |
|Certosa, Milan           |14   |
|Famagosta, Milan         |13   |
|Bovisa, Milan            |7    |
|Porta Vittoria, Milan    |35   |
|Navigli, Milan           |72   |
|Centre de Milan, Milan   |214  |
|Porta Romana, Milan      |21   |
|Gare centrale, Milan     |89   |
|Lorenteggio, Milan       |9    |
|Città Studi, Milan       |53   |
|Ripamonti Corvetto, Milan|22   |
|Gare Garibaldi, Milan    |25   |
|Milan banlieu            |42   |
|Niguarda, Milan          |13   |
|Viale Monza, Milan       |28   |
|Fiera Milano City, Milan |49   |
|Sempione, Milan          |29   |
|Bicocca - Zara, Milan    |31   |
+-------------------------+-----+



In [26]:
#Traitement type d'hebergement : on veut créer des categories qui sont assez homogenes donc on garde que les elements en commun.
traitement_type = udf(lambda x: ''.join(re.findall(r'^\s?(Chambre Double|Appartement|Studio|Loft|Suite|Penthouse|Maison|Maisonnette|Petite Chambre Double)\b', re.sub(r'Chambre(.*?)', 'Chambre Double', x))))
df_booking = df_booking.withColumn("type_hebergement", traitement_type("type_hebergement"))

In [27]:
#Comptage du nombre d'occurrences de la variable breakfast_inclus
annulation_gratuite_count = df_booking.groupBy("annulation_gratuite").count()
annulation_gratuite_count.show(truncate=False)

+-------------------+-----+
|annulation_gratuite|count|
+-------------------+-----+
|indisponible       |386  |
|annulation gratuite|415  |
+-------------------+-----+



In [28]:
#Comptage du nombre d'occurrences de la variable breakfast_inclus
breakfast_inclus_count = df_booking.groupBy("breakfast_inclus").count()
breakfast_inclus_count.show(truncate=False)

+----------------------+-----+
|breakfast_inclus      |count|
+----------------------+-----+
|petit-déjeuner compris|55   |
|non compris           |746  |
+----------------------+-----+



In [29]:
#Affichage du dataframe
df_booking.show()

+----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----+----+--------------------+---------+--------+
|   accessibilite|             adresse|annulation_gratuite|    breakfast_inclus|         description|         emplacement|                lien|nb_etoiles|                 nom|note|prix|    type_hebergement|      lat|    long|
+----------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----+----+--------------------+---------+--------+
|4,1 km du centre|Via Arbe 63, Bico...|annulation gratuite|         non compris|Implanté à quelqu...|Bicocca - Zara, M...|https://www.booki...|         3|BB Hotels Apartho...| 8.7|1980|              Studio|45.500034|9.198904|
|  50 m du centre|Passaggio degli O...|       indisponible|         non compris|Les appartements

In [30]:
#Creation d'une variable qui se base sur l'emplacement pour créer une variable booléenne qui nous permet de savoir si l'hotel est loin ou près du centre. 
df_booking = df_booking.withColumn("pres_du_centre", when(df_booking["emplacement"] == "Centre de Milan, Milan", 1).otherwise(0))

In [31]:
##Comptage du nombre d'occurrences de la nouvelle variable pres_du_centre
compatage_pres_du_centre_count = df_booking.groupBy("pres_du_centre").count()
compatage_pres_du_centre_count.show(100, truncate=False)

+--------------+-----+
|pres_du_centre|count|
+--------------+-----+
|1             |214  |
|0             |587  |
+--------------+-----+



In [32]:
# On réordonne la base de données 
df_booking = df_booking.select("nom","type_hebergement", "emplacement", "accessibilite", "nb_etoiles", "note", "prix", "breakfast_inclus", "annulation_gratuite", "adresse", "lat", "long", "pres_du_centre", "description", "lien")

In [33]:
#Affichage du dataframe final
df_booking.show(truncate=False)

+-------------------------------------------------+---------------------+------------------------+----------------+----------+----+----+----------------------+-------------------+------------------------------------------------------------------------------+---------+--------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------

In [34]:
# Export de la base de données nettoyée
df_booking.write.format("csv").save(r"test_booking.json")