This code is adapted from [this tutorial](https://github.com/planet-os/notebooks/blob/master/aws/era5-s3-via-boto.ipynb) on accessing ERA5 data stored on a public S3 bucket as a part of Amazon Web Services (AWS) and [this tutorial](https://github.com/pangeo-data/pangeo/blob/master/notebooks/newmann_ensemble_meteorology.ipynb) that analyzes a multi-model ensemble using dask and xarray.


Using [ERA5](https://www.ecmwf.int/en/forecasts/datasets/reanalysis-datasets/era5) data downloaded from a public S3 bucket on AWS, we will utilize [xarray](http://xarray.pydata.org/en/stable/) and [dask](https://docs.dask.org/en/latest/) to plot and analyze variables.

ERA5 data is hourly at a gridded resolution of 30 km. ERA5 combines vast amounts of historical observations into global estimates using advanced modelling and data assimilation systems to provide estimates of large number of atmospheric, land and oceanic climate variables.

**Load python libraries**

In [None]:
%matplotlib inline
import boto3
import botocore
import datetime
import matplotlib.pyplot as plt
import os.path
import xarray as xr
import numpy as np
# import cartopy.crs as ccrs
# from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER
# import matplotlib.ticker as mticker
# from IPython.display import Image, display
from dask.distributed import Client, progress

**Connect to Dask Distributed Cluster**

Set the `memory_limit` parameter in `Client()` if dask doesn't auto detect your memory limit accurately later on in the notebook. You will know this is occurring if processes start to get killed due o memory limit errors.

In [None]:
from dask.distributed import Client
client = Client(processes=True, n_workers=3)
client

**Set up access to S3 bucket using `boto3` and a low-level client**

Rather than setting up access key and ID, we will use a low-level client to request data anonymously.

In [None]:
era5_bucket = 'era5-pds'
# No AWS keys required
client = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED))

Let's download main.nc file for the month indicated and use xarray to inspect the metadata relating to the data files.

In [None]:
date = datetime.date(2017,1,1) # update to desired date
prefix = date.strftime('%Y/%m/')

metadata_file = 'main.nc'
metadata_key = prefix + metadata_file
client.download_file(era5_bucket, metadata_key, metadata_file)
ds_meta = xr.open_dataset('main.nc', decode_times=False)
ds_meta.info()

**Download variables of interest**

Note that each variable for one month is approximately 1 GB, so this takes a minute.

In [None]:
# select variable(s) of interest
# var1 = 'precipitation_amount_1hour_Accumulation'
var2 = 'air_pressure_at_mean_sea_level'
var1 = 'air_temperature_at_2_metres'

var_list = [var1, var2]

for var in var_list:
    # file path patterns for remote S3 objects and corresponding local file
    s3_data_ptrn = '{year}/{month}/data/{var}.nc'
    data_file_ptrn = '{year}{month}_{var}.nc'

    year = date.strftime('%Y')
    month = date.strftime('%m')
    s3_data_key = s3_data_ptrn.format(year=year, month=month, var=var)
    data_file = data_file_ptrn.format(year=year, month=month, var=var)

    if not os.path.isfile(data_file): # check if file already exists
        print("Downloading %s from S3..." % s3_data_key)
        client.download_file(era5_bucket, s3_data_key, data_file)

**Open multiple data files into a single xarray dataset object**

In [None]:
## Open multiple files as a single dataset
file_pattern = '{year}{month}*.nc'
filename_pattern = file_pattern.format(year=year, month=month)

ds = xr.open_mfdataset(filename_pattern, engine='netcdf4', concat_dim='var')
print('ds size in GB {:0.2f}\n'.format(ds.nbytes / 1e9))
ds.info

The `ds.info` output above shows us that there are four dimensions to the data: lat, lon, and time0; and two data variables: air_temperature_at_2_metres, and air_pressure_at_mean_sea_level.

**Spatially subset to Western US**

It is important near the beginning of your analysis to subset as much as possible. That way, when we start computing, we aren't taking up computational space with something we aren't interested in.

In [None]:
ds = ds.sel(lat=slice(50,30), lon=slice(360-140,360-105))
print('ds size in GB {:0.2f}\n'.format(ds.nbytes / 1e9))
ds.info

See how much smaller that is? Now we will do some unit conversions so that our plots make a little more sense.

**Convert units to something more familiar**

In [None]:
# Convert mean sea level pressure units to 'hPa'
ds['air_pressure_at_mean_sea_level'] = ds.air_pressure_at_mean_sea_level/100.0
ds.air_pressure_at_mean_sea_level.attrs['units'] = 'hPa'

# and surface temperature units to 'C'
ds['air_temperature_at_2_metres'] = ds.air_temperature_at_2_metres - 273.15
ds.air_temperature_at_2_metres.attrs['units'] = 'C'

Now let's do some quick analysis - for example taking the average January surface temperature.

**Compute the mean temperature across the time axis**

In [None]:
# calculates the monthly mean along the time dimension
da_temp_mean = ds['air_temperature_at_2_metres'].mean(dim='time0')

The expressions above didn’t actually compute anything. They just build the dask task graph. To do the computations, we call the `compute` method:

In [None]:
%time da_temp_mean = da_temp_mean.compute()

**Plot Average Surface Temperature**

In [None]:
da_temp_mean.plot(figsize=(10, 6))
plt.title('January 2017 Mean Temperature')

Most of the time spent in the last calculation was loading data from disk. After we were done with this data, Dask threw it away to free up memory. If we plan to reuse the same dataset many times then we may want to `persist` it in memory.

In [None]:
air_temp = ds['air_temperature_at_2_metres'].persist()
air_temp

Now the air_temperature DataArray is resident in memory on our workers. We can repeat our computation from last time much more quickly.

In [None]:
%time temp_mean = air_temp.mean(dim='time0').compute()

And we can also modify the computation and try something new. Keeping data in memory allows to iterate quickly, which is the whole point of this exercise.

In [None]:
%time temp_std = air_temp.std(dim='time0').compute()

In [None]:
temp_std.plot(figsize=(10, 6))
plt.title('January 2017 Temperature Standard Deviation')

Now let's select various cities and plot their January temperature over time. 

In [None]:
# location coordinates
locs = [
    {'name': 'santa_barbara', 'lon': -119.6982, 'lat': 34.4208},
    {'name': 'colorado_springs', 'lon': -104.8214, 'lat': 38.8339},
    {'name': 'honolulu', 'lon': -157.835938, 'lat': 21.290014},
    {'name': 'seattle', 'lon': -122.3321, 'lat': 47.6062},
]

# convert westward longitudes to degrees east
for l in locs:
    if l['lon'] < 0:
        l['lon'] = 360 + l['lon']
locs

In [None]:
ds_locs = xr.Dataset()
air_temp_ds = air_temp.to_dataset()

# interate through the locations and create a dataset
# containing the temperature values for each location
for l in locs:
    name = l['name']
    lon = l['lon']
    lat = l['lat']
    var_name = name

    ds2 = air_temp_ds.sel(lon=lon, lat=lat, method='nearest')

    lon_attr = '%s_lon' % name
    lat_attr = '%s_lat' % name

    ds2.attrs[lon_attr] = ds2.lon.values.tolist()
    ds2.attrs[lat_attr] = ds2.lat.values.tolist()
#     ds2 = ds2.drop(('lat', 'lon'))
    ds2 = ds2.rename({var1 : var_name}).drop(('lat', 'lon'))

    ds_locs = xr.merge([ds_locs, ds2])

ds_locs.data_vars

**Do some descriptive statistics**

In [None]:
df_f = ds_locs.to_dataframe()
df_f.describe()

**Plot temperature across time for multiple cities**

In [None]:
# readability please
plt.rcParams.update({'font.size': 16})

ax = df_f.plot(figsize=(18, 10), title="ERA5 Air Temperature at 2 Meters", grid=1)
ax.set(xlabel='Date', ylabel='Air Temperature (deg C)')
plt.show()