# Data preprocessing

In [1]:
import jetutils
from jetutils.jet_finding import JetFindingExperiment, get_double_jet_index, find_all_jets
from jetutils.definitions import DATADIR, get_index_columns, SEASONS, PRETTIER_VARNAME, UNITS, JJADOYS, FIGURES, KAPPA
from jetutils.plots import plot_seasonal, plot_trends, trends_and_pvalues, num2tex, COLORS, periodic_rolling_pl
from jetutils.data import DataHandler, flatten_by, standardize, metadata_from_da, find_spot 
from pathlib import Path
import colormaps

import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
from matplotlib.colors import BoundaryNorm
from matplotlib.lines import Line2D
import matplotlib.cm as cm
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER
import xarray as xr
import glob
import os
import seaborn as sns

#%load_ext autoreload
#%autoreload 2
%matplotlib inline
print(jetutils.__file__)

Found config override file at  /home/mabeling/.jetutils.ini
Guessed N_WORKERS :  256
Guessed MEMORY_LIMIT :  8000
/home/mabeling/Jetutils-NP/jetutils/__init__.py


In [2]:
# Constants
p0 = 100000  # Pa
KAPPA = 0.286  # R_d / c_p for dry air

In [10]:
test = xr.open_dataset('/mnt/climstor/ecmwf/era5/raw/PL/data/2021/an_pl_ERA5_2021-06-24.nc')
print(test['level'])

<xarray.DataArray 'level' (level: 17)> Size: 136B
array([100., 150., 200., 250., 300., 350., 400., 450., 500., 550., 600., 650.,
       700., 750., 800., 850., 900.])
Coordinates:
  * level    (level) float64 136B 100.0 150.0 200.0 250.0 ... 800.0 850.0 900.0
Attributes:
    standard_name:  air_pressure
    long_name:      pressure_level
    units:          millibars
    positive:       down
    axis:           Z


In [None]:
def process_jet_stream_data(file_pattern, output_dir, output_filename, level_hpa):
    """
    Extracts u, v, t at a given pressure level, computes potential temperature and wind speed,
    and saves the result as a NetCDF file.

    Parameters:
        file_pattern (str): Glob pattern to match input files.
        output_dir (str): Directory to save the output NetCDF.
        output_filename (str): Name of the output file ('uvtths_plev500.nc').
        level_hpa (int): Pressure level in hPa (500).
    """
    p0 = 100000  # Pa
    KAPPA = 0.286  
    
    files = sorted(glob.glob(file_pattern))
    if not files:
        print("No files found matching pattern.")
        return

    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, output_filename)

    print("Loading multiple files...")
    ds = xr.open_mfdataset(files, combine="by_coords")

    try:
        ds = ds.sel(level=level_hpa)
        ds = ds.sel(lat=slice(20, 70)) 
        ds_east = ds.sel(lon=slice(120, 180))
        ds_west = ds.sel(lon=slice(-180, -80))
        ds = xr.concat([ds_east, ds_west], dim="lon")

        required_vars = ['t', 'u', 'v']
        if not all(var in ds.variables for var in required_vars):
            print("Missing required variables: 't', 'u', or 'v'.")
            return

        T = ds['t']
        u = ds['u']
        v = ds['v']
        p = ds['level'] * 100  # Convert hPa to Pa

        _, p_3d = xr.broadcast(T, p)

        # Potential temperature
        theta = T * (p0 / p_3d) ** KAPPA
        theta.name = "theta"

        # Wind speed magnitude
        wind_speed = np.sqrt(u ** 2 + v ** 2)
        wind_speed.name = "s"

        ds_out = xr.merge([T, u, v, theta, wind_speed])

        print("Saving combined file...")
        print("Output dataset dimensions and sizes:")
        for dim in ds_out.dims:
            print(f"  {dim}: {ds_out.dims[dim]}")
        ds_out.to_netcdf(output_path)
        print(f"Saved: {output_path}")

    except Exception as e:
        print(f"Error during processing: {e}")

In [None]:
def process_files_daily(file_pattern, output_dir, level_hpa):
    files = sorted(glob.glob(file_pattern))
    if not files:
        print("No files found matching the pattern.")
        return

    os.makedirs(output_dir, exist_ok=True)

    for file_path in files:
        print(f"Processing file: {file_path}")
        ds = xr.open_dataset(file_path)

        ds = ds.sel(level=level_hpa)
        ds = ds.sel(lat=slice(20, 70))
        ds_east = ds.sel(lon=slice(120, 180))
        ds_west = ds.sel(lon=slice(-180, -80))
        ds = xr.concat([ds_east, ds_west], dim="lon")

        required_vars = ['t', 'u', 'v']
        if not all(var in ds.variables for var in required_vars):
            print(f"Skipping {file_path}, missing required variables.")
            continue

        for day, ds_day in ds.groupby('time.day'):
            date_str = np.datetime_as_string(ds_day.time.values[0], unit='D')
            print(f"  Processing day: {date_str} ({len(ds_day.time)} time steps)")

            T = ds_day['t']
            u = ds_day['u']
            v = ds_day['v']
            p = ds_day['level'] * 100  # convert hPa to Pa
            _, p_3d = xr.broadcast(T, p)

            theta = T * (p0 / p_3d) ** KAPPA
            theta.name = "theta"
            wind_speed = np.sqrt(u ** 2 + v ** 2)
            wind_speed.name = "s"

            ds_out = xr.merge([T, u, v, theta, wind_speed])

            out_filename = f"uvtths_plev{level_hpa}_{date_str}.nc"
            out_path = os.path.join(output_dir, out_filename)
            ds_out.to_netcdf(out_path)
            print(f"    Saved {out_filename}")


In [None]:
def process_files_yearly(input_dir, output_dir, level_hpa, level_name, year):
    """
    Processes NetCDF files matching the given pattern and combines all data 
    from the specified year into a single NetCDF file.

    Parameters:
        input_dir (str): Directory where input is stored
        output_dir (str): Directory to save the combined yearly NetCDF.
        level_hpa (int): Pressure level in hPa to extract.
        year (int): Year to filter and process (e.g., 2021).
    """
    p0 = 100000  
    KAPPA = 0.286  
    
    input_dir = f'{input_dir}/{year}'
    file_pattern = os.path.join(input_dir, 'an_pl_ERA5_*.nc')
    files = sorted(glob.glob(file_pattern))
    if not files:
        print("No files found matching the pattern.")
        return

    output_dir = f'{output_dir}/{year}'

    os.makedirs(output_dir, exist_ok=True)
    datasets = []

    for file_path in files:
        print(f"Reading file: {file_path}")
        #ds = xr.open_dataset(file_path)
        with xr.open_dataset(file_path) as ds:

            if not np.any([t.astype('datetime64[Y]').astype(int) + 1970 == year for t in ds.time.values]):
                continue

            ds = ds.sel(level=level_hpa)
            ds = ds.sel(lat=slice(20, 70))

            ds_east = ds.sel(lon=slice(120, 180))
            ds_west = ds.sel(lon=slice(-180, -80))
            ds = xr.concat([ds_east, ds_west], dim="lon")

            if not all(var in ds.variables for var in ['t', 'u', 'v']):
                print(f"Skipping {file_path}, missing required variables.")
                continue

            T = ds['t']
            u = ds['u']
            v = ds['v']
            p = ds['level'] * 100
            _, p_3d = xr.broadcast(T, p)

            theta = T * (p0 / p_3d) ** KAPPA
            theta.name = "theta"
            wind_speed = np.sqrt(u ** 2 + v ** 2)
            wind_speed.name = "s"

            ds_out = xr.merge([T, u, v, theta, wind_speed])
            datasets.append(ds_out)

    if not datasets:
        print("No valid data to save.")
        return

    # Combine all daily datasets into single yearly dataset
    print(f"Combining {len(datasets)} daily datasets for year {year}...")
    yearly_ds = xr.concat(datasets, dim="time")
    yearly_ds = yearly_ds.sortby("time")

    out_filename = f"uvtths_plev{level_name}_{year}.nc"
    out_path = os.path.join(output_dir, out_filename)
    yearly_ds.to_netcdf(out_path)
    print(f"Saved combined yearly file: {out_path}")

In [None]:
process_files_yearly(
    year=2023,
    input_dir="/mnt/climstor/ecmwf/era5/raw/PL/data",
    output_dir="/scratch2/mabeling/data/ERA5/plev/mid_wind/6H/data",
    level_hpa= slice(500,500), # 150,350
    level_name = 'mid')

Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-01.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-02.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-03.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-04.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-05.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-06.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-07.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-08.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-09.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-10.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-11.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/2023/an_pl_ERA5_2023-01-12.nc
Reading file: /mnt/climstor/

In [None]:
#MID
YEARS = range(1990, 1992) 
for year in YEARS:
    process_files_yearly(
        year=year,
        input_dir="/mnt/climstor/ecmwf/era5/raw/PL/data",
        output_dir="/scratch2/mabeling/data/ERA5/plev/mid_wind/6H/data",
        level_hpa= slice(500,500),
        level_name = '500'
    )

In [None]:
#HIGH
YEARS = range(1997, 1998) 
for year in YEARS:
    process_files_yearly(
        year=year,
        input_dir="/mnt/climstor/ecmwf/era5/raw/PL/data",
        output_dir="/scratch2/mabeling/data/ERA5/plev/high_wind/6H/data",
        level_hpa= slice(150,350),
        level_name = 'high'
    )

Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-01.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-02.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-03.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-04.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-05.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-06.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-07.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-08.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-09.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-10.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-11.nc
Reading file: /mnt/climstor/ecmwf/era5/raw/PL/data/1997/an_pl_ERA5_1997-01-12.nc
Reading file: /mnt/climstor/

In [None]:
# process multiple years
# mid level
YEARS = range(2021, 2021) 
input_dir = "/mnt/climstor/ecmwf/era5/raw/PL/data/2021/"
output_dir = "/scratch2/mabeling/data/ERA5/plev/mid_wind/6H/testcase/"
level_hpa = 500

#for year in YEARS:
#    file_pattern = os.path.join(input_dir, f"an_pl_ERA5_{year}*.nc")
#    output_filename = f"uvtths_plev{level_hpa}_{year}.nc"
#    process_jet_stream_data(file_pattern, output_dir, output_filename, slice(500,500))


In [None]:

def combine_high_wind_datasets(start_year=1959, end_year=2022):
    """
    Combine yearly high wind NetCDF files from 1959 to 2022 into single dataset
    """
    
    high_wind_datasets = []
    for year in range(start_year, end_year + 1):
        print(f"Processing high wind data for year {year}...")
        
        basepath = f"/scratch2/mabeling/data/ERA5/plev/high_wind/6H/data/{year}"
        high_wind_file = f"{basepath}/uvtths_plevhigh_{year}.nc"
        
        if os.path.exists(high_wind_file):
            try:
                ds_high = xr.open_dataset(high_wind_file)
                ds_high = flatten_by(standardize(ds_high), "s")
                high_wind_datasets.append(ds_high)
                print(f"High wind data loaded for {year}")
            except Exception as e:
                print(f"Error loading high wind data for {year}: {e}")
        else:
            print(f"High wind file not found for {year}: {high_wind_file}")
    
    print(f"\nCombining high wind datasets...")
    print(f"High wind datasets found: {len(high_wind_datasets)}")
    
    # Combine datasets along time dimension
    combined_high_wind = None
    
    if high_wind_datasets:
        try:
            combined_high_wind = xr.concat(high_wind_datasets, dim='time', 
                                         coords='minimal', compat='override')
            print("High wind datasets combined successfully")
        except Exception as e:
            print(f"Error combining high wind datasets: {e}")
    
    return combined_high_wind

In [4]:
ds_high_combine = combine_high_wind_datasets()

Processing high wind data for year 1959...
[########################################] | 100% Completed | 24.33 s
  ✓ High wind data loaded for 1959
Processing high wind data for year 1960...
[########################################] | 100% Completed | 26.60 s
  ✓ High wind data loaded for 1960
Processing high wind data for year 1961...
[########################################] | 100% Completed | 26.14 s
  ✓ High wind data loaded for 1961
Processing high wind data for year 1962...
[########################################] | 100% Completed | 27.03 s
  ✓ High wind data loaded for 1962
Processing high wind data for year 1963...
[########################################] | 100% Completed | 27.53 s
  ✓ High wind data loaded for 1963
Processing high wind data for year 1964...
[########################################] | 100% Completed | 25.90 s
  ✓ High wind data loaded for 1964
Processing high wind data for year 1965...
[########################################] | 100% Completed | 26.52 s

In [5]:
output_path = "/scratch2/mabeling/data/ERA5/plev/high_wind/6H/data/combined/uvtths_plevhigh_1959_2022.nc"
ds_high_combine.to_netcdf(output_path)

In [None]:
###
#Z500 Data
###

In [None]:
def process_z_data_by_year(base_input_dir, output_dir, level_hpa, years=None, lat_bounds=None, lon_bounds=None):
    """
    Extracts z (geopotential height) at a given pressure level and saves yearly NetCDF files.
    
    Parameters:
    base_input_dir (str): Base directory containing year subdirectories.
    output_dir (str): Directory to save the output NetCDF files.
    level_hpa (int): Pressure level in hPa (500).
    years (list, optional): List of years to process. If None, process all year directories found.
    lat_bounds (tuple, optional): (min_lat, max_lat) for latitude slicing. If None, use all available.
    lon_bounds (tuple, optional): (min_lon, max_lon) for longitude slicing. If None, use all available.
    """
    
    os.makedirs(output_dir, exist_ok=True)
    level_name = str(level_hpa)
    
    if years is None:
        years = []
        for item in os.listdir(base_input_dir):
            item_path = os.path.join(base_input_dir, item)
            if os.path.isdir(item_path) and item.isdigit() and len(item) == 4:
                year = int(item)
                if year >= 1959:
                    years.append(year)
        years = sorted(years)
    
    if not years:
        print("No year directories found.")
        return
    
    print(f"Processing data for years: {years}")
    
    for year in years:
        input_dir = f'{base_input_dir}/{year}'
        file_pattern = os.path.join(input_dir, 'an_pl_ERA5_*.nc')
        year_files = sorted(glob.glob(file_pattern))
        
        if not year_files:
            print(f"No files found for year {year} in {input_dir}")
            continue
            
        print(f"\nProcessing year {year} with {len(year_files)} files...")
        
        try:
            datasets = []
            for file in year_files:
                ds = xr.open_dataset(file)
                ds = ds.sel(level=level_hpa)

                if lat_bounds is not None and lon_bounds is not None:
                    min_lat, max_lat = lat_bounds
                    min_lon, max_lon = lon_bounds
                    ds = ds.sel(lat=slice(min_lat, max_lat))
                    ds = ds.sel(lon=slice(min_lon, max_lon))
                else:
                    ds = ds.sel(lat=slice(20, 70))  
                    
                    ds_east = ds.sel(lon=slice(120, 180))
                    ds_west = ds.sel(lon=slice(-180, -80))
                    ds = xr.concat([ds_east, ds_west], dim="lon")
                
                if 'z' not in ds.variables:
                    print(f"Missing required variable 'z' in file: {file}")
                    continue

                z = ds['z'].to_dataset()
                datasets.append(z)
            
            if not datasets:
                print(f"No valid datasets found for year {year}")
                continue
            
            print(f"Combining {len(datasets)} daily datasets for year {year}...")
            yearly_ds = xr.concat(datasets, dim="time")
            yearly_ds = yearly_ds.sortby("time")
            
            year_output_dir = os.path.join(output_dir, str(year))
            os.makedirs(year_output_dir, exist_ok=True)
            
            out_filename = f"z_plev{level_name}_{year}.nc"
            out_path = os.path.join(year_output_dir, out_filename)
            yearly_ds.to_netcdf(out_path)
            print(f"Saved combined yearly file: {out_path}")
            
            print("Output dataset dimensions and sizes:")
            for dim in yearly_ds.dims:
                print(f"  {dim}: {yearly_ds.dims[dim]}")
            
        except Exception as e:
            print(f"Error processing year {year}: {e}")

In [None]:
def combine_z500_datasets(start_year=1959, end_year=2022):
    """
    Combine z500 NetCDF files from 1959 to 2022 into single dataset
    """
    z500_datasets = []
    
    for year in range(start_year, end_year + 1):
        print(f"Processing z500 data for year {year}...")
        
        basepath = f"/scratch2/mabeling/data/ERA5/plev/high_wind/6H/z500data/{year}"
        z500_file = f"{basepath}/z_plev500_{year}.nc"
        
        if os.path.exists(z500_file):
            try:
                ds_z500 = xr.open_dataset(z500_file)
                z500_datasets.append(ds_z500)
                print(f"z500 data loaded for {year}")
            except Exception as e:
                print(f"Error loading z500 data for {year}: {e}")
        else:
            print(f"Error z500 file not found for {year}: {z500_file}")
    
    print(f"\nCombining z500 datasets...")
    print(f"z500 datasets found: {len(z500_datasets)}")
    combined_z500 = None
    
    if z500_datasets:
        try:
            combined_z500 = xr.concat(z500_datasets, dim='time', 
                                         coords='minimal', compat='override')
            print("z500 datasets combined successfully")
        except Exception as e:
            print(f"Error combining z500 datasets: {e}")
    
    return combined_z500

In [10]:
ds_z500_combine = combine_z500_datasets()

Processing z500 data for year 1959...
  ✓ z500 data loaded for 1959
Processing z500 data for year 1960...
  ✓ z500 data loaded for 1960
Processing z500 data for year 1961...
  ✓ z500 data loaded for 1961
Processing z500 data for year 1962...
  ✓ z500 data loaded for 1962
Processing z500 data for year 1963...
  ✓ z500 data loaded for 1963
Processing z500 data for year 1964...
  ✓ z500 data loaded for 1964
Processing z500 data for year 1965...
  ✓ z500 data loaded for 1965
Processing z500 data for year 1966...
  ✓ z500 data loaded for 1966
Processing z500 data for year 1967...
  ✓ z500 data loaded for 1967
Processing z500 data for year 1968...
  ✓ z500 data loaded for 1968
Processing z500 data for year 1969...
  ✓ z500 data loaded for 1969
Processing z500 data for year 1970...
  ✓ z500 data loaded for 1970
Processing z500 data for year 1971...
  ✓ z500 data loaded for 1971
Processing z500 data for year 1972...
  ✓ z500 data loaded for 1972
Processing z500 data for year 1973...
  ✓ z500 d

In [11]:
output_path = "/scratch2/mabeling/data/ERA5/plev/high_wind/6H/z500data/combined/z500_1959_2022.nc"
ds_z500_combine.to_netcdf(output_path)

In [4]:
ds = xr.open_dataset("/scratch2/mabeling/data/ERA5/plev/high_wind/6H/z500data/2017/z_plev500_2017.nc")
print(ds)

<xarray.Dataset> Size: 379MB
Dimensions:  (time: 1460, lat: 101, lon: 321)
Coordinates:
  * time     (time) datetime64[ns] 12kB 2017-01-01 ... 2017-12-31T18:00:00
  * lon      (lon) float32 1kB 120.0 120.5 121.0 121.5 ... -81.0 -80.5 -80.0
  * lat      (lat) float32 404B 20.0 20.5 21.0 21.5 22.0 ... 68.5 69.0 69.5 70.0
    level    float64 8B ...
Data variables:
    z        (time, lat, lon) float64 379MB ...


In [None]:
process_z_data_by_year(
     base_input_dir="/mnt/climstor/ecmwf/era5/raw/PL/data", 
     output_dir="/scratch2/mabeling/data/ERA5/plev/high_wind/6H/z500data",
     level_hpa=500,
     years=None  # Optional: specify years, None = all
)

In [None]:
def combine_mid_wind_datasets(start_year=2000, end_year=2023):
    """
    Combine yearly mid wind NetCDF files from 2000 to 2023 into single dataset
    """
    
    # List to store mid wind datasets
    mid_wind_datasets = []
    
    # Loop through all years
    for year in range(start_year, end_year + 1):
        print(f"Processing mid wind data for year {year}...")
        
        # Define path for current year
        basepath_mid = f"/scratch2/mabeling/data/ERA5/plev/mid_wind/6H/data/{year}"
        mid_wind_file = f"{basepath_mid}/uvtths_plev500_{year}.nc"
        
        # Check if file exists before trying to open it
        if os.path.exists(mid_wind_file):
            try:
                # Load and process mid wind data
                ds_mid = xr.open_dataset(mid_wind_file)
                ds_mid = flatten_by(standardize(ds_mid), "s")
                mid_wind_datasets.append(ds_mid)
                print(f"Mid wind data loaded for {year}")
            except Exception as e:
                print(f"Error loading mid wind data for {year}: {e}")
        else:
            print(f"Mid wind file not found for {year}: {mid_wind_file}")
    
    print(f"\nCombining mid wind datasets...")
    print(f"Mid wind datasets found: {len(mid_wind_datasets)}")
    
    combined_mid_wind = None
    
    if mid_wind_datasets:
        try:
            combined_mid_wind = xr.concat(mid_wind_datasets, dim='time', 
                                        coords='minimal', compat='override')
            print("Mid wind datasets combined successfully")
        except Exception as e:
            print(f"Error combining mid wind datasets: {e}")
    
    return combined_mid_wind