# Computing Weather for Each Flight Optimal Route with Parallelization

In [1]:
import sys
# This variable should indicate the path from this Jupyter Notebook to the root directory of the repo.
root_path = '../'
# Adds the repo's root to the list of paths
sys.path.append(root_path)

# Package to read yml files
import yaml
# Package to handle file paths
import os
# Package to deal with DataFrames
import pandas as pd
# Package to plot stuff
import matplotlib.pyplot as plt
# Package for numerical and array handling
import numpy as np
# Package to read and write to .sqlite files
import sqlite3
# Package to keep track of time
import datetime

# Function to clear output from jupyter notebook
from IPython.display import clear_output
# Package for compressing dataframes into file
from src.data import compressors
# Package for defining and fitting weather models
from src.models import weather
# Utilities package
from src.common import utils
# Package for interpolating and estimating weather
from src.analysis import weather_interpolator

# Path from this notebook to the root directory
root_path = os.path.normpath(root_path)
# Path from root to the desired config file
config_path_from_root = os.path.normpath('config/config.yml')
# Defining path from this notebook to config file
config_path = os.path.join(root_path, config_path_from_root)

# Loading config file
with open(config_path, 'r',  encoding='utf8') as file:
    config = yaml.safe_load(file)

# Defining "clear-output" function to feed into logger
def clear():
    clear_output(wait=True)

# Creates an instance of a logger class to log all that happens, optional (but encouraged).
logger = utils.Logger(config, clear_function=None)

config['statistics']['interpolation']['flights']['step'] = 600
# Creates an instance of the weather interpolator
interpolator = weather_interpolator.WeatherInterpolator(config, logger=logger)

# Defining location of data
flights_database = '../data/flight/KDEN_KSEA_2023-01-01_2023-01-31.sqlite'
weather_database = '../data/weather/Weather-US_2022-12-31_2023-02-01.sqlite'

# Path to file keeping track of already-loaded flights
tracking_file = flights_database.replace('sqlite','txt')

In [None]:
import concurrent.futures

# Path to file keeping track of already-loaded flights
tracking_file = flights_database.replace('sqlite', 'txt')

flights_connection = sqlite3.connect(flights_database)
weather_connection = sqlite3.connect(weather_database)
# Declares a cursor to write to the database
cursor = flights_connection.cursor()

# Checking and loading the tracking_file
if os.path.isfile(tracking_file):
    with open(tracking_file, 'r') as f:
        loaded_ids = f.read().split('\n')
        loaded_ids = [i for i in loaded_ids if i != '']
else:
    with open(tracking_file, 'w') as f:
        loaded_ids = []

# Runs a query to identify all the flight_ids available
flight_ids = pd.read_sql_query("SELECT flight_id FROM flights;", flights_connection).values[:,0]
# Only care about the ids that have not been loaded yet.
flight_ids = [f for f in flight_ids if f not in loaded_ids]

# Loading the time threshold variable, which is the time interval that weather data will be loaded for each calculation.
time_thresh = config['statistics']['interpolation']['weather']['time-thresh']
lat_lon_thresh = config['statistics']['interpolation']['weather']['lat-lon-thresh']

# The list of columns to be added to the new table to be created.
new_columns = ['tmpf', 'air_pressure', 'air_density', 'clouds', 'sknt', 'severity']

# If there is no record of loaded ids, we start from scratch
if len(loaded_ids) == 0:
    # Drop the table if it exists
    cursor.execute("DROP TABLE IF EXISTS optimal_state_vector_weather;")

    # Create the new table
    create_table_query = f'''
        CREATE TABLE optimal_state_vector_weather (
            flight_id TEXT,
            {", ".join([f"{col} REAL" for col in new_columns])}
        );
    '''
    # Create the new table if it doesn't exist
    cursor.execute(create_table_query)

# Commits change to file
flights_connection.commit()
# Closing connections
flights_connection.close()
weather_connection.close()

t_start_full = datetime.datetime.now()

count = 0

num_cores = os.cpu_count()

def main_computation(i, flight_id):
    flights_connection = sqlite3.connect(flights_database)
    weather_connection = sqlite3.connect(weather_database)

    # Finds minimum and maximum flight for current flight
    min_time, max_time = pd.read_sql_query(f"""
                                SELECT MIN(time) as min_time, MAX(time) as max_time
                                FROM state_vectors
                                JOIN flights ON flights.flight_id = state_vectors.flight_id
                                WHERE state_vectors.flight_id = "{flight_id}";
                               """,
                              flights_connection
                              ).values[0]

    # Finds the minimum and maximum latitudes for the current flight
    min_latitude, max_latitude = pd.read_sql_query(f"""
                                    SELECT MIN(lat) as min_lat, MAX(lat) as max_lat
                                    FROM state_vectors
                                    JOIN flights ON flights.flight_id = state_vectors.flight_id
                                    WHERE state_vectors.flight_id = "{flight_id}"
                                   """,
                                  flights_connection
                                  ).values[0]

    # Finds the minimum and maximum longitudes for the current flight
    min_longitude, max_longitude = pd.read_sql_query(f"""
                                    SELECT MIN(lon) as min_lon, MAX(lon) as max_lon
                                    FROM state_vectors
                                    JOIN flights ON flights.flight_id = state_vectors.flight_id
                                    WHERE state_vectors.flight_id = "{flight_id}"
                                   """,
                                  flights_connection
                                  ).values[0]

    # Adjusting time, lat and lon thresholds.
    # Adds time threshold to time limits
    min_time -= time_thresh
    max_time += time_thresh

    # Adds latitude and longitude threshold
    range_latitude = max_latitude - min_latitude
    range_longitude = max_longitude - min_longitude
    min_latitude -= range_latitude*lat_lon_thresh
    max_latitude += range_latitude*lat_lon_thresh
    min_longitude -= range_longitude*lat_lon_thresh
    max_longitude += range_longitude*lat_lon_thresh

    # Loads the weather data corresponding to the flight
    flight_weather_data = pd.read_sql_query(f"""
                                    SELECT ws.lat, ws.lon, ws.elevation, ws.sigma, wd.*
                                    FROM weather_data as wd
                                    JOIN weather_stations as ws ON ws.station_id = wd.station_id
                                    WHERE wd.time BETWEEN {min_time} AND {max_time};
                                   """,
                                   weather_connection
                                    )

    # Loads state vectors for the given flight

    optimal_state_vectors = pd.read_sql_query(f"""
        SELECT sv.*
        FROM state_vectors AS sv
        INNER JOIN (
            SELECT of.flight_id
            FROM flights AS fs
            JOIN flights_aircraft AS fa ON fs.icao24 = fa.icao24
            JOIN optimal_flights AS of ON fa.typecode = of.typecode
            WHERE fs.flight_id = "{flight_id}"
        ) AS optimal_flight_id ON sv.flight_id = optimal_flight_id.flight_id;
        """,
        flights_connection)

    optimal_state_vectors['time'] = [min_time + time_thresh - i for i in range(len(optimal_state_vectors))]

    # Computes the weather values for the current flight
    optimal_state_vectors = interpolator.compute_flight_weather_quantities(new_columns, optimal_state_vectors, stations_data=flight_weather_data)

    # Extract the directory and filename
    directory, filename = os.path.split(flights_database)

    # Modify the filename
    # Insert 'optimal_' at the beginning and replace '.sqlite' with '_{flight_id}.csv'
    filename = 'optimal_' + filename.replace('.sqlite', f'_{flight_id}.csv')

    # Combine the directory and the modified filename
    csv_file = os.path.join(directory, filename)
    
    optimal_state_vectors[['flight_id'] + new_columns].to_csv(csv_file)
    
    flights_connection.close()
    weather_connection.close()
    
    time_iteration = datetime.datetime.now()
    time_elapsed = (time_iteration - t_start_full).total_seconds()
    if i == 0:
        ETA = np.nan
    else:
        ETA = time_elapsed*len(flight_ids)/i - time_elapsed
    clear_output(wait=True)
    print(f'{i}/{len(flight_ids)}.')
    print(f'Time Elapsed: {utils.format_time(time_elapsed)}.')
    print(f'Estimate time to finish: {utils.format_time(ETA)}.')

with concurrent.futures.ThreadPoolExecutor(max_workers=num_cores) as executor:
    futures = [executor.submit(main_computation, i, flight_id) for i, flight_id in enumerate(flight_ids)]
    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
        except Exception as e:
            print(f"An error occurred: {e}")

# for i, flight_id in enumerate(flight_ids):
#     main_computation(i, flight_id)
    
import os
csv_files = ['../data/flight/' + f for f in os.listdir('../data/flight') if f.startswith('optimal_' + flights_database.split('/')[-1].replace('.sqlite','')) and f.endswith('.csv')]

flights_connection = sqlite3.connect(flights_database)
cursor = flights_connection.cursor()

for file in csv_files:
    optimal_state_vectors_weather = pd.read_csv(file, index_col=0)
    clear_output(wait=True)
    print(file)
    for index, row in optimal_state_vectors_weather.iterrows():
        # Preparing the data to be inserted
        insert_data = tuple(row[col] for col in ['flight_id'] + new_columns)

        # Creating query to insert new values
        insert_query = f'''
            INSERT INTO optimal_state_vector_weather (flight_id, {', '.join(new_columns)})
            VALUES ({', '.join('?' * len(insert_data))})
        '''
        cursor.execute(insert_query, insert_data)
    # Commiting changes to the database
    flights_connection.commit()

flights_connection.close()

87/649.
Time Elapsed: 1h 4m 29s.
Estimate time to finish: 6h 56m 34s.


In [13]:
csv_files = ['../data/flight/' + f for f in os.listdir('../data/flight') if f.startswith('optimal_' + flights_database.split('/')[-1].replace('.sqlite','')) and f.endswith('.csv')]

flights_connection = sqlite3.connect(flights_database)
cursor = flights_connection.cursor()

for file in csv_files:
    optimal_state_vectors_weather = pd.read_csv(file, index_col=0)
    clear_output(wait=True)
    print(file)
    for index, row in optimal_state_vectors_weather.iterrows():
        # Preparing the data to be inserted
        insert_data = tuple(row[col] for col in ['flight_id'] + new_columns)

        # Creating query to insert new values
        insert_query = f'''
            INSERT INTO optimal_state_vector_weather (flight_id, {', '.join(new_columns)})
            VALUES ({', '.join('?' * len(insert_data))})
        '''
        cursor.execute(insert_query, insert_data)
    # Commiting changes to the database
    flights_connection.commit()

flights_connection.close()

../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a335ad_1672966747_1672975259_KDEN_KSEA.csv


648/649.
Time Elapsed: 12h 44m 59s.
Estimate time to finish: 1m 10s.


In [24]:
optimal_state_vectors

Unnamed: 0,vector_id,flight_id,time,time_normalized,lat,lon,geoaltitude,baroaltitude,heading,velocity,tmpf,air_pressure,air_density,clouds,sknt,severity
0,2326505,a44565_1673808290_1673816496_KDEN_KSEA,1675198137,0,39.891403,-104.696244,1684.020000,1836.420000,0.000000,106.489908,17.637999,821.549646,1.079320,0.000000,3.560400,0.0
1,2326506,a44565_1673808290_1673816496_KDEN_KSEA,1675198136,1,39.892367,-104.696237,1701.028727,1853.428727,0.103302,106.364528,17.535467,820.750715,1.078381,0.000000,3.759182,0.0
2,2326507,a44565_1673808290_1673816496_KDEN_KSEA,1675198135,2,39.893331,-104.696229,1718.037455,1870.437455,0.206605,106.239149,17.432935,819.951783,1.077442,0.000000,3.957964,0.0
3,2326508,a44565_1673808290_1673816496_KDEN_KSEA,1675198134,3,39.894295,-104.696222,1735.046182,1887.446182,0.309907,106.113769,17.330403,819.152852,1.076503,0.000000,4.156746,0.0
4,2326509,a44565_1673808290_1673816496_KDEN_KSEA,1675198133,4,39.895259,-104.696215,1752.054909,1904.454909,0.413210,105.988389,17.227871,818.353920,1.075565,0.000000,4.355528,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8104,2334609,a44565_1673808290_1673816496_KDEN_KSEA,1675190033,8104,47.524300,-122.317834,475.977455,674.097455,179.979882,79.730740,-98.667206,179.199002,0.311247,0.799954,178.418546,0.0
8105,2334610,a44565_1673808290_1673816496_KDEN_KSEA,1675190032,8105,47.523601,-122.317834,471.283091,669.403091,179.797681,79.477219,-98.667206,179.199002,0.311247,0.799954,178.418546,0.0
8106,2334611,a44565_1673808290_1673816496_KDEN_KSEA,1675190031,8106,47.522901,-122.317834,466.588727,664.708727,179.615480,79.223698,-98.667206,179.199002,0.311247,0.799954,178.418546,0.0
8107,2334612,a44565_1673808290_1673816496_KDEN_KSEA,1675190030,8107,47.522202,-122.317834,461.894364,660.014364,179.433279,78.970177,-98.667206,179.199002,0.311247,0.799954,178.418546,0.0


In [14]:
flights_database = '../data/flight/KDEN_KSEA_2023-01-01_2023-01-31.sqlite'

conn = sqlite3.connect(flights_database)

optimal_sv = pd.read_sql_query(f"""
        SELECT *
        FROM optimal_state_vector_weather
        LIMIT 10;
        """,
        conn)

conn.close()
optimal_sv

Unnamed: 0,flight_id,tmpf,air_pressure,air_density,clouds,sknt,severity
0,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
1,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
2,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
3,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
4,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
5,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
6,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
7,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
8,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,
9,a44565_1673808290_1673816496_KDEN_KSEA,,,,,,


In [12]:
csv_files = ['../data/flight/' + f for f in os.listdir('../data/flight') if f.startswith('optimal_' + flights_database.split('/')[-1].replace('.sqlite','')) and f.endswith('.csv')]
csv_files

['../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a3649a_1673032170_1673040502_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a27f07_1674604688_1674614127_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a28de3_1674171821_1674180496_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a32ad5_1674524954_1674534011_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a5c2dd_1674171334_1674180026_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a441d6_1673400921_1673409014_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a2919a_1672870456_1672879758_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a44938_1674100898_1674109421_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_a33d68_1675130645_1675139747_KDEN_KSEA.csv',
 '../data/flight/optimal_KDEN_KSEA_2023-01-01_2023-01-31_ac1afa_1674966717_1674975

In [11]:
'optimal_' + flights_database.replace('.sqlite','')

'optimal_../data/flight/KDEN_KSEA_2023-01-01_2023-01-31'