# Data Pipeline

### Our pipeline process:
#### Extract Features:
- `Airport_Pair` which will be used for `Delay_Trend_Past_Week` to see if there was a Delay trend in the previous week
- `Same_Day_Tail_Reuse`which will be using **ONLY** the date from `dep_datetime`, as well as using `Tail_Number`
- `Previous_Flight_Delay` which will be using information from `Tail_Number`, `dep_date` which will derive from `dep_datetime`, and `CRSDepTime`
- `Turnaround_Time` will also be extracted from `CRSDepTime` subtracted by `Previous_Arrival_Time`(Which is derived from `Tail_Number`, `dep_date`, and `CRSArrTime`)
- `Slack_Time` that derives from `CRSArrTime` subtracted by `CRSElaspedTime`

#### Outlier
- `Aircraft_Age` outlier must be removed 

#### Transformations: 
- `log_distance` using the log transformation 

#### Drop 
- is_Holidays

In [14]:
import numpy as np
import scipy as sp
import pandas as pd
import matplotlib.pylab as plt
import seaborn as sns

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, FunctionTransformer, StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectFromModel
import time


from sklearn import set_config
set_config(transform_output = "pandas")

In [15]:
def stratified_random_split(df: pd.DataFrame, target_column: str, test_size: float = 0.1, random_state: int = 42):
    """
    Performs a stratified random train-test split to ensure all classes in 
    'Flight_Status' are proportionally represented in both sets.

    Parameters:
    df (pd.DataFrame): The dataset containing the target variable.
    target_column (str): The column representing the classification target.
    test_size (float): The proportion of data to be used as test data.
    random_state (int): Random seed for reproducibility.

    Returns:
    tuple: (train_df, test_df) DataFrames.
    """
    train_df, test_df = train_test_split(
        df, test_size=test_size, stratify=df[target_column], random_state=random_state
    )

    print(f"Train size: {len(train_df)} samples")
    print(f"Test size: {len(test_df)} samples")

    return train_df, test_df

In [16]:
# Setting up the DataFrame for the flight data
flight_data = pd.read_parquet("data/WEATHER61.parquet")

# Stratify the data
train_data, test_data= stratified_random_split(flight_data, target_column="Flight_Status")

Train size: 13184010 samples
Test size: 1464890 samples


In [17]:
# downsampling the data 
train_data = train_data.sample(n=100000, random_state=42)

# Splitting the data into X and y
X = train_data.drop(columns=["Flight_Status"])
y = train_data["Flight_Status"]

### Pipeline Functions

In [18]:
# Custom progress logger
class ProgressLogger(BaseEstimator, TransformerMixin):
    """
    A transformer that logs progress through a pipeline.
    """
    
    def __init__(self, total_rows, log_interval=0.01, name='Pipeline'):
        self.total_rows = total_rows
        self.log_interval = log_interval
        self.name = name
        self.start_time = None
        self.last_log_percent = -1
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        # Initialize timer if not started
        if self.start_time is None:
            self.start_time = time.time()
            print(f"{self.name} processing started on {self.total_rows:,} rows")
        
        # Calculate current progress
        current_rows = X.shape[0]
        percent_complete = current_rows / self.total_rows
        
        # Check if we need to log progress
        int_percent = int(percent_complete / self.log_interval)
        if int_percent > self.last_log_percent:
            self.last_log_percent = int_percent
            elapsed = time.time() - self.start_time
            
            # Estimate time remaining
            percent_done = percent_complete * 100
            if percent_complete > 0:
                total_est = elapsed / percent_complete
                remaining = total_est - elapsed
                time_str = f" - Est. remaining: {remaining:.1f}s"
            else:
                time_str = ""
                
            print(f"{self.name}: {percent_done:.1f}% complete ({current_rows:,}/{self.total_rows:,} rows){time_str}")
        
        return X


class DelayTrendEncoder(BaseEstimator, TransformerMixin):
    """
    Transformer that precomputes a route-based delay trend table using the training set,
    then merges it into any dataset based on (day_of_year, origin, dest).
    This allows for valid use in future (unlabeled) prediction data.

    Parameters:
        date_col (str): Name of the datetime column
        origin_col (str): Column name for origin airport ID
        dest_col (str): Column name for destination airport ID
        status_col (str): Target column used to derive delay signal (only during fit)
        output_col (str): Name of the output trend feature
    """

    def __init__(self,
                 date_col='dep_datetime',
                 origin_col='OriginAirportID',
                 dest_col='DestAirportID',
                 status_col='Flight_Status',
                 output_col='Delay_Trend'):
        self.date_col = date_col
        self.origin_col = origin_col
        self.dest_col = dest_col
        self.status_col = status_col
        self.output_col = output_col

    def fit(self, X, y=None):
        X = X.copy()
        X[self.date_col] = pd.to_datetime(X[self.date_col])
        X['day_of_year'] = X[self.date_col].dt.dayofyear
        X['Flight_Status'] = y.reset_index(drop=True)

        delay_reasons = [
            'CarrierDelay', 'WeatherDelay', 'NASDelay',
            'SecurityDelay', 'LateAircraftDelay'
        ]

        X['delay_signal'] = 0
        for idx, status in enumerate(X['Flight_Status']):
            if isinstance(status, str) and 'Delay' in status:
                delay_type = status.split(' - ')[-1]
                if delay_type in delay_reasons:
                    X.at[idx, 'delay_signal'] = 1

        trend_table = (
            X.groupby(['day_of_year', self.origin_col, self.dest_col])['delay_signal']
             .mean()
             .reset_index()
             .rename(columns={'delay_signal': self.output_col})
        )

        self.trend_lookup_ = trend_table
        return self

    def transform(self, X):
        X = X.copy()
        X[self.date_col] = pd.to_datetime(X[self.date_col])
        X['day_of_year'] = X[self.date_col].dt.dayofyear

        X = X.merge(
            self.trend_lookup_,
            on=['day_of_year', self.origin_col, self.dest_col],
            how='left'
        )

        X[self.output_col] = X[self.output_col].fillna(0)
        return X.drop(columns=['day_of_year'])

class SameDayTailReuseEncoder(BaseEstimator, TransformerMixin):
    """
    Transformer that creates a feature counting how many times
    the same tail number (aircraft) is used on the same day.

    Parameters:
        datetime_col (str): Column containing full departure datetime
        tail_col (str): Column name for tail number
        output_col (str): Name of the output column
    """

    def __init__(self,
                 datetime_col='dep_datetime',
                 tail_col='Tail_Number',
                 output_col='Same_Day_Tail_Reuse'):
        self.datetime_col = datetime_col
        self.tail_col = tail_col
        self.output_col = output_col

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()

        # Ensure datetime is parsed correctly
        X[self.datetime_col] = pd.to_datetime(X[self.datetime_col])

        # Extract just the date portion
        X['dep_date'] = X[self.datetime_col].dt.date

        # Count reuse of same tail number per day
        X[self.output_col] = (
            X.groupby([self.tail_col, 'dep_date'])[self.tail_col]
            .transform('count')
            .astype(float)  
        )

        return X.drop(columns=['dep_date'])

    
class TurnaroundDelayEncoder(BaseEstimator, TransformerMixin):
    """
    Transformer that computes:
    - Previous_Flight_Delay: scheduled departure time of previous flight with same tail number on same day
    - Turnaround_Time: time between previous arrival and current departure

    Parameters:
        datetime_col (str): Column with full departure datetime
        dep_time_col (str): Column with scheduled departure time (e.g. CRSDepTime)
        arr_time_col (str): Column with scheduled arrival time (e.g. CRSArrTime)
        tail_col (str): Tail number column
        output_prefix (str): Prefix to use for new feature columns
    """

    def __init__(self,
                 datetime_col='dep_datetime',
                 dep_time_col='CRSDepTime',
                 arr_time_col='CRSArrTime',
                 tail_col='Tail_Number',
                 output_prefix=''):
        self.datetime_col = datetime_col
        self.dep_time_col = dep_time_col
        self.arr_time_col = arr_time_col
        self.tail_col = tail_col
        self.output_prefix = output_prefix

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X[self.datetime_col] = pd.to_datetime(X[self.datetime_col])
        X['dep_date'] = X[self.datetime_col].dt.date

        # Sort the values
        X = X.sort_values(by=[self.tail_col, 'dep_date', self.dep_time_col])

        prev_delay_col = self.output_prefix + 'Previous_Flight_Delay'
        turnaround_col = self.output_prefix + 'Turnaround_Time'

        # Previous scheduled departure
        X[prev_delay_col] = (
            X.groupby([self.tail_col, 'dep_date'])[self.dep_time_col]
            .shift(1)
        )

        # Previous scheduled arrival time
        X['Previous_Arrival_Time'] = (
            X.groupby([self.tail_col, 'dep_date'])[self.arr_time_col]
            .shift(1)
        )

        # Compute turnaround time using a defined method
        X[turnaround_col] = self._calculate_turnaround(X[self.dep_time_col], X['Previous_Arrival_Time'])

        # Replace missing with 0
        X[prev_delay_col] = X[prev_delay_col].fillna(0)
        X[turnaround_col] = X[turnaround_col].fillna(0)

        return X.drop(columns=['dep_date', 'Previous_Arrival_Time'])

    def _calculate_turnaround(self, current_dep, previous_arr):
        """
        Applies time difference logic with wrap-around at midnight (2400).
        """
        diff = current_dep - previous_arr
        adjusted = diff.mask((~diff.isna()) & (diff < 0), diff + 2400)
        return adjusted

class SlackTimeEncoder(BaseEstimator, TransformerMixin):
    """
    Transformer that calculates slack time as the difference between scheduled
    arrival time (CRSArrTime) and scheduled elapsed time (CRSElapsedTime).

    Parameters:
        arr_col (str): Column name for scheduled arrival time.
        elapsed_col (str): Column name for scheduled elapsed time.
        output_col (str): Name of the output column to store slack time.
    """

    def __init__(self,
                 arr_col='CRSArrTime',
                 elapsed_col='CRSElapsedTime',
                 output_col='Slack_Time'):
        self.arr_col = arr_col
        self.elapsed_col = elapsed_col
        self.output_col = output_col

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X[self.output_col] = X[self.arr_col] - X[self.elapsed_col]
        return X

In [6]:
# Apply the fitted DelayTrendEncoder to the data
encoder = DelayTrendEncoder(
    date_col='dep_datetime',
    origin_col='OriginAirportID',
    dest_col='DestAirportID',
    status_col='Flight_Status',
    output_col='Delay_Trend'
)

# Fit on data using target label
encoder.fit(flight_data, flight_data['Flight_Status'])

# Transform the data
data_transformed = encoder.transform(flight_data)

# Create dummy variables for Flight_Status categories
status_dummies = pd.get_dummies(flight_data['Flight_Status'], prefix='Status')

# Add trend feature to the dummy DataFrame
correlation_df = pd.concat([status_dummies, data_transformed[['Delay_Trend']]], axis=1)

# Calculate correlation matrix
correlation_matrix = correlation_df.corr()

# Plot heatmap for Delay_Trend against all status categories
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix[['Delay_Trend']], annot=True, cmap='coolwarm', vmin=-1, vmax=1)
plt.title('Correlation Heatmap: Delay_Trend vs Flight_Status Categories')
plt.show()


KeyboardInterrupt: 

### Pipeline Model

In [22]:
# import random forest
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectFromModel


numerical_features = [
    'Distance', 'relative_humidity_2m', 'temperature_2m',
    'Delay_Trend', 'Same_Day_Tail_Reuse', 'Previous_Flight_Delay',
    'Turnaround_Time', 'Slack_Time',
    'Aircraft_Seats', 'Aircraft_Engines', 'Aircraft_Age', 'Is_Freighter',
    'Is_Holiday_Week',
    'temperature_2m_dest_dep_time', 'relative_humidity_2m_dest_dep_time',
    'precipitation', 'showers', 'snowfall', 'snow_depth',
    'soil_moisture_0_to_1cm', 'et0_fao_evapotranspiration', 'evapotranspiration',
    'cloud_cover_high', 'cloud_cover_low', 'surface_pressure', 'weather_code',
    'pressure_msl', 'wind_direction_10m', 'wind_gusts_10m',
    'precipitation_dest_dep_time', 'showers_dest_dep_time',
    'snowfall_dest_dep_time', 'snow_depth_dest_dep_time',
    'soil_moisture_0_to_1cm_dest_dep_time',
    'et0_fao_evapotranspiration_dest_dep_time',
    'evapotranspiration_dest_dep_time', 'cloud_cover_high_dest_dep_time',
    'cloud_cover_low_dest_dep_time', 'surface_pressure_dest_dep_time',
    'weather_code_dest_dep_time', 'pressure_msl_dest_dep_time',
    'wind_direction_10m_dest_dep_time', 'wind_gusts_10m_dest_dep_time'
]



# Define your numeric transformer
numeric_transformer = Pipeline(steps=[
    ('log1p_distance', ColumnTransformer([
        ('log', FunctionTransformer(np.log1p), ['Distance'])
    ], remainder='passthrough', verbose_feature_names_out=False)),
    
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler())
])

# Full pipeline with progress loggers
preprocessor = Pipeline([
    ('initial_logger', ProgressLogger(total_rows=13000000, name='Starting Pipeline')),
    
    ('delay_trend', DelayTrendEncoder(
        date_col='dep_datetime',
        origin_col='OriginAirportID',
        dest_col='DestAirportID',
        status_col='Flight_Status',
        output_col='Delay_Trend'
    )),
    
    ('trend_logger', ProgressLogger(total_rows=13000000, name='Delay Trend Complete')),
    
    ('tail_reuse', SameDayTailReuseEncoder(
        datetime_col='dep_datetime',
        tail_col='Tail_Number',
        output_col='Same_Day_Tail_Reuse'
    )),
    
    ('reuse_logger', ProgressLogger(total_rows=13000000, name='Tail Reuse Complete')),
    
    ('turnaround_delay', TurnaroundDelayEncoder(
        datetime_col='dep_datetime',
        dep_time_col='CRSDepTime',
        arr_time_col='CRSArrTime',
        tail_col='Tail_Number',
        output_prefix=''
    )),
    
    ('turnaround_logger', ProgressLogger(total_rows=13000000, name='Turnaround Delay Complete')),
    
    ('slack_time', SlackTimeEncoder(
        arr_col='CRSArrTime',
        elapsed_col='CRSElapsedTime',
        output_col='Slack_Time'
    )),
    
    ('slack_logger', ProgressLogger(total_rows=13000000, name='Slack Time Complete')),
    
    ('feature_prep', ColumnTransformer([
        ('numeric', numeric_transformer, numerical_features),
        ('drop', 'drop', [
            'Aircraft_Model', 'Aircraft_EngineType', 'Holiday',
            'dest_dep_datetime', 'dep_datetime', 'Tail_Number'
        ])
    ], remainder='passthrough', verbose_feature_names_out=False)),
    
    ('prep_logger', ProgressLogger(total_rows=13000000, name='Feature Prep Complete')),
    
    ('select', SelectFromModel(RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1))),
    
    ('final_logger', ProgressLogger(total_rows=13000000, name='Pipeline Complete'))
])


preprocessor

In [23]:
# Train the pipeline in this cell and run it on X, this cell must end with the transformed data
X_prepped = preprocessor.fit_transform(X, y)
# X_prepped = preprocessor.fit_transform(X, y)
pd.set_option('display.max_columns', None)
X_prepped

Starting Pipeline processing started on 13,000,000 rows
Starting Pipeline: 0.8% complete (100,000/13,000,000 rows) - Est. remaining: 0.0s
Delay Trend Complete processing started on 13,000,000 rows
Delay Trend Complete: 0.8% complete (100,000/13,000,000 rows) - Est. remaining: 0.0s
Tail Reuse Complete processing started on 13,000,000 rows
Tail Reuse Complete: 0.8% complete (100,000/13,000,000 rows) - Est. remaining: 0.0s
Turnaround Delay Complete processing started on 13,000,000 rows
Turnaround Delay Complete: 0.8% complete (100,000/13,000,000 rows) - Est. remaining: 0.0s
Slack Time Complete processing started on 13,000,000 rows
Slack Time Complete: 0.8% complete (100,000/13,000,000 rows) - Est. remaining: 0.0s
Feature Prep Complete processing started on 13,000,000 rows
Feature Prep Complete: 0.8% complete (100,000/13,000,000 rows) - Est. remaining: 0.0s
Pipeline Complete processing started on 13,000,000 rows
Pipeline Complete: 0.8% complete (100,000/13,000,000 rows) - Est. remaining: 0

Unnamed: 0,Distance,relative_humidity_2m,temperature_2m,Slack_Time,Aircraft_Seats,Aircraft_Age,temperature_2m_dest_dep_time,relative_humidity_2m_dest_dep_time,soil_moisture_0_to_1cm,et0_fao_evapotranspiration,surface_pressure,pressure_msl,wind_direction_10m,wind_gusts_10m,soil_moisture_0_to_1cm_dest_dep_time,et0_fao_evapotranspiration_dest_dep_time,surface_pressure_dest_dep_time,pressure_msl_dest_dep_time,wind_direction_10m_dest_dep_time,wind_gusts_10m_dest_dep_time,DayofMonth,Flight_Number_Operating_Airline,OriginAirportID,OriginCityMarketID,OriginStateFips,DestAirportID,DestCityMarketID,DestStateFips,CRSDepTime,CRSArrTime,CRSElapsedTime
36067,1.139773,0.625039,-0.849266,-1.017470,6.108376e-16,-6.380832e-17,0.329902,-1.864214,0.113596,-0.381813,-0.086398,-0.225644,-1.582770,0.270780,-1.695619,-0.171856,-0.555024,-1.273849,-0.838354,-0.047194,15,3182.0,10408,30408,55,10466,30466,4,900,1050,230.0
56882,0.857169,0.358440,1.177946,1.186279,6.108376e-16,-6.380832e-17,0.925662,-0.974085,-1.627468,-0.489983,0.591932,-0.303059,0.422006,-0.734831,0.626771,0.390808,-0.206624,-1.412820,0.004841,1.408368,18,1572.0,14112,33195,12,10408,30408,55,1951,2152,181.0
78333,0.702888,-0.130324,0.947201,0.556363,6.108376e-16,-6.380832e-17,1.322343,0.183083,-0.390917,1.338729,-0.073993,-0.937861,0.080644,0.727876,-0.727133,1.680846,0.509910,-0.887817,1.052914,-0.653678,20,572.0,11986,31986,26,14112,33195,12,1522,1810,168.0
30177,0.652475,-1.107853,0.203756,-0.194175,6.108376e-16,-6.380832e-17,-1.333072,1.206732,-1.667038,1.057611,0.717110,0.579471,1.285684,0.545037,0.804656,-0.788492,0.105480,0.687193,0.835777,-0.471733,23,499.0,11697,32467,12,12339,32337,18,1130,1418,168.0
60047,0.097812,0.802771,0.390514,0.108337,6.108376e-16,-6.380832e-17,-1.567295,-0.840565,-1.696715,-0.704911,0.682654,0.331744,1.506104,-0.155843,-0.331833,-0.657997,-0.899489,1.243079,1.534978,0.953505,13,222.0,11697,32467,12,10431,30431,37,1328,1528,120.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
78139,0.523904,-0.574655,-0.375294,0.399362,6.108376e-16,-6.380832e-17,1.275617,-0.662540,-1.301019,0.326022,-0.114853,0.951063,1.046903,0.331726,-1.665972,1.695665,0.676720,0.378368,-1.263023,-0.229139,1,2652.0,14122,30198,42,14986,34986,12,1440,1700,140.0
15152,0.279049,-0.619088,-1.796721,-0.519664,6.108376e-16,-6.380832e-17,-0.116463,0.094070,0.182842,-0.857252,0.511147,0.796233,1.498491,-0.643412,-1.102669,-0.601522,0.657310,0.764400,-1.792844,0.316697,2,2854.0,12264,30852,51,13204,31454,12,1000,1229,149.0
8925,1.477733,-1.063420,-1.593941,0.887595,6.108376e-16,-6.380832e-17,-2.109724,-0.261981,1.013805,-0.705935,0.668965,0.254329,0.716043,0.880241,-0.282420,-0.738961,-3.926256,1.443816,0.408700,0.559290,19,1778.0,11618,31703,34,12441,32441,56,1815,2113,298.0
5996,-0.300962,-0.219190,-0.128084,0.313204,6.108376e-16,-6.380832e-17,-0.695957,-1.552669,-0.766829,-0.582630,0.720605,1.090409,-0.666705,-0.643412,-1.201494,-0.696910,0.765839,1.443816,1.398078,-0.744651,28,4378.0,12266,31453,48,14193,33728,12,1435,1615,100.0


In [None]:
# # pickle the preprocessor
# import pickle
# with open("preprocessor.pkl", "wb") as f:
#     pickle.dump(preprocessor, f, protocol=pickle.HIGHEST_PROTOCOL)

In [20]:
# use MLP classifier on the pipeline

from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report, precision_score, f1_score

X_test = test_data.drop('Flight_Status', axis=1)
y_test = test_data['Flight_Status']

# Your X_prepped is already available from previous preprocessing
# Now we'll create and train the MLP classifier optimized for precision

from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report, precision_score

# Create the MLP classifier with precision-focused parameters
mlp = MLPClassifier(
    hidden_layer_sizes=(100, 50),  # Two hidden layers
    activation='relu',             
    solver='adam',                 
    alpha=0.001,                   # Higher regularization for better precision
    batch_size='auto',             
    learning_rate='adaptive',      
    max_iter=2000,                 # More iterations to ensure convergence
    early_stopping=True,           # Stop if validation performance doesn't improve
    validation_fraction=0.2,       # Portion of training data used for validation
    n_iter_no_change=20,           # Patience parameter
    random_state=42,
)

# Train the model using your preprocessed X data
mlp.fit(X_prepped, y)

# For testing, you'll need to apply the same preprocessing to X_test
# If you haven't already done this, you would typically do:
# X_test_prepped = preprocessor.transform(X_test)
# But since you didn't specify this step, I'll assume you need to do it:

# Make predictions (using whatever preprocessed test data you have)
# If X_test is already preprocessed:
y_pred = mlp.predict(X_test)
# If you need to preprocess X_test:
# y_pred = mlp.predict(X_test_prepped)

# Evaluate with focus on precision
print("Precision score:", precision_score(y_test, y_pred, average='macro'))
print("F1 score:", f1_score(y_test, y_pred, average='macro'))
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

ValueError: Input X contains NaN.
MLPClassifier does not accept missing values encoded as NaN natively. For supervised learning, you might want to consider sklearn.ensemble.HistGradientBoostingClassifier and Regressor which accept missing values encoded as NaNs natively. Alternatively, it is possible to preprocess the data, for instance by using an imputer transformer in a pipeline or drop samples with missing values. See https://scikit-learn.org/stable/modules/impute.html You can find a list of all estimators that handle NaN values at the following page: https://scikit-learn.org/stable/modules/impute.html#estimators-that-handle-nan-values