# Importing Required Libraries

In [None]:
from datetime import datetime
import re
from sklearn.preprocessing import LabelEncoder
from pyspark.sql.functions import mean,col,size,split,when,concat_ws,collect_list, array_distinct,udf,to_timestamp,round, hour, minute,expr
from pyspark.sql.types import StringType,IntegerType,TimestampType
from pyspark.ml.feature import StringIndexer
from datetime import datetime, timedelta

# Importing CSV File From Google Bucket into Cluster

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PredicitiveFlightDataAnalysis").getOrCreate()
df=spark.read.csv("gs://flight-data-bucket-551/itineraries_random_2M.csv",header=True, inferSchema=True)

                                                                                

# Data Cleaning

In [37]:
# Filling The NA Values with Mean
def fillna_mean(df, include=set()): 
    means = df.agg(*(
        mean(x).alias(x) for x in df.columns if x in include
    ))
    return df.fillna(means.first().asDict())
df = fillna_mean(df, ['totalTravelDistance'])

                                                                                

In [38]:
# Filtering NULL value Columns
df = df.filter(col('travelDuration').isNotNull())

In [39]:
columns_to_drop = ['segmentsEquipmentDescription', 'segmentsAirlineCode', 'segmentsCabinCode',
                   'segmentsDistance', 'isRefundable', 'legId', 'fareBasisCode',
                   'segmentsDepartureTimeEpochSeconds', 'segmentsDepartureTimeRaw',
                   'segmentsArrivalAirportCode', 'segmentsDepartureAirportCode',
                   'segmentsDurationInSeconds','segmentsArrivalTimeEpochSeconds','segmentsArrivalTimeRaw']

# Select columns that are not in the list of columns to drop
df = df.select([col(column) for column in df.columns if column not in columns_to_drop])


In [40]:
df.printSchema()


root
 |-- searchDate: timestamp (nullable = true)
 |-- flightDate: timestamp (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsAirlineName: string (nullable = true)



In [41]:
# Creating a new Column Number of Stops with Segment Airline Name as Input
column_to_split = 'segmentsAirlineName'

delimiter = '\|\|'

df = df.withColumn('numberOfStops', size(split(col(column_to_split), delimiter)))

In [42]:
def process_airline_names(airline_names):
    if len(set(airline_names)) == 1:
        return airline_names[0]
    else:
        return "||".join(airline_names)

# Register the UDF with PySpark
process_airline_names_udf = udf(process_airline_names, StringType())

# Apply the UDF to the 'segmentsAirlineName' column
df = df.withColumn("segmentsAirlineName", process_airline_names_udf(split(col("segmentsAirlineName"), '\|\|')))

In [43]:
# Converting the Time from Format "P4H5M" to TimeStrap
def convert_duration(duration):
    hours_match = re.search(r'(\d+)H', duration)
    minutes_match = re.search(r'(\d+)M', duration)
    
    if hours_match and minutes_match:
        hours = int(hours_match.group(1))
        minutes = int(minutes_match.group(1))
        return datetime.strptime(f"{hours}:{minutes}", '%H:%M').time()
    else:
        return None  # Handle cases where the pattern is not found
convert_duration_udf = udf(convert_duration, StringType())
df = df.withColumn("travelDuration", convert_duration_udf("travelDuration"))

In [45]:
# Extract hours and minutes using regular expressions
df = df.withColumn("travelDurationHour",
                   expr("CAST(regexp_extract(travelDuration, '([0-9]+) hours', 1) AS INT)"))
df = df.withColumn("travelDurationMinute",
                   expr("CAST(regexp_extract(travelDuration, '([0-9]+) minutes', 1) AS INT)"))
df.select('travelDurationMinute','travelDurationHour').show(truncate=False)


[Stage 24:>                                                         (0 + 1) / 1]

+--------------------+------------------+
|travelDurationMinute|travelDurationHour|
+--------------------+------------------+
|19                  |4                 |
|10                  |8                 |
|54                  |11                |
|20                  |1                 |
|41                  |6                 |
|6                   |2                 |
|28                  |7                 |
|30                  |3                 |
|20                  |3                 |
|51                  |1                 |
|26                  |11                |
|8                   |5                 |
|20                  |6                 |
|55                  |5                 |
|47                  |9                 |
|29                  |8                 |
|49                  |8                 |
|43                  |8                 |
|16                  |8                 |
|55                  |7                 |
+--------------------+------------


                                                                                

In [46]:
columns_to_drop = ['searchDate', 'flightDate', 'startingAirport', 'destinationAirport', 'travelDuration','baseFare']

# Select columns that are not in the list of columns to drop
df = df.select([col(column) for column in df.columns if column not in columns_to_drop])

In [47]:
df.printSchema()

root
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsAirlineName: string (nullable = true)
 |-- numberOfStops: integer (nullable = false)
 |-- travelDurationHour: integer (nullable = true)
 |-- travelDurationMinute: integer (nullable = true)



In [48]:
#Applying Label Encoding to the feature segmentAirlineName
columns_to_encode = ['segmentsAirlineName']
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_encoded").fit(df) for col in columns_to_encode]
for indexer in indexers:
    df = indexer.transform(df)

                                                                                

In [49]:
df.select('segmentsAirlineName_encoded').show()


[Stage 28:>                                                         (0 + 1) / 1]

+---------------------------+
|segmentsAirlineName_encoded|
+---------------------------+
|                        1.0|
|                        1.0|
|                        1.0|
|                        5.0|
|                        1.0|
|                        2.0|
|                        2.0|
|                        2.0|
|                        0.0|
|                        0.0|
|                        7.0|
|                        0.0|
|                        7.0|
|                        1.0|
|                        6.0|
|                        0.0|
|                        1.0|
|                        0.0|
|                        4.0|
|                        5.0|
+---------------------------+
only showing top 20 rows




                                                                                

In [50]:
df=df.withColumn("segmentsAirlineName_encoded", round(df["segmentsAirlineName_encoded"]).cast(IntegerType()))

In [51]:
# Select columns that are not in the list of columns to drop
columns_to_drop = ['segmentsAirlineName']
df = df.select([col(column) for column in df.columns if column not in columns_to_drop])

In [52]:
df = df.withColumnRenamed('segmentsAirlineName_encoded', 'segmentsAirlineName')

In [61]:
data=data.dropna()

# Predictive Analysis

In [75]:
df=data

In [76]:
data.printSchema()


root
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- numberOfStops: integer (nullable = false)
 |-- travelDurationHour: integer (nullable = true)
 |-- travelDurationMinute: integer (nullable = true)
 |-- segmentsAirlineName: integer (nullable = true)



In [67]:
# Importing ML Libraries and Preprocessing Tools
from pyspark.ml.feature import VectorAssembler,MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [68]:
# Select relevant features and the target variable
feature_columns = [
    'elapsedDays',
    'isBasicEconomy',
    'isNonStop',
    'seatsRemaining',
    'totalTravelDistance',
    'numberOfStops',
    'travelDurationHour',
    'travelDurationMinute',
    'segmentsAirlineName'
]

target_column = 'totalFare'

In [69]:
# Create a feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)



In [70]:
# Create a MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

In [71]:
# Split the data into training and testing sets
(training_data, testing_data) = df.randomSplit([0.8, 0.2], seed=42)

In [72]:
# Create a Random Forest Regressor
rf = RandomForestRegressor(featuresCol="scaled_features", labelCol=target_column, numTrees=10)

In [73]:
# Create a pipeline
pipeline = Pipeline(stages=[scaler,rf])

# Train the model
model = pipeline.fit(training_data)

                                                                                

In [None]:
# Make predictions on the testing set
predictions = model.transform(testing_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

In [None]:
dt = dt = DecisionTreeRegressor(featuresCol="features", labelCol=target_column)
pipeline_dt = Pipeline(stages=[dt])
model_dt = pipeline_dt.fit(training_data)


In [None]:
predictions_dt = model_dt.transform(testing_data)
rmse_dt = evaluator.evaluate(predictions_dt)
print(f"Extra Trees Regressor - Root Mean Squared Error (RMSE): {rmse_dt}")