# Request Data from Copernicus (GloFAS; ERA5) and Save Merged Data to Cloud Object Storage (COS)

In this notebook, we assume that when it is run, there is no notion of our data since it has never been persisted. Therefore this is the *initial* notebook to run and ideally only run once. When invoking the pipeline further times there should already be historic data in place, which makes running this notebook unnecessary (at that point).

Steps covered in this notebook:
1. Retrieve parameters
2. Set-up Cloud Object Storage connection
3. Set-up Copernicus credentials (w/ Configuration File)
4. **Retrieve ERA5 and GloFAS for given timeframe**
5. Handle both netcdf files (open, interpolate, reset_index, to_pandas)
6. **Concatenate both datasets on Latitude, Longitude, Time**
7. Serialize result and persist with Cloud Object Storage

In [1]:
# TODO: Create software configuration in Watson Studio to reduce resource waste by installing manually on each run
!pip install cdsapi netCDF4 xarray ibm_watson_studio_pipelines

zsh:1: /Users/ennmouri/csm/mlops-sustainability-oss/venv/bin/pip: bad interpreter: /Users/ennmouri/csm/mlops-sustainability/venv/bin/python3: no such file or directory
Collecting cdsapi
  Using cached cdsapi-0.6.1.tar.gz (13 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting netCDF4
  Downloading netCDF4-1.6.3-cp310-cp310-macosx_11_0_arm64.whl (3.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting xarray
  Using cached xarray-2023.5.0-py3-none-any.whl (994 kB)
Collecting cftime
  Downloading cftime-1.6.2-cp310-cp310-macosx_11_0_arm64.whl (212 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.6/212.6 kB[0m [31m20.1 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: cdsapi
  Building wheel for cdsapi (setup.py) ... [?25ldone
[?25h  Created wheel for cdsapi: filename=cdsapi-0.6.1-py2.py3-none-any.whl size=12010 sha256=12de0

In [1]:
# data sources
import cdsapi

# data manipulation
from netCDF4 import Dataset
import xarray as xr
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# serialization
import pickle
import json

# remotes
from botocore.client import Config
from ibm_watson_studio_pipelines import WSPipelines
import ibm_boto3

# misc
import logging
import os, types
import warnings
warnings.filterwarnings("ignore")




### Retrieve parameters & Set-up Cloud Object Storage connection

**Note**: If you are running this notebook outside of a Watson Studio Pipeline execution. Make sure to set the environment variables that the Pipeline environment would have passed to the notebook.
Refer to ```credentials.py```.

In [4]:
# Uncomment this cell and put your credentials in credentials.py to run locally.
from credentials2 import set_env_variables_for_credentials
set_env_variables_for_credentials()

In [3]:
## Retrieve cos credentials from global pipeline parameters

# Get json from environment and convert to string
project_cos_credentials = json.loads(os.getenv('PROJECT_COS_CREDENTIALS'))
mlops_cos_credentials = json.loads(os.getenv('MLOPS_COS_CREDENTIALS'))

## PROJECT COS 
AUTH_ENDPOINT = project_cos_credentials['AUTH_ENDPOINT']
ENDPOINT_URL = project_cos_credentials['ENDPOINT_URL']
API_KEY_COS = project_cos_credentials['API_KEY']
BUCKET_PROJECT_COS = project_cos_credentials['BUCKET']

## MLOPS COS
ENDPOINT_URL_MLOPS = mlops_cos_credentials['ENDPOINT_URL']
API_KEY_MLOPS = mlops_cos_credentials['API_KEY']
CRN_MLOPS = mlops_cos_credentials['CRN']
BUCKET_MLOPS  = mlops_cos_credentials['BUCKET']

In [4]:
CLOUD_API_KEY = os.getenv('CLOUD_API_KEY')

In [5]:
def save_df_to_cos(df,filename,key):
    """
    
    Save Data in IBM Cloud Object Storage

    
    """

    try:
        #df.to_csv(filename,index=False)
        with open(filename, 'wb') as file:
            pickle.dump(df, file)
        mlops_res = ibm_boto3.resource(
            service_name='s3',
            ibm_api_key_id=API_KEY_MLOPS,
            ibm_service_instance_id=CRN_MLOPS,
            ibm_auth_endpoint=AUTH_ENDPOINT,
            config=Config(signature_version='oauth'),
            endpoint_url=ENDPOINT_URL_MLOPS)

        mlops_res.Bucket(BUCKET_MLOPS).upload_file(filename,key)
        print(f"Dataframe {filename} uploaded successfully")
    except Exception as e:
        print(e)
        print("Dataframe upload for {filename} failed")

def save_binary_to_cos(filename,key):
    """
    
    Save Data in IBM Cloud Object Storage

    
    """

    try:
        mlops_res = ibm_boto3.resource(
            service_name='s3',
            ibm_api_key_id=API_KEY_MLOPS,
            ibm_service_instance_id=CRN_MLOPS,
            ibm_auth_endpoint=AUTH_ENDPOINT,
            config=Config(signature_version='oauth'),
            endpoint_url=ENDPOINT_URL_MLOPS)

        mlops_res.Bucket(BUCKET_MLOPS).upload_file(filename,key)
        print(f"File {filename} uploaded successfully")
    except Exception as e:
        print(e)
        print("File upload for {filename} failed")

def check_if_file_exists(filename):
    mlops_client = ibm_boto3.client(
        service_name='s3',
        ibm_api_key_id=API_KEY_MLOPS,
        ibm_service_instance_id=CRN_MLOPS,
        ibm_auth_endpoint=AUTH_ENDPOINT,
        config=Config(signature_version='oauth'),
        endpoint_url=ENDPOINT_URL_MLOPS)
    
    for key in mlops_client.list_objects(Bucket=BUCKET_MLOPS)['Contents']:
        files = key['Key']
        if files == filename:
            return True
    return False

### Set-up Copernicus credentials (w/ Configuration File)

In [6]:
# Use your Copernicus API_KEY
# @hidden_cell
import os
CDS_USER_ID = os.getenv("CDS_USER_ID")
CDS_API_KEY = os.getenv("CDS_API_KEY")

In [7]:
# Setup copernicus credentials file for cdsapi
import os
with open(os.path.join(os.path.expanduser('~'), '.cdsapirc'), 'w') as f:
    f.write('url: https://cds.climate.copernicus.eu/api/v2\n')
    f.write(f'key: {CDS_USER_ID}:{CDS_API_KEY}')

In [8]:
# Ensure COPERNICUS config is setup at the right place
!cat ~/.cdsapirc

url: https://cds.climate.copernicus.eu/api/v2
key: 198547:84cce522-23c8-4df9-8458-f1f3e7f15a07

In [9]:
copernicus = cdsapi.Client()

### Retrieve ERA5 and GloFAS for given timeframe

In [10]:
europe = [72,25,34,40] # NWSE bounds for Europe
days = [str(i) for i in range(31)]
# months = ['january', 'february', 'march', 'april']
# years = ['2023']

months = ['january', 'february', 'march', 'april']#, 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december']
years = ['2023']

hours = [
            '00:00', '01:00', '02:00',
            '03:00', '04:00', '05:00',
            '06:00', '07:00', '08:00',
            '09:00', '10:00', '11:00',
            '12:00', '13:00', '14:00',
            '15:00', '16:00', '17:00',
            '18:00', '19:00', '20:00',
            '21:00', '22:00', '23:00',
]

hours = [ '00:00',]
hours

['00:00']

In [11]:
def download_glofas_historic(client, bounds, years, months, days, download_path):
    glofas_format = ".netcdf4.zip"
    if os.path.exists(f'{download_path}{glofas_format}'):
        # Reason to cancel download process if file exists is elaborated where method is invoked.
        print(f"Target filename already exists in target path ({download_path}{glofas_format})... cancelling download")
        exit
    else:
        client.retrieve(
            'cems-glofas-historical',
            {
                'system_version': 'version_3_1',
                'variable': 'river_discharge_in_the_last_24_hours',
                'format': 'netcdf4.zip',
                'hyear': years,
                'hmonth': months,
                'hday': days,
                'hydrological_model': 'lisflood',
                'product_type': 'intermediate',
                'area': bounds,
            },
            f'{download_path}.netcdf4.zip')

In [12]:
# Download ERA5 monthly averaged data from soil temp l1, volumetric soil water l1, total precipitation
def download_era5_historic(client, bounds, years, months, days, hours, download_path):
    era5_format = ".netcdf.zip"
    if os.path.exists(f'{download_path}{era5_format}'):
        # Reason to cancel download process if file exists is elaborated where method is invoked.
        print(f"Target filename already exists in target path ({download_path}{era5_format})... cancelling download")
        exit
    else:
        client.retrieve(
            'reanalysis-era5-land',
            {
                'variable': [
                    'soil_temperature_level_1', 'total_precipitation', 'volumetric_soil_water_layer_1',
                ],
                'year': years,
                # CDS Datasets do not have uniformal requests. Here Months are expected to be e.g. "01" instead of 'january'.
                # Work-around with list comprehension
                # 'month': [str(i) for i in range(len(months))],
                'month': [f'0{i+1}' if i < 9 else str(i+1) for i in range(len(months))],
                'day': [f'0{i+1}' if i < 9 else str(i+1) for i in range(len(days))],
                'time': hours,
                'format': 'netcdf.zip',
                'area': bounds,
            },
            f'{download_path}.netcdf.zip')

In [13]:
def download_era5_historic_extended(client, bounds, years, months, days, hours, download_path):
    months = [f'0{i+3}' if i < 9 else str(i+1) for i in range(len(months))]

    import time

    for year in years:
        for month in months:
            print(f"Downloading ERA5 data for {month}.{year} (mm.YYYY)")

            client.retrieve(
            'reanalysis-era5-land',
            {
                'variable': [
                    'runoff', 'skin_reservoir_content', 'skin_temperature',
                    'soil_temperature_level_1', 'soil_temperature_level_2', 'soil_temperature_level_3',
                    'soil_temperature_level_4', 'surface_runoff', 'total_precipitation',
                    'volumetric_soil_water_layer_1', 'volumetric_soil_water_layer_2', 'volumetric_soil_water_layer_3',
                    'volumetric_soil_water_layer_4',
                ],
                'year': years,
                'month': month,
                'day': [f'0{i+1}' if i < 9 else str(i+1) for i in range(len(days))],
                'time': hours,
                'format': 'netcdf.zip',
                'area': bounds,
            },
            f'{download_path}_{month}_{year}.netcdf.zip')
            print("Waiting 30 seconds...")
            time.sleep(30)
            

In [16]:
# NOTE: cdsapi has no notion of the files in the current working directory. 
# Passing a download path and filename where a file already sits causes a seemingly infinite loop in the download process.
# Your cell will never finish running and resources will be wasted.
# No problem for CPDaaS since working directory is runtime bound (no persistent filesystem) and in production the file cannot already exist.
download_glofas_historic(
    copernicus,
    bounds=europe,
    years=years,
    months=months,
    days=days,
    download_path="glofas_2023"
)

Target filename already exists in target path (glofas_2023.netcdf4.zip)... cancelling download


In [17]:
# NOTE: cdsapi has no notion of the files in the current working directory. 
# Passing a download path and filename where a file already sits causes a seemingly infinite loop in the download process.
# Your cell will never finish running and resources will be wasted.
# No problem for CPDaaS since working directory is runtime bound (no persistent filesystem) and in production the file cannot already exist.
download_era5_historic(
    copernicus,
    bounds=europe,
    years=years,
    months=months,
    days=days,
    hours=hours,
    download_path="era5_2023"
)

Target filename already exists in target path (era5_2023.netcdf.zip)... cancelling download


In [None]:
# download_era5_historic_extended(
#     copernicus,
#     bounds=europe,
#     years=years,
#     months=["02"],
#     days=days,
#     hours=hours,
#     download_path="era5_2023_extended"
# )

In [None]:
#era5_zip = save_binary_to_cos('era5_2023.netcdf.zip', 'era5_2023.netcdf.zip')
#glofas_zip = save_binary_to_cos('glofas_2023.netcdf.zip', 'glofas_2023.netcdf.zip')
!ls -lh

### Handle ERA5/GloFAS netcdf files (open, interpolate, reset_index, to_pandas)

In [20]:
!mkdir era5 && mkdir glofas

In [21]:
!unzip era5_2023.netcdf.zip -d era5 && unzip glofas_2023.netcdf4.zip -d glofas

Archive:  era5_2023.netcdf.zip
  inflating: era5/data.nc            
Archive:  glofas_2023.netcdf4.zip
 extracting: glofas/data.nc          


In [22]:
e5 = xr.open_dataset('era5/data.nc')
f = xr.open_dataset('glofas/data.nc')

## Handle ERA5 Data

**Data**: Total Precipitation; Volumetric Soil Water Layer 1; Soil Temperature Level 1

**Mission**: We requested the above mentioned variables for roughly the same coordinates (variation of .05). Lets have a quick look at the dataset and prepare it for a training split, version control, and more.


In [23]:
e5

In [24]:
# Interpolate to drop 'expver' mask from coordinates
e5_interp = e5.interp_like(f)

In [25]:
e5_interp

In [26]:
# Get rid of that darn supplementary expver dimension's issue (See https://confluence.ecmwf.int/display/CUSF/ERA5+CDS+requests+which+return+a+mixture+of+ERA5+and+ERA5T+data)
e5_combine = e5_interp.sel(expver=1).combine_first(e5_interp.sel(expver=5))
e5_combine.load()
e5_combine

### Concatenate both datasets on Latitude, Longitude, Time

In [27]:
## Joining predictand onto feature y-interpolated table 
# Set features to keep and choose target variable
X = e5_combine.to_dataframe()
y = f['dis24'].to_dataframe()

# Reset the index to include the coordinates as columns
X.reset_index(inplace=True)
y.reset_index(inplace=True)

In [28]:
# Merge features and predictand together common coordinates (time, latitude, longitude)
data = pd.merge(X, y, on=['time', 'latitude', 'longitude'])
data

Unnamed: 0,time,latitude,longitude,stl1,tp,swvl1,step,surface,valid_time,dis24
0,2023-01-01,72.05,24.95,,,,1 days,0.0,2023-01-02,
1,2023-01-01,72.05,25.05,,,,1 days,0.0,2023-01-02,
2,2023-01-01,72.05,25.15,,,,1 days,0.0,2023-01-02,
3,2023-01-01,72.05,25.25,,,,1 days,0.0,2023-01-02,
4,2023-01-01,72.05,25.35,,,,1 days,0.0,2023-01-02,
...,...,...,...,...,...,...,...,...,...,...
6851547,2023-04-30,33.95,39.65,,,,1 days,0.0,2023-05-01,8.03125
6851548,2023-04-30,33.95,39.75,,,,1 days,0.0,2023-05-01,0.28125
6851549,2023-04-30,33.95,39.85,,,,1 days,0.0,2023-05-01,0.15625
6851550,2023-04-30,33.95,39.95,,,,1 days,0.0,2023-05-01,0.18750


In [29]:
# Shows most recent day covered by data ('2023-04-30') to later handle merging with newer data more efficiently
most_recent_covered_day = str(data['time'].max()).split()[0] 

### Serialize Concatenated Dataset

In [30]:
# Pickle and save data

FILENAME = "era5-glofas-merged.pkl"

save_df_to_cos(data, FILENAME, FILENAME)

Dataframe era5-glofas-merged.pkl uploaded successfully


### Persist with Cloud Object Storage

Serialized dataset will be moved to COS since filesystem in CPDaaS runtimes is temporary and therefore unfit to house our data. 

In [32]:
files_copied_in_cos = check_if_file_exists(FILENAME)
files_copied_in_cos

True

### Hand-off to Next Notebook

In [33]:
validation_params = {}
validation_params['most_recent_day_in_data'] = most_recent_covered_day
validation_params['serialized_data_filename'] = FILENAME
validation_params['files_copied_in_cos'] = files_copied_in_cos

In [34]:
pipelines_client = WSPipelines.from_apikey(apikey=CLOUD_API_KEY)
pipelines_client.store_results(validation_params)

Running outside of Watson Studio Pipeline - storing results in the local filesystem for testing purposes...

  output paths:
    - "most_recent_day_in_data": .ibm_watson_studio_pipelines/results/most_recent_day_in_data
    - "serialized_data_filename": .ibm_watson_studio_pipelines/results/serialized_data_filename
    - "files_copied_in_cos": .ibm_watson_studio_pipelines/results/files_copied_in_cos


<ibm_cloud_sdk_core.detailed_response.DetailedResponse at 0x146e4b490>