# R4openEO use case 1, machine learning UDF implementation using Eco4alps training data

## Author btufail@eurac.edu
## Date 2022/11/10

### Import libraries and packages

In [1]:
import openeo
from openeo.processes import process
from openeo.processes import eq
from openeo.processes import quantiles, mean, min, median, sd

### Connect and Login

In [2]:
openeoHost = "https://openeo.eurac.edu"
# Connect to the back-end
connection = openeo.connect("https://openeo.eurac.edu").authenticate_oidc(client_id="openEO_PKCE")

Authenticated using refresh token.


### Discover the available collections:

In [None]:
connection.list_collections()

### Discover the available processes:

In [None]:
connection.list_processes()

### Load data

In [3]:
connection = openeo.connect(openeoHost).authenticate_oidc(client_id="openEO_PKCE")
load1 = connection.load_collection(collection_id = "S2_L2A_ALPS",
                                   spatial_extent = {"east": 11.371689015081296,"south": 46.44500631201319,"north": 46.52435165332716,"west": 11.253252336416189}, 
                                   temporal_extent = ["2021-05-01T00:00:00Z","2021-10-31T00:00:00Z"],
                                   bands = ["B02", "B03", "B04", "B05", "B06", "B07", "B08", "B8A", "B11", "B12", "CLOUD_MASK"],
                                   properties = {})

Authenticated using refresh token.


### Masking

In [4]:
def reducer1(data, context):
    arrayelement2 = process("array_element", data = data, index = 0)
    eq2 = process("eq", x = arrayelement2, y = 1)
    return eq2

In [5]:
zero_mask = load1.band('B04') == 0
stack = load1.filter_bands(["B02", "B03", "B04", "B05", "B06", "B07", "B08", "B8A", "B11", "B12"]).mask(zero_mask)
clmask = load1.process("filter_bands", data = load1, bands = ["CLOUD_MASK"])

In [6]:
cloudmask = clmask.reduce_dimension(reducer = reducer1, dimension = "bands")
load1 = stack.mask(cloudmask)

### Feature generation

In [7]:
def compute_features(cube,band_name):
    def reducer1(data, context):
        array1 = process("array_element", data = data, label = band_name)
        return array1
    
    cube_band = cube.reduce_dimension(reducer = reducer1, dimension = "bands")
    
    cube_quantiles = cube_band.apply_dimension(
            process=lambda d: quantiles(d, [0.1, 0.25, 0.5, 0.75, 0.9]),
            dimension="DATE",
            target_dimension="variable"
        )
    
    cube_quantiles.metadata = cube_quantiles.metadata.add_dimension("bands", "bands")
    cube_quantiles_renamed = cube_quantiles.rename_labels(dimension='bands',target=[band_name + '_Q10',band_name + '_Q25',band_name + '_Q50',band_name + '_Q75',band_name + '_Q90'])
    
    cube_band_mean_time = cube_band.reduce_dimension(reducer=mean,dimension='DATE')
    cube_band_mean_time = cube_band_mean_time.add_dimension(name='bands',label=band_name + '_AVG')
    
    cube_band_median_time = cube_band.reduce_dimension(reducer=median,dimension='DATE')
    cube_band_median_time = cube_band_median_time.add_dimension(name='bands',label=band_name + '_MED')

    cube_band_min_time = cube_band.reduce_dimension(reducer=min,dimension='DATE')
    cube_band_min_time = cube_band_min_time.add_dimension(name='bands',label=band_name + '_MIN')

    cube_band_std_time = cube_band.reduce_dimension(reducer=sd,dimension='DATE')
    cube_band_std_time = cube_band_std_time.add_dimension(name='bands',label=band_name + '_STD')
    
    def reducer4(data, context):
        bgqa0sh1p = process("array_element", data = data, label = band_name + "_Q25")
        gsuv3t22h = process("array_element", data = data, label = band_name + "_Q75")
        r3f7c22u9 = process("subtract", x = gsuv3t22h, y = bgqa0sh1p)
        return r3f7c22u9
    
    inter_q_diff = cube_quantiles_renamed.reduce_dimension(reducer = reducer4, dimension = "bands")
    inter_q_diff = inter_q_diff.add_dimension(name='bands',label=band_name + '_IQR')
    
    result_0 = inter_q_diff.merge_cubes(cube_quantiles_renamed).merge_cubes(cube_band_mean_time).merge_cubes(cube_band_median_time).merge_cubes(cube_band_min_time).merge_cubes(cube_band_std_time)
    return result_0

b02_cube = compute_features(load1,'B02')   
b03_cube = compute_features(load1,'B03')
b04_cube = compute_features(load1,'B04')
b05_cube = compute_features(load1,'B05')
b06_cube = compute_features(load1,'B06')
b07_cube = compute_features(load1,'B07')
b08_cube = compute_features(load1,'B08')
b8A_cube = compute_features(load1,'B8A')
b11_cube = compute_features(load1,'B11')
b12_cube = compute_features(load1,'B12')
result = b02_cube.merge_cubes(b03_cube).merge_cubes(b04_cube).merge_cubes(b05_cube).merge_cubes(b06_cube).merge_cubes(b07_cube).merge_cubes(b08_cube).merge_cubes(b8A_cube).merge_cubes(b11_cube).merge_cubes(b12_cube)

### Running UDF through reduce dimension 

In [8]:
udf_reducer = lambda data: data.run_udf(udf="https://raw.githubusercontent.com/Open-EO/r4openeo-usecases/main/uc1-ml-landcover/eco4alps/udf/reduce_udf_eco4alps.R", runtime="R")
udf_result = result.reduce_dimension(reducer=udf_reducer,dimension='bands')

### Running graph and saving results

In [10]:
result = udf_result.save_result(format='GTIFF')

In [8]:
result = result.save_result(format='NetCDF')

In [11]:
job = connection.create_job(result,title="complete_udf_pg")
job_id = job.job_id
if job_id:
    print("Batch job created with id: ",job_id)
    job.start_job()
else:
    print("Error! Job ID is None")

Batch job created with id:  6c5462b1-27c9-4847-80a2-ac1bad033fb0


In [48]:
ST_S2_rgb_mask = job.get_results()
ST_S2_rgb_mask.download_files()

[PosixPath('/home/btufail@eurac.edu/Documents/SInCohMap/result.tiff'),
 PosixPath('/home/btufail@eurac.edu/Documents/SInCohMap/process.json')]

In [None]:
print(result.to_json())