# Data Preprocessing Notebook

## Introduction
This Jupyter notebook is dedicated to the preprocessing of data collected from three different sources: flights, weather, and reviews. The purpose of this notebook is to clean and prepare the data for further analysis or machine learning tasks. Preprocessing includes handling missing values, identifying and dealing with outliers, and merging the datasets into a single cohesive structure.

---

## Objective
The goal of this preprocessing step is to ensure that the datasets are:
- **Clean**: Free from inaccuracies and inconsistencies.
- **Complete**: Missing values are addressed appropriately.
- **Conformant**: Data is standardized to expected formats.
- **Consolidated**: Relevant data from all three sources are combined logically.

---

## Datasets
The datasets being processed are:
1. **Flights**: Contains information about flight schedules, delays, and other related attributes.
2. **Weather**: Includes weather conditions at different airport locations.
3. **Reviews**: Comprises customer reviews and ratings for the flights.

---

## Preprocessing Steps
The preprocessing will be conducted in the following order:
1. **Initial Exploration**: Quick overview of the datasets to understand the structure and content.
2. **Data Cleaning**:
    - Removing duplicates.
    - Fixing structural errors (e.g., mislabeled classes, wrong data types).
3. **Handling Missing Values**:
    - Identifying missing values.
    - Deciding on a strategy to handle missing values (e.g., imputation, removal).
4. **Outlier Detection**:
    - Statistical methods to detect outliers.
    - Deciding on a strategy to handle outliers (e.g., trimming, capping, or correcting).
5. **Data Integration**:
    - Aligning datasets by common attributes.
    - Merging datasets into a unified table.
6. **Data Transformation**:
    - Normalization or scaling.
    - Encoding categorical variables.
7. **Final Inspection**:
    - Ensuring the processed data meets the initial objectives.
    - Storing the preprocessed data in a suitable format.
---

## Tools and Libraries
- `Spark`: For distributed data processing.
- `PySpark`: Python API for Spark.
- `Pandas`: For data manipulation within Spark jobs.
- `Matplotlib`/`Seaborn`: For visualizations (if needed, considering the size of data).
- `MLlib`: Spark’s machine learning library (if preprocessing involves feature selection or dimensionality reduction).



In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, split, expr, to_date, to_timestamp, date_format, lower, concat_ws, regexp_replace, when, regexp_replace, trim, regexp_extract, hour, mean, minute, lpad
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

---
## Flights

In [49]:
def flights_processing():
    """
    Transforms flight data by cleaning and structuring. Removes unnecessary columns, normalizes dates and times, 
    extracts key information from strings, and filters based on flight status. Assumes data is loaded from a CSV 
    with a predefined schema.

    Returns:
        flights_df (DataFrame): A Spark DataFrame with processed flights information.
    """
    # Initialize Spark Session
    spark = SparkSession.builder.appName("FlightsDataProcessing")\
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")\
        .getOrCreate()

    # Define the schema for reading the CSV file
    schema = StructType([
        StructField("aircraft", StringType(), True),
        StructField("temp1", StringType(), True),
        StructField("temp2", StringType(), True),
        StructField("date", StringType(), True),
        StructField("from", StringType(), True),
        StructField("to", StringType(), True),
        StructField("flight", StringType(), True),
        StructField("flight_time", StringType(), True),
        StructField("scheduled_time_departure", StringType(), True),
        StructField("actual_time_departure", StringType(), True),
        StructField("scheduled_time_arrival", StringType(), True),
        StructField("temp3", StringType(), True),
        StructField("status", StringType(), True),
        StructField("temp4", StringType(), True),
    ])

    # Load the data
    flights_df = spark.read.csv('./data/history/flights.csv', schema=schema, header=False)
    
    # Data Preprocessing Steps

    # 1. Remove unnecessary columns
    flights_df = flights_df.drop("temp1", "temp2", "temp3", "temp4")
    
    # 2. Convert date to DateType
    flights_df = flights_df.withColumn("date", to_date("date", "dd MMM yyyy"))

    # 7. Split 'status' into new 'status' and 'actual_time_arrival'
    split_col = split(col("status"), " ")
    flights_df = flights_df.withColumn("actual_time_arrival", expr("substring(status, length(status) - 4, 5)"))
    flights_df = flights_df.withColumn("status", split_col.getItem(0))

    
    # 8. Filter rows to only include statuses 'Departed' or 'Arrived'
    flights_df = flights_df.filter(col("status").rlike("Landed"))

    
    # 3. Convert 'time' to TimestampType assuming it contains AM/PM
    # Concatenate 'date' with 'time' before converting to timestamp for 'expected_time'
    # This ensures the timestamp includes the correct date instead of defaulting to '1970-01-01'
    flights_df = flights_df.withColumn(
        "flight_time", 
        to_timestamp(concat_ws(" ", date_format(col("date"), "yyyy-MM-dd"), col("flight_time")), "yyyy-MM-dd HH:mm")
    ).withColumn(
        "scheduled_time_departure", 
        to_timestamp(concat_ws(" ", date_format(col("date"), "yyyy-MM-dd"), col("scheduled_time_departure")), "yyyy-MM-dd HH:mm")
    ).withColumn(
        "actual_time_departure", 
        to_timestamp(concat_ws(" ", date_format(col("date"), "yyyy-MM-dd"), col("actual_time_departure")), "yyyy-MM-dd HH:mm")
    ).withColumn(
        "scheduled_time_arrival", 
        to_timestamp(concat_ws(" ", date_format(col("date"), "yyyy-MM-dd"), col("scheduled_time_arrival")), "yyyy-MM-dd HH:mm")
    ).withColumn(
        "actual_time_arrival", 
        to_timestamp(concat_ws(" ", date_format(col("date"), "yyyy-MM-dd"), col("actual_time_arrival")), "yyyy-MM-dd HH:mm")
    )
    
    
    # 4. Extract city from 'from' and 'to' and convert it to lowercase
    flights_df = flights_df.withColumn("from_city", lower(split(col("from"), " \\(")[0])) \
                           .withColumn("to_city", lower(split(col("to"), " \\(")[0]))

    # 5. Extract airport code from 'from' and 'to'
    flights_df = flights_df.withColumn("from", lower(split(col("from"), " \\(")[1].substr(0, 3))) \
                           .withColumn("to", lower(split(col("to"), " \\(")[1].substr(0, 3))) \

    # 10. Remove duplicates
    flights_df = flights_df.dropDuplicates()
    
    '''
    # 9. Correct 'actual_time' to match the 'date' column and convert to TimestampType
    flights_df = flights_df.withColumn("actual_time", to_timestamp(concat_ws(" ", col("date"), col("actual_time")), "yyyy-MM-dd hh:mm a")).drop("time", "date")

    # Add a new column 'rounded_hour' that represents the closest hour
    flights_df = flights_df.withColumn("date", to_date("expected_time")) \
        .withColumn("hour", hour("expected_time")) \
        .withColumn("minute", minute("expected_time")) \
        .withColumn("rounded_hour",
                        when(col("minute") >= 30, expr("hour + 1"))
                        .otherwise(col("hour"))
                    ) \
        .drop("hour", "minute")
    
    # Adjust for the case where adding 1 to the hour results in 24
    flights_df = flights_df.withColumn("rounded_hour",
                    when(col("rounded_hour") == 24, 0)
                    .otherwise(col("rounded_hour"))
                    )

    # Convert 'hour_column' to a string with two digits
    rounded_hour = lpad(col("rounded_hour"), 2, '0')
    
    # Concatenate 'date_column' and 'hour_str' to form a datetime string
    datetime_str = concat_ws(" ", col("date"), rounded_hour)

    # Append ":00:00" to represent minutes and seconds, forming a full datetime string
    datetime_str = concat_ws(":", datetime_str, lit("00"), lit("00"))

    # Convert the datetime string to a timestamp
    flights_df = flights_df.withColumn("rounded_hour", to_timestamp(datetime_str, "yyyy-MM-dd HH:mm:ss")).drop('date')

    
    '''
    for i in flights_df.dtypes:
        print(i)
    # Display the processed DataFrame
    flights_df.show(truncate=False)
    
    # Return the processed DataFrame
    return flights_df

In [50]:
# Run the flights processing function
flights_df = flights_processing()

('aircraft', 'string')
('date', 'date')
('from', 'string')
('to', 'string')
('flight', 'string')
('flight_time', 'timestamp')
('scheduled_time_departure', 'timestamp')
('actual_time_departure', 'timestamp')
('scheduled_time_arrival', 'timestamp')
('status', 'string')
('actual_time_arrival', 'timestamp')
('from_city', 'string')
('to_city', 'string')
+--------+----------+----+---+------+-------------------+------------------------+---------------------+----------------------+------+-------------------+---------+--------+
|aircraft|date      |from|to |flight|flight_time        |scheduled_time_departure|actual_time_departure|scheduled_time_arrival|status|actual_time_arrival|from_city|to_city |
+--------+----------+----+---+------+-------------------+------------------------+---------------------+----------------------+------+-------------------+---------+--------+
|c-fyns  |2024-03-27|yyz |yul|AC404 |2024-03-27 00:50:00|2024-03-27 09:00:00     |2024-03-27 09:28:00  |2024-03-27 10:20:00   |

In [15]:
l = flights_df.select('aircraft').distinct().collect()

In [6]:
len(l)

77

---
## Aircrafts info

In [67]:
def aircrafts_info_processing():
    """
    Processes airaircraftport information data, cleaning and converting specific columns to proper data types.
    N/A values are treated as null, and numeric fields are cast to their respective types.
    
    Returns:
        aircraft_info_df (DataFrame): A Spark DataFrame with processed aircraft information.
    """
    # Initialize Spark Session with legacy time parser policy for compatibility
    spark = SparkSession.builder.appName("AircraftsInfoDataProcessing") \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .getOrCreate()

    # Define the schema for the airport information data
    schema = StructType([
        StructField("msn", StringType(), True),
        StructField("type", StringType(), True),
        StructField("aircraft", StringType(), True),
        StructField("airline", StringType(), True),
        StructField("first_flight", StringType(), True),
        StructField("photo", StringType(), True),
    ])
    
    # Load the data from a CSV file, ensuring correct schema application
    aircraft_info_df = spark.read.csv('./data/history/aircrafts_info.csv', schema=schema, header=False)

    aircraft_info_df = aircraft_info_df.drop("photo")

    age_pattern = r"\((\d+) years\)"

    # Add a new column "age" that extracts the age part and converts it to an integer
    aircraft_info_df = aircraft_info_df.withColumn("age", regexp_extract(col("first_flight"), age_pattern, 1).cast("integer")).drop('first_flight')

    for i in aircraft_info_df.dtypes:
        print(i)
    aircraft_info_df.show(truncate=False)

In [68]:
aircrafts_info_processing()

('msn', 'string')
('type', 'string')
('aircraft', 'string')
('airline', 'string')
('age', 'int')
+----+----+--------+------------------------+---+
|msn |type|aircraft|airline                 |age|
+----+----+--------+------------------------+---+
|0010|AT43|LY-RUM  |DOT LT                  |38 |
|0012|AT43|LY-ARI  |DOT LT                  |37 |
|0016|AT43|N212AZ  |Blue Ridge Aero Services|37 |
|0023|AT43|N908FX  |FedEx                   |37 |
|0032|AT43|ZS-LUC  |Solenta Aviation        |37 |
|0033|AT43|F-HBCS  |Chalair Aviation        |31 |
|0036|AT43|OY-JRJ  |DAT                     |37 |
|0045|AT43|N911FX  |FedEx                   |36 |
|0047|AT43|N912FX  |FedEx Feeder            |36 |
|0051|AT43|PNC0245 |Colombia - Police       |36 |
|0058|AT43|XA-TIC  |Aeromar                 |36 |
|0062|AT43|XA-SYH  |Aeromar                 |36 |
|0063|AT43|OY-JRY  |DAT                     |36 |
|0066|AT43|HA-KAM  |Fleet Air International |36 |
|0078|AT43|F-HMTO  |Meteo France            |36 |
|00

---
## Airports info

In [4]:
def airports_info_processing():
    """
    Processes airport information data, cleaning and converting specific columns to proper data types.
    N/A values are treated as null, and numeric fields are cast to their respective types.
    
    Returns:
        info_df (DataFrame): A Spark DataFrame with processed airport information.
    """
    # Initialize Spark Session with legacy time parser policy for compatibility
    spark = SparkSession.builder.appName("AirportsInfoDataProcessing") \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .getOrCreate()

    # Define the schema for the airport information data
    schema = StructType([
        StructField("my_flightradar24_rating", StringType(), True),
        StructField("temp", StringType(), True),  # Placeholder for column due to scraping error
        StructField("arrival_delay_index", StringType(), True),
        StructField("departure_delay_index", StringType(), True),
        StructField("utc", StringType(), True),
        StructField("local", StringType(), True),
        StructField("airport", StringType(), True),
    ])

    # Load the data from a CSV file, ensuring correct schema application
    info_df = spark.read.csv('./data/history/airports_info.csv', schema=schema, header=False)

    # Drop the 'temp' column as it contains null values due to scraping errors
    info_df = info_df.drop("temp")

    # Replace "N/A" string values with null across the DataFrame
    info_df = info_df.na.replace("N/A", None)

    # Clean numeric fields and cast to correct types
    info_df = info_df.withColumn("my_flightradar24_rating", 
                                 regexp_replace(col("my_flightradar24_rating"), "[^0-9]", "").cast(IntegerType())) \
                     .withColumn("arrival_delay_index", col("arrival_delay_index").cast(FloatType())) \
                     .withColumn("departure_delay_index", col("departure_delay_index").cast(FloatType()))
    
    # Extract the utc time part and convert it to a Spark timestamp format
    info_df = info_df.withColumn("utc", to_timestamp(regexp_extract(col("utc"), "(\\d{2}:\\d{2})", 0), "HH:mm"))

    # Convert local time to a Spark timestamp format
    info_df = info_df.withColumn("local", to_timestamp(concat(lit("1970-01-01 "), col("local")), "yyyy-MM-dd hh:mm a"))

    # Calculate time difference utc-local
    info_df = info_df.withColumn("time_diff", col('utc')-col('local')).drop('utc', 'local')

    # Remove duplicates
    info_df = info_df.dropDuplicates()

    # Display the schema to verify data types
    for column_dtype in info_df.dtypes:
        print(column_dtype)

    # Display the processed DataFrame
    info_df.show(truncate=False)

    # Return the processed DataFrame
    return info_df

In [5]:
# Run the airports info processing function
info_df = airports_info_processing()

('my_flightradar24_rating', 'int')
('arrival_delay_index', 'float')
('departure_delay_index', 'float')
('airport', 'string')
('time_diff', 'interval day to second')
+-----------------------+-------------------+---------------------+-------+------------------------------------+
|my_flightradar24_rating|arrival_delay_index|departure_delay_index|airport|time_diff                           |
+-----------------------+-------------------+---------------------+-------+------------------------------------+
|63                     |0.4                |1.0                  |crl    |INTERVAL '-0 01:00:00' DAY TO SECOND|
|72                     |0.4                |0.8                  |tia    |INTERVAL '-0 01:00:00' DAY TO SECOND|
|81                     |0.4                |2.1                  |bru    |INTERVAL '-0 01:00:00' DAY TO SECOND|
|77                     |0.4                |0.0                  |bah    |INTERVAL '-0 03:00:00' DAY TO SECOND|
|85                     |0.4                

---
## Weather

In [6]:
def weather_processing():
    """
    Processes weather data by cleaning and transforming specific columns.
    This includes removing non-numeric characters, handling special cases in visibility,
    and converting date_time strings to timestamp format.

    Returns:
        weather_df (DataFrame): A Spark DataFrame with processed weather information.
    """
    # Initialize Spark Session with a specified app name and configuration
    spark = SparkSession.builder.appName("WeatherDataProcessing") \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .getOrCreate()

    # Read the CSV file. Assuming the file path is './data/complex_weather.csv'
    weather_df = spark.read.format("text").load("./data/history/weather.csv")

    weather_df = weather_df.withColumn("day", expr("regexp_extract(value, 'Day: ([^,]+)', 1)"))
    weather_df = weather_df.withColumn("time", expr("regexp_extract(value, 'Time: ([^,]+)', 1)"))
    weather_df = weather_df.withColumn("wind_direction", expr("regexp_extract(value, 'Wind direction: ([^,]+)', 1)"))
    weather_df = weather_df.withColumn("wind_speed", expr("regexp_extract(value, 'Wind speed: ([^,]+)', 1)"))
    weather_df = weather_df.withColumn("temperature", expr("regexp_extract(value, 'Temperature: ([^,]+)', 1)"))
    weather_df = weather_df.withColumn("dew_point", expr("regexp_extract(value, 'Dew point: ([^,]+)', 1)"))
    weather_df = weather_df.withColumn("pressure", expr("regexp_extract(value, 'Pressure: ([^,]+)', 1)"))
    weather_df = weather_df.withColumn("visibility", expr("regexp_extract(value, 'Visibility: ([^,]+)', 1)"))
    weather_df = weather_df.withColumn("date_time", regexp_extract("value", "(\d{2}:\d{2}:\d{2} \d{4}-\d{2}-\d{2})", 0))
    weather_df = weather_df.withColumn("airport", regexp_extract(col("value"), ",([a-z]{3})$", 1))

    # Drop unnecessary columns
    weather_df = weather_df.drop("day", "time")

    # Clean numeric columns by removing non-numeric characters and casting to IntegerType
    numeric_columns = ["wind_direction", "wind_speed", "temperature", "dew_point", "pressure"]

    
    for column in numeric_columns:
        weather_df = weather_df.withColumn(column, when(col(column) != "", col(column)))
        weather_df = weather_df.withColumn(column, regexp_replace(col(column), "[^0-9]", "").cast(IntegerType()))
    
    # Handle special visibility cases by replacing specific text with a numeric value and cleaning
    weather_df = weather_df.withColumn("visibility",
                                        when(col("visibility").rlike("Sky and visibility OK"), 10000)
                                        .otherwise(regexp_replace(col("visibility"), "[^0-9]", ""))
                                        .cast(IntegerType()))

    # Convert 'date_time' column to timestamp format
    weather_df = weather_df.withColumn("date_time", to_timestamp(col("date_time"), "HH:mm:ss yyyy-MM-dd"))

    # Remove duplicates and value column
    weather_df = weather_df.dropDuplicates().drop('value')

    # Join the airports_info data with the aggregated weather data
    weather_df = weather_df.join(info_df, "airport", "left")

    # Converting weather date_time to local time using difference from joining info_df
    weather_df = weather_df.withColumn("date_time", expr("date_time - time_diff")).drop("time_diff")
    
    # Add a new column 'rounded_hour' that represents the closest hour
    weather_df = weather_df.withColumn("date", to_date("date_time")) \
        .withColumn("hour", hour("date_time")) \
        .withColumn("minute", minute("date_time")) \
        .withColumn("rounded_hour",
                        when(col("minute") >= 30, expr("hour + 1"))
                        .otherwise(col("hour"))
                    ) \
        .drop("hour", "minute")
    
    # Adjust for the case where adding 1 to the hour results in 24
    weather_df = weather_df.withColumn("rounded_hour",
                    when(col("rounded_hour") == 24, 0)
                    .otherwise(col("rounded_hour"))
                    )

    # Convert 'hour_column' to a string with two digits
    rounded_hour = lpad(col("rounded_hour"), 2, '0')
    
    # Concatenate 'date_column' and 'hour_str' to form a datetime string
    datetime_str = concat_ws(" ", col("date"), rounded_hour)

    # Append ":00:00" to represent minutes and seconds, forming a full datetime string
    datetime_str = concat_ws(":", datetime_str, lit("00"), lit("00"))

    # Convert the datetime string to a timestamp
    weather_df = weather_df.withColumn("rounded_hour", to_timestamp(datetime_str, "yyyy-MM-dd HH:mm:ss")).drop('date')
    
    # Aggregating wind direction, wind speed, temperature, dew point, pressure and visibility
    weather_df = weather_df.groupBy("airport", "rounded_hour").agg(
        mean("wind_direction").alias("wind_direction"),
        mean("wind_speed").alias("wind_speed"),
        mean("temperature").alias("temperature"),
        mean("dew_point").alias("dew_point"),
        mean("pressure").alias("pressure"),
        mean("visibility").alias("visibility"),
    )
    
    for i in weather_df.dtypes:
        print(i)
        
    # Display the processed DataFrame
    weather_df.show(100, truncate=False)

    # Return the processed DataFrame
    return weather_df

In [7]:
# Run the weather processing function
weather_df = weather_processing()

('airport', 'string')
('rounded_hour', 'timestamp')
('wind_direction', 'double')
('wind_speed', 'double')
('temperature', 'double')
('dew_point', 'double')
('pressure', 'double')
('visibility', 'double')
+-------+-------------------+--------------+----------+-----------+---------+--------+----------+
|airport|rounded_hour       |wind_direction|wind_speed|temperature|dew_point|pressure|visibility|
+-------+-------------------+--------------+----------+-----------+---------+--------+----------+
|aae    |2024-03-16 06:00:00|195.0         |4.5       |8.0        |6.5      |1022.5  |10000.0   |
|aae    |2024-03-17 02:00:00|240.0         |4.0       |12.0       |11.0     |1025.0  |8000.0    |
|aae    |2024-03-18 07:00:00|195.0         |6.0       |9.0        |8.0      |1018.0  |8000.0    |
|aae    |2024-03-16 19:00:00|25.0          |5.0       |15.0       |12.5     |1024.0  |9999.0    |
|aae    |2024-03-15 14:00:00|30.0          |NULL      |NULL       |NULL     |NULL    |NULL      |
|aae    |202

---
## Reviews

In [8]:
def reviews_processing():
    """
    Cleans review data from a CSV file. This function lowercases comments, removes special characters,
    filters out empty comments, and removes duplicate rows. It initializes a Spark session, reads the data using
    a predefined schema, and applies text preprocessing to the 'comment' field. The cleaned DataFrame is then returned.

    Returns:
        DataFrame: The processed reviews DataFrame.
    """
    # Initialization and data loading
    spark = SparkSession.builder.appName("ReviewsDataProcessing")\
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")\
        .getOrCreate()
    schema = StructType([
        StructField("comment", StringType(), True),
        StructField("airport", StringType(), True),
    ])
    reviews_df = spark.read.csv('./data/history/reviews.csv', schema=schema, header=False)

    # Data cleaning and preprocessing
    reviews_df = reviews_df.withColumn("comment", lower(col("comment")))
    reviews_df = reviews_df.withColumn("comment", regexp_replace(col("comment"), "[^a-zA-Z0-9 ]", ""))
    reviews_df = reviews_df.filter(trim(col("comment")) != "")
    reviews_df = reviews_df.dropDuplicates()

    # Show and return the processed DataFrame
    reviews_df.show(truncate=False)
    return reviews_df

In [9]:
print(flights_df.count())

101


In [10]:
# Join the flights data with the aggregated weather data
flights_df = flights_df.join(weather_df, ["rounded_hour", "airport"], "left")

# Join flights data with airports info
flights_df = flights_df.join(info_df, "airport", "left").drop("time_diff")
print(flights_df.count())
# Display the result to verify the join
flights_df.show(1000, truncate=False)

101
+-------+-------------------+-----------+-----------+--------------------------------+--------+--------+----------+-------------------+---------+-------------------+--------------+----------+-----------+---------+--------+----------+-----------------------+-------------------+---------------------+
|airport|rounded_hour       |flight_code|destination|airline                         |aircraft|status  |type      |expected_time      |dest_city|actual_time        |wind_direction|wind_speed|temperature|dew_point|pressure|visibility|my_flightradar24_rating|arrival_delay_index|departure_delay_index|
+-------+-------------------+-----------+-----------+--------------------------------+--------+--------+----------+-------------------+---------+-------------------+--------------+----------+-----------+---------+--------+----------+-----------------------+-------------------+---------------------+
|aae    |2024-03-18 07:00:00|W43843     |fmm        |Wizz Air                        |A320    |D

## Conclusion
.

---

## Change Log
- **Version 1.0** [Date]: Initial version of the notebook.
