In [None]:
import pandas as pd, dill as pickle, os
from utils.calculateConstants import *
from utils.util import getTopLevelPath
from utils.elevation import getStationElevations, getStationElevationCircles
from datetime import date
from tqdm.notebook import tqdm, trange
from pyarrow import feather

In [None]:
folder_path =  getTopLevelPath() + 'data/'
stationsLonLatXY_path = folder_path + 'Measured/stationsLonLatXY.pkl'
measured_path = folder_path + 'Measured/Processed/' + max(os.listdir(folder_path + 'Measured/Processed/'), key = lambda f: os.path.getmtime(folder_path + 'Measured/Processed/' + f))
reanalysis_path = folder_path + 'Reanalysis/' + max([file for file in os.listdir(folder_path + 'Reanalysis/') if file.endswith('.feather')], key = lambda f: os.path.getmtime(folder_path + 'Reanalysis/' + f))
elevation_path = folder_path + "Elevation/IslandsDEMv1.0_20x20m_isn93_zmasl.tif"

se = getStationElevations()
ec = getStationElevationCircles()

today = date.today().strftime("%Y-%m-%d")
measured_outputpath = folder_path + f'Model/measured_{today}.feather'
reanalysis_outputpath = folder_path + f'Model/reanalysis_{today}.feather'
outputpath = folder_path + f'Model/data_{today}.feather'
outputpath_for_errors = folder_path + f'Model/Errors/error_{today}.feather'

In [None]:
def addLonLatXYtoMeasured(df, stationsLonLatXY_path = stationsLonLatXY_path):
    with open(stationsLonLatXY_path, 'rb') as f:
        stationsLonLatXY = pickle.load(f)
    def get_lon_lat_X_Y(stod):
        return stationsLonLatXY.get(stod, (np.nan, np.nan, np.nan, np.nan))

    lon, lat, X, Y = zip(*df.stod.map(get_lon_lat_X_Y))
    df['lon'], df['lat'], df['X'], df['Y'] = lon, lat, X, Y
    return df

In [None]:
def addStationToReanalysis(df, stationnsLonLatXY_path = stationsLonLatXY_path):
    df['LonLat'] = list(zip(df.lon, df.lat))
    with open(stationsLonLatXY_path, 'rb') as f:
        stationsLonLatXY = pickle.load(f)
    inv = {v[:2]: k for k, v in stationsLonLatXY.items()}
    def getStation(lonlat):
        return inv.get(lonlat, (np.nan))
    tqdm.pandas(desc = "Adding stations to reanalysis...")
    df['stod'] = df.LonLat.progress_map(getStation)

    return df

In [None]:
def addElevationCircles(stod):
    return ec[stod]

In [None]:
def addStationElevations(stod):
    return se[stod]

In [None]:
def addElevation(df):
    df['XYd'] = list(zip(df.X, df.Y, df.d))
    tqdm.pandas(desc = 'Adding station elevations...')
    df['station_elevation'] = df.stod.progress_map(addStationElevations)
    tqdm.pandas(desc = 'Adding landscape elevation...')
    ec = getStationElevationCircles()
    df['elevations']  = df.stod.progress_map(addElevationCircles)
    df = df.drop(['XYd'], axis = 1)

    return df

In [None]:
def prepareMeasurements(df, stationsLonLatXY_path, decimal_places = 4):
    df = df.drop(['dsdev'], axis = 1)
    df = df.rename(columns = {'timi':'time'})
    df = addLonLatXYtoMeasured(df)
    df = addElevation(df)
    df = df.round(decimal_places)
    return df

In [None]:
def prepareRenalysis(df, decimal_places = 4):
    df = df.rename(columns = {'Wind speed':'ws', 'Wind direction': 'wd', 'Pressure':'p', 'Temperature':'t'})
    df = df.drop_duplicates(subset=['lon', 'lat', 'time', 'height_level'])
    df = df.pivot(index = ['lon', 'lat', 'time'], columns = 'height_level')
    df = df.drop(columns='yr_month')
    df.columns = [f'{col[0]}_{col[1]}' for col in df.columns]
    df = df.reset_index()
    df = addStationToReanalysis(df)
    df.time = pd.to_datetime(df.time)
    df = df.round(decimal_places)
    tqdm.pandas(desc='Creating new column to calculate constants')
    df['cc'] = list(zip(df.t_15, df.t_500, df.p_15, df.p_500, df.ws_15, df.ws_500))
    tqdm.pandas(desc='Calculating Richardson number...')
    df['Ri'] = df.cc.progress_map(rowRichardson).to_list()
    tqdm.pandas(desc='Calculating Brunt Vaisala Squared...')
    df['N_squared'] = df.cc.progress_map(rowBruntVaisalaSquared).to_list()
    df = df.drop(['cc'], axis = 1)
    return df

In [None]:
def write_in_chunks(df, file_path = outputpath, chunk_size = int(1e6)):
    num_chunks = len(df)//chunk_size + (1 if len(df) % chunk_size > 0 else 0)
    for i in trange(num_chunks, desc = 'Writing to feather...'):
        start = i * chunk_size
        end = start + chunk_size
        chunk = df.iloc[start:end]
        chunk_filepath = file_path.split('.')[0] + f'_{str(i)}.feather'
        print(f"About to write to {chunk_filepath}!")
        feather.write_feather(chunk, chunk_filepath)

In [None]:
def merge(measured_path = measured_path, reanalysis_path = reanalysis_path):
    measured_df = pd.read_feather(measured_path)
    reanalysis_df = pd.read_feather(reanalysis_path)
    measured_df = prepareMeasurements(measured_df, stationsLonLatXY_path)
    reanalysis_df = prepareRenalysis(reanalysis_df)
    print("Sorting dataframes...")
    measured_df = measured_df.sort_values(['stod', 'time'])
    reanalysis_df = reanalysis_df.sort_values(['stod', 'time'])
    print("About to merge dataframes...")
    merged_df = pd.merge(measured_df, reanalysis_df, on = ['stod', 'time'], how = 'inner')
    print("Finished merging...")
    print("Splitting by if error (fg <= f) (should not happen)")
    errors_df = merged_df[merged_df.fg <= merged_df.f]
    merged_df = merged_df[merged_df.fg > merged_df.f]
    print("About to start writing out dataframes...")
    write_in_chunks(merged_df, outputpath)
    print("Finished writing data. About to write errors...")
    write_in_chunks(errors_df, outputpath_for_errors)
    print("Finished writing errors.")
    return merged_df