# Data Transformation for SOMOSPIE
Soil moisture is a critical variable that links climate dynamics with water and food security. It regulates land-atmosphere interactions (e.g., via evapotranspiration--the loss of water from evaporation and plant transpiration to the atmosphere), and it is directly linked with plant productivity and survival. Information on soil moisture is important to design appropriate irrigation strategies to increase crop yield, and long-term soil moisture coupled with climate information provides insights into trends and potential agricultural thresholds and risks. Thus, information on soil moisture is a key factor to inform and enable precision agriculture.

The current availability in soil moisture data over large areas comes from remote sensing (i.e., satellites with radar sensors) which provide daily, nearly global coverage of soil moisture. However, satellite soil moisture datasets have a major shortcoming in that they are limited to coarse spatial resolution (generally no finer than tens of kilometers).

There do exist at higher resolution other geographic datasets (e.g., climatic, geological, and topographic) that are intimately related to soil moisture values. SOMOSPIE is meant to be a general-purpose tool for using such datasets to downscale (i.e., increase resolution) satelite-based soil moisture products. This Jupyter Notebook is a result of a collaboration between computer scientists of the Global Computing Laboratory at the Universtiy of Tennessee, Knoxville and soil scientists at the University of Delware (funded by NSF awards #1724843 and #1854312).

This notebook combines terrain parameters withsatellite-based soil moisture data to generate the input files to the ML prediction workflow. 

## Environment Setup


In [None]:
from Pegasus.api import *
import os
from pathlib import Path
import logging

## OSN credentials and setup
Before running the workflow, specify your access key and secret key in the Pegasus credentials file at ~/.pegasus/credentials.conf with the format below.

```
[osn]
endpoint = https://sdsc.osn.xsede.org

[USER@osn]
access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
secret_key = abababababababababababababababab
```
**Note:** Replace USER with your ACCESS username

In the following code cell also specify the OSN bucket and ACCESS username.

In [None]:
# update to a OSN bucket you have access to. For example asc190064-bucket01 
osn_bucket="BUCKET" 
# update to your ACCESS username
access_user="USER"

!chmod 600 ~/.pegasus/credentials.conf

## Input parameters
In the code cell bellow specify the inputs to the workflow:

* **year:** year from which to fetch soil moisture data.
* **avg_type:** Averaging type over soil moisture values (monthly is the only option currently available).
* **param_names:** Name of the terrain parameters that will be combined with the satellite data.
* **param_paths:** Path to the terrain parameters that will be combined with the satellite data.
* **shp_path:** Shp file in zip format of the region of interest.
* **n_tiles:** Number of tiles of the elevation data both from the x and y axis, total number of tiles = n_tiles*n_tiles.

In [None]:
year = 2010
avg_type = 'monthly'
param_names = ['aspect', 'elevation', 'hillshading', 'slope']
param_paths = ["s3://" + access_user +"@osn/" + osn_bucket + "/TerrainParameters/OK_10m/" + param + '.tif' for param in param_names]
shp_path = "s3://" + access_user +"@osn/" + osn_bucket + "/shpFiles/OK.zip"
n_tiles = 6

## Pegasus logging and properties
Some properties for the workflow are specified, such as the data staging configuration to NonShared FileSystem to be able to use OSN for the intermediate and output data.

In [None]:
logging.basicConfig(level=logging.DEBUG)
BASE_DIR = Path(".").resolve()

stg_out = False # Set to True to disable cleanup jobs on intermediate files

# --- Properties ---------------------------------------------------------------
props = Properties()
props["pegasus.monitord.encoding"] = "json"  
# props["pegasus.mode"] = "tutorial" # speeds up tutorial workflows - remove for production ones
props["pegasus.catalog.workflow.amqp.url"] = "amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows"
props["pegasus.data.configuration"] = "nonsharedfs"
props["pegasus.transfer.threads"] = "10"
props["pegasus.transfer.lite.threads"] = "10"
props["pegasus.integrity.checking"] = "none" # temporary, bug
#props["pegasus.transfer.bypass.input.staging"] = "true"
props.write() # written to ./pegasus.properties 

## Replica Catalog
The input files to the workflow are specified in the Replica Catalog.

In [None]:
rc = ReplicaCatalog()

param_files = []
for param_path in param_paths:
    param_files.append(File(os.path.basename(param_path)))
    rc.add_replica(site="osn", lfn=param_files[-1], pfn=param_path)

shp_file = File(os.path.basename(shp_path))
rc.add_replica(site="osn", lfn=shp_file, pfn=shp_path)

rc.write()

## Transformation Catalog
In this catalog the container in which the workflow will be run is specified along with the scripts that contain each of the functions of the workflow. 

In [None]:
# --- Container ----------------------------------------------------------
base_container = Container(
                  "base-container",
                  Container.SINGULARITY,
                  image="docker://olayap/somospie-gdal-netcdf")

# --- Transformations ----------------------------------------------------------
get_sm = Transformation(
                "get_sm.py",
                site="local",
                pfn=Path(".").resolve() / "code/get_sm.py",
                is_stageable=True,
                container=base_container,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            ).add_profiles(Namespace.CONDOR, request_memory="3GB")

generate_train = Transformation(
                "generate_train.py",
                site="local",
                pfn=Path(".").resolve() / "code/generate_train.py",
                is_stageable=True,
                container=base_container,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            ).add_profiles(Namespace.CONDOR, request_memory="3GB")

generate_eval = Transformation(
                "generate_eval.py",
                site="local",
                pfn=Path(".").resolve() / "code/generate_eval.py",
                is_stageable=True,
                container=base_container,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            ).add_profiles(Namespace.CONDOR, request_memory="8GB", request_disk="70GB")


tc = TransformationCatalog()\
    .add_containers(base_container)\
    .add_transformations(get_sm, generate_train, generate_eval)\
    .write() # written to ./transformations.yml

## Site Catalog
Specifies the OSN bucket where the files from the workflow will be stored and the local site where the input files and scripts are present.

In [None]:
# --- Site Catalog ------------------------------------------------- 
osn = Site("osn", arch=Arch.X86_64, os_type=OS.LINUX)

# create and add a bucket in OSN to use for your workflows
osn_shared_scratch_dir = Directory(Directory.SHARED_SCRATCH, path="/" + osn_bucket + "/DataTransformation/work") \
    .add_file_servers(FileServer("s3://" + access_user +"@osn/" + osn_bucket + "/DataTransformation/work", Operation.ALL),)
osn_shared_storage_dir = Directory(Directory.SHARED_STORAGE, path="/" + osn_bucket + "/DataTransformation/storage") \
    .add_file_servers(FileServer("s3://" + access_user +"@osn/" + osn_bucket + "/DataTransformation/storage", Operation.ALL),)
osn.add_directories(osn_shared_scratch_dir, osn_shared_storage_dir)

# add a local site with an optional job env file to use for compute jobs
shared_scratch_dir = "{}/work".format(BASE_DIR)
local_storage_dir = "{}/storage".format(BASE_DIR)
local = Site("local") \
    .add_directories(
    Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
        .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),
    Directory(Directory.LOCAL_STORAGE, local_storage_dir)
        .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL)))

#job_env_file = Path(str(BASE_DIR) + "/../tools/job-env-setup.sh").resolve()
#local.add_pegasus_profile(pegasus_lite_env_source=job_env_file)

#condorpool_site = Site("condorpool")
#condorpool_site.add_condor_profile(request_cpus=1, request_memory="9 GB", request_disk="9 GB")

sc = SiteCatalog()\
   .add_sites(osn, local)\
   .write() # written to ./sites.yml

## Workflow
The workflow is specified in the next code cell with the inputs, output and intermediate files. The latter also have specified cleanup jobs by using the argument **stage_out=False**.

In [None]:
# --- Workflow -----------------------------------------------------------------
wf = Workflow("DataTransformation")

avg_files = [File('{0:02d}.tif'.format(month)) for month in range(1, 13)]

job_get_sm = Job(get_sm)\
                .add_args("-y", year, "-a", avg_type, "-o", *avg_files)\
                .add_outputs(*avg_files, stage_out=stg_out) # bypass_staging=False

wf.add_jobs(job_get_sm)

# Generate training files
for i, avg_file in enumerate(avg_files):
    train_file = File('{0:04d}_{1:02d}.tif'.format(year, i + 1))
    train_file_aux = File('{0:04d}_{1:02d}.tif.aux.xml'.format(year, i + 1))
    job_generate_train = Job(generate_train)\
                        .add_args("-i", avg_file, "-o", train_file, "-f", *param_files, "-p", *param_names, "-s", shp_file)\
                        .add_inputs(avg_file, *param_files, shp_file)\
                        .add_outputs(train_file, train_file_aux, stage_out=True)
    wf.add_jobs(job_generate_train)
    
# Generate eval files    
if n_tiles == 0:
    eval_file = File('eval.tif')
    eval_file_aux = File("eval.tif.aux.xml")
    job_generate_eval = Job(generate_eval)\
                    .add_args("-i", *param_files, "-p", *param_names, "-n", n_tiles, "-s", shp_file, "-o", eval_file)\
                    .add_inputs(*param_files, shp_file)\
                    .add_outputs(eval_file, eval_file_aux, stage_out=True)
    wf.add_jobs(job_generate_eval)

tile_count = 0
for i in range(n_tiles):
    for j in range(n_tiles):
        eval_path = "eval_{0:04d}.tif".format(tile_count)
        eval_file = File(eval_path)
        eval_file_aux = File(eval_path + ".aux.xml")
        job_generate_eval = Job(generate_eval)\
                        .add_args("-i", *param_files, "-p", *param_names, "-n", n_tiles, "-x", i, "-y", j, "-s", shp_file, "-o", eval_file)\
                        .add_inputs(*param_files, shp_file)\
                        .add_outputs(eval_file, eval_file_aux, stage_out=True)
        wf.add_jobs(job_generate_eval)
        
        tile_count += 1

## Visualizing the Workflow

In [None]:
try:
    wf.write()
    wf.graph(include_files=True, label="xform-id", output="graph.png")
except PegasusClientError as e:
    print(e)

# view rendered workflow
from IPython.display import Image
Image(filename='graph.png')

## Plan and submit the Workflow
In this case OSN is specified for data staging.

In [None]:
try:
    wf.plan(staging_sites={"condorpool": "osn"}, sites=["condorpool"], output_sites=["osn"], submit=True)\
        .wait()
except PegasusClientError as e:
    print(e)


## Analyze the workflow
Pegasus returns statistics from the run of the workflow.

In [None]:
try:
    wf.statistics()
except PegasusClientError as e:
    print(e)

## Debug the workflow
In case of failure `wf.analyze()` is helpful to find the cause of the error.

In [None]:
try:
    wf.analyze()
except PegasusClientError as e:
    print(e)