# Flight data preprocession

## Decompression

Run decompression.bat, csv will be stored in ../code/raw_data/

In [1]:
import os
import io
import pytz
import requests
import numpy as np
import pandas as pd
from datetime import datetime

In [2]:
data = pd.read_parquet('../data/cleaned_data/flights_data.parquet', engine='pyarrow')

In [29]:
data.to_parquet('../data/cleaned_data/flights_data.parquet', engine='pyarrow', compression='gzip')

## Read and concatenate

In [2]:
data = pd.DataFrame()
for csv in os.listdir('../data/raw_data/'):
    data_temp = pd.read_csv('../data/raw_data/' + csv)
    data = pd.concat([data, data_temp], ignore_index=True)

## Drop useless  columns

In [3]:
# FLIGHTS all equal to 1
data = data.drop('FLIGHTS', axis=1)
# DUP all equal to N
data = data.drop('DUP', axis=1)
# same num of unique values in ORIGIN_AIRPORT_ID and ORIGIN, keep ORIGIN
assert data['ORIGIN_AIRPORT_ID'].unique().__len__() == data['ORIGIN'].unique().__len__()
data = data.drop('ORIGIN_AIRPORT_ID', axis=1)
# same reason for dropping DEST_AIRPORT_ID
assert data['DEST_AIRPORT_ID'].unique().__len__() == data['DEST'].unique().__len__()
data = data.drop('DEST_AIRPORT_ID', axis=1)

## Drop derived columns

In [4]:
# State names are included in city names
data = data.drop(['ORIGIN_STATE_NM', 'DEST_STATE_NM'], axis=1)
# Elapsed time can be calculated from dep and arr time
data = data.drop(['CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME'], axis=1)
# Distance group are derived by distance
data = data.drop('DISTANCE_GROUP', axis=1)

# Get airport timezone

In [11]:
# same airports in column ORIGIN and DEST
assert np.array_equal(np.unique(data['ORIGIN'].unique()), np.unique(data['DEST'].unique()))
# get airports timezone
airport_list = data['ORIGIN'].unique()
missing_airport_info = pd.read_csv('../data/downloaded_data/airports.dat', header=None,
                           usecols=[4, 11], names=['IATA', 'Timezone'])
# filter airports in ORIGIN
airport_list = missing_airport_info[missing_airport_info['IATA'].isin(airport_list)]
# find airports missing timezone, search and concat manually
missing_airports = set(data['ORIGIN'].unique()) - set(airport_list['IATA'])
print(airport_list[airport_list['Timezone']=='\\N'])
airport_list = airport_list[airport_list['IATA'] != 'BIH']
print("Missing airports:", missing_airports)
missing_airports = {
    'IATA': ['BIH', 'XWA', 'IFP', 'EAR'],
    'Timezone': ['America/Los_Angeles', 'America/Chicago', 'America/Phoenix', 'America/Chicago']
}
missing_airports = pd.DataFrame(missing_airports)
airport_list = pd.concat([airport_list, missing_airports], ignore_index=True)
airport_list = airport_list.reindex()

     IATA Timezone
7055  BIH       \N
Missing airports: {'XWA', 'EAR', 'IFP'}


## Convert to UTC

This will take 30 mins!

In [8]:
data['FL_DATE'] = pd.to_datetime(data['FL_DATE'], format="%m/%d/%Y %I:%M:%S %p").dt.date

In [21]:
def process_time_column(date_col, time_column):
    date_adjusted = date_col.where(~time_column.isna(), pd.NaT)
    time_adjusted = time_column.fillna(0).astype(int).astype(str).str.zfill(4)
    
    date_adjusted = date_adjusted.where(time_adjusted != "2400", date_adjusted + pd.Timedelta(days=1))
    time_adjusted = time_adjusted.where(time_adjusted != "2400", "0000")
    
    return date_adjusted, time_adjusted

In [24]:
crs_dep_date, crs_dep_time_str = process_time_column(data['FL_DATE'], data['CRS_DEP_TIME'])
crs_arr_date, crs_arr_time_str = process_time_column(data['FL_DATE'], data['CRS_ARR_TIME'])
dep_date, dep_time_str = process_time_column(data['FL_DATE'], data['DEP_TIME'])
arr_date, arr_time_str = process_time_column(data['FL_DATE'], data['ARR_TIME'])

In [None]:
data['CRS_DEP_TIMESTAMP'] = pd.to_datetime(crs_dep_date.astype(str) + " " + crs_dep_time_str.str[:2] + ":" + crs_dep_time_str.str[2:], errors='coerce')
print('>>>')
data['CRS_ARR_TIMESTAMP'] = pd.to_datetime(crs_arr_date.astype(str) + " " + crs_arr_time_str.str[:2] + ":" + crs_arr_time_str.str[2:], errors='coerce')
print('>>>')
data['DEP_TIMESTAMP'] = pd.to_datetime(dep_date.astype(str) + " " + dep_time_str.str[:2] + ":" + dep_time_str.str[2:], errors='coerce')
print('>>>')
data['ARR_TIMESTAMP'] = pd.to_datetime(arr_date.astype(str) + " " + arr_time_str.str[:2] + ":" + arr_time_str.str[2:], errors='coerce')

50 MINS

In [26]:
origin_tz = data.merge(airport_list[['IATA', 'Timezone']], left_on='ORIGIN', right_on='IATA', how='left')['Timezone']
dest_tz = data.merge(airport_list[['IATA', 'Timezone']], left_on='DEST', right_on='IATA', how='left')['Timezone']

def convert_to_utc(timestamp, timezone_str):
    local_tz = pytz.timezone(timezone_str)
    return local_tz.localize(timestamp).astimezone(pytz.UTC)

data['CRS_DEP_UTC'] = [convert_to_utc(ts, tz) for ts, tz in zip(data['CRS_DEP_TIMESTAMP'], origin_tz)]
data['CRS_ARR_UTC'] = [convert_to_utc(ts, tz) for ts, tz in zip(data['CRS_ARR_TIMESTAMP'], dest_tz)]

In [27]:
data.columns

Index(['FL_DATE', 'OP_UNIQUE_CARRIER', 'OP_CARRIER_FL_NUM', 'ORIGIN',
       'ORIGIN_CITY_NAME', 'DEST', 'DEST_CITY_NAME', 'CRS_DEP_TIME',
       'DEP_TIME', 'CRS_ARR_TIME', 'ARR_TIME', 'CANCELLED',
       'CANCELLATION_CODE', 'DIVERTED', 'AIR_TIME', 'DISTANCE',
       'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY',
       'LATE_AIRCRAFT_DELAY', 'CRS_DEP_TIMESTAMP', 'CRS_ARR_TIMESTAMP',
       'DEP_TIMESTAMP', 'ARR_TIMESTAMP', 'CRS_DEP_UTC', 'CRS_ARR_UTC'],
      dtype='object')

## Drop processed time columns

In [28]:
data = data.drop(columns=['FL_DATE', 'CRS_DEP_TIME', 'DEP_TIME', 'CRS_ARR_TIME', 'ARR_TIME'])

# Get weather data

Reference webpage:

Iowa Environment Mesonet, ASOS data download.
https://mesonet.agron.iastate.edu/request/download.phtml?network=IA_ASOS

Github repo
https://github.com/akrherz/iem/blob/main/pylib/iemweb/request/asos.py

In [3]:
weather = pd.read_parquet('../data/cleaned_data/weather.parquet')

In [None]:
weather.to_parquet('../data/cleaned_data/weather.parquet')

## By IATA code.

2 hours!

In [None]:
essential_fields = ['tmpf', 'dwpf', 'relh', 'drct', 'sknt', 'p01i', 'vsby', 'gust', 'alti', 'mslp', 'wxcodes']
essential_fields = ''.join([f"&data={field}" for field in essential_fields])

station_names = airport_list['IATA'].values

all_data = []
df = pd.DataFrame()
no_data_station = []
iteration = 1
years = range(2018, 2025)
months = [1, 11, 12]
current_year = datetime.now().year

for station_name in station_names:
    for year in years:
        for month in months:
            if year == current_year and month in [11, 12]:
                continue
            if month == 1:
                year2 = year
                month2 = 2
                day1, day2 = 1, 1
            elif month == 11:
                year2 = year
                month2 = 12
                day1, day2 = 1, 1
            else:
                year2 = year + 1
                month2 = 1
                day1, day2 = 1, 1
            uri = (
                "http://mesonet.agron.iastate.edu/cgi-bin/request/asos.py?"
                f"station={station_name}" + essential_fields +
                f"&year1={year}&month1={month}&day1={day1}"
                f"&year2={year2}&month2={month2}&day2={day2}"
                "&tz=Etc/UTC&format=onlycomma&latlon=no&elev=no&missing=M&trace=T"
                "&direct=yes&report_type=3"
            )
            try:
                res = requests.get(uri)
                res.raise_for_status()
                df = pd.read_csv(io.StringIO(res.text), na_values='M')
                df['valid'] = pd.to_datetime(df['valid'], format='%Y-%m-%d %H:%M')
                if df.empty:
                    no_data_station.append(station_name)
                    continue
                else:
                    all_data.append(df)
            except requests.RequestException as e:
                print(f"Failed to retrieve data for {station_name} in {year}-{month}: {e}")
    print(iteration, station_name, len(df))
    iteration += 1

weather = pd.concat(all_data, ignore_index=True)
# Substitute trace precipitation as 0.0001
weather['p01i'] = weather['p01i'].replace('T', 0.0001).astype(float)

## By ICAO code

Cannot get weather data for some airports, try to use ICAO code instead.

In [None]:
# Find airports with no data
missing_airports = set(airport_list['IATA'].values) - set(weather['station'].unique())
print(missing_airports)

In [None]:
airport_info = pd.read_csv('../data/downloaded_data/airports.dat', header=None, usecols=[4, 5, 6, 7, 8],
                           names=["IATA", "ICAO", "Latitude", "Longitude", "Altitude"])
missing_airport_info = airport_info[airport_info['IATA'].isin(missing_airports)]
missing_airports = missing_airport_info['ICAO'].values

In [None]:
all_data = []
no_data_station = []
iteration = 1
years = range(2018, 2025)
months = [1, 11, 12]
current_year = datetime.now().year

for station_name in missing_airports:
    for year in years:
        for month in months:
            if year == current_year and month in [11, 12]:
                continue
            if month == 1:
                year2 = year
                month2 = 2
                day1, day2 = 1, 1
            elif month == 11:
                year2 = year
                month2 = 12
                day1, day2 = 1, 1
            else:
                year2 = year + 1
                month2 = 1
                day1, day2 = 1, 1
            uri = (
                "http://mesonet.agron.iastate.edu/cgi-bin/request/asos.py?"
                f"station={station_name}" + essential_fields +
                f"&year1={year}&month1={month}&day1={day1}"
                f"&year2={year2}&month2={month2}&day2={day2}"
                "&tz=Etc/UTC&format=onlycomma&latlon=no&elev=no&missing=M&trace=T"
                "&direct=yes&report_type=3"
            )
            try:
                res = requests.get(uri)
                res.raise_for_status()
                df = pd.read_csv(io.StringIO(res.text), na_values='M')
                df['valid'] = pd.to_datetime(df['valid'], format='%Y-%m-%d %H:%M')
                if df.empty:
                    no_data_station.append(station_name)
                    continue
                else:
                    all_data.append(df)
            except requests.RequestException as e:
                print(f"Failed to retrieve data for {station_name} in {year}-{month}: {e}")
    print(iteration, station_name, len(df))
    iteration += 1

weather_missing = pd.concat(all_data, ignore_index=True)
weather_missing['p01i'] = weather_missing['p01i'].replace('T', 0.0001).astype(float)
weather = pd.concat([weather, weather_missing], ignore_index=True)

In [None]:
# repalce ICAO code to IATA code
icao_to_iata = airport_info[airport_info['IATA'].isin(airport_list['IATA'])].set_index('ICAO')['IATA'].to_dict()
weather['station'] = weather['station'].replace(icao_to_iata)

We still have no data of Kapalua–West Maui Airport, which is a regional airport in the district of Mahinahina on the west side of Maui island in the state of Hawaii. There is no data of this airport on METAR. Based on the difficulty of getting this data, we decide to remove observations with airport JHM, for ORIGIN and DEST.

In [None]:
data = data[(data['ORIGIN'] != 'JHM') & (data['DEST'] != 'JHM')]

# Merge flight and weather data

Some weather data are missing after UTC time conversion

In [None]:
all_data = []
date_range = [pd.Timestamp("2017-12-31")]
for year in range(2018, 2024):
    date_range.append(pd.Timestamp(f"{year}-10-31"))
essential_fields = ['tmpf', 'dwpf', 'relh', 'drct', 'sknt', 'p01i', 'vsby', 'gust', 'alti', 'mslp', 'wxcodes']
essential_fields = ''.join([f"&data={field}" for field in essential_fields])
for station_name in airport_list['IATA']:
    for date in date_range:
        if date.year == 2017:
            year2 = date.year + 1
        else:
            year2 = date.year
        uri = (
            "http://mesonet.agron.iastate.edu/cgi-bin/request/asos.py?"
            f"station={station_name}" + essential_fields +
            f"&year1={date.year}&month1={date.month}&day1=31"
            f"&year2={year2}&month2={(date.month + 1) % 12}&day2=1"
            "&tz=Etc/UTC&format=onlycomma&latlon=no&elev=no&missing=M&trace=T"
            "&direct=yes&report_type=3"
        )
        res = requests.get(uri)
        df = pd.read_csv(io.StringIO(res.text), na_values='M')
        df['valid'] = pd.to_datetime(df['valid'], format='%Y-%m-%d %H:%M')
        all_data.append(df)

weather_missing = pd.concat(all_data, ignore_index=True)
weather_missing['p01i'] = weather_missing['p01i'].replace('T', 0.0001).astype(float)
weather_missing['valid'] = weather_missing['valid'].dt.tz_localize('UTC')
weather = pd.concat([weather, weather_missing], ignore_index=True)

In [13]:
all_data = []
date_range = []
for year in range(2018, 2025):
    date_range.append(pd.Timestamp(f"{year}-02-01"))
essential_fields = ['tmpf', 'dwpf', 'relh', 'drct', 'sknt', 'p01i', 'vsby', 'gust', 'alti', 'mslp', 'wxcodes']
essential_fields = ''.join([f"&data={field}" for field in essential_fields])
for station_name in airport_list['IATA']:
    for date in date_range:
        uri = (
            "http://mesonet.agron.iastate.edu/cgi-bin/request/asos.py?"
            f"station={station_name}" + essential_fields +
            f"&year1={date.year}&month1=2&day1=1"
            f"&year2={date.year}&month2=2&day2=2"
            "&tz=Etc/UTC&format=onlycomma&latlon=no&elev=no&missing=M&trace=T"
            "&direct=yes&report_type=3"
        )
        res = requests.get(uri)
        df = pd.read_csv(io.StringIO(res.text), na_values='M')
        df['valid'] = pd.to_datetime(df['valid'], format='%Y-%m-%d %H:%M')
        all_data.append(df)

weather_missing = pd.concat(all_data, ignore_index=True)
weather_missing['p01i'] = weather_missing['p01i'].replace('T', 0.0001).astype(float)
weather_missing['valid'] = weather_missing['valid'].dt.tz_localize('UTC')
weather = pd.concat([weather, weather_missing], ignore_index=True)

  weather_missing = pd.concat(all_data, ignore_index=True)


In [14]:
data = data.sort_values('CRS_DEP_UTC')
weather = weather.sort_values('valid')

data_with_dep_weather = pd.merge_asof(
    data,
    weather,
    left_on='CRS_DEP_UTC',
    right_on='valid',
    left_by='ORIGIN',
    right_by='station',
    suffixes=('', '_dep')
)

data_with_dep_weather = data_with_dep_weather.sort_values('CRS_ARR_UTC')

data_with_all_weather = pd.merge_asof(
    data_with_dep_weather,
    weather,
    left_on='CRS_ARR_UTC',
    right_on='valid',
    left_by='DEST',
    right_by='station',
    suffixes=('', '_arr')
)

data_with_all_weather = data_with_all_weather.sort_values('CRS_DEP_UTC')
print(data_with_all_weather)

         OP_UNIQUE_CARRIER  OP_CARRIER_FL_NUM ORIGIN ORIGIN_CITY_NAME DEST  \
0                       9K               6393    GUM         Guam, TT  SPN   
1                       9K               6394    SPN       Saipan, TT  GUM   
2                       9K               6395    GUM         Guam, TT  SPN   
18030                   UA                200    GUM         Guam, TT  HNL   
3                       9K               6392    SPN       Saipan, TT  GUM   
...                    ...                ...    ...              ...  ...   
10924307                DL                485    KOA         Kona, HI  SEA   
10924295                DL                975    LIH        Lihue, HI  SEA   
10923676                UA               1639    OGG      Kahului, HI  SFO   
10923933                UA               1724    KOA         Kona, HI  SFO   
10924706                AS                848    HNL     Honolulu, HI  SEA   

             DEST_CITY_NAME  CANCELLED CANCELLATION_CODE  DIVER

In [15]:
data_with_all_weather = data_with_all_weather.drop(['station', 'valid', 'station_arr', 'valid_arr'], axis=1)
# drop column gust because so many missing values(87%)
data_with_all_weather = data_with_all_weather.drop(['gust', 'gust_arr'], axis=1)
data_with_all_weather = data_with_all_weather.sort_values('CRS_DEP_TIMESTAMP')

In [16]:
data_with_all_weather.to_parquet('../data/cleaned_data/data_with_all_weather.parquet')

In [None]:
data_with_all_weather = pd.read_parquet('../data/cleaned_data/data_with_all_weather.parquet')