# Data Ingest

This notebook contains a workflow to:

1. Download meteorological reanalysis data from the Copernicus [Climate Data Store](https://cds.climate.copernicus.eu/) and
2. Upload the data to a Google Cloud Storage bucket.

This data ingest is necessary to support an analysis of Earth's radiation budget under clear sky conditions concurrent with the hottest week of the year, so we will require five available variables:

* Top of Atmosphere Incident Solar Radiation (`toa_incident_solar_radiation`)
* Top of Atmosphere Net Solar Radiation, Clear Sky (`top_net_solar_radiation_clear_sky`)
* Surface Solar Radiation Downward, Clear Sky (`surface_solar_radiation_downward_clear_sky`)
* Surface Net Solar Radiation, Clear Sky (`surface_net_solar_radiation_clear_sky`)
* 2m Temperature (`2m_temperature`)

## Preliminaries

### Requirements
* A Copernicus Climate Data Store account ([Create new account](https://cds.climate.copernicus.eu/user/register))
* A Google Cloud project with Cloud Storage enabled ([Create new account](https://cloud.google.com/))
* Python packages. See `environments` directory for platform specific environment files.

## Imports

In [None]:
from utils import check_environment

check_environment("ingest")

import contextlib
from datetime import timedelta, date
from functools import wraps
import logging
import multiprocessing
from os import environ
import os
from sys import platform
import urllib.request

import cdsapi
import certifi
from google.cloud import storage
import joblib
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
import urllib3
import xarray as xr

## Setup

In [None]:
start_date = date(1990, 1, 1)
end_date = date(1990, 1, 5)
hourly_data_bucket = "era5-single-level"
n_jobs = 1  # number of jobs for parallelization; if 1, then serial; if negative, then (n_cpus + 1 + n_jobs) are used

# ECMWF C3S CDS Client Configuration
try:
    if None not in (environ["CDSAPI_URL"], environ["CDSAPI_KEY"]):
        pass
except KeyError:
    # Fallback to .cdsapirc
    try:
        with open(os.path.join(os.path.expanduser("~"),".cdsapirc"), "r") as f:
            output = f.read()
    except IOError:
        logging.warning("No $CDSAPI_URL, $CDSAPI_KEY, or .cdsapirc found.")

# Certificate management
http = urllib3.PoolManager(
    cert_reqs='CERT_REQUIRED',
    ca_certs=certifi.where()
)

# Logging configuration
logging.basicConfig(filename="ingest.log", filemode="w", level=logging.INFO)

# Multiprocessing configuration for MacOS
if platform == "darwin":
    multiprocessing.set_start_method("fork", force=True)  # ipython bug workaround https://github.com/ipython/ipython/issues/12396
    
# Project ID
url = "http://metadata.google.internal/computeMetadata/v1/project/project-id"
req = urllib.request.Request(url)
req.add_header("Metadata-Flavor", "Google")
project_id = urllib.request.urlopen(req).read().decode()

## Functions

In [None]:
def retry(f):
    """Retry wrapper function used as a decorator."""
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except Exception:
                pass
    return wrapped


@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Patch joblib to report into tqdm progress bar given as argument."""

    def tqdm_print_progress(self):
        if self.n_completed_tasks > tqdm_object.n:
            n_completed = self.n_completed_tasks - tqdm_object.n
            tqdm_object.update(n=n_completed)

    original_print_progress = joblib.parallel.Parallel.print_progress
    joblib.parallel.Parallel.print_progress = tqdm_print_progress

    try:
        yield tqdm_object
    finally:
        joblib.parallel.Parallel.print_progress = original_print_progress
        tqdm_object.close()


def daterange(start_date, end_date):
    """Make a date range object spanning two dates.
    
    Args:
      start_date: date object to start from.
      end_date: date object to end at.
    
    Yields:
      date object for iteration.
    """
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)


def get_file_gcs(file_name, bucket_name, project_name=None, local_path="."):
    """Download a file from Google Cloud Storage.

    Args:
        file_name (str): file_name to download from gcs.
        bucket_name (str): Google Cloud Storage bucket to download from.
        project_name (str): Google Cloud project name.
        local_path (str): optional local path to download to.

    Returns:
        Nothing; downloads a file from Google Cloud Storage.
    """
    client = storage.Client()
    bucket = client.bucket(bucket_name, user_project=project_name)
    blob = bucket.blob(file_name)
    blob.download_to_filename(filename=os.path.join(local_path, file_name))


@retry
def put_file_gcs(file_name, bucket_name, project_name, local_path="."):
    """Upload a dataset for a single date to Google Cloud Storage.

    Args:
        file_name (str): file name to upload to Google Cloud Storage.
        bucket_name (str): Google Cloud Storage bucket to download from.
        project_name (str): Google Cloud project name.
        local_path (str): optional local path to download to.

    Returns:
        Nothing; uploads data to Google Cloud Storage.
    """
    client = storage.Client()
    bucket = client.bucket(bucket_name, user_project=project_name)
    blob = bucket.blob(file_name)
    blob.upload_from_filename(filename=os.path.join(local_path, file_name))
    

def check_blob_size(file_name, bucket_name, project_name=None, raise_threshold=1e+2):
    """Verify that a GCS blob is larger than a specified threshold.
    
    Args:
        single_date (date): day to retrieve data for.
        bucket_name (str): Google Cloud Storage bucket to upload to.
        project_name (str): project ID for requester pays billing.
        raise_threshold (float): file size below which an exception should 
            be raised.
        
    Returns:
        Nothing; logs an info message about the size of the blob.
        
    Raises:
        Exception: if the blob file size is less than the specified threshold
    """
    client = storage.Client()
    bucket = client.bucket(bucket_name, user_project=project_name)    
    blob = bucket.get_blob(file_name)
    if blob.size < raise_threshold:
        raise Exception(
            f"{file_name} file size is smaller than expected")
    else:
        logging.info(
            f"{file_name} size in GCS is {int(blob.size * 1e-6)}MB")


def merge_allsky_clearsky_data(single_date):
    """Merge all sky and clear sky data variables from ERA5."""
    clearsky = xr.open_dataset(f"{single_date.strftime('%Y%m%d')}_clearsky.nc")
    allsky = xr.open_dataset(f"{single_date.strftime('%Y%m%d')}.nc")
    data = xr.merge([allsky, clearsky])
    data.to_netcdf(f"{single_date.strftime('%Y%m%d')}_merged.nc")


def ingest_cds_to_gcs(single_date, bucket_name, user_project=None, cleanup=False):
    """Retrieve data from Copernicus and upload data to Google Cloud Storage.
    
    Args:
        single_date: date object representing day to retrieve data for.
        bucket_name: Google Cloud Storage bucket to upload to.
        user_project: project ID for requester pays billing.
        cleanup: Optionally remove the downloaded data after upload to GCS.
        
    Returns:
        Nothing; downloads a file, uploads it to GCS, and optionally 
        removes it.
    """
    try:
        check_blob_size(f"{single_date.strftime('%Y%m%d')}.nc",
                        bucket_name,
                        project_name=user_project,
                        raise_threshold=3.8e+8)
        return
    except Exception:
        pass

    file_name = f"{single_date.strftime('%Y%m%d')}_clearsky.nc"
    client = cdsapi.Client(progress=False, quiet=True)
    client.retrieve(
    "reanalysis-era5-single-levels",
    {
        "product_type": "reanalysis",
        "variable": [
            "surface_solar_radiation_downward_clear_sky", 
            "top_net_solar_radiation_clear_sky",
            "surface_net_solar_radiation_clear_sky", 
            "2m_temperature"
        ],
        "year": single_date.strftime("%Y"),
        "month": single_date.strftime("%m"),
        "day": single_date.strftime("%d"),
        "time": [
            "00:00", "01:00", "02:00",
            "03:00", "04:00", "05:00",
            "06:00", "07:00", "08:00",
            "09:00", "10:00", "11:00",
            "12:00", "13:00", "14:00",
            "15:00", "16:00", "17:00",
            "18:00", "19:00", "20:00",
            "21:00", "22:00", "23:00",
        ],
        "format": "netcdf",
    },
    file_name)
    
    get_file_gcs(f"{single_date.strftime('%Y%m%d')}.nc",
                bucket_name, project_name=user_project)
    merge_allsky_clearsky_data(single_date)

    os.remove(f"{single_date.strftime('%Y%m%d')}.nc")
    os.rename(f"{single_date.strftime('%Y%m%d')}_merged.nc",
              f"{single_date.strftime('%Y%m%d')}.nc")
    
    put_file_gcs(f"{single_date.strftime('%Y%m%d')}.nc", 
                 bucket_name, project_name=user_project)
    
    check_blob_size(f"{single_date.strftime('%Y%m%d')}.nc", 
                    bucket_name, project_name=user_project,
                    raise_threshold=3.8e+8)
    
    if cleanup:
        os.remove(f"{single_date.strftime('%Y%m%d')}.nc")
        os.remove(f"{single_date.strftime('%Y%m%d')}_clearsky.nc")

## Workflow

In [None]:
with tqdm_joblib(tqdm(total=sum(1 for _ in daterange(start_date, 
                                                     end_date)))) as pbar:
    Parallel(n_jobs=n_jobs, backend="multiprocessing")(
        delayed(ingest_cds_to_gcs)(day,
                                   hourly_data_bucket,
                                   user_project=project_id,
                                   cleanup=True)
        for day in daterange(start_date, end_date))