In [3]:
import pandas as pd
import pyarrow.parquet as pq
from spark_hdfs import SparkHDFSConnector, SparkHDFSReader
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit, sum, avg, when, count, date_format, isnan, log, expr, unix_timestamp
from pyspark.sql.types import TimestampType

In [4]:
if __name__ == "__main__":
    hdfs_url = "http://localhost:9870"
    hdfs_user = "anna"
    spark_app_name = "ReadParquetAppSpark"
    spark_master_url = "local[2]"  # Use Spark cluster's master URL if not running locally

    # Create an instance of the SparkHDFSConnector class
    connector = SparkHDFSConnector(hdfs_url, hdfs_user, spark_app_name, spark_master_url)

    # Define the Parquet file path
    parquet_file_path = "hdfs://localhost:9000/user/anna/flight_data/features_added.parquet"

    # Read and show the Parquet file using the connector
    df = connector.read_parquet_from_hdfs(parquet_file_path)

    # Stop the Spark session for the connector
    connector.stop_spark_session()

    # Create an instance of the SparkHDFSReader class
    hdfs_reader = SparkHDFSReader(hdfs_url, hdfs_user)

    # Create a Spark session for other operations
    hdfs_reader.create_hdfs_spark_session(app_name="ParquetPreprocessingApp")

    # Read and print the schema for the Parquet file using the reader
    df = hdfs_reader.read_parquet_file(parquet_file_path)

    #hdfs_reader.show_dataframe(df)
    hdfs_reader.print_schema(df)

    # Load the Parquet data
    parquet_data = hdfs_reader.read_parquet_file(parquet_file_path)

    # Check for missing values in the DataFrame
    missing_values = parquet_data.select([count(when(parquet_data[c].isNull(), c)).alias(c) for c in parquet_data.columns])

    # Define the threshold percentage (e.g., 30%)
    threshold_percentage = 30

    # Calculate the total number of rows in the DataFrame
    total_rows = parquet_data.count()

    # Calculate the percentage of missing values for each column
    missing_percentage = [(col_name, (total_rows - parquet_data.filter(col(col_name).isNotNull()).count()) / total_rows * 100) for col_name in parquet_data.columns]

    # Determine columns with missing values exceeding the threshold
    columns_to_drop = [col_name for col_name, percentage in missing_percentage if percentage > threshold_percentage]

    # Drop the identified columns with excessive missing data
    parquet_data = parquet_data.drop(*columns_to_drop)

    # To drop rows with missing values in specific columns, you can use the drop method
    # For example, to drop rows with missing values in the "DepDelay" and "ArrDelay" columns:
    columns_to_drop = ["DepDelay", "ArrDelay"]
    parquet_data = parquet_data.dropna(subset=columns_to_drop)

    # To impute missing values with a specific value (e.g., 0) in specific columns:
    columns_to_impute = ["DepDelay", "ArrDelay"]
    for column in columns_to_impute:
        parquet_data = parquet_data.fillna(0, subset=column)

    #Select the numeric columns you want to analyze for outliers.
    numeric_columns = ["DepTime", "DepDelay", "TaxiOut", "TaxiIn", "ArrTime", "ArrDelay", "CRSElapsedTime", "ActualElapsedTime", "AirTime", 
                   "Distance", "CarrierDelay", "WeatherDelay" , "NASDelay" , "SecurityDelay", "LateAircraftDelay", "CRSDepTimeMinute", 
                   "CRSDepTimeHour", "WheelsOffMinute" , "WheelsOffHour", "CRSArrTimeMinute", "CRSArrTimeHour", "WheelsOnMinute", 
                   "WheelsOnHour" ]  # Replace with column names
    df = df.select(*numeric_columns)

    def calculate_iqr_bounds(df, columns):
        bounds = {}
        for col_name in columns:
            quantiles = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
            iqr = quantiles[1] - quantiles[0]
            lower_bound = quantiles[0] - 1.5 * iqr
            upper_bound = quantiles[1] + 1.5 * iqr
            bounds[col_name] = (lower_bound, upper_bound)
        return bounds

    # Calculate lower and upper bounds for each column
    bounds = calculate_iqr_bounds(df, numeric_columns)

    # Show the calculated bounds
    for col_name, (lower_bound, upper_bound) in bounds.items():
        print(f"Column: {col_name}, Lower Bound: {lower_bound}, Upper Bound: {upper_bound}")

    # Now, you can decide whether to remove or transform outliers based on the calculated bounds.
    # For example, you can filter the DataFrame to remove outliers:
    filtered_data = parquet_data
    for col_name, (lower_bound, upper_bound) in bounds.items():
        filtered_data = filtered_data.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))


    # Optionally, you can transform outliers instead of removing them
    # For example, apply a log transformation to "DepDelay" column
    # Apply a log transformation to "DepDelay" column
    filtered_data = filtered_data.withColumn("DepDelay", log(col("DepDelay") + 1))

    # Data Type Conversion
    filtered_data = filtered_data.withColumn("FlightDate", unix_timestamp(col("FlightDate")).cast(TimestampType()))

    # Perform Transformation
    # Create a new column 'TotalDelay' by adding DepDelay and ArrDelay
    filtered_data = filtered_data.withColumn('TotalDelay', filtered_data['DepDelay'] + filtered_data['ArrDelay'])

    # Show the filtered DataFrame
    filtered_data.show(5)


    # Close the Spark session for other operations
    hdfs_reader.close_spark_session()

root
 |-- Year: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- FlightDate: timestamp (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- DestCityName: string (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- DistanceGroup: long (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: 

                                                                                

Column: DepTime, Lower Bound: -143.5, Upper Bound: 2732.5
Column: DepDelay, Lower Bound: -19.5, Upper Bound: 16.5
Column: TaxiOut, Lower Bound: -1.0, Upper Bound: 31.0
Column: TaxiIn, Lower Bound: -3.5, Upper Bound: 16.5
Column: ArrTime, Lower Bound: -22.5, Upper Bound: 2981.5
Column: ArrDelay, Lower Bound: -39.5, Upper Bound: 20.5
Column: CRSElapsedTime, Lower Bound: -25.0, Upper Bound: 279.0
Column: ActualElapsedTime, Lower Bound: -37.0, Upper Bound: 275.0
Column: AirTime, Lower Bound: -51.0, Upper Bound: 245.0
Column: Distance, Lower Bound: -515.5, Upper Bound: 1840.5
Column: CarrierDelay, Lower Bound: 0.0, Upper Bound: 0.0
Column: WeatherDelay, Lower Bound: 0.0, Upper Bound: 0.0
Column: NASDelay, Lower Bound: 0.0, Upper Bound: 0.0
Column: SecurityDelay, Lower Bound: 0.0, Upper Bound: 0.0
Column: LateAircraftDelay, Lower Bound: 0.0, Upper Bound: 0.0
Column: CRSDepTimeMinute, Lower Bound: -30.0, Upper Bound: 90.0
Column: CRSDepTimeHour, Lower Bound: -3.0, Upper Bound: 29.0
Column: Wh

                                                                                

+----+-----+----------+-------------------+-------------------------+--------------+--------------+-------+------------------+---------------+-------+------+-------+--------+---------------+--------------+-----------------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+---------+--------+----------------+--------------+---------------+-------------+----------------+--------------+--------------+------------+-----------------+----------------+-----------------+---------------+-------------------+-----------------+-------------------+
|Year|Month|DayofMonth|         FlightDate|Marketing_Airline_Network|OriginCityName|  DestCityName|DepTime|          DepDelay|DepDelayMinutes|TaxiOut|TaxiIn|ArrTime|ArrDelay|ArrDelayMinutes|CRSElapsedTime|ActualElapsedTime|AirTime|Distance|DistanceGroup|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|DayofWeek|Holidays|CRSDepTimeMinute|CRSDepTimeHour|WheelsOffMinute|WheelsOffHour|CRSArrTi