In [None]:
import netCDF4 as nc
from netCDF4 import Dataset
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde, poisson
import tropycal.tracks as tracks
import pandas as pd
import numpy as np
import datetime as dt
import os
import re
import geopandas as gpd
import geodatasets
from shapely import geometry
import plotly.express as px

Western Pacific (WP) locations: 5–60°N 100°–180°E

In [None]:
s = re.compile('^MRI_HPB_m+\d{3}.nc$')
file_dirs = os.listdir('data')
file_dirs
data_files = [f for f in file_dirs if s.match(f)]
data_files

In [None]:
datas = [Dataset('data/' + f) for f in data_files]

In [None]:
for data in datas[:10]:
    print(np.nanmax(data['track_lat'][:]), np.nanmin(data['track_lat'][:]), np.nanmax(data['track_lon'][:]), np.nanmin(data['track_lon'][:]))

In [None]:
storms = {}
for data in datas[:10]:
    for i in range(0, data.dimensions['storm'].size):
    # for i in range(0, 10):
        track_lat_any = np.array(data.variables['track_lat'][i])[
            (np.array(data.variables['track_lat'][i]) >= 5) &
            (np.array(data.variables['track_lat'][i]) <= 60) &
            (~np.isnan(np.array(data.variables['track_lat'][i])))
        ]
        track_lon_any = np.array(data.variables['track_lon'][i])[
            (np.array(data.variables['track_lon'][i]) >= 100) & 
            (np.array(data.variables['track_lon'][i]) <= 180) &
            (~np.isnan(np.array(data.variables['track_lon'][i])))
        ]
        if len(track_lat_any) != 0 and len(track_lon_any) != 0:
            # print(data['track_lat'][i])
            pass
        else:
            continue
        storms[i] = {
            'time': data.variables['track_time'][i],
            'lat': data.variables['track_lat'][i],
            'lon': data.variables['track_lon'][i],
            'pres': data.variables['track_pres'][i],
            'wind': data.variables['track_wind'][i]
        }
# storms

In [None]:
world = gpd.read_file(geodatasets.get_path('naturalearth.land'))
storm_geo = {}
for i in storms:
    storm_geo[i] = {
        # 'time': storms[i]['time'],
        'geometry': geometry.LineString((lat, lon) for lat, lon in zip(storms[i]['lon'], storms[i]['lat']) if (not np.isnan(lat) and not np.isnan(lon))),
        'featurecla': 'storm',
        'scalerank': 1,
        'min_zoom': 1,
    }
df = pd.DataFrame.from_dict(storm_geo, orient='index')
roi = gpd.GeoDataFrame(df, geometry='geometry', crs=world.crs)
roi
# world_roi = pd.concat([world, roi])
# world_roi

In [None]:
roi.explore()

In [None]:
res = 2
lat_max = np.nanmax([np.nanmax(storms[i]['lat']) for i in storms])
lat_min = np.nanmin([np.nanmin(storms[i]['lat']) for i in storms])
lon_max = np.nanmax([np.nanmax(storms[i]['lon']) for i in storms])
lon_min = np.nanmin([np.nanmin(storms[i]['lon']) for i in storms])
grids = np.zeros((int((lat_max-lat_min)/res)+1, int((lon_max-lon_min)/res)+1))
for i in storms:
    cell_coords = {}
    for lat, lon in zip(storms[i]['lat'], storms[i]['lon']):
        if (not np.isnan(lat) and not np.isnan(lon)):
            cell_coords[int((lat-lat_min)/res), int((lon-lon_min)/res)] = 1
    for coord in cell_coords:
        grids[coord] += 1

In [None]:
grids

In [None]:
grid_cells = []
for i in range(grids.shape[0]):
    for j in range(grids.shape[1]):
        grid_cells.append({
            'geometry': geometry.Polygon([(lon_min + j*res, lat_min + i*res), (lon_min + j*res, lat_min + (i+1)*res), (lon_min + (j+1)*res, lat_min + (i+1)*res), (lon_min + (j+1)*res, lat_min + i*res)]),
            'storm_count': grids[i, j]
        })
grid_df = pd.DataFrame(grid_cells)
grid_gdf = gpd.GeoDataFrame(grid_df, geometry='geometry', crs=world.crs)
grid_gdf.explore(column='storm_count', legend=True)