In [1]:
import os
import sys
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

nb_dir = os.path.split(os.getcwd())[0]
if nb_dir not in sys.path:
    sys.path.append(nb_dir)

In [2]:
from utils.files.file_helper import load_binary_file, save_binary_file
import utils.configuration
import pandas as pd
import concurrent.futures
import numpy as np

In [3]:
config = utils.configuration.Configuration()

In [4]:
MERGED_DATA_LOCATION = config.config['DEFAULT']['MERGED_DATA_LOCATION']
MERGED_DATA_FILE_BIN = config.config['DEFAULT']['MERGED_DATA_FILE_BIN']
PREPROCESSED_DATA_FILE_BIN = config.config['DEFAULT']['PREPROCESSED_DATA_FILE_BIN']

In [5]:
def get_dependent_variable_value(cancelled, delay_duration):
    if cancelled == 1:
        return 'cancelled_flight'
    if delay_duration > 60:
        return 'long_delay'
    if delay_duration > 15:
        return 'delay'
    return 'no_delay'

In [6]:
def get_airline_delay_index(delay_df, carrier):
    return delay_df[delay_df['carrier'] == carrier]['delay_index'].values[0]

In [7]:
def process_weather(data_df, prefix):
    data_df[prefix + 'awnd'].fillna((data_df[prefix + 'awnd'].mean()), inplace=True)
    
    data_df.loc[:, data_df.columns.str.startswith(prefix)] = data_df \
                                                .loc[:, data_df.columns.str.startswith(prefix)].fillna(value=0)
    
    
    data_df[prefix + 'fog'] = list(map(int , (data_df[prefix + 'wt01'] + data_df[prefix + 'wt02']).values > 0))
    data_df[prefix + 'hail'] = list(map(int , (data_df[prefix + 'wt04'] + data_df[prefix + 'wt05']).values > 0))
    data_df[prefix + 'damaging_wind'] = list(map(int , (data_df[prefix + 'wt10'] + data_df[prefix + 'wt11']).values > 0))
    data_df[prefix + 'snow'] = list(map(int , (data_df[prefix + 'snow'] + data_df[prefix + 'wt09']).values > 0))
    
    data_df = data_df.rename(columns={prefix + 'snwd': prefix + 'snow_depth', 
                                      prefix + 'awnd': prefix + 'average_wind_speed', 
                                      prefix + 'wt03': prefix + 'thunder', 
                                      prefix + 'wt07': prefix + 'dust', 
                                      prefix + 'wt08': prefix + 'haze'})
    
    data_df[data_df.columns.drop(list(data_df.filter(regex=prefix + 'wt')))]
    
    return data_df.loc[:, ~data_df.columns.str.startswith(prefix + 'wt')]
    

In [8]:
def process_airline_rating(data_df):
    airlines_group = data_df[['status', 'op_unique_carrier']]
    airlines_group_num = airlines_group.groupby(['op_unique_carrier']).size()
    airlines_group = data_df[['status', 'op_unique_carrier']]
    airlines_group = airlines_group[(airlines_group['status'] != 'no_delay')]
    airlines_group_delays_num = airlines_group.groupby(['op_unique_carrier']).size()
    delay_info = pd.DataFrame({'op_unique_carrier': np.unique(airlines_group.op_unique_carrier.values), 'number_of_flights': airlines_group_num.values, 'number_of_delays': airlines_group_delays_num.values})
    delay_info['delay_index'] = delay_info['number_of_delays'] / delay_info['number_of_flights']
    
    return delay_info

In [9]:
def process_data_variables(file_data):
    
    data_df = file_data.copy()
    
    data_df.columns = map(str.lower, data_df.columns)
    
    logging.debug('Processing origin weather')
    
    data_df = process_weather(data_df, 'origin_')
    
    logging.debug('Processing dest weather')

    data_df = process_weather(data_df, 'dest_')
        
    logging.debug('Creating dependent variable')
    
    with concurrent.futures.ProcessPoolExecutor(16) as pool:
        data_df['status'] = list(pool.map(get_dependent_variable_value, data_df['cancelled'], data_df['arr_delay'], chunksize=1_000))
    
    logging.debug('Processing airline ratings')
    
    ratings = process_airline_rating(data_df)
    
    data_df = pd.merge(data_df, ratings[['op_unique_carrier', 'delay_index']], on=['op_unique_carrier'])
    
    logging.debug('Peforming final data preparation')
    
    data_df['crs_dep_time'] = list(map(int, data_df['crs_dep_time'].values / 100))    
    
    data_df['day_of_year'] = (data_df['fl_date'] - data_df['fl_date'].min())  / np.timedelta64(1,'D')
    
    data_df['weekend'] = np.where(data_df['day_of_week'] >= 6, 1, 0)
    
    data_df = data_df.drop(['dep_delay', 'arr_delay', 'cancelled', 'crs_elapsed_time', 
                           'carrier_delay', 'weather_delay', 'nas_delay', 'security_delay',
                           'late_aircraft_delay'], axis=1)
    return data_df

In [10]:
def process_data(merged_data_location, merged_data_file_name, preprocessed_file_name):
    logging.info('Loading historical data')

    parsed_data = load_binary_file(merged_data_location, merged_data_file_name)
    
    logging.info('Starting to process data file')
    
    processed_data = process_data_variables(parsed_data)
    
    logging.info('Saving preprocessed data file')
    
    save_binary_file(processed_data, preprocessed_file_name, merged_data_location)
    
    return processed_data

In [11]:
parsed_data = process_data(MERGED_DATA_LOCATION, MERGED_DATA_FILE_BIN, PREPROCESSED_DATA_FILE_BIN)

INFO:root:Loading historical data
INFO:root:Starting to process data file
INFO:root:Saving preprocessed data file
