In [0]:
import os.path
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

databricks = True

if databricks == False:
    import findspark
    findspark.init()

if databricks == True:
    root = "/FileStore/tables"
else:
    root = "../Data"

# Create SparkSession from builder

spark1 = SparkSession.builder.master("local[2]") \
    .appName('mise en pratique') \
    .getOrCreate()

sc = spark1.sparkContext

In [0]:
# Création dataframe avec le premier fichier air b n b

path1 = os.path.join(root, "paris_listings100000.csv")

listingDF = spark.read.option("quote", "\"").option("escape", "\"").option("multiline", True).csv(path1, header=True, inferSchema=True)

listingDF.show(n=3,truncate=25,vertical=True)

listingDF[columns].limit(5).show()
listingDF.count()

-RECORD 0-----------------------------------------------------------------
 id                                           | 3109                      
 listing_url                                  | https://www.airbnb.com... 
 scrape_id                                    | 20231212042736            
 last_scraped                                 | 2023-12-12                
 source                                       | city scrape               
 name                                         | Rental unit in Paris ·... 
 description                                  | NULL                      
 neighborhood_overview                        | Good restaurants<br />... 
 picture_url                                  | https://a0.muscache.co... 
 host_id                                      | 3631                      
 host_url                                     | https://www.airbnb.com... 
 host_name                                    | Anne                      
 host_since              

40145

In [0]:
# Création dataframe avec le deuxième fichier air b n b

path2 = os.path.join(root, "paris_reviews100000.csv")

reviewsDF = spark.read.option("quote", "\"").option("escape", "\"").option("multiline", True).csv(path2, header=True, inferSchema=True, quote="\"")

reviewsDF.show(n=10,truncate=False,vertical=True)
reviewsDF.count()

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 listing_id    | 3109                                                                                                                                                                                                                                                                                        

100000

In [0]:
# création des views pour les requètes sql
listingDF.createOrReplaceTempView("listingDF")
reviewsDF.createOrReplaceTempView("reviewsDF")

spark.sql("select * from reviewsDF").show()

+----------+---------+----------+-----------+-------------+--------------------+
|listing_id|       id|      date|reviewer_id|reviewer_name|            comments|
+----------+---------+----------+-----------+-------------+--------------------+
|      3109|207127433|2017-10-28|   51636494|     Patricia|Tout s'est bien d...|
|      3109|208779822|2017-11-03|    4142888|     Patricia|Un petit nid foui...|
|      3109|295840159|2018-07-24|    7415343|      Laurent|Appartement spaci...|
|      3109|553502638|2019-10-24|   21159216|    Anastasia|Appartement total...|
|      5396|     4824|2009-06-30|      19995|        Sarah|Perfect location!...|
|      5396|     4968|2009-07-03|      20117|        Chris|This is a nice pl...|
|      5396|     5240|2009-07-08|      22190|    Annelaure|Nice studio, very...|
|      5396|     9619|2009-09-10|      11947|         Jean|Superb location, ...|
|      5396|    18970|2009-12-02|      40625|        Bette|Perfect place to ...|
|      5396|    25574|2010-0

In [0]:
# première requète pour tester
spark.sql("select host_location, min(price), max(price) from listingDF group by host_location").show()

spark.sql("select distinct host_location from listingDF where host_location == 'France' or host_location is null").show()

+--------------------+----------+----------+
|       host_location|min(price)|max(price)|
+--------------------+----------+----------+
|                NULL| $1,000.00|   $999.00|
|Aberdeen, United ...|      NULL|      NULL|
|Abidjan, Côte d’I...|   $110.00|   $293.00|
|    Abondant, France|    $81.00|    $81.00|
|Abu Dhabi, United...|      NULL|      NULL|
|    Accettura, Italy|   $200.00|   $200.00|
|     Achères, France|    $45.00|    $45.00|
|      Acigné, France|   $153.00|   $153.00|
| Adelaide, Australia|   $180.00|   $180.00|
|     Agadir, Morocco|    $80.00|    $80.00|
|        Agde, France|    $79.00|    $79.00|
|        Agen, France|    $95.00|    $95.00|
|     Agneaux, France|      NULL|      NULL|
|    Agrigento, Italy|   $120.00|   $120.00|
|   Aigremont, France|   $180.00|    $55.00|
|Aigueblanche, France|    $93.00|    $93.00|
|Aix-en-Provence, ...|   $100.00|    $99.00|
|Aix-les-Bains, Fr...|   $121.00|   $121.00|
|     Ajaccio, France|   $105.00|    $75.00|
|     Albu

In [0]:
#séparation de la colonne host_location en une colonne town et une colonne country
splitedListingDF = listingDF.select("*",
    split(col("host_location"), ",").getItem(0).alias("town"),
    when(split(col("host_location"), ",").getItem(1).isNull(), 
         split(col("host_location"), ",").getItem(0)).otherwise(split(col("host_location"), ",").getItem(1)).alias("country")
)
splitedListingDF.show(n=10,truncate=False,vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
#formater les colonnes de pourcentage en enlevant le % et en castant en entier
rateListingDF = splitedListingDF.withColumn("host_response_rate_cleaned", regexp_replace(splitedListingDF["host_response_rate"], "%", "").cast("integer")) \
    .withColumn("host_acceptance_rate_cleaned", regexp_replace(splitedListingDF["host_acceptance_rate"], "%", "").cast("integer"))

rateListingDF.show(n=10,truncate=False,vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
#formater la colonne "price" en enlevant le $ puis en castant en décimal avec 2 chiffres après la virgule
priceListingDF = rateListingDF.withColumn("price_cleaned", regexp_replace(rateListingDF["price"], "\\$", ""))
priceListingDF = priceListingDF.withColumn("virg_cleaned", regexp_replace(priceListingDF["price_cleaned"], "\,", ""))

priceListingDF = priceListingDF.withColumn("price_decimal", priceListingDF["virg_cleaned"].cast("decimal(10,2)"))

priceListingDF.show(n=10,truncate=False,vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# création de la view du fichier avec les colonnes splitées et le prix nettoyé pour les requètes sql
priceListingDF.createOrReplaceTempView("priceListingDF")

spark.sql("select town, country from priceListingDF where town = null").show()

+----+-------+
|town|country|
+----+-------+
+----+-------+



In [0]:
# requète précédente avec un group by country et renommage des colonnes en retirant les valeurs nulles dans country et price_decimal
spark.sql("select country, min(price_decimal) as min_price, round(avg(price_decimal),2) as avg_price, max(price_decimal) as max_price from priceListingDF where country is not null and price_decimal is not null group by country").show()

# prix globaux en retirant les valeurs nulles dans country et price_decimal
spark.sql("select min(price_decimal) as min_price, round(avg(price_decimal),2) as avg_price, max(price_decimal) as max_price from priceListingDF where country is not null and price_decimal is not null").show()

+-------------------+---------+---------+---------+
|            country|min_price|avg_price|max_price|
+-------------------+---------+---------+---------+
|            Uruguay|   160.00|   160.00|   160.00|
|         Luxembourg|    50.00|   143.00|   439.00|
|            Tunisia|    38.00|    96.00|   150.00|
| Dominican Republic|    38.00|    38.00|    38.00|
|           Colombia|    49.00|    74.00|    99.00|
|            Ireland|    95.00|   135.60|   180.00|
|                 WA|    49.00|   129.25|   190.00|
|          Singapore|    39.00|   171.83|   280.00|
|                 AL|    58.00|    58.00|    58.00|
|                 NM|   100.00|   153.00|   206.00|
|             Panama|    59.00|   129.50|   200.00|
|             Taiwan|    25.00|    25.00|    25.00|
|            Germany|   914.00|   914.00|   914.00|
|                 MI|   220.00|   360.50|   501.00|
|            Lebanon|   100.00|   237.20|   485.00|
|             France|    20.00|   160.83|  1000.00|
|          A

In [0]:
#affichage des prix min, moyens et max par pièce, trié par pièce en changeant le nom des colonnes en returant les nulls
spark.sql("select accommodates, min(price_decimal) as min_price, round(avg(price_decimal),2) as average_price, max(price_decimal) as max_price from priceListingDF where accommodates is not null group by accommodates order by accommodates").show()

+------------+---------+-------------+---------+
|accommodates|min_price|average_price|max_price|
+------------+---------+-------------+---------+
|           1|     8.00|        85.67|  3000.00|
|           2|     8.00|       130.28| 30000.00|
|           3|    15.00|       191.49| 55714.00|
|           4|    10.00|       217.80| 10000.00|
|           5|    25.00|       276.36|  5500.00|
|           6|    10.00|       379.93| 11600.00|
|           7|    63.00|       478.52|  3358.00|
|           8|    52.00|       546.00| 12000.00|
|           9|   160.00|       508.14|  1340.00|
|          10|    50.00|       781.04|  9000.00|
|          11|   292.00|       837.00|  1615.00|
|          12|   258.00|       707.02|  2014.00|
|          13|   400.00|      2769.75|  7080.00|
|          14|    99.00|       874.33|  3000.00|
|          15|   530.00|       673.00|   829.00|
|          16|    76.00|       918.12|  6977.00|
+------------+---------+-------------+---------+



In [0]:
#affichage des prix min, moyens et max par pays et nb de commodités, trié par pays et nb de commodités en changeant le nom des colonnes et en excluant les valeurs nulles pour les pays et les prix
spark.sql("select country, accommodates, min(price_decimal) as min_price, round(avg(price_decimal),2) as average_price, max(price_decimal) as max_price from priceListingDF where country is not null and price is not null group by accommodates, country order by country, accommodates").show()

+----------+------------+---------+-------------+---------+
|   country|accommodates|min_price|average_price|max_price|
+----------+------------+---------+-------------+---------+
|        AL|           2|    58.00|        58.00|    58.00|
|        AR|           2|    44.00|        89.33|   139.00|
| Argentina|           2|    45.00|        45.00|    45.00|
| Argentina|           3|   150.00|       150.00|   150.00|
| Argentina|           4|    70.00|       153.75|   340.00|
| Australia|           2|    55.00|       106.35|   230.00|
| Australia|           3|    98.00|       164.00|   230.00|
| Australia|           4|   105.00|       268.75|   480.00|
| Australia|           5|   180.00|       219.67|   283.00|
| Australia|           6|    90.00|        90.00|    90.00|
|   Austria|           2|    50.00|        81.00|   109.00|
|   Austria|           4|   120.00|       220.00|   320.00|
|   Belgium|           1|    45.00|        67.50|    90.00|
|   Belgium|           2|     9.00|     

In [0]:
#nombre de locations disponibles par ville en excluant les null en incluant les superhosts et en ne conservant que les taux d'acceptation supérieurs à 90%
spark.sql("select town, count(*) as nb_loc, SUM(case when host_is_superhost = true then 1 else 0 end) as nb_superhost from priceListingDF where town is not null and host_acceptance_rate_cleaned > 90 group by town order by town").show()

+-------------------+------+------------+
|               town|nb_loc|nb_superhost|
+-------------------+------+------------+
|            Abidjan|     1|           1|
|           Abondant|     2|           2|
|             Agadir|     1|           0|
|    Aix-en-Provence|     8|           1|
|            Ajaccio|     2|           1|
|        Alfortville|     2|           0|
|         Almenêches|     1|           0|
|            Ambazac|     2|           0|
|             Amiens|     4|           3|
|          Andalusia|     1|           1|
|             Angers|     1|           1|
|           Angresse|     1|           0|
|             Annecy|     1|           0|
|    Annecy-le-Vieux|     1|           0|
|            Antibes|     5|           1|
|             Antony|     3|           1|
|            Arcueil|     1|           0|
|          Argentina|     1|           0|
| Argentine Township|     2|           2|
|Armentières-en-Brie|     1|           0|
+-------------------+------+------

In [0]:
#liste des hosts triés par score global descendant
spark.sql("select host_name, review_scores_value as global_score from priceListingDF order by review_scores_value desc").show() 

+---------------+------------+
|      host_name|global_score|
+---------------+------------+
|          Jordi|         5.0|
|         Ariane|         5.0|
|      Domitille|         5.0|
|          Anaïs|         5.0|
|        Camille|         5.0|
|       Philippe|         5.0|
|         Gorkem|         5.0|
|         Pascal|         5.0|
|        Pauline|         5.0|
|Pierre-François|         5.0|
|       Juliette|         5.0|
|         Pierre|         5.0|
|        Olivier|         5.0|
|       Federico|         5.0|
|          Cyril|         5.0|
|          Heidi|         5.0|
|          Helmi|         5.0|
|         Sandie|         5.0|
|         Sophie|         5.0|
|           Anne|         5.0|
+---------------+------------+
only showing top 20 rows



In [0]:
#liste des hosts dont le score global est 5 groupés et triés par pays puis nom de l'host
spark.sql("select country, host_name from priceListingDF where review_scores_value = 5 and country is not null group by country, host_name order by country, host_name").show() 

+----------+--------------+
|   country|     host_name|
+----------+--------------+
|        AR|        Yingzi|
| Argentina|         Julie|
| Argentina|          Maxi|
| Australia|        Audrey|
| Australia|       Francis|
| Australia|         Sarah|
|   Belgium|         Emile|
|   Belgium|       Florent|
|   Belgium|       Hadrien|
|   Belgium|       Justine|
|   Belgium|        Maxime|
|   Belgium|        Sophie|
|    Brazil|     Ana Luiza|
|    Brazil|         Anita|
|    Brazil|       Charles|
|    Brazil|      Delphine|
|    Brazil|Félix Et Diana|
|    Brazil|       Gabriel|
|    Brazil|       Juliene|
|    Brazil|       Sabrina|
+----------+--------------+
only showing top 20 rows



In [0]:
#jointure sur les 2 DF
globalDF = priceListingDF.join(
    reviewsDF,
    priceListingDF.id ==  reviewsDF.listing_id,
    "inner"
)
type(globalDF)
globalDF.count()


100000

In [0]:
#nombre de commentaire des locataires par logement tiré par l'id du logement puis par le nombre décroissant de commentaires
globalDF.groupBy("listing_id").count().orderBy("listing_id").show()
globalDF.groupBy("listing_id").count().orderBy(desc("count")).show()

+----------+-----+
|listing_id|count|
+----------+-----+
|      3109|    4|
|      5396|  374|
|      7964|    5|
|     11265|   30|
|     11798|  121|
|     12268|    1|
|     14264|   10|
|     17287|   33|
|     17994|  202|
|     23441|   86|
|     26827|   80|
|     27288|   72|
|     33814|   59|
|     35065|  824|
|     36586|   35|
|     38303|   44|
|     38650|   19|
|     39948|  116|
|     40143|   11|
|     40899|  100|
+----------+-----+
only showing top 20 rows

+----------+-----+
|listing_id|count|
+----------+-----+
|   2488829|  871|
|     35065|  824|
|    753143|  671|
|    193632|  656|
|   2247629|  640|
|   2343894|  606|
|    192162|  596|
|   2173772|  590|
|    371299|  561|
|   1587440|  553|
|    314288|  543|
|    252525|  542|
|   2302860|  519|
|   2444138|  518|
|    878992|  517|
|   2005815|  517|
|    857573|  501|
|   2642991|  484|
|    534650|  484|
|   1458447|  477|
+----------+-----+
only showing top 20 rows



In [0]:
#sauvegarde des dataframes
priceListingDF.write.mode('overwrite').option("header",True).csv(os.path.join("/FileStore/tables/", "listingRbNb.csv"))
reviewsDF.write.mode('overwrite').option("header",True).csv(os.path.join("/FileStore/tables/", "reviewsRbNb.csv"))
globalDF.write.mode('overwrite').option("header",True).csv(os.path.join("/FileStore/tables/", "globalRbNb.csv"))