In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Merge CSV") \
    .enableHiveSupport() \
    .getOrCreate()


In [2]:
csv_path = "hdfs:///user/talentum/flight_2018/"

In [3]:
cols_to_drop = [
    "DivReachedDest", "DivActualElapsedTime", "DivArrDelay", "DivDistance", "Div1Airport",
    "Div1AirportID", "Div1AirportSeqID", "Div1WheelsOn", "Div1TotalGTime", "Div1LongestGTime",
    "Div1WheelsOff", "Div1TailNum", "Div2Airport", "Div2AirportID", "Div2AirportSeqID", "Div2WheelsOn",
    "Div2TotalGTime", "Div2LongestGTime", "Div2WheelsOff", "Div2TailNum", "Div3Airport", "Div3AirportID",
    "Div3AirportSeqID", "Div3WheelsOn", "Div3TotalGTime", "Div3LongestGTime", "Div3WheelsOff", "Div3TailNum",
    "Div4Airport", "Div4AirportID", "Div4AirportSeqID", "Div4WheelsOn", "Div4TotalGTime", "Div4LongestGTime",
    "Div4WheelsOff", "Div4TailNum", "Div5Airport", "Div5AirportID", "Div5AirportSeqID", "Div5WheelsOn",
    "Div5TotalGTime", "Div5LongestGTime", "Div5WheelsOff", "Div5TailNum", "Duplicate",
    "Originally_Scheduled_Code_Share_Airline","DOT_ID_Originally_Scheduled_Code_Share_Airline",
    "IATA_Code_Originally_Scheduled_Code_Share_Airline","Flight_Num_Originally_Scheduled_Code_Share_Airline",
    "_c119","Tail_Number","CancellationCode"
]
print(len(cols_to_drop))

52


In [4]:
df = spark.read.option("header", "true").csv(csv_path+"Flights_2018_*").drop(*cols_to_drop)

In [5]:
print(len(df.columns))

68


In [6]:
from pyspark.sql import functions as F

# Count nulls column-wise for each column individually
for col in df.columns:
    null_count = df.filter(F.col(col).isNull()).count()
    print(f"Column: {col}, Null count: {null_count}")

Column: Year, Null count: 0
Column: Quarter, Null count: 0
Column: Month, Null count: 0
Column: DayofMonth, Null count: 0
Column: DayOfWeek, Null count: 0
Column: FlightDate, Null count: 0
Column: Marketing_Airline_Network, Null count: 0
Column: Operated_or_Branded_Code_Share_Partners, Null count: 0
Column: DOT_ID_Marketing_Airline, Null count: 0
Column: IATA_Code_Marketing_Airline, Null count: 0
Column: Flight_Number_Marketing_Airline, Null count: 0
Column: Operating_Airline , Null count: 0
Column: DOT_ID_Operating_Airline, Null count: 0
Column: IATA_Code_Operating_Airline, Null count: 0
Column: Flight_Number_Operating_Airline, Null count: 0
Column: OriginAirportID, Null count: 0
Column: OriginAirportSeqID, Null count: 0
Column: OriginCityMarketID, Null count: 0
Column: Origin, Null count: 0
Column: OriginCityName, Null count: 0
Column: OriginState, Null count: 0
Column: OriginStateFips, Null count: 0
Column: OriginStateName, Null count: 0
Column: OriginWac, Null count: 0
Column: Dest

In [7]:
# 1. Compute Mode using percentile_approx for certain columns
mode_wheels_off = df.agg(F.expr("percentile_approx(WheelsOff, 0.5)")).collect()[0][0]
mode_wheels_on = df.agg(F.expr("percentile_approx(WheelsOn, 0.5)")).collect()[0][0]
mode_air_time = df.agg(F.expr("percentile_approx(AirTime, 0.5)")).collect()[0][0]
mode_actual_elapsed_time = df.agg(F.expr("percentile_approx(ActualElapsedTime, 0.5)")).collect()[0][0]

# 2. Compute Mean for TaxiOut and TaxiIn
mean_taxi_out = df.agg(F.round(F.avg("TaxiOut"), 0)).collect()[0][0]
mean_taxi_in = df.agg(F.round(F.avg("TaxiIn"), 0)).collect()[0][0]

# 3. Fill missing values with mode and mean for selected columns
df_filled = df.fillna({
    "WheelsOff": mode_wheels_off, 
    "WheelsOn": mode_wheels_on, 
    "AirTime": mode_air_time,
    "ActualElapsedTime": mode_actual_elapsed_time,
    "TaxiOut": mean_taxi_out,
    "TaxiIn": mean_taxi_in
})

# 4. List of columns to impute with 0 (and cast them to double if needed)
columns_to_impute = ["LongestAddGTime", "TotalAddGTime", "FirstDepTime", "LateAircraftDelay",
                     "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "ArrDelay", "ArrDelayMinutes",
                     "ArrDel15", "DepDelay", "DepDelayMinutes", "DepDel15","DivAirportLandings"]

# Cast columns to double before filling them with 0
from pyspark.sql.functions import col

for col_name in columns_to_impute:
    df_filled = df_filled.withColumn(col_name, col(col_name).cast("double"))

# 5. Fill missing values in these columns with 0
df_filled = df_filled.fillna(0, subset=columns_to_impute)

# 6. Convert ArrivalDelayGroups and DepartureDelayGroups to double before filling with -1
df_filled = df_filled.withColumn("ArrivalDelayGroups", col("ArrivalDelayGroups").cast("double")) \
                     .withColumn("DepartureDelayGroups", col("DepartureDelayGroups").cast("double"))

# 7. Fill missing values in ArrivalDelayGroups and DepartureDelayGroups with -1
df_filled = df_filled.fillna(-1, subset=["ArrivalDelayGroups", "DepartureDelayGroups"])

In [8]:
df_cancelled= df_filled.filter(df_filled.Cancelled == 1)

In [9]:
# Filter out rows where 'Cancelled' column is 1
df_nc = df_filled.filter(df_filled.Cancelled != 1)

In [10]:
from pyspark.sql.functions import when
# Fill DepTime with CRSDepTime if null
df_nc = df_nc.withColumn("DepTime", when(col("DepTime").isNull(), col("CRSDepTime")).otherwise(col("DepTime")))

# Fill ArrTime with CRSArrTime if null
df_nc = df_nc.withColumn("ArrTime", when(col("ArrTime").isNull(), col("CRSArrTime")).otherwise(col("ArrTime")))


In [11]:
for c in df_nc.columns:
    null_count = df_nc.filter(F.col(c).isNull()).count()
    print(f"Column: {c}, Null count: {null_count}")

Column: Year, Null count: 0
Column: Quarter, Null count: 0
Column: Month, Null count: 0
Column: DayofMonth, Null count: 0
Column: DayOfWeek, Null count: 0
Column: FlightDate, Null count: 0
Column: Marketing_Airline_Network, Null count: 0
Column: Operated_or_Branded_Code_Share_Partners, Null count: 0
Column: DOT_ID_Marketing_Airline, Null count: 0
Column: IATA_Code_Marketing_Airline, Null count: 0
Column: Flight_Number_Marketing_Airline, Null count: 0
Column: Operating_Airline , Null count: 0
Column: DOT_ID_Operating_Airline, Null count: 0
Column: IATA_Code_Operating_Airline, Null count: 0
Column: Flight_Number_Operating_Airline, Null count: 0
Column: OriginAirportID, Null count: 0
Column: OriginAirportSeqID, Null count: 0
Column: OriginCityMarketID, Null count: 0
Column: Origin, Null count: 0
Column: OriginCityName, Null count: 0
Column: OriginState, Null count: 0
Column: OriginStateFips, Null count: 0
Column: OriginStateName, Null count: 0
Column: OriginWac, Null count: 0
Column: Dest

In [12]:
# Fill CRSElapsedTime with ActualElapsedTime if null
df_nc = df_nc.withColumn("CRSElapsedTime", when(col("CRSElapsedTime").isNull(), col("ActualElapsedTime")).otherwise(col("CRSElapsedTime")))

In [13]:
null_count = df_nc.filter(F.col("CRSElapsedTime").isNull()).count()
print(f"Null count: {null_count}")

Null count: 0


In [14]:
# Create the TotalDelay column as the sum of ArrDelay and DepDelay
df_nc = df_nc.withColumn("TotalDelay", col("ArrDelay") + col("DepDelay"))

In [15]:
# Count the number of rows in the DataFrame
row_count = df_nc.count()

# Print the number of rows
print(f"Number of rows in the DataFrame: {row_count}")

Number of rows in the DataFrame: 5601139


In [16]:
# Enable Hive support in Spark session
spark.sql("CREATE DATABASE IF NOT EXISTS flight_db")

# Define the Hive table name
hive_table = "flight_db.flight_data_partitioned_month"

# Rename columns: Trim spaces and replace invalid characters
df_nc = df_nc.select([col(c).alias(c.strip().replace(" ", "_").replace("/", "_")) for c in df_nc.columns])

# Write the DataFrame into Hive with Partitioning (e.g., by Year and Month)
df_nc.write.mode("overwrite").partitionBy("Month").saveAsTable(hive_table)


In [17]:
df_nc.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID_Marketing_Airline: string (nullable = true)
 |-- IATA_Code_Marketing_Airline: string (nullable = true)
 |-- Flight_Number_Marketing_Airline: string (nullable = true)
 |-- Operating_Airline: string (nullable = true)
 |-- DOT_ID_Operating_Airline: string (nullable = true)
 |-- IATA_Code_Operating_Airline: string (nullable = true)
 |-- Flight_Number_Operating_Airline: string (nullable = true)
 |-- OriginAirportID: string (nullable = true)
 |-- OriginAirportSeqID: string (nullable = true)
 |-- OriginCityMarketID: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = 

In [26]:
#output_path = "hdfs:///user/talentum/flight_data_partitioned_loki"

#df_nc.write.mode("overwrite") \
#   .option("header", "true") \
#  .partitionBy("Month") \
# .csv(output_path)


In [18]:
spark.stop()