# Data Extractions from OpenEO

To run the extractions, you need an account in the [Copernicus Data Space Ecosystem (CDSE)](https://dataspace.copernicus.eu/).

In [1]:
!pip install git+https://github.com/WorldCereal/prometheo.git@scaleag_augmentations --quiet
!pip install git+https://github.com/ScaleAGData/scaleag-vito.git@prometheo-integration --quiet

In [None]:
from loguru import logger
import geopandas as gpd
from pathlib import Path
from scaleagdata_vito.openeo.extract_sample_scaleag import (
    generate_input_for_extractions,
    extract
)
from scaleagdata_vito.presto.presto_df import load_dataset
from scaleagdata_vito.presto.utils import evaluate_finetuned_model
from prometheo.datasets.scaleag import ScaleAgDataset
from prometheo import finetune
from prometheo.finetune import Hyperparams
from prometheo.models.presto import param_groups_lrd
from prometheo.models.presto.wrapper import PretrainedPrestoWrapper, load_pretrained
from torch import nn
from torch.optim import AdamW, lr_scheduler
from torch.utils.data import DataLoader
from scaleagdata_vito.presto.utils import evaluate_downstream_model, get_encodings
import catboost as cb

  from .autonotebook import tqdm as notebook_tqdm


### Assess data correctness before launching the OpenEO jobs 
You can run some checks on your input file to make sure they are suitable to run the extractions successfully. In particular, it is important to check the validity of the geometries and, ideally, to also have a column containing a unique id for each sample.

In case of invalid geometries, you will be provided with both the dataframe with the failing polygons to be fixed and the one with valid geometries.


In [25]:
def check_unique_id(df_path, unique_id):
    df = gpd.read_file(df_path)
    if df[unique_id].nunique() != df.shape[0]:
        logger.info("IDs are not unique!")
        return df[df[unique_id].duplicated(keep=False)]
    else:
        logger.info("IDs are unique")
        return None

def check_valid_geometry(df):
    if isinstance(df, str):
        df = gpd.read_file(df)
    df_invalid = df[~df.geometry.is_valid]
    # Assessing wheather some invalid geometries are present
    if len(df_invalid) > 0:
        # 1) some invalid geometries are present. Attempt fixing them
        df['geometry'] = df.geometry.buffer(0)
        df_invalid = df[~df.geometry.is_valid]
        df_valid = df[df.geometry.is_valid]
        if len(df_invalid) > 0:
            # 2) Still some invalid geometries are present. Return them
            logger.info("Invalid geometries found! Returning invalid geometries")
            return df_invalid, df_valid
        else:
            # All geometries are now valid. Return fixed dataframe and empty dataframe for invalid geometries
            logger.info("Fixed some invalid geometries. All geometries are now valid")
            return gpd.GeoDataFrame(), df
    else:
        # All geometries are valid. Return empty dataframe for invalid geometries
        logger.info("All geometries are valid")
        return gpd.GeoDataFrame(), df

def _save(save_to, original_file_path, df, suffix=''):
    if suffix!='':
        filename = Path(save_to) / f"{Path(original_file_path).stem}_{suffix}.geojson"
    else:
        filename = Path(save_to) / f"{Path(original_file_path).stem}.geojson"
    logger.info(f"Saving invalid geometries to {filename}")
    Path(save_to).mkdir(parents=True, exist_ok=True)
    df.to_file(filename)

In [45]:
input_file = "/home/giorgia/Private/data/geomaize/correct/Maize_North_Ghana_valid.geojson"
invalid_geom, valid_geom = check_valid_geometry(input_file)
non_unique_ids = check_unique_id(input_file, unique_id="Field_id")

[32m2025-03-17 10:19:05[0m | [1mINFO    [0m | [36m__main__[0m - [1mAll geometries are valid[0m


[32m2025-03-17 10:19:05[0m | [1mINFO    [0m | [36m__main__[0m - [1mIDs are unique[0m


In [46]:
# save files after geometry validity check. If invalid geometries are present, save them to a separate file
if len(invalid_geom) > 0:
    _save(
        save_to="/home/giorgia/Private/data/geomaize/invalid/",
        original_file_path=input_file,
        df=invalid_geom,
        suffix='',
    )

# save valid geometries to a separate file
_save(
    save_to="/home/giorgia/Private/data/geomaize/correct/",
    original_file_path=input_file,
    df=valid_geom,
    suffix='',
)

[32m2025-03-17 10:19:18[0m | [1mINFO    [0m | [36m__main__[0m - [1mSaving invalid geometries to /home/giorgia/Private/data/geomaize/correct/Maize_North_Ghana_valid.geojson[0m


#### Provide job instructions and start OpenEO extractions

1) Indicate the following fields in order to guide the extraction
2) In the cell below you will be asked for authentication and be provided with a link. click on the link and login with your CDSE credentials.  
3) Once the extraction process will be over, you will find your extracted dataset in the output folder you indicated. You can load it by running `load_dataset` as shown below

    ```python
    job_params = dict(
        output_folder=..., # where to save the extracted dataset
        input_df=..., # input georeferenced dataset to run the extractions for 
        start_date=..., # string indicating from which date to extract data  
        end_date=..., # string indicating until which date to extract the data 
        unique_id_column=..., # name of the column in the input_df containing the unique ID of the samples  
        composite_window=..., # "month" or "dekad" are supported. Default is "dekad"
    )
    ```

In [48]:
# Dataset Parameters
task_type = "regression"
start_date="2021-07-01"
end_date="2021-10-31"
composite_window="dekad"
unique_id_column="Field_id"
input_df="/home/giorgia/Private/data/geomaize/correct/Maize_North_Ghana_valid.geojson"
output_folder="/home/giorgia/Private/data/geomaize/extractions_North_Ghana/"

job_params = dict(
    output_folder=output_folder,
    input_df=input_df,
    start_date=start_date,
    end_date=end_date,
    unique_id_column=unique_id_column,
    composite_window=composite_window,
)
extract(generate_input_for_extractions(job_params))

2025-03-17 10:19:58,193|extraction_pipeline|INFO:  Loading input dataframe from /home/giorgia/Private/data/geomaize/correct/Maize_North_Ghana_valid.geojson.
2025-03-17 10:19:58,228|extraction_pipeline|INFO:  Preparing the job dataframe.
2025-03-17 10:19:58,229|extraction_pipeline|INFO:  Performing splitting by s2 grid...

  polygons["centroid"] = polygons.geometry.centroid
2025-03-17 10:19:58,460|extraction_pipeline|INFO:  Dataframes split to jobs, creating the job dataframe...

  job["lat"] = job.geometry.centroid.y

  job["lon"] = job.geometry.centroid.x

  job["lat"] = job.geometry.centroid.y

  job["lon"] = job.geometry.centroid.x

  job["lat"] = job.geometry.centroid.y

  job["lon"] = job.geometry.centroid.x

  job["lat"] = job.geometry.centroid.y

  job["lon"] = job.geometry.centroid.x

  job["lat"] = job.geometry.centroid.y

  job["lon"] = job.geometry.centroid.x
100%|██████████| 5/5 [00:00<00:00, 24.24it/s]
2025-03-17 10:19:58,671|extraction_pipeline|INFO:  Job dataframe create

Authenticated using refresh token.


  return DataCube.load_collection(
2025-03-17 10:20:04,802 - openeo_gfmap.utils - INFO - Selected orbit state: ASCENDING. Reason: Only orbit fully covering the requested area.
  return DataCube.load_collection(
2025-03-17 10:20:31,820 - openeo_gfmap.utils - INFO - Selected orbit state: ASCENDING. Reason: Only orbit fully covering the requested area.
  return DataCube.load_collection(
2025-03-17 10:23:05,974 - openeo_gfmap.utils - INFO - Selected orbit state: ASCENDING. Reason: Only orbit fully covering the requested area.
2025-03-17 10:25:36,885|openeo_gfmap.manager|INFO:  Parsed item timeseries.parquet from job j-25031709203540c18f5f1da581de617c
2025-03-17 10:25:36,945|openeo_gfmap.manager|INFO:  Adding 1 items to the STAC collection...
2025-03-17 10:25:36,946|openeo_gfmap.manager|INFO:  Job j-25031709203540c18f5f1da581de617c and post job action finished successfully.
  return DataCube.load_collection(
2025-03-17 10:25:43,519 - openeo_gfmap.utils - INFO - Selected orbit state: ASCENDI

In [29]:
train_path = "/home/giorgia/Private/data/geomaize/extractions_2021"
valid_path = "/home/giorgia/Private/data/geomaize/extractions_2022"
test_path = "/home/giorgia/Private/data/geomaize/extractions_2023"

train_df = load_dataset(
    train_path,
    composite_window=composite_window
    )

val_df = load_dataset(
    valid_path,
    composite_window=composite_window
    )

test_df = load_dataset(
    test_path,
    composite_window=composite_window
    )

100%|██████████| 1/1 [00:00<00:00, 18.09it/s]
100%|██████████| 1/1 [00:00<00:00, 20.43it/s]
100%|██████████| 1/1 [00:00<00:00, 18.61it/s]


In [30]:

train_ds = ScaleAgDataset(
    dataframe=train_df,
    num_timesteps=train_df.available_timesteps.max(),
    task_type=task_type,
    target_name="Yield kg/H",
    compositing_window=composite_window,
)

val_ds = ScaleAgDataset(
    dataframe=val_df,
    num_timesteps=train_df.available_timesteps.max(),
    task_type=task_type,
    target_name="Kg/ha",
    compositing_window=composite_window,
)

test_ds = ScaleAgDataset(
    dataframe=test_df,
    num_timesteps=train_df.available_timesteps.max(),
    task_type=task_type,
    target_name="Kg/ha",
    compositing_window=composite_window,
)


[32m2025-03-17 10:01:58[0m | [1mINFO    [0m | [36mprometheo.datasets.scaleag[0m - [1mSetting number of outputs to 1 for regression task.[0m
[32m2025-03-17 10:01:58[0m | [1mINFO    [0m | [36mprometheo.datasets.scaleag[0m - [1mSetting number of outputs to 1 for regression task.[0m
[32m2025-03-17 10:01:58[0m | [1mINFO    [0m | [36mprometheo.datasets.scaleag[0m - [1mSetting number of outputs to 1 for regression task.[0m


In [None]:
# Fine Tuning Hyperparameters for
lr = 1e-4
batch_size = 32
epochs = 50
num_workers = 2
patience = 10
pretrained_model_path = "https://artifactory.vgt.vito.be/artifactory/auxdata-public/scaleagdata/models/presto-ss-wc_10D.pt"
output_dir = Path("/home/giorgia/Private/data/geomaize/models/")
experiment_name = "presto-ss-wc-10D-ft-dek_geomaize"

In [None]:
# Construct the model with finetuning head
model = PretrainedPrestoWrapper(
    num_outputs=train_ds.num_outputs,
    regression=True if task_type == "regression" else False,
)
model = load_pretrained(model, pretrained_model_path, strict=False)

# Reduce epochs for testing purposes
hyperparams = Hyperparams(max_epochs=epochs, batch_size=batch_size, patience=patience, num_workers=num_workers, lr=lr)


# set loss depending on the task type
if task_type == "regression":
    loss_fn = nn.MSELoss()
elif task_type == "binary":
    loss_fn = nn.BCEWithLogitsLoss()
else:
    loss_fn = nn.CrossEntropyLoss()

parameters = param_groups_lrd(model)
optimizer = AdamW(parameters, lr=hyperparams.lr)
scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=0.99)

finetuned_model = finetune.run_finetuning(
            model,
            train_ds,
            val_ds,
            experiment_name=experiment_name,
            output_dir=output_dir,
            loss_fn=loss_fn,
            hyperparams=hyperparams,
        )

evaluate_finetuned_model(
    finetuned_model=finetuned_model,
    test_ds=test_ds,
    num_workers=num_workers,
    batch_size=batch_size,
)

[32m2025-03-17 10:02:13[0m | [1mINFO    [0m | [36mprometheo.utils[0m - [1mLogging setup complete. Logging to: /home/giorgia/Private/data/geomaize/models/logs/presto-ss-wc-10D-ft-dek_geomaize.log and console.[0m
[32m2025-03-17 10:02:13[0m | [1mINFO    [0m | [36mprometheo.finetune[0m - [1mUsing output dir: /data/users/Private/giorgia/data/geomaize/models[0m


Train metric: 0.048, Val metric: 0.131, Best Val Loss: 0.030 (no improvement for 9 epochs):  28%|██▊       | 14/50 [00:10<00:27,  1.29it/s]

[32m2025-03-17 10:02:24[0m | [1mINFO    [0m | [36mprometheo.finetune[0m - [1mEarly stopping![0m


Train metric: 0.048, Val metric: 0.131, Best Val Loss: 0.030 (no improvement for 9 epochs):  28%|██▊       | 14/50 [00:11<00:29,  1.23it/s]

[32m2025-03-17 10:02:24[0m | [1mINFO    [0m | [36mprometheo.finetune[0m - [1mFinetuning done[0m
[32m2025-03-17 10:02:24[0m | [1mINFO    [0m | [36mscaleagdata_vito.presto.utils[0m - [1mEvaluating the finetuned model on regression task[0m





{'RMSE': 1866.951171875,
 'R2_score': -0.15574908256530762,
 'explained_var_score': 0.041095733642578125,
 'MAPE': 0.46740350127220154}

In [None]:


cbm = cb.CatBoostRegressor(
    random_state=3,
    logging_level="Silent",
    loss_function="RMSE",
)
logger.info("Computing Presto encodings")
train_dl = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=2)
train_encodings, train_targets = get_encodings(train_dl, finetuned_model)
logger.info("Fitting Catboost model on Presto encodings")
train_dataset = cb.Pool(train_encodings, train_targets)
cbm.fit(train_dataset)

evaluate_downstream_model(finetuned_model, cbm, test_ds, num_workers=num_workers, batch_size=batch_size)

[32m2025-03-17 10:07:38[0m | [1mINFO    [0m | [36m__main__[0m - [1mComputing Presto encodings[0m
[32m2025-03-17 10:07:38[0m | [1mINFO    [0m | [36m__main__[0m - [1mFitting Catboost model on Presto encodings[0m
[32m2025-03-17 10:07:40[0m | [1mINFO    [0m | [36mscaleagdata_vito.presto.utils[0m - [1mEvaluating the finetuned model on regression task[0m


{'RMSE': 1786.345340148278,
 'R2_score': -0.058104321557600036,
 'explained_var_score': 0.08963859747725222,
 'MAPE': 1.053063968797406}

In [42]:
from scaleagdata_vito.presto.presto_utils_demo import revert_to_original_units
from scaleagdata_vito.demo.utils import prepare_data_for_cb
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score, explained_variance_score, mean_absolute_percentage_error

raw_cbm = cb.CatBoostRegressor(
    random_state=3,
    logging_level="Silent",
    loss_function="RMSE",
)

train_x, train_y = prepare_data_for_cb(
    train_df,
    "Yield kg/H",
    num_time_steps=train_df.available_timesteps.max(),
)
val_x, val_y = prepare_data_for_cb(
    val_df, 
    "Kg/ha",
    num_time_steps=train_df.available_timesteps.max(),
)
test_x, test_y = prepare_data_for_cb(
    val_df, 
    "Kg/ha",
    num_time_steps=train_df.available_timesteps.max(),
)

train_pool = cb.Pool(train_x, train_y)
raw_cbm.fit(train_pool, eval_set=cb.Pool(val_x, val_y))

preds = raw_cbm.predict(test_x)
targets = revert_to_original_units(
    test_y, upper_bound=test_ds.upper_bound, lower_bound=test_ds.lower_bound
)
preds = revert_to_original_units(
    preds, upper_bound=test_ds.upper_bound, lower_bound=test_ds.lower_bound
)
print({
    "RMSE": float(np.sqrt(mean_squared_error(targets, preds))),
    "R2_score": float(r2_score(targets, preds)),
    "explained_var_score": float(explained_variance_score(targets, preds)),
    "MAPE": float(mean_absolute_percentage_error(targets, preds)),
})


{'RMSE': 53755341.12740031, 'R2_score': -0.04463906772080262, 'explained_var_score': 0.0005328617477075026, 'MAPE': 0.9037415384582995}
