# Land Cover Mapping Feature Extraction

This notebook serves as an example of how to use GFMap and openEO to extract point features for training a machine learning model to do Land Cover Mapping. 

The example uses the following steps:
- Load the labelled points and ditribute them into spatial hexagons.
- Define the pre-processing steps for extracting the features from Sentinel-1 and Sentinel-2 data.
- Set-up the Sentinel-1 and Sentinel-2 fetchers with GFMap and launch the openEO jobs to fetch the data.
- Combine the results from all the batch jobs into one dataframe.
- Train a random forrest classifier using the extracted features.

In [1]:
import openeo

import geopandas as gpd
import pandas as pd
import geojson
from pathlib import Path
import datetime
from typing import List
import logging

from openeo_gfmap.manager import _log
from openeo_gfmap import TemporalContext, Backend, BackendContext, FetchType
from openeo_gfmap.manager.job_splitters import split_job_hex
from openeo_gfmap.manager.job_manager import GFMAPJobManager
from openeo_gfmap.manager import _log
from openeo_gfmap.backend import cdse_connection, vito_connection
from openeo_gfmap.fetching import build_sentinel2_l2a_extractor, build_sentinel1_grd_extractor

In [2]:
_log.setLevel(logging.INFO)

stream_handler = logging.StreamHandler()
_log.addHandler(stream_handler)

formatter = logging.Formatter('%(asctime)s|%(name)s|%(levelname)s:  %(message)s')
stream_handler.setFormatter(formatter)

# Exclude the other loggers from other libraries
class MyLoggerFilter(logging.Filter):
    def filter(self, record):
        return record.name == _log.name

stream_handler.addFilter(MyLoggerFilter())

## Distribute labelled points

First, we load in a dataset with target labels. In order for the model to work, the target labels need to be integers. Also, we extract some target points from the target polygons.

In [3]:
resource_folder = Path("resources")
YEAR = 2018

input_gpkg = gpd.GeoDataFrame()
for file in resource_folder.glob("*.gpkg"):
    print("Digesting", file)
    input_gpkg = pd.concat([input_gpkg, gpd.read_file(file)], ignore_index=True, sort=False, copy=False)

input_gpkg["geometry"] = input_gpkg["geometry"].apply(lambda x: x.centroid)
input_gpkg = input_gpkg[['UID',"CODE_1_18", "CODE_2_18", "CODE_3_18", "CODE_4_18",'geometry', 'REF_TYPE']]
input_gpkg

Digesting resources\AT3304000_LC_REF.gpkg
Digesting resources\BE34057C0_LC_REF.gpkg
Digesting resources\BG0000151_LC_REF.gpkg
Digesting resources\CZ0314123_LC_REF.gpkg
Digesting resources\ES0000022_LC_REF.gpkg
Digesting resources\ES4110114_LC_REF.gpkg
Digesting resources\ES6110005_LC_REF.gpkg
Digesting resources\FR2400534_LC_REF.gpkg
Digesting resources\FR2500088_LC_REF.gpkg
Digesting resources\HUBN20034_LC_REF.gpkg
Digesting resources\HUHN20002_LC_REF.gpkg
Digesting resources\LV0536600_LC_REF.gpkg
Digesting resources\ROSCI0227_LC_REF.gpkg
Digesting resources\SE0540063_LC_REF.gpkg
Digesting resources\SKUEV0307_LC_REF.gpkg


Unnamed: 0,UID,CODE_1_18,CODE_2_18,CODE_3_18,CODE_4_18,geometry,REF_TYPE
0,004_1154542,1,12,121,1210,POINT (4441479.644 2719309.820),NONE
1,004_1159640,3,32,321,3210,POINT (4455579.832 2723493.083),TRAIN
2,004_1161222,8,81,811,8110,POINT (4451985.264 2723915.898),TRAIN
3,004_1161915,8,81,811,8110,POINT (4428891.549 2721245.698),TRAIN
4,004_1162261,1,12,122,1220,POINT (4409755.801 2720635.752),NONE
...,...,...,...,...,...,...,...
86489,004_377885,3,31,311,3110,POINT (5031171.347 2932261.964),VALI
86490,004_377894,4,41,410,4100,POINT (5032675.904 2932251.885),TRAIN
86491,004_377895,2,21,211,2110,POINT (5031120.057 2932245.202),VALI
86492,004_377898,4,41,410,4100,POINT (5036328.995 2932249.014),VALI


To extract the target point features, we use GFMap to distribute the target points over multiple hexagons. Each hexagon extraction will be performed in a separate openeo job. 
Splitting up jobs is necessary because processing a large area in one job would cause memory issues.

We use `split_job_hex` for distributing the target points over multiple hexagons.

In [4]:
input_split = split_job_hex(input_gpkg, max_points=50, grid_resolution=4)

We then create a dataframe where each row represents a single hexagon, and thus batch_job.

In [5]:
def create_job_dataframe(split_jobs: List[gpd.GeoDataFrame]) -> pd.DataFrame:
    """Create a dataframe from the split jobs, containg all the necessary information to run the job."""
    rows = []
    for job in split_jobs:
        start_date = datetime.datetime(YEAR, 1, 1)
        end_date = datetime.datetime(YEAR, 12, 31)
        rows.append(pd.Series({
            'out_prefix': 'S1S2-stats',
            'out_extension': '.csv',
            'start_date': start_date,
            'end_date': end_date,
            'geometry': job.to_json()
        }))
    return pd.DataFrame(rows)

job_df = create_job_dataframe(input_split)

In [6]:
job_df = job_df.head(1) # testing: only run one job for now

## Define feature extraction

Next, we will define wich features we want to extract from openeo.

First we define the process graph, except the actual loading of a collection. This will be done by using the GFMap specific methods.

The preprocessing is contained in a [seperate .py file](./features.py) so we can use it for inference later on.

In [7]:
from features import preprocess_features

## Fetching the data

### Set-up the Sentinel-1 and Sentinel-2 fetchers

Next we use the extractor methods of GFMap to load the collection. Using these methods allows the backend independant loading of collections (e.g. wether or not we still have to calculate the backscatter on S1 data or not).

The loaded collections are pre-processed and then aggregated for the target points.

In [8]:
def sentinel2_collection(
        row : pd.Series,
        connection: openeo.DataCube,
        geometry: geojson.FeatureCollection
    )-> openeo.DataCube:
    bands = ["B02", "B03", "B04", "B05", "B06", "B07", "B08", "B11", "B12", "SCL"]
    bands_with_platform = ["S2-L2A-" + band for band in bands]

    extraction_parameters = {
        "load_collection": {
            "eo:cloud_cover": lambda val: val <= 80.0,
        },
    }

    extractor = build_sentinel2_l2a_extractor(
        backend_context=BackendContext(Backend(row.backend_name)),
        bands=bands_with_platform,
        fetch_type=FetchType.POINT,
        **extraction_parameters
    )

    temporal_context = TemporalContext(row.start_date, row.end_date)

    s2 = extractor.get_cube(connection, geometry, temporal_context)
    s2 = s2.rename_labels("bands", bands)
    return s2

def sentinel1_collection(
        row: pd.Series,
        connection : openeo.DataCube,
        geometry: geojson.FeatureCollection,
    )-> openeo.DataCube:
    bands = ["VH", "VV"]
    bands_with_platform = ["S1-SIGMA0-" + band for band in bands]

    extractor = build_sentinel1_grd_extractor(
        backend_context=BackendContext(Backend(row.backend_name)),
        bands=bands_with_platform,
        fetch_type=FetchType.POINT,
    )

    temporal_context = TemporalContext(row.start_date, row.end_date)

    s1 = extractor.get_cube(connection, geometry, temporal_context)
    s1 = s1.rename_labels("bands", bands)
    return s1

def load_lc_features(
    row: pd.Series,
    connection : openeo.DataCube,
    **kwargs
):
    geometry = geojson.loads(row.geometry)
    
    s2_collection = sentinel2_collection(
        row=row,
        connection=connection,
        geometry=geometry
    )

    s1_collection = sentinel1_collection(
        row=row,
        connection=connection,
        geometry=geometry
    )

    features = preprocess_features(s2_collection, s1_collection)

    # Currently, aggregate_spatial and vectorcubes do not keep the band names, so we'll need to rename them later on
    global final_band_names
    final_band_names = [b.name for b in features.metadata.band_dimension.bands]

    features = features.aggregate_spatial(geometry, reducer="median")
    
    job_options = {
        "executor-memory": "3G", # Increase this value if a job fails due to memory issues
        "executor-memoryOverhead": "2G",
        "soft-errors": True
    }

    return features.create_job(
        out_format="csv",
        title=f"GFMAP_Extraction_{geometry.features[0].properties['h3index']}",
        job_options=job_options,
    )

# Global variable to store the final band names
final_band_names = None

### Launch the openEO jobs to fetch the data

In order to launch the jobs, we have to define a function that fill determine the outputfile name and create the job manager.

In [9]:
def generate_output_path(
    root_folder: Path,
    geometry_index: int,
    row: pd.Series
) -> Path:
    features = geojson.loads(row.geometry)
    h3index = features[geometry_index].properties['h3index']
    result = root_folder / f"{row.out_prefix}_{h3index}_{geometry_index}{row.out_extension}"
    print("output_path:", result)
    return result

In [10]:
base_output_path = Path("output")
base_output_path.mkdir(exist_ok=True)

timenow = datetime.datetime.now()
timestr = timenow.strftime("%Y%m%d-%Hh%M")
print(f"Timestr: {timestr}")
tracking_file = base_output_path / f"tracking_{timestr}.csv"

Timestr: 20240322-16h54


In [11]:
manager = GFMAPJobManager(
    output_dir=base_output_path / timestr,
    output_path_generator=generate_output_path,
    poll_sleep=60,
    n_threads=1,
    collection_id="LC_feature_extraction",
)

In [12]:
manager.add_backend(Backend.CDSE, cdse_connection, parallel_jobs=2)

We then run the prepared jobs.

In [13]:
manager.run_jobs(
    job_df,
    load_lc_features,
    tracking_file
)

2024-03-22 16:54:52,958|openeo_gfmap.manager|INFO:  Starting job manager using 1 worker threads.
2024-03-22 16:54:52,961|openeo_gfmap.manager|INFO:  Workers started, creating and running jobs.


Authenticated using refresh token.
DataCube(<PGNode 'dimension_labels' at 0x2252402c8c0>)
DataCube(<PGNode 'dimension_labels' at 0x2251fe48140>)


2024-03-22 17:14:33,102|openeo_gfmap.manager|INFO:  Job j-24032263ac114973a8df2544441484e4 finished successfully, queueing on_job_done...


output_path: output\20240322-16h54\S1S2-stats_8408b09ffffffff_0.csv


2024-03-22 17:14:36,961|openeo_gfmap.manager|INFO:  Added 0 items to the STAC collection.
2024-03-22 17:14:36,963|openeo_gfmap.manager|INFO:  Job j-24032263ac114973a8df2544441484e4 and post job action finished successfully.


## Combine the results

We combine all the different extractions into one dataframe to train and test the model.

In [14]:
## Run these lines to post-process older results
timestr = "20240318-20h30"
tracking_file = base_output_path / f"tracking_{timestr}.csv"

In [15]:

tracker_df = pd.read_csv(tracking_file)
df = pd.DataFrame(columns = final_band_names + ["CODE_1_18", "CODE_2_18", "CODE_3_18", "CODE_4_18", 'geometry'])

for index, row in tracker_df.iterrows():
    if row.status == "finished":
        try:
            # Get the target and geometry from the input
            geometry = gpd.read_file(row.geometry)
            geometry['id'] = geometry['id'].astype(int)
            h3index = geometry.iloc[0]['h3index']
            filename = f"S1S2-stats_{h3index}_0.csv"
            target_df = geometry[['id', "CODE_1_18", "CODE_2_18", "CODE_3_18", "CODE_4_18", 'geometry']]

            # Read the stats
            stats_df = pd.read_csv(base_output_path/timestr/filename)
            stats_df.columns = ['id'] + final_band_names

            # Merge the target and geometry with the stats
            stats_df = stats_df.merge(target_df, how='left', on='id')
            stats_df = stats_df.drop(columns=['id'])

            # Append to the dataframe
            df = pd.concat([df, stats_df])
        except FileNotFoundError as e:
            print(f"File not found: {filename}")
            pass

  df = pd.concat([df, stats_df])


Here we filter out features that contain NaN values. These often correspond to the months January and December.

In [16]:
## drop NA columns
# nan_columns = df.columns[df.isna().any()].tolist()
# print(f"Dropping columns containing NaN: {nan_columns}")
# df.drop(nan_columns, axis=1, inplace=True)

In [17]:
df.to_csv(base_output_path / timestr / "features.csv", index=False)

## Training and saving a random forrest model
The Following is just an example of local training a random forrest.

The model is converted to an ONNX model and saved. ONNX is a format to store machine learning models in a standardized way. This allows us to use the model in other applications, such as the openEO backend.

In [18]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
from skl2onnx import to_onnx

In [19]:
df = pd.read_csv(base_output_path / timestr / "features.csv")

In [20]:
X = df.drop(columns=["CODE_1_18", "CODE_2_18", "CODE_3_18", "CODE_4_18", 'geometry'])
X = X.astype(np.float32) # convert to float32 to allow ONNX conversion later on
y = df['CODE_1_18'].astype(int)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [21]:

rf = RandomForestClassifier(n_estimators=100, max_features=y.unique().size, random_state=42)
rf = rf.fit(X_train, y_train)

y_pred = rf.predict(X_test)
print("Accuracy on test set: "+str(accuracy_score(y_test,y_pred))[0:5])

Accuracy on test set: 0.733


In [22]:
model_output_path = base_output_path / "models"
model_output_path.mkdir(exist_ok=True)

onnx = to_onnx(model=rf, name="random_forest", X=X_train.values)

with open(base_output_path / "models" / "random_forest.onnx", "wb") as f:
    f.write(onnx.SerializeToString())

See the [inference notebook](./lc_inference.ipynb) for an example of how to use the model for inference.