In [1]:
import os
import xarray as xr
import numpy as np
import pandas as pd
from metpy.calc import dewpoint_from_relative_humidity
from metpy.units import units


In [2]:
path_data = "/glade/p/cisl/aiml/jtti_tornado/gridrad_storm_tracks/"
path_save = "/glade/p/cisl/aiml/jtti_tornado/gridrad_soundings/"


In [None]:
columns_to_drop = ['grid_point_latitudes_deg', 'grid_point_longitudes_deg', 'grid_point_rows', 'grid_point_columns', 'polygon_object_latlng_deg', 'polygon_object_rowcol']
for year in [d for d in os.listdir(path_data) if d.startswith('20')]:
    dates = os.listdir(os.path.join(path_data, year))
    print(year, len(dates))
    for date in dates:
        files = os.listdir(os.path.join(path_data, year, date, 'scale_314159265m2'))
        df = pd.DataFrame()
        for file in files:
            df = df.append(pd.read_pickle(os.path.join(path_data, year, date, "scale_314159265m2", file)),
                           ignore_index=True)
        df = df.drop(columns=columns_to_drop)
        df = df.reset_index(drop=True)
        df.to_parquet(os.path.join(path_save, "raw", f"{date}.parquet"))


In [3]:
def summarize_storm_ids(df, ds, date):
#     print(date)
    df_storm_ids = list(df['full_id_string'])
    df_storm_ids.sort()
#     print(f"\t- DF count of full_id_strings: {len(df_storm_ids)}")
#     print(f"\t- DF count of unique full_id_strings: {len(set(df_storm_ids))}")
    
    ds_storm_ids = list(ds['full_storm_id_strings'].values)
    ds_storm_ids = [i.decode('UTF-8') for i in ds_storm_ids]
    ds_storm_ids.sort()
#     print(f"\t- DS count of full_id_strings: {len(ds_storm_ids)}")
#     print(f"\t- DS count of unique full_id_strings: {len(set(ds_storm_ids))}")
    
    return df_storm_ids, ds_storm_ids
    

In [4]:
ds = xr.open_zarr(os.path.join("/glade/p/cisl/aiml/jtti_tornado/gridrad_radar_zarr/", "storm_radar_20110108.zarr"))

heights = list(ds['sounding_heights_m_agl'].values)
names = [i.decode('UTF-8') for i in ds['sounding_field_names'].values]

columns_flattened = []
for n in names:
    for h in heights:
        columns_flattened.append(n+'_'+str(h))


In [5]:
ds["sounding_field_names"].values

array([b'temperature_kelvins', b'pressure_pascals', b'u_wind_m_s01',
       b'v_wind_m_s01', b'relative_humidity_unitless',
       b'specific_humidity', b'virtual_potential_temperature_kelvins'],
      dtype='|S37')

In [6]:
def calc_dewpoint(df):
    for h in heights:
        df_RH = units.Quantity(np.array(df[f'relative_humidity_unitless_{h}']), "dimensionless")
        df_TMP =  units.Quantity(np.array(df[f'temperature_kelvins_{h}']), "K")
        df[f't_dewpoint_{h}'] = dewpoint_from_relative_humidity(df_TMP, df_RH) 
    return df


In [20]:
dates_all = os.listdir(os.path.join(path_save, "raw"))
dates_saved = []
dates_unavailable = []
for f in dates_all:
    date = f[:8]
    try:
        ds = xr.open_zarr(os.path.join("/glade/p/cisl/aiml/jtti_tornado/gridrad_radar_zarr/", f"storm_radar_{date}.zarr"))
        dates_saved.append(date)
    except Exception as e:
#         print(date, e)
        dates_unavailable.append(date)
        continue
dates_saved = dates_saved.sort()
dates_unavailable = dates_unavailable.sort()
 

In [21]:
dates_saved_files = os.listdir(os.path.join(path_save, "final"))
dates_saved_files = dates_saved_files.sort()
dates_saved == dates_saved_files


True

In [None]:
for f in os.listdir(os.path.join(path_save, "raw")):
    date = f[:8]
    df_new = pd.DataFrame(columns = list(df.columns) + columns_flattened)
    df = pd.read_parquet(os.path.join(path_save, "raw", f))
    try:
        ds = xr.open_zarr(os.path.join("/glade/p/cisl/aiml/jtti_tornado/gridrad_radar_zarr/", f"storm_radar_{date}.zarr"))
    except Exception as e:
        print(date, e)
        continue
    df_storm_ids, ds_storm_ids = summarize_storm_ids(df, ds, date)
    for id_string in list(set(df_storm_ids)):
        df_len = np.where(np.array(df_storm_ids) == id_string)[0].shape[0]
        ds_len = np.where(np.array(ds_storm_ids) == id_string)[0].shape[0]
        if df_len == ds_len:
            df_temp = df[df['full_id_string'] == id_string]
            ds_idx = np.where(np.array(ds_storm_ids) == id_string)[0]
            soundings = ds['sounding_matrix'].values[ds_idx].reshape(df_len, -1)
            df_temp[columns_flattened] = soundings
            df_new = df_new.append(df_temp)
        else:
            print(f"For date {date} & id_string {id_string}, DF contains {np.where(np.array(df_storm_ids) == id_string)[0].shape[0]}, DS contains {np.where(np.array(ds_storm_ids) == id_string)[0].shape[0]}")
    df_new = calc_dewpoint(df_new)
    if 0 in df_new.shape:
        print(f"Nothing to save for {date}")
    else:
        df_new.to_parquet(os.path.join(path_save, "final", f"{date}.parquet"))
    

In [None]:
dewpoint_levels = [f't_dewpoint_{h}' for h in heights]

RH0_count = 0
if df_new.isna().sum().sum() > 0:
    idx, idx_col = np.where(df_new[dewpoint_levels].isnull())
    RH0_count += len(idx)
    for ix, ixc in zip(idx, idx_col):
        height = heights[ixc]
        if df_new.at[ix, f'relative_humidity_unitless_{height}'] == 0.0:
            df_new.at[ix, f'relative_humidity_unitless_{height}'] == 1.0
            RH = units.Quantity(1.0/100., "dimensionless")
        elif df_new.at[ix, f'relative_humidity_unitless_{height}'] < 0.0:
            df_new.at[ix, f'relative_humidity_unitless_{height}'] == 1.0
            RH = units.Quantity(1.0/100., "dimensionless")
        else:
            print(f"RH not 0.0% at {ix, ixc}")
            break
        TMP = units.Quantity(df_new.at[ix, f'temperature_kelvins_{height}'] + 273.15, "K")
        df_new.at[ix, dewpoint_levels[ixc]] = np.array(dewpoint_from_relative_humidity(TMP, RH))


In [None]:
df_new.at[ix, f'relative_humidity_unitless_{height}']

In [None]:
df_new.at[ix, f't_dewpoint_{height}']

In [None]:
df_new.at[ix, f'temperature_kelvins_{height}']