## Start 

In [1]:
import os
print(os.getcwd())


/app/projects


In [2]:
from pyspark.sql import SparkSession

#Initialize the session 
spark=SparkSession.builder.appName("Flight Price Prediction").getOrCreate()

#Read the data
data="./data/flights_data_.csv" #Chemin vers le fichier
df=spark.read.csv(data,header=True,inferSchema=True)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/01 15:00:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#Show the data
df.show()
#Number of rows
df.count()

+--------------------+-----------+-----+--------+-----------+----------+
|            Airlines|Total_Stops|Price|Duration|Destination|      Date|
+--------------------+-----------+-----+--------+-----------+----------+
|            Tunisair|          0| 81 €|2h 15min|        MAD|2025-06-29|
|            Tunisair|          0| 81 €|2h 15min|        MAD|2025-06-29|
|          Air Europa|          0| 83 €|2h 15min|        MAD|2025-06-29|
|     Vueling, Iberia|          1|102 €|5h 20min|        MAD|2025-06-29|
|    Tunisair, Iberia|          1|105 €|5h 55min|        MAD|2025-06-29|
|     Aegean Airlines|          1|117 €|9h 35min|        MAD|2025-06-29|
|    Tunisair, Iberia|          1|137 €|4h 45min|        MAD|2025-06-29|
|         ITA Airways|          1|177 €|4h 49min|        MAD|2025-06-29|
|   Tunisair, Ryanair|          1|110 €|6h 30min|        MAD|2025-06-29|
|    Transavia France|          1|137 €|7h 05min|        MAD|2025-06-29|
|   Nouvelair, Iberia|          1|119 €|6h 15min|  

7394

## Clean the data 

In [4]:
df.printSchema()
# #Data format 
#  |-- Airlines: string (nullable = true)
#  |-- Total_Stops: integer (nullable = true)
#  |-- Price: string (nullable = true)
#  |-- Duration: string (nullable = true)
#  |-- Destination: string (nullable = true)
#  |-- Date: date (nullable = true)
#It is neccesary to change few format

root
 |-- Airlines: string (nullable = true)
 |-- Total_Stops: integer (nullable = true)
 |-- Price: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Date: date (nullable = true)



In [5]:
from pyspark.sql.functions import (
    col,  
    to_timestamp, 
    when, 
    split,
   regexp_replace,lit,
date_format
)


# 1. Convert Date to timestamp
df = df.withColumn("Date",date_format( to_timestamp(col("Date"), "yyyy-MM-dd"),"yyyy-MM-dd"))

# 2.Convert price to integer
df = df.withColumn("Price", regexp_replace(col("Price"), "\xa0", " "))

df = df.withColumn("Price", split(col("Price"), " ")[0].cast("integer"))

# Afficher les résultats



df.printSchema()
df.show()

root
 |-- Airlines: string (nullable = true)
 |-- Total_Stops: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Date: string (nullable = true)

+--------------------+-----------+-----+--------+-----------+----------+
|            Airlines|Total_Stops|Price|Duration|Destination|      Date|
+--------------------+-----------+-----+--------+-----------+----------+
|            Tunisair|          0|   81|2h 15min|        MAD|2025-06-29|
|            Tunisair|          0|   81|2h 15min|        MAD|2025-06-29|
|          Air Europa|          0|   83|2h 15min|        MAD|2025-06-29|
|     Vueling, Iberia|          1|  102|5h 20min|        MAD|2025-06-29|
|    Tunisair, Iberia|          1|  105|5h 55min|        MAD|2025-06-29|
|     Aegean Airlines|          1|  117|9h 35min|        MAD|2025-06-29|
|    Tunisair, Iberia|          1|  137|4h 45min|        MAD|2025-06-29|
|         ITA Airways| 

In [6]:
df = df.withColumn("Airlines", regexp_replace(col("Airlines"), "\xa0", " "))
df=df.withColumn("Airlines_list",split(col("Airlines"), ", "))
df.printSchema()
df.show()

root
 |-- Airlines: string (nullable = true)
 |-- Total_Stops: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Airlines_list: array (nullable = true)
 |    |-- element: string (containsNull = false)

+--------------------+-----------+-----+--------+-----------+----------+--------------------+
|            Airlines|Total_Stops|Price|Duration|Destination|      Date|       Airlines_list|
+--------------------+-----------+-----+--------+-----------+----------+--------------------+
|            Tunisair|          0|   81|2h 15min|        MAD|2025-06-29|          [Tunisair]|
|            Tunisair|          0|   81|2h 15min|        MAD|2025-06-29|          [Tunisair]|
|          Air Europa|          0|   83|2h 15min|        MAD|2025-06-29|        [Air Europa]|
|     Vueling, Iberia|          1|  102|5h 20min|        MAD|2025-06-29|   [Vueling, Iberia]|
| 

In [7]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType

# Convert duration from 'Xh Ym' format to hours
def duration_to_hours(duration):
    """
    Convert flight duration to hours.
    Args:
        duration (str): Duration in format 'Xh Ym' or 'Xh' or 'Ym'.
    Returns:
        float: Total duration in hours.
    """
    if not duration:
        return None
    
    total_hours = 0.0
    
    # Process hours and minutes
    if "h" in duration:
        hours = float(duration.split("h")[0].strip())
        total_hours += hours  # Add hours directly
        if "m" in duration:
            minutes = int(duration.split("h")[1].split("m")[0].strip())
            total_hours += minutes / 60.0  # Convert minutes to hours and add
    elif "m" in duration:
        minutes = int(duration.replace("m", "").strip())
        total_hours += minutes / 60.0  # Convert minutes to hours
    
    return total_hours

# Create a UDF to apply the function
duration_to_hours_udf = udf(duration_to_hours, FloatType())

# Apply the transformation on the DataFrame to convert duration to hours
df = df.withColumn("Duration", duration_to_hours_udf(col("Duration")))

# Show the schema and the DataFrame data
df.printSchema()
df.show(truncate=False)


root
 |-- Airlines: string (nullable = true)
 |-- Total_Stops: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Duration: float (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Airlines_list: array (nullable = true)
 |    |-- element: string (containsNull = false)



[Stage 8:>                                                          (0 + 1) / 1]

+--------------------------+-----------+-----+---------+-----------+----------+----------------------------+
|Airlines                  |Total_Stops|Price|Duration |Destination|Date      |Airlines_list               |
+--------------------------+-----------+-----+---------+-----------+----------+----------------------------+
|Tunisair                  |0          |81   |2.25     |MAD        |2025-06-29|[Tunisair]                  |
|Tunisair                  |0          |81   |2.25     |MAD        |2025-06-29|[Tunisair]                  |
|Air Europa                |0          |83   |2.25     |MAD        |2025-06-29|[Air Europa]                |
|Vueling, Iberia           |1          |102  |5.3333335|MAD        |2025-06-29|[Vueling, Iberia]           |
|Tunisair, Iberia          |1          |105  |5.9166665|MAD        |2025-06-29|[Tunisair, Iberia]          |
|Aegean Airlines           |1          |117  |9.583333 |MAD        |2025-06-29|[Aegean Airlines]           |
|Tunisair, Iberia  

                                                                                

In [8]:
from pyspark.sql.types import StringType

def pays_converter(destination):
    destinations={"TUN":"Tunisie","CRL":"Belgique","ANR":"Belgique","OST":"Tchèque","LUX":"Luxembourg","NDJ":"Tchad","ABJ":"Cote d'Ivoire","PAR":"France","ALG":"Algerie","COO":"Benin","MAD":"Espagne"}
    return destinations.get(destination)
def destination_converter(destination):
    destinations={"TUN":"Tunis","CRL":"Brussels","ANR":"Antwerp","OST":"Ostrava","LUX":"Luxembourg","NDJ":"N'Djamena","ABJ":"Abidjan","PAR":"Paris","ALG":"Alger","COO":"Cotonou","MAD":"Madrid"}
    return destinations.get(destination)

pays_converter_udf = udf(pays_converter, StringType())
destination_converter_udf = udf(destination_converter, StringType())


df = df.withColumn("Pays_Destination", pays_converter_udf(col("Destination")))
df = df.withColumn("Destination", destination_converter_udf(col("Destination")))
df = df.withColumn("Depart",lit("Tunis"))
df.printSchema()
df.show(truncate=False)
    

root
 |-- Airlines: string (nullable = true)
 |-- Total_Stops: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Duration: float (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Airlines_list: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Pays_Destination: string (nullable = true)
 |-- Depart: string (nullable = false)

+--------------------------+-----------+-----+---------+-----------+----------+----------------------------+----------------+------+
|Airlines                  |Total_Stops|Price|Duration |Destination|Date      |Airlines_list               |Pays_Destination|Depart|
+--------------------------+-----------+-----+---------+-----------+----------+----------------------------+----------------+------+
|Tunisair                  |0          |81   |2.25     |Madrid     |2025-06-29|[Tunisair]                  |Espagne         |Tunis |
|Tunisair                  |0          |81

In [9]:

# Remove rows with null values in critical columns
df = df.filter(
    (col("Duration").isNotNull()) &
    (col("Price").isNotNull()) &
    (col("Airlines").isNotNull()) &
    (col("Date").isNotNull())
)


df.printSchema()


root
 |-- Airlines: string (nullable = true)
 |-- Total_Stops: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Duration: float (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Airlines_list: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Pays_Destination: string (nullable = true)
 |-- Depart: string (nullable = false)



In [10]:
df.show()

+--------------------+-----------+-----+---------+-----------+----------+--------------------+----------------+------+
|            Airlines|Total_Stops|Price| Duration|Destination|      Date|       Airlines_list|Pays_Destination|Depart|
+--------------------+-----------+-----+---------+-----------+----------+--------------------+----------------+------+
|            Tunisair|          0|   81|     2.25|     Madrid|2025-06-29|          [Tunisair]|         Espagne| Tunis|
|            Tunisair|          0|   81|     2.25|     Madrid|2025-06-29|          [Tunisair]|         Espagne| Tunis|
|          Air Europa|          0|   83|     2.25|     Madrid|2025-06-29|        [Air Europa]|         Espagne| Tunis|
|     Vueling, Iberia|          1|  102|5.3333335|     Madrid|2025-06-29|   [Vueling, Iberia]|         Espagne| Tunis|
|    Tunisair, Iberia|          1|  105|5.9166665|     Madrid|2025-06-29|  [Tunisair, Iberia]|         Espagne| Tunis|
|     Aegean Airlines|          1|  117| 9.58333

In [11]:
#Delete duplicates
df = df.distinct()
df.count()

                                                                                

5707