# Data Processing

This notebook contains a workflow to:
1. Download hourly ERA5 data from a Google Cloud Storage bucket.
1. Process the hourly ERA5 data into daily ERA5 data.
1. Upload the daily data to a Google Cloud Storage bucket.

This data processing is necessary to support an analysis of Earth's radiation budget based on daily solar fluxes at the surface and top of atmosphere, so we will process hourly averages into daily averages. ERA5 does not output top of atmosphere outgoing solar radiation or upwelling solar radiation at the surface, however these quantities can be calculated using the available fluxes at those levels (e.g. incoming radiation and net radiation at the top of atmosphere).

### Requirements

* A Google Cloud project with Cloud Storage enabled
* The following Python packages:

In [1]:
%pip install -q gsutil tqdm xarray scipy dask netCDF4 pandas joblib

Note: you may need to restart the kernel to use updated packages.


## Imports

In [1]:
import contextlib
from datetime import timedelta, date, datetime
from functools import wraps
import multiprocessing
from os import system
from sys import platform
import time
import warnings

import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import joblib
from joblib import Parallel, delayed
import xarray as xr

## Setup

Our analysis seeks a long-term estimate of the amount of outgoing radiation that Earth's surface can reflect. ERA5 has 42 years of hourly data available. A long-term climatology is typically defined as 30 years. Thus, we ingest the latest 30 years: 1991 through 2020. Since making a single request would be prohibitively large, we break the request up by day. 

In [2]:
start_date = date(1990, 1, 1)
end_date = date(2021, 1, 1)
hourly_data_bucket = "era5-single-level"
daily_data_bucket = "era5-single-level-daily"
n_jobs = 1  # number of jobs for parallelization; if 1, then serial; if negative, then (n_cpus + 1 + n_jobs) are used

xr.set_options(keep_attrs=True)
warnings.filterwarnings("ignore", category=FutureWarning)
if platform == "darwin":
    multiprocessing.set_start_method("fork", force=True)  # ipython bug workaround https://github.com/ipython/ipython/issues/12396

## Functions

In [59]:
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Patch joblib to report into tqdm progress bar given as argument."""

    def tqdm_print_progress(self):
        if self.n_completed_tasks > tqdm_object.n:
            n_completed = self.n_completed_tasks - tqdm_object.n
            tqdm_object.update(n=n_completed)

    original_print_progress = joblib.parallel.Parallel.print_progress
    joblib.parallel.Parallel.print_progress = tqdm_print_progress

    try:
        yield tqdm_object
    finally:
        joblib.parallel.Parallel.print_progress = original_print_progress
        tqdm_object.close()


def retry(exceptions, tries=4, delay=3, backoff=2, logger=None):
    """Retry calling the decorated function using an exponential backoff.

    Args:
        exceptions: The exception to check. may be a tuple of
            exceptions to check.
        tries: Number of times to try (not retry) before giving up.
        delay: Initial delay between retries in seconds.
        backoff: Backoff multiplier (e.g. value of 2 will double the delay
            each retry).
        logger: Logger to use. If None, print.
    """
    def deco_retry(f):

        @wraps(f)
        def f_retry(*args, **kwargs):
            mtries, mdelay = tries, delay
            while mtries > 1:
                try:
                    return f(*args, **kwargs)
                except exceptions as e:
                    msg = '{}, Retrying in {} seconds...'.format(e, mdelay)
                    if logger:
                        logger.warning(msg)
                    else:
                        print(msg)
                    time.sleep(mdelay)
                    mtries -= 1
                    mdelay *= backoff
            return f(*args, **kwargs)

        return f_retry  # true decorator

    return deco_retry


def daterange(start_date, end_date):
    """Make a date range object spanning two dates.
    
    Args:
        start_date: date object to start from.
        end_date: date object to end at.
    
    Yields:
        date object for iteration.
    """
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)


@retry(RuntimeError, backoff=1)
def get_date_data_gcs(single_date, bucket):
    """Download a dataset for a single date from Google Cloud Storage.
    
    Args:
        single_date: date object representing day to retrieve data for.
        bucket: Google Cloud Storage bucket to download from.
        
    Returns:
        Exit status of system call to get data.
        
    Raises:
        RuntimeError: if system call exit status is not 0.
    """
    download_status = system(f"gsutil -m cp -r gs://{bucket}/{single_date.strftime('%Y%m%d')}.nc \
                             ./{single_date.strftime('%Y%m%d')}.nc")
    if download_status != 0:
        raise RuntimeError("Non-zero exit status")

    return download_status


@retry(RuntimeError, backoff=1)
def put_date_data_gcs(single_date, bucket, cleanup=False):
    """Upload a dataset for a single date to Google Cloud Storage.
    
    Args:
        single_date: date object representing day to retrieve data for.
        bucket: Google Cloud Storage bucket to download from.
        cleanup: Optionally remove the file locally.
        
    Returns:
        Nothing; downloads data from Google Cloud Storage as side effect.
        
    Raises:
        RuntimeError: if system call exit status is not 0.
    """
    upload_status = system(f"gsutil -m cp -r ./{single_date.strftime('%Y%m%d')}.nc gs://{bucket}/")

    if upload_status != 0:
        raise RuntimeError("Non-zero exit status")
    
    if (upload_status == 0) & cleanup:
        system(f"rm {single_date.strftime('%Y%m%d')}.nc")

    return upload_status


def modify_units(dataset, starting_units, ending_units, conversion_factor):
    """Modify the units of a variable.
    
    Args:
        dataset: xarray Dataset
        starting_units: str of units to be modified
        ending_units: str of units after modification
        conversion_factor: numerical factor to apply to convert units
    
    Returns:
        xarray Dataset with units modified for variables with units matching the starting unit.
    """
    for variable in dataset:
        if dataset[variable].attrs["units"] == starting_units:
            dataset[variable] = dataset[variable] * conversion_factor
            dataset[variable].attrs["units"] = ending_units
    return dataset

        
def compute_daily_average(dataset):
    """Compute the daily average and an input xarray dataset."""
    return dataset.resample(time='1D').sum() / dataset.sizes["time"]


def compute_boundary_fluxes(dataset):
    """Compute missing boundary fluxes at the surface and top of atmosphere if possible.
    
    Use available radiative fluxes e.g. net solar radiation and incoming solar radiation to 
    compute outgoing solar radiation.
    
    Args:
        dataset: xarray Dataset with radiative fluxes at the surface and top of atmosphere.
        
    Returns:
        xarray Dataset with missing fluxes at the boundaries.
    """
    if ("tosr" not in dataset) and all(x in dataset for x in ["tisr", "tsr"]):
        dataset["tosr"] = dataset["tisr"] - dataset["tsr"]
        dataset["tosr"].attrs["long_name"] = "TOA outgoing solar radiation"
        dataset["tosr"].attrs["standard_name"] = "toa_outgoing_shortwave_flux"
    if ("ssru" not in dataset) and all(x in dataset for x in ["ssrd", "ssr"]):
        dataset["ssru"] = dataset["ssrd"] - dataset["ssr"]
        dataset["ssru"].attrs["long_name"] = "Surface solar radiation upwards"
        dataset["ssru"].attrs["standard_name"] = "surface_upwelling_shortwave_flux_in_air"
    return dataset


def drop_unneccesary_variables(dataset, keep_vars):
    """Drop variables not specified as necessary.
    
    Args:
        dataset: xarray Dataset.
        keep_vars: list of variables to keep.
    
    Returns:
        xarray Dataset with specified variables.
    """
    drop_vars = list(set(dataset.data_vars).symmetric_difference(set(keep_vars)))
    return dataset.drop_vars(drop_vars)


def process_hourly_data(single_date, hourly_data_bucket, daily_data_bucket, cleanup=False):
    """Process hourly average data into daily average data.
    
    Args:
        single_date: date object representing day to retrieve data for.
        hourly_data_bucket: str name of Google Cloud Storage bucket for hourly data.
        daily_data_bucket: str name of Google Cloud Storage bucket for daily data.
        cleanup: boolean option to remove downloaded data after processing.
    
    Returns:
        Exit status of system call to upload processed data to Google Cloud Storage.
    """
    download_status = get_date_data_gcs(single_date, hourly_data_bucket)
    
    with xr.open_dataset(f"./{single_date.strftime('%Y%m%d')}.nc") as ds:
        ds = compute_boundary_fluxes(ds)
        ds = drop_unneccesary_variables(ds, keep_vars=["ssrd", "ssru", "tisr", "tosr"])
        ds = compute_daily_average(ds)
        ds = modify_units(ds, "J m**-2", "W m**-2", (1 / 3600))  # 3600 seconds in an hour
        ds.to_netcdf(f"./{single_date.strftime('%Y%m%d')}.nc")

    upload_status = put_date_data_gcs(single_date, daily_data_bucket, cleanup)
    return upload_status

## Workflow

For each day between the specified start and end dates, download the hourly data, process it, and upload it to a bucket for daily data.

In [9]:
with tqdm_joblib(tqdm(total=sum(1 for _ in daterange(start_date, end_date)))) as pbar:
    Parallel(n_jobs=n_jobs,
             backend="multiprocessing")(delayed(process_hourly_data)(day, hourly_data_bucket, daily_data_bucket, cleanup=True)
                                        for day in daterange(start_date, end_date))

  0%|          | 0/3 [00:00<?, ?it/s]