# **Exercise session 3: Convolutional neural network (CNN) and support vector machine (SVM) to detect $CH_4$ emissions from satellite data**

## Exercise 3.3 Batch jobs and server-side computations using OpenEO (1 pt)

So far you have been using openEO for synchronous execution of your requests: you submitted a request and the result came as a direct response to your request. This is not feasible for heavier work. Instead, one should submit the requests as batch jobs. Familiarise yourself with the Batch-job-combine-tasks.ipynb file.

Furthermore, instead of downloading large amounts of raw data, one can process the data directly on server side. You can check all available processes by running 'connection.list_processes()'. For more details, see the [EO Cookbook](https://openeo.org/documentation/1.0/cookbook/#temporal-mean-reduce-dimension). Note that they give two alternatives on how to compute the temporal mean and we recommend to use the 'reduce_dimension' function.

Task: fix an area of interest and a time period and load the min, max, mean and standard deviation of CH4. Perform this task as a batch job using server-side processes. You should save the output in the JSON format. Note that the output is computed per pixel.

In [2]:
import pathlib
import rasterio
import matplotlib.pyplot as plt
import openeo
import pandas as pd
import xarray as xr
import os
import joblib
import json

In [15]:
connection = openeo.connect(url="openeo.dataspace.copernicus.eu")
connection.authenticate_oidc_device()

OidcDeviceCodePollTimeout: Timeout (300.0s) while polling for access token.

In [11]:
# print(connection.list_collections())
for collection in connection.list_collections():
    print(collection['id'])

SENTINEL3_OLCI_L1B
SENTINEL3_SLSTR
SENTINEL_5P_L2
SENTINEL2_L1C
SENTINEL2_L2A
SENTINEL1_GRD
COPERNICUS_30
LANDSAT8_L2


In [15]:
# Load the coordinates and labels of the CH4 plumes coming from gas infrastructure, 
# oil infrastruture and coal mines from [all TROPOMI detected plumes for 2021. (Schuit et al. 2023)]
# (https://zenodo.org/records/8087134). 
download_path = "coursedata/users/nguyenb5"

CH4_plumes = pd.read_csv(f"Schuit_etal2023_TROPOMI_all_plume_detections_2021.csv")

# Filter out rows where estimated_source_type is "unclassified"
CH4_plumes = CH4_plumes[CH4_plumes['estimated_source_type'] != 'Unclassified']

# For each plume, compute the spatial extent of an area 1 deg x 1 deg around the plume
CH4_plumes['west'] = CH4_plumes['lon'] - 0.5
CH4_plumes['east'] = CH4_plumes['lon'] + 0.5
CH4_plumes['south'] = CH4_plumes['lat'] - 0.5
CH4_plumes['north'] = CH4_plumes['lat'] + 0.5

print("Number of data points:", len(CH4_plumes))

CH4_plumes.head()

Number of data points: 2944


Unnamed: 0,date,time_UTC,lat,lon,source_rate_t/h,uncertainty_t/h,estimated_source_type,west,east,south,north
0,20210101,06:00:45,36.75,109.76,32,16,Coal,109.26,110.26,36.25,37.25
1,20210101,06:00:55,37.53,110.75,39,22,Coal,110.25,111.25,37.03,38.03
2,20210101,07:37:49,20.89,85.22,4,2,Coal,84.72,85.72,20.39,21.39
3,20210101,07:38:15,23.3,90.79,51,15,Landfill/Urban,90.29,91.29,22.8,23.8
4,20210101,07:38:27,23.56,86.44,25,12,Coal,85.94,86.94,23.06,24.06


In [16]:
# Filter the data for each category and select the first 20 rows
gas_plumes = CH4_plumes[CH4_plumes['estimated_source_type'] == 'Gas'].head(20)
oil_plumes = CH4_plumes[CH4_plumes['estimated_source_type'] == 'Oil'].head(20)
coal_plumes = CH4_plumes[CH4_plumes['estimated_source_type'] == 'Coal'].head(20)
# We are exempt to use landfill/urban plumes. Classifying gas, oil and coal are enough
landfill_urban_plumes = CH4_plumes[CH4_plumes['estimated_source_type'] == 'Landfill/Urban'].head(20)

**1. Define the datacube**

In [11]:
period=("2021-01-01", "2021-01-31")

s5_CH4 = connection.load_collection(
    "SENTINEL_5P_L2",
    #spatial_extent=optional_bounding_box_for_all_plume_rectangles,
    temporal_extent = period,
    bands=["CH4"] #as before, server-side computations for S5 accept only one band at a time
)

s5_CO = connection.load_collection(
    "SENTINEL_5P_L2",
    #spatial_extent=optional_bounding_box_for_all_plume_rectangles,
    temporal_extent = period,
    bands=["CO"] #as before, server-side computations for S5 accept only one band at a time
)

s5_SO2 = connection.load_collection(
    "SENTINEL_5P_L2",
    #spatial_extent=optional_bounding_box_for_all_plume_rectangles,
    temporal_extent = period,
    bands=["SO2"] #as before, server-side computations for S5 accept only one band at a time
)


s5_NO2 = connection.load_collection(
    "SENTINEL_5P_L2",
    #spatial_extent=optional_bounding_box_for_all_plume_rectangles,
    temporal_extent = period,
    bands=["NO2"] #as before, server-side computations for S5 accept only one band at a time
)

s3_all_bands = connection.load_collection(
    "SENTINEL3_SLSTR",
    #spatial_extent=optional_bounding_box_for_all_plume_rectangles,
    temporal_extent = period,
    bands=["S5", "S6"] #as before, server-side computations for S3 accept only several bands at a time
)
    

**3. Add reducer and initiate the job**

### Divide and conquer between 5 accounts
### Each account can run 2 concurrent jobs

#### Account 1: Cube s5_CH4

In [17]:
datacube_name = "s5_CH4"
datacube = s5_CH4
plume_type = "gas"

print(f"Starting aggregating {datacube_name}_{plume_type}.json")

with open(f"{download_path}/assignment3_Task4/{plume_type}_geojson.json", 'r') as file:
    plumetype_geojson = json.load(file)
    
aggregation = datacube.aggregate_spatial(
    geometries=plumetype_geojson,
    reducer="mean",
)
aggregation = aggregation.save_result(format="JSON")
job=aggregation.create_job(title="aggregation")
job.start_and_wait()
results = job.get_results()
results.download_file(f"{download_path}/assignment3_Task4/{datacube_name}_{plume_type}.json")
print(f"Finish aggregating {datacube_name}_{plume_type}.json")

In [None]:
datacube_name = "s5_CH4"
datacube = s5_CH4
plume_type = "coal"

print(f"Starting aggregating {datacube_name}_{plume_type}.json")

with open(f"{download_path}/assignment3_Task4/{plume_type}_geojson.json", 'r') as file:
    plumetype_geojson = json.load(file)
    
aggregation = datacube.aggregate_spatial(
    geometries=plumetype_geojson,
    reducer="mean",
)
aggregation = aggregation.save_result(format="JSON")
job=aggregation.create_job(title="aggregation")
job.start_and_wait()
results = job.get_results()
results.download_file(f"{download_path}/assignment3_Task4/{datacube_name}_{plume_type}.json")
print(f"Finish aggregating {datacube_name}_{plume_type}.json")

In [None]:
datacube_name = "s5_CH4"
datacube = s5_CH4
plume_type = "oil"

print(f"Starting aggregating {datacube_name}_{plume_type}.json")

with open(f"{download_path}/assignment3_Task4/{plume_type}_geojson.json", 'r') as file:
    plumetype_geojson = json.load(file)
    
aggregation = datacube.aggregate_spatial(
    geometries=plumetype_geojson,
    reducer="mean",
)
aggregation = aggregation.save_result(format="JSON")
job=aggregation.create_job(title="aggregation")
job.start_and_wait()
results = job.get_results()
results.download_file(f"{download_path}/assignment3_Task4/{datacube_name}_{plume_type}.json")
print(f"Finish aggregating {datacube_name}_{plume_type}.json")