# Section 2 - v2.0

Updated code.\
Updates:
- Layers are now plotted through ipyleaflet
- Introduced new functions to download, load and plot forecast data at fixed steps from today
- Implemented a widget to select the step to plot in the ipyleaflet

To do:
- add one colormap for each layer in the plot

In [2]:
import cfgrib
from datetime import datetime
from ecmwf.opendata import Client
from ipyleaflet import Choropleth, Map, basemap_to_tiles, LayersControl, LegendControl
import ipywidgets as widgets
from IPython.display import display
from localtileserver import get_leaflet_tile_layer, TileClient
# from Magics import macro as magics
# from Magics.macro import *
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import rasterio
# import requests
import rioxarray as rxr
import xarray as xr

In [3]:
# parameters to be imported from main widget
# dates_to_download = ['20230823', '20230824'] #only last five days are available
dates_to_download = [0, -1, -2] #today, yesterday, the day before yesterday

initial_lat_lon = (38, -90) #first cyclone position
final_lat_lon = (60, 90) #last cyclone position

In [4]:
# variable initialization
# variables to visualize
variables = ['msl', '2t', 'tp'] #, 10fgg15] #list of variables to visualize, 10fgg15 doesn't work right now

# get mean coordinates to center the plot
mean_lat = (initial_lat_lon[0] - final_lat_lon[0])/2
mean_lon = (initial_lat_lon[1] - final_lat_lon[1])/2

coord = [mean_lat, mean_lon]

#steps to download
stepsdict = {
    "base": [24, 48, 120, 240],
    "10fgg15": ["0-24", "24-48", "96-120", "216-240"]
}

## Download

**CAREFUL**: with this function we are downloading the 10 days ahead forecasts, so only the forecast 10 days from the the date provided

In [5]:
def dwnl_atmdata(variables, dates):
    """
    Function which replicates the code written by Laura
    It downloads 10 day ahead forecasts from the dates specified in dates
    It need the dates from which the forecast starts
    """
    fnames = []
    today = datetime.today().strftime('%Y%m%d')
    for var in variables:            
        for d in dates:
            if var == "10fgg15":
                rqt = {
                    "date": str(d), #date start of the forecast
                    "time": 0,      #time start of the forecast, can be 0 or 12
                    "step": "216-240",    #step of the forecast: 10 days
                    "stream": "enfo",
                    "type": "ep",
                    "param": var,
                }
            else:
                rqt = {
                    "date": str(d), #date start of the forecast
                    "time": 0,      #time start of the forecast, can be 0 or 12
                    "step": 240,    #step of the forecast: 10 days
                    "stream": "oper",
                    "type": "fc",
                    "levtype": "sfc",
                    "param": var,
                }
            filename = f"data/atm/{var}_{rqt['date']}_{today}_time{rqt['time']}_step{rqt['step']}_{rqt['stream']}_{rqt['type']}.grib"
            if not os.path.exists(filename):
                client = Client(source = "ecmwf", beta = True)
                client.retrieve(
                    request = rqt,
                    target = filename
                )
            fnames.append(filename)
    return(fnames)

def dwnl_atmdata_step(variables, stepsdict):
    """
    Function which downloads 1, 2, 5 and 10 days ahead forecasts from today
    """
    fnames = []
    today = datetime.today().strftime('%Y%m%d')
    for var in variables:
        if var == "10fgg15": steps = stepsdict["10fgg15"]
        else: steps = stepsdict["base"]
        for s in steps:
            if var == "10fgg15":
                rqt = {
                    "date": 0,      #date start of the forecast: today
                    "time": 0,      #time start of the forecast, can be 0 or 12
                    "step": s,      #step of the forecast: 1, 2, 5, 10 days
                    "stream": "enfo",
                    "type": "ep",
                    "param": var,
                }
            else:
                rqt = {
                    "date": 0,      #date start of the forecast: today
                    "time": 0,      #time start of the forecast, can be 0 or 12
                    "step": s,      #step of the forecast: 1, 2, 5, 10 days
                    "stream": "oper",
                    "type": "fc",
                    "levtype": "sfc",
                    "param": var,
                }
            filename = f"data/atm/{var}_{rqt['date']}_{today}_time{rqt['time']}_step{rqt['step']}_{rqt['stream']}_{rqt['type']}.grib"
            if not os.path.exists(filename):
                client = Client(source = "ecmwf", beta = True)
                client.retrieve(
                    request = rqt,
                    target = filename
                )
            fnames.append(filename)
    return(fnames)

In [6]:
# fnames = dwnl_atmdata(variables, dates_to_download)

In [7]:
fnames = dwnl_atmdata_step(variables, stepsdict)

## Load

In [8]:
# Load data
def gen_raster(var, filename):
    """
    Generates .tiff raster files from .grib files downloaded from ECMWF's Open Data through dwnl_atmdata

    var: str
        Code of the variable
    d: str
        Date of the downloaded variable file

    Returns:
    tiffpath: str
        Path to the created .tiff file
    """
    tiffpath = ".".join([filename.split(".")[0], "tiff"])
    if not os.path.exists(tiffpath):
        print(var, " conversion")
        f = xr.load_dataset(filename, engine = "cfgrib")
        f = f.rio.write_crs("epsg:4326")
        f.rio.to_raster(tiffpath)
        f.close()
    return(tiffpath)

def load_atmdata(varlst, fnames):
    """
    varlst: str, list of str
        List containing strings of the variables to load
    fnames: str, list of str
        List containing the paths to downloaded data
    
    Returns:
    vardict: dict
        A dictionary of rasterio DatasetReader objects, each assigned to the corresponding variable
    """
    vardict = {}
    for i, var in enumerate(varlst):
        lst = []
        tool = [x for x in fnames if var in x] #filenames containing the variable
        for filename in tool:
            lst.append(rasterio.open(gen_raster(var, filename)))
        vardict[f"{var}"] = lst
        print(var, " loaded")
    return(vardict)

In [9]:
vardict = load_atmdata(variables, fnames)

msl  loaded
2t  loaded
tp  loaded


## Plot

In [75]:
def get_colordict(array, cmap):
    """
    array: numpy.array
        Array containing the values of which the quantiles will be obtained
    cmap: str
        Name of the matplotlib colormap associated to the array

    Returns:
    dict
        Dictionary containing Hex codes as keys and the array quantiles as values
    """
    n_col = 8
    cmap = plt.get_cmap(cmap, n_col)
    custom_palette = [mpl.colors.rgb2hex(cmap(i)) for i in range(cmap.N)]
    quantiles = [round(np.quantile(array, q)) for q in np.linspace(0,1,n_col)]
    return(dict(zip(quantiles, custom_palette)))

#plot the data with ipyleaflet
def plot_atmdata_date(vardict, date, coord):
    """
    vardict: dict
        Dictionary containing the rasters uploaded trhough load_atmdata
    date: str, list of str
        Dates considered
    coord: list or tuple
        Coordinates of the map central point. Provide them as lat, lon

    Returns:
    m: ipyleaflet.Map
    """
    namedict = {
        "msl": "Mean sea level pressure [Pa]", #meansealevelpressure
        "2t": "2 meter temperature [K]",
        "tp": "Total Precipitation [m]",
        "10fgg15": "10 metre wind gust of at least 15 m/s [%]",
    }
    palettedict = {
        "msl": "cool", #meansealevelpressure
        "2t": "RdBu_r",
        "tp": "PuBu",
        "10fgg15": "winter",
    }
    m = Map(center=(coord[0], coord[1]), zoom = 3)
    for var in vardict.keys():
        r = [x for x in vardict[var] if date in x.name][0]
        print("Plotting ", namedict[var])
        client = TileClient(r)
        t = get_leaflet_tile_layer(client, name = namedict[var], opacity = 0.7, palette = palettedict[var])
        m.add_layer(t)
    m.add_control(LayersControl())
    m.layout.height = "700px"
    return(m)

def plot_atmdata_step(vardict, step, coord):
    """
    vardict: dict
        Dictionary containing the rasters uploaded trhough load_atmdata
    step: int
        Index of the desired step inside the list defined in stepsdict
    coord: list or tuple
        Coordinates of the map central point. Provide them as lat, lon

    Returns:
    m: ipyleaflet.Map
    """
    namedict = {
        "msl": "Mean sea level pressure [Pa]",
        "2t": "2 meter temperature [K]",
        "tp": "Total Precipitation [m]",
        "10fgg15": "10 metre wind gust of at least 15 m/s [%]",
    }
    palettedict = {
        "msl": "cool",
        "2t": "RdBu_r",
        "tp": "PuBu",
        "10fgg15": "winter",
    }
    m = Map(center=(coord[0], coord[1]), zoom = 3)
    for var in vardict.keys():
        if var == "10fgg15": steps = stepsdict["10fgg15"]
        else: steps = stepsdict["base"]
        r = [x for x in vardict[var] if f"step{steps[step]}" in x.name][0]
        print("Plotting ", namedict[var])
        client = TileClient(r)
        t = get_leaflet_tile_layer(client, name = namedict[var], opacity = 0.7, palette = palettedict[var], n_colors = 8)
        m.add_layer(t)

        # find a way to add a colormap
        # in leaflet you can do it as:
        # m.add_colorbar(colors=custom_palette, vmin=r.read(1).ravel().min(), vmax=r.read(1).ravel().max())
        #
        # doing as below works but the legends created are too big:
        # colordict = get_colordict(r.read(1).ravel(), palettedict[var])
        # legend = LegendControl(legend = colordict, name = namedict[var], position="topright")
        # legend = LegendControl({"low":"#24dbff", "medium":"#A55", "High":"#500"}, name = namedict[var], position="bottomleft")
        # m.add_control(legend)
    m.add_control(LayersControl())
    m.layout.height = "700px"
    return(m)

In [44]:
# plot_atmdata_date(vardict, dates_to_download[0], coord)

In [76]:
# plot only the first forecast step available (24h from today)
plot_atmdata_step(vardict, 0, coord)

Plotting  Mean sea level pressure [Pa]
Plotting  2 meter temperature [K]
Plotting  Total Precipitation [m]


Map(center=[-11.0, -90.0], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', 'zoom_o…

## Plot and widgets

To make widget text bold, use this unicode converter to get the bold text then paste it as the description: http://slothsoft.net/UnicodeMapper/?lang=en-us

In [34]:
# widget to select forecast step
vec_forecast = [f"{x}h from today" for x in stepsdict["base"]]
widget_sel_forecast = widgets.Dropdown(options = vec_forecast, description = 'Forecast')

# function to update the plot
def update_plot(select_forecast):
    tooldict = {
        "24h from today": 0,
        "48h from today": 1,
        "120h from today": 2,
        "240h from today": 3
    }
    s = tooldict[select_forecast]
    display(plot_atmdata_step(vardict, s, coord))

In [23]:
# launch the interactive plot
widgets.interactive(update_plot, select_forecast = widget_sel_forecast)

interactive(children=(Dropdown(description='Forecast timestep', options=('24h from today', '48h from today', '…

## Old code

In [46]:
widgets.interactive(update_plot, select_var=widget_select_ssvar, select_timestep=widget_sel_timestep,
                    select_lat=latitude_slider, 
                    select_lon=longitude_slider)

interactive(children=(Dropdown(description='Sea surface variable', options=('msl', '2t'), value='msl'), Select…

In [47]:
widgets.interactive(update_plot, 
                    select_var=widget_select_atmvar, select_timestep=widget_sel_timestep,
                    select_lat=latitude_slider, 
                    select_lon=longitude_slider)

interactive(children=(Dropdown(description='Atmospheric variable', options=('tp',), value='tp'), SelectionSlid…

In [7]:
# LOAD AND DATA
all_variables = ['msl', '2t', 'tp']
gtp = []
gmsl = []
g2t = []
print('Loading data...')
for var in all_variables:
    print(var)
    for t in dates_to_download:
        filename = var + '_' + str(t) + 'time0_step240_oper_fc.grib'
        #plotname =  var + '_' + str(t) + 'time0_step240_oper_fc' 
        
        #output_grafic = output(output_formats = ['png'], output_name = plotname, 
        #                       output_name_first_page_number = "off" )

        #Loading Grib data
        gvar = mgrib(grib_input_file_name=filename)

        if var == 'tp':
            gtp.append(gvar)
        elif var == 'msl':
            gmsl.append(gvar)
        elif var == '2t':
            g2t.append(gvar)
print('Done!')

Loading data...
msl
2t
tp
Done!


In [9]:
widgets selection lat lon
longitude = np.arange(-180,185,5)
longitude_slider = widgets.SelectionRangeSlider(
    options = longitude, index = (0,len(longitude)-1), description='Longitude',
    orientation='horizontal', disabled=False)
latitude = np.arange(-90,95,5)
latitude_slider = widgets.SelectionRangeSlider(
    options = latitude, index = (0,len(latitude)-1), description='Latitude',
    orientation='horizontal', disabled=False)

widget select sea surface variable
widget_select_ssvar = widgets.Dropdown(description='Sea surface variable', options=('msl', '2t'))

# widget select atm variable
widget_select_atmvar = widgets.Dropdown(description='Atmospheric variable', options=('tp',))
                        # la coma del final és important; sinó agafa cada lletra per separat!!
# widget select timestep
vec_timesteps = list(range(0, len(dates_to_download))) 
widget_sel_timestep = widgets.SelectionSlider(options=vec_timesteps, description='Timestep')