https://github.com/earthpulse/eotdl/issues/190


1. find and explore the EuroCropsDataset, stage it in the EODTL workspace
2. filter the EuroCropsDataset dataset using EOTDL functionality, to create a subset of parcels,
   e.g., 8 crop classes, each with 1000 examples, for one country
3. run feature engineering with openEO, creating temporal metrics from a S1 and S2 time series (temporally optimised for crops classe of interest). Store feature engineering process graph with the training datsets in EOTDL
4. Use EOTDL functionality to train a model (for this the features need to be retrieved..). Store the model along with the openEO process graph in EOTDL.
5. Use the model to run inference (from within EOTDL?) in an openEO platform such as CDSE or openEO platform. Make use of the feature engineering process graph stored along with the EOTDL model.

## 1 Ingest EuroCrops to EOTDL

We already have Q0 https://www.eotdl.com/datasets/EuroCrops/. The dataset contains a zip file, which in turn contains zip files for each country with the shapefiles (16 total).

In [4]:
# !eotdl datasets get EuroCrops -v 1
# !unzip -o ~/.cache/eotdl/datasets/EuroCrops/v1/EuroCrops.zip -d data/

In [5]:
# from glob import glob

# zips = glob('data/*.zip')

# zips

In [6]:
# # unzip shapefiles

# import zipfile

# for zip_file in zips:
# 	with zipfile.ZipFile(zip_file, 'r') as zip_ref:
# 		zip_ref.extractall('data/')


In [7]:
# cleanup

# !rm -rf data/*.zips

In [9]:
from glob import glob

shapefiles = glob('data/**/*.shp', recursive=True)

shapefiles

['data/DE_NRW_2021_EC21.shp',
 'data/EE_2021_EC21.shp',
 'data/LV_2021_EC21.shp',
 'data/SK_2021_EC21.shp',
 'data/NL_2020_EC21.shp',
 'data/BE_VLG_2021_EC21.shp',
 'data/DK_2019_EC21.shp',
 'data/SI_2021_EC21.shp',
 'data/LT_2021_EC.shp',
 'data/AT_2021_EC21.shp',
 'data/DE_LS_2021_EC21.shp',
 'data/filtered_gdf.shp',
 'data/RO/RO_ny_EC21.shp',
 'data/SE/SE_2021_EC21.shp',
 'data/FR/FR_2018_EC21.shp',
 'data/HR/HR_2020_EC21.shp',
 'data/NA/ES_NA_2020_EC21.shp']

In [10]:
import geopandas as gpd

path = shapefiles[0]

gdf = gpd.read_file(path)

gdf.head()


Unnamed: 0,ID,INSPIRE_ID,FLIK,AREA_HA,CODE,CODE_TXT,USE_CODE,USE_TXT,D_PG,CROPDIV,EFA,ELER,WJ,DAT_BEARB,EC_trans_n,EC_hcat_n,EC_hcat_c,geometry
0,4598773,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0544130746,1.5204,311,Winterraps,OE,Ölsaaten,N,N,N,N,2021,2021-03-12,Winter rape,winter_rapeseed_rape,3301060401,"POLYGON ((428647.74 5711831.893, 428651.689 57..."
1,4598772,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0544130596,2.2812,131,Wintergerste,GT,Getreide,N,N,N,N,2021,2021-03-12,Winter barley,winter_barley,3301010401,"POLYGON ((427717.449 5710011.129, 427709.347 5..."
2,4598771,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0544130402,0.8311,115,Winterweichweizen,GT,Getreide,N,N,N,N,2021,2021-03-12,Winter soft wheat,winter_common_soft_wheat,3301010101,"POLYGON ((427337.557 5710068.068, 427332.544 5..."
3,5447571,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0548091835,4.7241,459,Grünland (Dauergrünland),GL,Dauergrünland,Y,N,N,Y,2021,2021-09-24,Grassland (permanent grassland),pasture_meadow_grassland_grass,3302000000,"POLYGON ((376283.353 5665431.25, 376308.653 56..."
4,5447586,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0548091988,6.1005,459,Grünland (Dauergrünland),GL,Dauergrünland,Y,N,N,Y,2021,2021-09-24,Grassland (permanent grassland),pasture_meadow_grassland_grass,3302000000,"POLYGON ((376495.069 5665848.269, 376496.653 5..."


In [12]:
# columns
gdf.columns

Index(['ID', 'INSPIRE_ID', 'FLIK', 'AREA_HA', 'CODE', 'CODE_TXT', 'USE_CODE',
       'USE_TXT', 'D_PG', 'CROPDIV', 'EFA', 'ELER', 'WJ', 'DAT_BEARB',
       'EC_trans_n', 'EC_hcat_n', 'EC_hcat_c', 'geometry'],
      dtype='object')

TODO: create and ingest Q1/Q2.

## 2. Filter EuroCropsDataset

Filter the EuroCropsDataset dataset using EOTDL functionality, to create a subset of parcels, e.g., 8 crop classes, each with 1000 examples, for one country

Filter from Q0.

In [11]:
# random country

import numpy as np

ix = np.random.randint(0, len(shapefiles))
country = shapefiles[ix]

country

'data/AT_2021_EC21.shp'

In [12]:
gdf = gpd.read_file(path)

In [13]:
crop_classes = gdf['EC_hcat_n'].unique()

crop_classes

array(['winter_rapeseed_rape', 'winter_barley',
       'winter_common_soft_wheat', 'pasture_meadow_grassland_grass',
       'green_silo_maize', 'sugar_beet', 'summer_oats', 'potatoes',
       'winter_rye', 'summer_rye', 'unmaintained', 'beans',
       'winter_triticale', 'spring_common_soft_wheat',
       'fallow_land_not_crop', 'peas', 'alfalfa_lucerne',
       'grain_maize_corn_popcorn', 'not_known_and_other',
       'arable_land_seed_seedlings', 'miscanthus_silvergrass',
       'flowers_ornamental_plants', 'orchards_fruits',
       'legumes_dried_pulses_protein_crops', 'summer_barley', 'clover',
       'other_arable_land_crops', 'salads_lettuce_leaf_vegetables',
       'fresh_vegetables', 'celery', 'asparagus', 'tree_wood_forest',
       'winter_spelt', 'silphium_rosinweeds', 'winter_oats',
       'mangelwurzel_fodder_beet', 'winter_durum_hard_wheat',
       'strawberries', 'berries_berry_species', 'walnuts', 'chickpeas',
       'pumpkin_squash_gourd', 'winter_unspecified_cereals', 

In [14]:
# number of samples per class

num_samples_per_class = {class_: len(gdf[gdf['EC_hcat_n'] == class_]) for class_ in crop_classes}

num_samples_per_class = dict(sorted(num_samples_per_class.items(), key=lambda x: x[1], reverse=True))

num_samples_per_class

{'pasture_meadow_grassland_grass': 314671,
 'green_silo_maize': 75863,
 'winter_common_soft_wheat': 64827,
 'winter_barley': 43119,
 'not_known_and_other': 32067,
 'flowers_ornamental_plants': 31132,
 'grain_maize_corn_popcorn': 25188,
 'winter_triticale': 18743,
 'winter_rye': 13833,
 'sugar_beet': 13284,
 'winter_rapeseed_rape': 11240,
 'unmaintained': 11060,
 'potatoes': 10534,
 'orchards_fruits': 9213,
 'fallow_land_not_crop': 7047,
 'clover': 5434,
 'other_arable_land_crops': 4741,
 'summer_oats': 3931,
 'summer_barley': 3647,
 'beans': 3550,
 'peas': 2172,
 'asparagus': 1851,
 'alfalfa_lucerne': 1830,
 'winter_spelt': 1746,
 'nurseries_nursery': 1685,
 'strawberries': 1669,
 'tree_wood_forest': 1619,
 'legumes_dried_pulses_protein_crops': 1573,
 'spring_common_soft_wheat': 1316,
 'winter_durum_hard_wheat': 1164,
 'fresh_vegetables': 1009,
 'carrots_daucus': 973,
 'arable_land_seed_seedlings': 918,
 'alliums': 882,
 'berries_berry_species': 783,
 'miscanthus_silvergrass': 758,
 'r

In [15]:
# import matplotlib.pyplot as plt

# plt.figure(figsize=(5, 25))
# plt.barh(list(num_samples_per_class.keys()), list(num_samples_per_class.values()))
# plt.tight_layout()
# plt.show()

In [16]:
# filter 1000 examples per class

# Each job runs separately, so we need to limit the number of classes and samples per class
# samples = 1000
# num_classes = 8

samples = 10
num_classes = 2

# keep classes with at least 1000 samples
classes = [class_ for class_, count in num_samples_per_class.items() if count >= samples]

# random 8 classes
classes = np.random.choice(classes, num_classes, replace=False)

classes


array(['beans', 'zucchini_courgette'], dtype='<U47')

In [17]:
filtered_gdf = gdf[gdf['EC_hcat_n'].isin(classes)]

filtered_gdf = filtered_gdf.groupby('EC_hcat_n').sample(n=samples, random_state=42)

filtered_gdf.head()

Unnamed: 0,ID,INSPIRE_ID,FLIK,AREA_HA,CODE,CODE_TXT,USE_CODE,USE_TXT,D_PG,CROPDIV,EFA,ELER,WJ,DAT_BEARB,EC_trans_n,EC_hcat_n,EC_hcat_c,geometry
78228,5417349,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0546112289,2.9869,220,Acker-/Puff-/Pferdebohne,EW,Eiweißpflanzen,N,Y,N,Y,2021,2021-09-01,Field / broad / horse bean,beans,3301020100,"POLYGON ((402124.915 5689912.834, 402155.955 5..."
353731,4643636,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0544132364,2.4801,220,Acker-/Puff-/Pferdebohne,EW,Eiweißpflanzen,N,N,N,Y,2021,2021-03-22,Field / broad / horse bean,beans,3301020100,"POLYGON ((425937.256 5713636.381, 425936.302 5..."
240449,5360953,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0543200356,5.1461,220,Acker-/Puff-/Pferdebohne,EW,Eiweißpflanzen,N,Y,N,Y,2021,2021-08-26,Field / broad / horse bean,beans,3301020100,"POLYGON ((504871.095 5720189.902, 504868.826 5..."
559329,5261489,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0538170588,1.749,220,Acker-/Puff-/Pferdebohne,EW,Eiweißpflanzen,N,N,N,Y,2021,2021-05-14,Field / broad / horse bean,beans,3301020100,"POLYGON ((475024.702 5774470.363, 475025.348 5..."
652144,5420930,https://geodaten.nrw.de/id/inspire-lu-ts/exist...,DENWLI0546072940,8.7275,220,Acker-/Puff-/Pferdebohne,EW,Eiweißpflanzen,N,Y,N,Y,2021,2021-09-03,Field / broad / horse bean,beans,3301020100,"POLYGON ((357547.084 5686286.642, 357561.053 5..."


In [18]:
assert len(filtered_gdf) == num_classes * samples

# save to disk
filtered_gdf.to_file('data/filtered_gdf.shp')


TODO: Use STAC/GeoDB to filter the dataset. This will return a STAC catalog with the filtered items, that can be staged with EOTDL.

Note: GeoDB only stores the STAC metadata. For this filtering, we need the actual data (crop type), which is not in the STAC metadata. Hence, we will not be able to do this filtering directly with GeoDB nor with the STAC metadata (even locally).


## 3. Feature Engineering with openEO

Run feature engineering with openEO, creating temporal metrics from a S1 and S2 time series (temporally optimised for crops classe of interest). Store feature engineering process graph with the training datsets in EOTDL

In [1]:
import geopandas as gpd

gdf = gpd.read_file('data/filtered_gdf.shp')

gdf.shape

(20, 18)

## Transform GeoDataFrame for MultiBackendJobManager

This function processes an input GeoDataFrame and prepares it for use with openEO's **MultiBackendJobManager**. The job manager enables launching and tracking multiple openEO jobs simultaneously, which is essential for large-scale data extractions. 

### Note

It is important to note, that for this simple example we have opted to not group the various geometries into feature collections. This utility is only illustrated in the more advanced example. The impact for this choice is that for each polygon, a singly openEO job will need to be launched, leading to a more time and cost extensive extraction workflow.


### Parameters

#### Temporal Parameters:
- **Start Date:** Start of the temporal extent (e.g., `"2020-01-01"`).  
- **Number of Months:** Duration of the temporal extent in months.

In [2]:
from dataframe_utils import *

# Constants
start_date = "2020-01-01"
nb_months = 3

job_df = process_geodataframe(gdf, start_date, nb_months)

job_df

Unnamed: 0,geometry,temporal_extent
0,"POLYGON ((7.59447 51.35205, 7.59491 51.35223, ...","[2020-01-01, 2020-04-01]"
1,"POLYGON ((7.93138 51.56893, 7.93137 51.56893, ...","[2020-01-01, 2020-04-01]"
2,"POLYGON ((9.07038 51.6327, 9.07035 51.6327, 9....","[2020-01-01, 2020-04-01]"
3,"POLYGON ((8.63522 52.1202, 8.63522 52.1206, 8....","[2020-01-01, 2020-04-01]"
4,"POLYGON ((6.95613 51.31004, 6.95633 51.31003, ...","[2020-01-01, 2020-04-01]"
5,"POLYGON ((8.0983 51.89359, 8.09937 51.89486, 8...","[2020-01-01, 2020-04-01]"
6,"POLYGON ((8.58446 52.28925, 8.58584 52.28774, ...","[2020-01-01, 2020-04-01]"
7,"POLYGON ((8.94611 51.96644, 8.94646 51.96644, ...","[2020-01-01, 2020-04-01]"
8,"POLYGON ((8.81338 51.53074, 8.81344 51.53074, ...","[2020-01-01, 2020-04-01]"
9,"POLYGON ((8.85684 52.05215, 8.8568 52.05217, 8...","[2020-01-01, 2020-04-01]"


In [11]:
job_df.info()

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 20 entries, 0 to 19
Data columns (total 2 columns):
 #   Column           Non-Null Count  Dtype   
---  ------           --------------  -----   
 0   geometry         20 non-null     geometry
 1   temporal_extent  20 non-null     object  
dtypes: geometry(1), object(1)
memory usage: 452.0+ bytes


## Start Job with Standardized UDPs and Feature Collection Filtering

This function initializes an openEO batch job using standardized **User-Defined Processes (UDPs)** for Sentinel-1 and Sentinel-2 data processing. It employs a spatial aggregation in order to get a time series per polygon.

### Key Features

1. **Use of Standardized UDPs**  
   - **S1 Weekly Statistics:** Computes weekly statistics from Sentinel-1 data.  
   - **S2 Weekly Statistics:** Computes weekly statistics from Sentinel-2 data.  
   - UDPs are defined in external JSON files.

2. **Spatial aggregation across polygons**  
   - an average is calculated for each individual polygon

3. **Cube Merging**  
   - Merges Sentinel-1 and Sentinel-2 datacubes for combined analysis.

4. **Job Configuration**  
   - Outputs results in **parquet** format with filenames derived

In [12]:
import openeo
from s3proxy_utils import upload_geoparquet_file

def start_job(row: pd.Series, connection: openeo.Connection, **kwargs) -> openeo.BatchJob:

	# for some reason, the temporal_extent is a string, so we need to convert it to a list
        temporal_extent = eval(row["temporal_extent"]) if isinstance(row["temporal_extent"], str) else row["temporal_extent"]

        # set up load url in order to allow non-latlon feature collections for spatial filtering
        geometry = row["geometry"]

        #run the s1 and s2 udp
        s1 = connection.datacube_from_process(
                "s1_weekly_statistics",
                namespace="https://raw.githubusercontent.com/earthpulse/eotdl/refs/heads/hv_openeoexample/tutorials/notebooks/openeo/s1_weekly_statistics.json",
                temporal_extent=temporal_extent,
        )
        
        s2 = connection.datacube_from_process(
		"s2_weekly_statistics",
		namespace="https://raw.githubusercontent.com/earthpulse/eotdl/refs/heads/hv_openeoexample/tutorials/notebooks/openeo/s2_weekly_statistics.json",
		temporal_extent=temporal_extent,
        )
        
        #merge both cubes and filter across the feature collection
        merged = s2.merge_cubes(s1)
        result = merged.aggregate_spatial(geometries = geometry, reducer = "mean")
        
        #dedicated job settings to save the individual features within a collection seperately
        job = result.create_job(
                out_format="parquet",
        )

        return job

In [4]:
# row = job_df.iloc[0]
# connection = openeo.connect(url="openeo.dataspace.copernicus.eu").authenticate_oidc()

# job = start_job(row, connection)

# job.start_and_wait()

### Submit Extraction Jobs

Using the openEO backend, we authenticate and submit the jobs to process the EO data. 
Each job extracts Sentinel 1 and Sentinel 2 training features.

In [13]:
!rm -rf jobs.csv

In [14]:
import openeo
from openeo.extra.job_management import MultiBackendJobManager, ParquetJobDatabase, CsvJobDatabase

# Authenticate and add the backend

# job_tracker = 'jobs.parquet'
job_tracker = 'jobs.csv'

# initialize the job manager
manager = MultiBackendJobManager()
connection = openeo.connect(url="openeo.dataspace.copernicus.eu").authenticate_oidc()
manager.add_backend("cdse", connection=connection, parallel_jobs=2)

job_db = CsvJobDatabase(path=job_tracker)
# job_db = ParquetJobDatabase(path=job_tracker)

# if not job_db.exists():
#     df = manager._normalize_df(job_df)
#     job_db.persist(df)
    
job_db.initialize_from_df(job_df)

manager.run_jobs(start_job=start_job, job_db=job_tracker)


Authenticated using refresh token.


KeyboardInterrupt: 

In [7]:
openeo.__version__

'0.37.0'

In [7]:
gpd.__version__

'1.0.1'

TODO: fix error, ingest resulting parquet files to EOTDL (features as a new dataset), save process graph (json) as a reusable Feature recipe (should be used for inference).

## 4. Train a model with EOTDL

TODO:
- Load parquet files with features (staged from EOTDL)
- Split train/test
- Train model
- Evaluate model
- Export model (ONNX)
- Ingest model to EOTDL

## 5. Run inference with EOTDL

TODO:
- Stage model from EOTDL
- Stage feature recipe from EOTDL
- Generate nuew subset of parcels
- Compute features with reusable feature recipe
- Run inference with model
- Explore results