In [41]:
import warnings
warnings.filterwarnings('ignore')

# PySpark - Import specific classes to avoid conflicts
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when, expr
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import (
    RandomForestRegressor as SparkRandomForestRegressor, 
    GBTRegressor as SparkGBTRegressor
)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from xgboost.spark import SparkXGBRegressor
from pyspark.sql.functions import corr, avg, stddev
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Data processing & ML
import pandas as pd
import numpy as np
from builtins import min as py_min, max as py_max, sum as py_sum


# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.figure_factory as ff

# Model interpretation
import shap

# Time series
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller

# Utilities
import kagglehub
from datetime import datetime
import json
import joblib


from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml import Pipeline
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np


In [42]:
# Initialize Spark Session with optimized configuration
spark = SparkSession.builder \
    .appName("Enhanced Airline Market Share Prediction 2") \
    .config("spark.driver.memory", "6g") \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")


In [43]:
# Download dataset
path = kagglehub.dataset_download("bhavikjikadara/us-airline-flight-routes-and-fares-1993-2024")

# Load data with optimized settings
df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("quote", '"')
    .option("escape", '"')
    .option("multiLine", "true")
    .option("mode", "PERMISSIVE")
    .option("nullValue", "")
    .option("nanValue", "NaN")
    .option("emptyValue", "")
    .csv(f"{path}/*.csv")
)

print(f" Shape: {df.count():,} rows × {len(df.columns)} columns")


 Shape: 245,955 rows × 23 columns


In [44]:
#Remove rows with null target variable
df_clean = df.filter(col('large_ms').isNotNull())
print(df_clean.count())

244415


In [45]:
#Filter valid market share values (0-1)
df_clean = df_clean.filter((col('large_ms') >= 0) & (col('large_ms') <= 1))
print(df_clean.count())

244415


In [46]:
#Remove invalid business metrics
df_clean = df_clean.filter(
    (col('passengers') > 0) & 
    (col('fare') > 0) & 
    (col('nsmiles') > 0)
)
print(df_clean.count())

238057


In [47]:
#Remove rows with missing carrier information
df_clean = df_clean.dropna(subset=['carrier_low'])
print(df_clean.count())

237985


In [48]:
#Remove extreme outliers (using IQR method)
numeric_cols = ['passengers', 'fare', 'nsmiles', 'large_ms', 'lf_ms']
for col_name in numeric_cols:
    if col_name in df_clean.columns:
        # Calculate IQR
        quantiles = df_clean.select(
            expr(f"percentile_approx({col_name}, 0.25)").alias("q1"),
            expr(f"percentile_approx({col_name}, 0.75)").alias("q3")
        ).collect()[0]
        
        q1, q3 = quantiles['q1'], quantiles['q3']
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        
        df_clean = df_clean.filter(
            (col(col_name) >= lower_bound) & (col(col_name) <= upper_bound)
        )
print(df_clean.count())

211133


In [49]:
# Create enhanced features
df_features = df_clean.withColumn(
    'revenue', F.col('passengers') * F.col('fare')
).withColumn(
    'fare_per_mile', F.col('fare') / F.col('nsmiles')
).withColumn(
    'fare_difference', F.col('fare') - F.col('fare_low')
).withColumn(
    'is_largest_cheapest', 
    F.when(F.col('carrier_lg') == F.col('carrier_low'), 1).otherwise(0)
).withColumn(
    'price_competitiveness', 
    F.when(F.col('fare') < F.col('fare_low') * 1.1, 1).otherwise(0)
).withColumn(
    'route_density', F.col('passengers') / F.col('nsmiles')
).withColumn(
    'is_peak_season', 
    F.when(F.col('quarter').isin([2, 3]), 1).otherwise(0)
).withColumn(
    'years_since_2000', F.col('Year') - 2000
).withColumn(
    'fare_elasticity', F.col('fare_difference') / F.col('fare_low')
).withColumn(
    'market_power', 
    F.when(F.col('large_ms') > 0.8, 'Dominant')
    .when(F.col('large_ms') > 0.5, 'Strong')
    .when(F.col('large_ms') > 0.2, 'Moderate')
    .otherwise('Weak')
).withColumn(
    'is_short_haul', F.when(F.col('nsmiles') < 500, 1).otherwise(0)
).withColumn(
    'is_medium_haul', F.when((F.col('nsmiles') >= 500) & (F.col('nsmiles') < 1500), 1).otherwise(0)
).withColumn(
    'is_long_haul', F.when(F.col('nsmiles') >= 1500, 1).otherwise(0)
).withColumn(
    'competition_level',
    F.when(F.col('lf_ms') > 0.8, 'High')
    .when(F.col('lf_ms') > 0.5, 'Medium')
    .otherwise('Low')
)

# Cache để tăng tốc
df_features = df_features.cache()

new_features = [
    'revenue', 'fare_per_mile', 'fare_difference', 'is_largest_cheapest',
    'price_competitiveness', 'route_density', 'is_peak_season', 'years_since_2000',
    'fare_elasticity', 'market_power', 'is_short_haul', 'is_medium_haul', 
    'is_long_haul', 'competition_level'
]

for feature in new_features:
    print(f"  • {feature}")

print(len(df_features.columns))
print(f"{df_features.count():,} rows × {len(df_features.columns)} columns")


  • revenue
  • fare_per_mile
  • fare_difference
  • is_largest_cheapest
  • price_competitiveness
  • route_density
  • is_peak_season
  • years_since_2000
  • fare_elasticity
  • market_power
  • is_short_haul
  • is_medium_haul
  • is_long_haul
  • competition_level
37
211,133 rows × 37 columns


In [50]:
# Define feature columns
numeric_features = [
    'passengers', 'fare', 'nsmiles', 'lf_ms', 'fare_low', 'revenue', 
    'fare_per_mile', 'fare_difference', 'is_largest_cheapest', 'price_competitiveness',
    'route_density', 'is_peak_season', 'years_since_2000', 'fare_elasticity',
    'is_short_haul', 'is_medium_haul', 'is_long_haul', 'Year', 'quarter'
]

categorical_features = ['carrier_lg', 'carrier_low']

# Create string indexers for categorical features
indexers = [
    StringIndexer(inputCol=col, outputCol=col+"_indexed", handleInvalid="keep")
    for col in categorical_features
]

# Feature vector assembly
feature_cols = numeric_features + [col+"_indexed" for col in categorical_features]
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw"
)

# Feature scaling
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

# Train-test split
train_data, test_data = df_features.randomSplit([0.8, 0.2], seed=42)

print(f"Training set: {train_data.count():,} rows")
print(f"Test set: {test_data.count():,} rows")
print(f"Features: {len(feature_cols)}")


Training set: 168,758 rows
Test set: 42,375 rows
Features: 21


In [51]:
prep_pipeline = Pipeline(stages=indexers + [assembler, scaler])


In [52]:
prep_model = prep_pipeline.fit(train_data)
train_transformed = prep_model.transform(train_data)
test_transformed = prep_model.transform(test_data)

In [53]:
def prepare_datasets(train_spark_df, test_spark_df, val_fraction=0.15, batch_size=256):
    """
    Convert Spark DataFrames to TensorFlow datasets
    Returns: train_ds, val_ds, test_ds, and metadata
    """
    # Collect train data
    train_data = train_spark_df.select('features', 'large_ms').collect()
    X_train_full = np.array([row['features'].toArray() for row in train_data])
    y_train_full = np.array([row['large_ms'] for row in train_data])
    
    # Collect test data
    test_data = test_spark_df.select('features', 'large_ms').collect()
    X_test = np.array([row['features'].toArray() for row in test_data])
    y_test = np.array([row['large_ms'] for row in test_data])
    
    # Split train into train and validation
    n_samples = len(X_train_full)
    n_val = int(n_samples * val_fraction)
    
    # Shuffle
    indices = np.random.RandomState(42).permutation(n_samples)
    
    # Split
    val_indices = indices[:n_val]
    train_indices = indices[n_val:]
    
    X_train = X_train_full[train_indices]
    y_train = y_train_full[train_indices]
    X_val = X_train_full[val_indices]
    y_val = y_train_full[val_indices]
    
    # Create TensorFlow datasets
    train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    train_ds = train_ds.shuffle(buffer_size=10000, seed=42).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val))
    val_ds = val_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test))
    test_ds = test_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    metadata = {
        'n_train': len(X_train),
        'n_val': len(X_val),
        'n_test': len(X_test),
        'n_features': X_train.shape[1],
        'X_test': X_test,
        'y_test': y_test
    }
    
    return train_ds, val_ds, test_ds, metadata

In [54]:
train_dataset, val_dataset, test_dataset, metadata = prepare_datasets(
    train_transformed, 
    test_transformed,
    val_fraction=0.15,
    batch_size=512
)

In [55]:
n_train = metadata['n_train']
n_val = metadata['n_val']
n_test = metadata['n_test']
n_features = metadata['n_features']


In [56]:
def create_mlp_model(input_dim):
    """Create MLP regression model"""
    model = keras.Sequential([
        # Input layer
        layers.Input(shape=(input_dim,)),
        
        # Hidden layer 1
        layers.Dense(256, activation='relu', kernel_regularizer=keras.regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.3),
        
        # Hidden layer 2
        layers.Dense(128, activation='relu', kernel_regularizer=keras.regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.3),
        
        # Hidden layer 3
        layers.Dense(64, activation='relu', kernel_regularizer=keras.regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        
        # Hidden layer 4
        layers.Dense(32, activation='relu', kernel_regularizer=keras.regularizers.l2(0.001)),
        layers.Dropout(0.2),
        
        # Output layer (regression)
        layers.Dense(1, activation='sigmoid')  # sigmoid vì output trong [0,1]
    ])
    
    return model

In [57]:
mlp_model = create_mlp_model(n_features)


In [58]:
mlp_model.summary()


Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_10 (Dense)            (None, 256)               5632      
                                                                 
 batch_normalization_6 (Bat  (None, 256)               1024      
 chNormalization)                                                
                                                                 
 dropout_8 (Dropout)         (None, 256)               0         
                                                                 
 dense_11 (Dense)            (None, 128)               32896     
                                                                 
 batch_normalization_7 (Bat  (None, 128)               512       
 chNormalization)                                                
                                                                 
 dropout_9 (Dropout)         (None, 128)              

In [59]:
mlp_model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss='mse',
    metrics=['mae', 'mse', tf.keras.metrics.RootMeanSquaredError(name='rmse')]
)

In [60]:
# Callbacks
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=15,
        restore_best_weights=True,
        verbose=1
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1
    ),
    keras.callbacks.ModelCheckpoint(
        'best_mlp_model.h5',
        monitor='val_loss',
        save_best_only=True,
        verbose=1
    )
]

In [61]:
history = mlp_model.fit(
    train_dataset,
    validation_data=val_dataset,  # Sử dụng validation dataset đã chuẩn bị
    epochs=100,
    callbacks=callbacks,
    verbose=1
)

Epoch 1/100
Epoch 1: val_loss improved from inf to 0.09153, saving model to best_mlp_model.h5
Epoch 2/100
Epoch 2: val_loss improved from 0.09153 to 0.04484, saving model to best_mlp_model.h5
Epoch 3/100
Epoch 3: val_loss improved from 0.04484 to 0.01113, saving model to best_mlp_model.h5
Epoch 4/100
Epoch 4: val_loss improved from 0.01113 to 0.00997, saving model to best_mlp_model.h5
Epoch 5/100
Epoch 5: val_loss did not improve from 0.00997
Epoch 6/100
Epoch 6: val_loss improved from 0.00997 to 0.00938, saving model to best_mlp_model.h5
Epoch 7/100
Epoch 7: val_loss improved from 0.00938 to 0.00923, saving model to best_mlp_model.h5
Epoch 8/100
Epoch 8: val_loss did not improve from 0.00923
Epoch 9/100
Epoch 9: val_loss did not improve from 0.00923
Epoch 10/100
Epoch 10: val_loss did not improve from 0.00923
Epoch 11/100
Epoch 11: val_loss did not improve from 0.00923
Epoch 12/100
Epoch 12: ReduceLROnPlateau reducing learning rate to 0.0005000000237487257.

Epoch 12: val_loss did not

In [62]:
test_results = mlp_model.evaluate(test_dataset, verbose=0)

print(f" TEST SET PERFORMANCE:")
print(f"   Loss (MSE): {test_results[0]:.6f}")
print(f"   MAE:        {test_results[1]:.4f}")
print(f"   MSE:        {test_results[2]:.6f}")
print(f"   RMSE:       {test_results[3]:.4f}")

 TEST SET PERFORMANCE:
   Loss (MSE): 0.006522
   MAE:        0.0449
   MSE:        0.005964
   RMSE:       0.0772


In [63]:
test_data_collected = test_transformed.select('features', 'large_ms').collect()
X_test = np.array([row['features'].toArray() for row in test_data_collected])
y_test = np.array([row['large_ms'] for row in test_data_collected])


In [64]:
y_pred = mlp_model.predict(X_test, batch_size=1024, verbose=0).flatten()


In [65]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

test_mae = mean_absolute_error(y_test, y_pred)
test_rmse = np.sqrt(mean_squared_error(y_test, y_pred))
test_r2 = r2_score(y_test, y_pred)
test_mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100

In [66]:
print(f"   MAE:  {test_mae:.4f}")
print(f"   RMSE: {test_rmse:.4f}")
print(f"   R²:   {test_r2:.4f}")
print(f"   MAPE: {test_mape:.2f}%")

   MAE:  0.0449
   RMSE: 0.0772
   R²:   0.8832
   MAPE: 9.34%


In [67]:
residuals = y_test - y_pred

In [68]:
sample_df = pd.DataFrame({
    'Actual': y_test[:20],
    'Predicted': y_pred[:20],
    'Error': residuals[:20],
    'Error %': (residuals[:20] / y_test[:20] * 100)
})
print(sample_df.to_string(index=True))

    Actual  Predicted     Error    Error %
0     0.85   0.863933 -0.013933  -1.639166
1     0.37   0.493771 -0.123771 -33.451699
2     0.26   0.391486 -0.131486 -50.571465
3     0.83   0.832111 -0.002111  -0.254395
4     0.66   0.656139  0.003861   0.584989
5     0.80   0.781189  0.018811   2.351350
6     0.98   0.984175 -0.004175  -0.426023
7     0.29   0.297447 -0.007447  -2.567991
8     0.44   0.464680 -0.024680  -5.609020
9     0.35   0.468565 -0.118565 -33.875608
10    0.44   0.443207 -0.003207  -0.728883
11    0.94   0.955282 -0.015282  -1.625704
12    0.35   0.546277 -0.196277 -56.079156
13    0.61   0.609341  0.000659   0.108048
14    0.67   0.574838  0.095162  14.203251
15    1.00   0.990198  0.009802   0.980240
16    0.55   0.553760 -0.003760  -0.683711
17    1.00   0.990901  0.009099   0.909871
18    0.41   0.674080 -0.264080 -64.409672
19    1.00   0.990857  0.009143   0.914311
