## **Import Dependencies**

In [1]:
from google.colab import drive
from google.colab import files
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%pip install retry

Collecting retry
  Downloading retry-0.9.2-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting py<2.0.0,>=1.4.26 (from retry)
  Downloading py-1.11.0-py2.py3-none-any.whl.metadata (2.8 kB)
Downloading retry-0.9.2-py2.py3-none-any.whl (8.0 kB)
Downloading py-1.11.0-py2.py3-none-any.whl (98 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/98.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.7/98.7 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: py, retry
Successfully installed py-1.11.0 retry-0.9.2


In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import geopandas as gpd
import ee
import geemap

from tqdm.auto import tqdm
import os
from glob import glob
import json

import logging
import multiprocessing
import requests
import shutil
from retry import retry

import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.family'] = 'DeJavu Serif'
plt.rcParams['font.serif'] = 'Times New Roman'

In [4]:
# Change the default working directory (optional)
os.chdir('/content/drive/MyDrive/GITHUB/MSM-Research/Landscape-Classification')
print('Current working directory:', os.getcwd())

Current working directory: /content/drive/MyDrive/GITHUB/MSM-Research/Landscape-Classification


## **Initialize an EE Map Object**

In [5]:
# ee.Authenticate()
ee.Initialize(
    opt_url="https://earthengine-highvolume.googleapis.com",
    project='ee-geonextgis'
)

In [6]:
Map = geemap.Map(basemap="Esri.WorldImagery")
Map

Map(center=[0, 0], controls=(WidgetControl(options=['position', 'transparent_bg'], widget=SearchDataGUI(childr…

## **Import World Administrative Boundary Layer**

In [7]:
# Import World Administrative Layer
world = ee.FeatureCollection('projects/ee-geonextgis/assets/world_administrative_boundaries')

# Filter the countries belonging to Europe
europe = world.filter(ee.Filter.eq('continent', 'Europe'))
europe_country_names = europe.aggregate_array('name').getInfo()

# Specify the countries to remove
countries_to_remove = {'Russian Federation', 'Belarus', 'Ukraine', 'Moldova, Republic of', 'Svalbard and Jan Mayen Islands'}
europe_country_names = list(set(europe_country_names).difference(countries_to_remove))
europe_country_names.append('Turkey')
europe = world.filter(ee.Filter.inList('name', europe_country_names))

style = {'fillColor': '00000000', 'color': 'red', 'width': 1}
Map.addLayer(europe.style(**style), {}, 'Europe')
Map.centerObject(europe, 5)

## **Calculate the Area and Number of Samples per Country**

In [31]:
# Calculate the area for each country
area = europe.map(lambda f: ee.Feature(None, {'country': f.get('name'), 'area': f.area()}))
area = geemap.ee_to_df(area)
area['area'] = (area['area'] // 1000000).astype('int')
area = area[['country', 'area']]

# Calculate the area proportion
area['area_proportion'] = ((area['area'] / area['area'].sum())).round(4)

# Calculate the number of samples per country
total_n_samples = 25000 # total number of samples
area['n_of_samples'] = (area['area_proportion'] * total_n_samples).astype('int')

# Drop the columns where number of samples is 0
area = area[area['n_of_samples']>0]
area.sort_values(by='n_of_samples', ascending=False, inplace=True)

print(area.shape)
area.head()

(41, 4)


Unnamed: 0,country,area,area_proportion,n_of_samples
0,Turkey,779333,0.1345,3362
32,France,546646,0.0943,2357
30,Spain,505656,0.0873,2182
17,Sweden,443345,0.0765,1912
38,Germany,355924,0.0614,1535


## **Import the CORINE LULC Product for 2018**

In [9]:
# Import the CORINE LULC 2018
corine_2018 = ee.Image('COPERNICUS/CORINE/V20/100m/2018')\
                .select('landcover')

# Extract all the class values from the image
class_values = corine_2018.get('landcover_class_values').getInfo()
class_palette = corine_2018.get('landcover_class_palette').getInfo()

print('Number of classes in the CORINE LULC:', len(class_values))
Map.addLayer(corine_2018, {}, 'CORINE Land Cover')

Number of classes in the CORINE LULC: 44


## **Generate Equal Number of Samples from Each Class**

In [10]:
# Function to generate sample points
def generateSamplePoints(image,
                         country_name,
                         band_name,
                         scale=100,
                         seed=42,
                         save=False,
                         output_folder=None):
    """
    Generate stratified sample points for a given country from a land use/land cover (LULC) image.

    Args:
        image (ee.Image): The Earth Engine image containing LULC data.
        country_name (str): Name of the country for which to generate sample points.
        band_name (str): Name of the band containing the LULC classification.
        scale (int, optional): Scale in meters for sampling. Default is 100.
        seed (int, optional): Random seed for reproducibility. Default is 42.
        save (bool, optional): Whether to save the samples as a shapefile. Default is False.
        output_folder (str, optional): Folder to save the output shapefile if `save` is True. Default is None.

    Returns:
        geopandas.GeoDataFrame: A GeoDataFrame containing the stratified sample points with their respective attributes.

    """
    country_geom = europe.filter(ee.Filter.eq('name', country_name)).first().geometry()
    country_n_samples = area[area['country']==country_name].values[0][-1]

    country_lulc = image.clip(country_geom)

    # Calculate the frequency histogram
    freq_hist = country_lulc.reduceRegion(
        reducer=ee.Reducer.frequencyHistogram(),
        geometry=country_geom,
        scale=100,
        bestEffort=True,
        maxPixels=1e10,
        tileScale=8
    )

    # Extract the class values
    country_class_values = [int(i) for i in freq_hist.getInfo()[band_name].keys()]

    # Calculate the sample per class
    sample_per_calss = country_n_samples // len(country_class_values)

    # Generate samples for each class
    lulc_samples = country_lulc.stratifiedSample(
        numPoints=sample_per_calss,
        classBand=band_name,
        region=country_geom,
        scale=scale,
        seed=seed,
        classValues=country_class_values,
        classPoints=[sample_per_calss for i in range(len(country_class_values))],
        dropNulls=True,
        tileScale=8,
        geometries=True
    )

    if lulc_samples.size().getInfo() > 0:
        lulc_samples_gdf = geemap.ee_to_gdf(lulc_samples)

        if save!=False:
            if os.path.exists(output_folder)==False:
                os.makedirs(output_folder)

            file_name = f'{country_name}_Samples.shp'
            out_file_path = os.path.join(output_folder, file_name)
            lulc_samples_gdf.to_file(out_file_path, driver='ESRI Shapefile')
            print(f'{file_name} saved successfully at {out_file_path}.')

        return lulc_samples_gdf

    else:
        print(f'{country_name}: No sample has been generated.')

In [11]:
# # Apply the function over all the countries
# for country in tqdm(sorted(area['country'])):
#     generateSamplePoints(
#         image=corine_2018,
#         country_name=country,
#         band_name='landcover',
#         scale=100,
#         seed=42,
#         save=True,
#         output_folder='datasets/shapefiles'
#     )

## **Prepare the Seasonal Sentinel Image Datasets**

In [12]:
# Function to remove clouds from Sentine-2 imagery
def maskS2CloudsAndShadow(image, cloud_perc=10):

    cloudProb = image.select("MSK_CLDPRB")
    cloud = cloudProb.lt(cloud_perc)
    scl = image.select("SCL")
    saturated = scl.eq(1)
    cloudShadow = scl.eq(3)
    cirrus = scl.eq(10)

    # Create a mask
    mask = cloud.And(saturated.neq(1)).And(cloudShadow.neq(1)).And(cirrus.neq(1))

    return image.updateMask(mask)

# Function to perform edge masking on Sentinel-1 images
def edgeMasking(image, min_thresh=30, max_thresh=45):

    angle = image.select("angle")
    mask = angle.gt(min_thresh).And(angle.lt(max_thresh))

    return image.updateMask(mask)

In [13]:
# Function to prepare Sentinel 1 and 2 composite
def prepareSentinelComposite(region,
                             s1_band_names,
                             s2_band_names,
                             start_year,
                             end_year,
                             start_month,
                             end_month):

    # Read the Sentinel-2 and Sentinel-1 images from the Earth Engine
    sentinel_2 = ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED")\
        .filterBounds(region.bounds())\
        .filter(ee.Filter.calendarRange(start_year, end_year, 'year'))\
        .filter(ee.Filter.calendarRange(start_month, end_month, 'month'))\
        .filterMetadata("CLOUDY_PIXEL_PERCENTAGE", "less_than", 10)\
        .map(maskS2CloudsAndShadow)\
        .select(s2_band_names)\
        .median()\
        .clip(region.bounds())

    # Prepare Vegetation Indices
    ndvi = sentinel_2.normalizedDifference(['B8', 'B4']).rename('NDVI')
    savi = sentinel_2.expression(
        '((NIR - RED) / (NIR + RED + L)) * (1 + L)',
        {
            'NIR': sentinel_2.select('B8'),
            'RED': sentinel_2.select('B4'),
            'L': 0.5
        }
    ).rename('SAVI')

    sentinel_1 = ee.ImageCollection("COPERNICUS/S1_GRD")\
        .filterBounds(region.bounds())\
        .filter(ee.Filter.calendarRange(start_year, end_year, 'year'))\
        .filter(ee.Filter.calendarRange(start_month, end_month, 'month'))\
        .filter(ee.Filter.listContains("transmitterReceiverPolarisation", "VV"))\
        .filter(ee.Filter.listContains("transmitterReceiverPolarisation", "VH"))\
        .filter(ee.Filter.eq('instrumentMode', 'IW'))\
        .filter(ee.Filter.eq("orbitProperties_pass", "DESCENDING"))\
        .map(edgeMasking)\
        .select(s1_band_names)\
        .mean()\
        .clip(region.bounds())

    # Combine both the Image
    sentinel_combined = sentinel_2.addBands(ndvi)\
                                  .addBands(savi)\
                                  .addBands(sentinel_1).setDefaultProjection('EPSG:3857', None, 10)\

    return sentinel_combined

## **Prepare the Topographic Features**

In [14]:
# Function to prepare topographic variables (e.g., elevation, slope)
def prepareTopographicFeatures(region):

    # Read the Copernicus Digital Elevation Model (Glo-30 DEM)
    glo30 = ee.ImageCollection("projects/sat-io/open-datasets/GLO-30").mosaic()\
              .setDefaultProjection('EPSG:3857', None, 30)\
              .select('b1')

    # Clip the elevation
    elevation = glo30.clip(region.bounds()).rename('Elevation')

    # Derive the slope
    slope = ee.Terrain.slope(elevation).rename('Slope')

    return elevation.addBands(slope)

## **Prepare the LULC Source (CORINE, Dynamic World, ESRI)**

In [15]:
# Function to prepare Sentinel 1 and 2 composite
def prepareLULCSource(region,
                      start_year,
                      end_year):

    # Prepare the CORINE LULC Dataset
    corine = ee.Image('COPERNICUS/CORINE/V20/100m/2018')\
        .select('landcover')\
        .rename('CORINE')\
        .setDefaultProjection('EPSG:3857', None, 100)\
        .clip(region.bounds())

    # Prepare the Dynamic World LULC dataset
    dynamic_world = ee.ImageCollection("GOOGLE/DYNAMICWORLD/V1")\
        .filterBounds(region.bounds())\
        .filter(ee.Filter.calendarRange(start_year, end_year, 'year'))\
        .select("label")\
        .mode()\
        .rename('DW')\
        .setDefaultProjection('EPSG:3857', None, 10)\
        .clip(region.bounds())

    # Prepare the ESRI LULC dataset
    esri = ee.ImageCollection("projects/sat-io/open-datasets/landcover/ESRI_Global-LULC_10m_TS")\
        .filterBounds(region.bounds())\
        .filter(ee.Filter.calendarRange(start_year, end_year, 'year'))\
        .mode()\
        .remap([1,2,4,5,7,8,9,10,11],[1,2,3,4,5,6,7,8,9])\
        .rename('ESRI')\
        .setDefaultProjection('EPSG:3857', None, 10)\
        .clip(region.bounds())

    # Prepare ESA LULC dataset
    esa = ee.ImageCollection("ESA/WorldCover/v200")\
            .first()\
            .rename('ESA')\
            .setDefaultProjection('EPSG:3857', None, 10)\
            .clip(region.bounds())

    # Combine both the Image
    lulc_combined = corine.addBands(dynamic_world).addBands(esri).addBands(esa)

    return lulc_combined

In [16]:
## Example usage
# Define the Sentinel-2 and Sentinel-1 band names
s2_band_names = ['B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B8', 'B11', 'B12']
s1_band_names = ["VH", "VV"]

# Define the start, end month, and year
start_year = 2017
end_year = 2019
start_month = 6
end_month = 8

test_region = europe.filter(ee.Filter.eq('name', 'Albania')).geometry()

test_sentinel_composite = prepareSentinelComposite(
    region=test_region,
    s1_band_names=s1_band_names,
    s2_band_names=s2_band_names,
    start_year=start_year,
    end_year=end_year,
    start_month=start_month,
    end_month=end_month
)

test_topographic_features = prepareTopographicFeatures(test_region)

test_lulc_combined = prepareLULCSource(
    region=test_region,
    start_year=start_year,
    end_year=end_year
)

Map.addLayer(test_sentinel_composite, {"min": 0, "max": 4000, "bands": ["B8", "B4", "B3"]}, "Test Sentinel Image")
Map.addLayer(test_topographic_features, {'min': 0, 'max': 90, 'bands': ['Slope']}, 'Test Topographic Features')
Map.addLayer(test_lulc_combined, {}, "Test LULC Image")

### **Data Extraction Module**

In [17]:
# Read all the dataframe path
country_samples_path = glob(os.path.join(os.getcwd(), 'datasets/shapefiles', '*.shp'))
print('The number of shapefiles:', len(country_samples_path))

# Define the Sentinel-2 and Sentinel-1 band names
s2_band_names = ['B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B8', 'B11', 'B12']
s1_band_names = ["VH", "VV"]

# Define the start, end month of four seasons in a dictionary
season_dict = {
    'Summer': [6, 8], # June to August
    'Autumn': [9, 11], # September to November
    'Winter': [12, 2], # December to February
    'Spring': [3, 5] # March to May
}

The number of shapefiles: 37


#### **Seasonal Patches**

In [18]:
# # Iterate over all the countries and download the seasonal data
# for path in tqdm(sorted(country_samples_path[:])):
#     country = path.split("/")[-1].split("_")[0]
#     print("******************************************************************************************")
#     print(country)

#     # Read the geodataframe
#     sample_gdf = gpd.read_file(path)

#     # Convert the geodataframe into a json object
#     sample_json = json.loads(sample_gdf.to_json())["features"]

#     # Iterate over all the seasons
#     for season, months in tqdm(season_dict.items()):
#         print('Season:', season)
#         start_year = 2017
#         end_year = 2019
#         start_month = months[0]
#         end_month = months[1]

#         # Set the output directories
#         out_season_dir = os.path.join(f'datasets//seasonal_patches/{season}')
#         out_country_dir = os.path.join(out_season_dir, country)
#         print(out_season_dir, out_country_dir)
#         for p in [out_season_dir, out_country_dir]:
#             if os.path.exists(p):
#                 print(f"{p} directory is already existed.")
#             else:
#                 os.mkdir(p)
#                 print(f"{p} directory has been successfully created!")

#         # Prepare the datasets
#         region = europe.filter(ee.Filter.eq('name', country)).geometry().bounds()

#         sentinel_composite = prepareSentinelComposite(
#             region=region,
#             s1_band_names=s1_band_names,
#             s2_band_names=s2_band_names,
#             start_year=start_year,
#             end_year=end_year,
#             start_month=start_month,
#             end_month=end_month
#         )

#         # Set output image parameters
#         img_params = {
#             "count": len(sample_gdf),
#             "buffer": 1270,
#             "dimensions": "256x256",  # The dimension of each image chip
#             "format": "GEO_TIFF",  # The output image format, can be png, jpg, ZIPPED_GEO_TIFF, GEO_TIFF, NPY
#             "processes": 30,  # How many processes to used for parallel processing
#         }

#         # Create a Function for Downloading Image
#         @retry(tries=10, delay=1, backoff=2)
#         def getResult(id, type, props, geom):

#             index = id
#             landcover = props["landcover"]
#             coords = ee.Geometry.Point(geom["coordinates"])

#             region = coords.buffer(img_params["buffer"]).bounds()

#             if img_params["format"] in ["png", "jpg"]:
#                 img_url = sentinel_composite.getThumbURL(
#                     {
#                         "region": region,
#                         "dimensions": img_params["dimensions"],
#                         "format": img_params["format"],
#                     }
#                 )

#             else:
#                 img_url = sentinel_composite.getDownloadURL(
#                     {
#                         "region": region,
#                         "dimensions": img_params["dimensions"],
#                         "format": img_params["format"],
#                     }
#                 )


#             if img_params["format"] == "GEO_TIFF":
#                 ext = "tif"
#             else:
#                 ext = img_params["format"]

#             r_img = requests.get(img_url, stream=True)
#             if r_img.status_code != 200:
#                 r_img.raise_for_status()

#             basename = str(index).zfill(len(str(img_params["count"])))
#             img_filename = f"{out_country_dir}/{basename}_{landcover}_Image.{ext}"

#             with open(img_filename, "wb") as out_file:
#                 shutil.copyfileobj(r_img.raw, out_file)
#             print("Done (Image): ", basename)

#         # Extract the patches
#         logging.basicConfig()
#         items = [list(i.values()) for i in sample_json]

#         pool = multiprocessing.Pool(img_params["processes"])
#         pool.starmap(getResult, items)

#         pool.close()

In [19]:
# # Convert the Folders into Zip File in Google Drive
# # Create a folder to store zip files
# seasons = ['Summer', 'Autumn', 'Winter', 'Spring']
# for season in tqdm(seasons):
#     out_zip_dir = f'datasets/seasonal_patches_zip/{season}'

#     if os.path.exists(out_zip_dir)==False:
#         os.mkdir(out_zip_dir)
#         print("Zip directory has been successfuly created.")

#     else:
#         print("Zip directory is already existed.")


#     # Read all the season folder paths
#     season_f_paths = glob(f'datasets/seasonal_patches/{season}/*')

#     for country in tqdm(season_f_paths):
#         f_path = next((i for i in season_f_paths if country in i), None)
#         print(f_path)

#         if f_path != None:
#             # Replace 'folder_path' with the path to your folder
#             output_file_name = f_path.split("/")[-1]
#             print(f'Season: {output_file_name}')
#             output_file_path = os.path.join(out_zip_dir, output_file_name)

#             # Create a ZIP file
#             shutil.make_archive(output_file_path, 'zip', f_path, verbose=True)
#             print(f"Folder {f_path} has been zipped to {output_file_path}.zip")

#             # Delete the folder
#             shutil.rmtree(f_path, ignore_errors=True)
#             print(f"Folder {f_path} deleted!")

#         else:
#             continue

#### **Topographic Features and Labels**

In [20]:
# # Iterate over all the countries and download the seasonal data
# for path in tqdm(sorted(country_samples_path)):
#     country = path.split("/")[-1].split("_")[0]
#     print("******************************************************************************************")
#     print(country)

#     # Read the geodataframe
#     sample_gdf = gpd.read_file(path)

#     # Convert the geodataframe into a json object
#     sample_json = json.loads(sample_gdf.to_json())["features"]

#     # Set the output directories
#     out_country_topo_dir = os.path.join(f'datasets//topo_patches/{country}')
#     out_country_labels_dir = os.path.join(f'datasets//label_patches/{country}')

#     for p in [out_country_topo_dir, out_country_labels_dir]:
#         if os.path.exists(p):
#             print(f"{p} directory is already existed.")
#         else:
#             os.mkdir(p)
#             print(f"{p} directory has been successfully created!")

#     # Set output image parameters
#     img_params = {
#         "count": len(sample_gdf),
#         "buffer": 1270,
#         "dimensions": "256x256",  # The dimension of each image chip
#         "format": "GEO_TIFF",  # The output image format, can be png, jpg, ZIPPED_GEO_TIFF, GEO_TIFF, NPY
#         "processes": 30,  # How many processes to used for parallel processing
#     }

#     # Create a Function for Downloading Image
#     @retry(tries=10, delay=1, backoff=2)
#     def getResult(id, type, props, geom):

#         index = id
#         landcover = props["landcover"]
#         coords = ee.Geometry.Point(geom["coordinates"])

#         region = coords.buffer(img_params["buffer"]).bounds()

#         # Prepare the topographic features and label
#         topo_features = prepareTopographicFeatures(region)
#         labels = prepareLULCSource(region, start_year=2017, end_year=2019)

#         if img_params["format"] in ["png", "jpg"]:
#             img_url = topo_features.getThumbURL(
#                 {
#                     "region": region,
#                     "dimensions": img_params["dimensions"],
#                     "format": img_params["format"],
#                 }
#             )

#             mask_url = labels.getThumbURL(
#                 {
#                     "region": region,
#                     "dimensions": img_params["dimensions"],
#                     "format": img_params["format"],
#                 }
#             )

#         else:
#             img_url = topo_features.getDownloadURL(
#                 {
#                     "region": region,
#                     "dimensions": img_params["dimensions"],
#                     "format": img_params["format"],
#                 }
#             )

#             mask_url = labels.getDownloadURL(
#                 {
#                     "region": region,
#                     "dimensions": img_params["dimensions"],
#                     "format": img_params["format"],
#                 }
#             )


#         if img_params["format"] == "GEO_TIFF":
#             ext = "tif"
#         else:
#             ext = img_params["format"]

#         r_img = requests.get(img_url, stream=True)
#         r_mask = requests.get(mask_url, stream=True)

#         if r_img.status_code != 200:
#             r_img.raise_for_status()

#         if r_mask.status_code != 200:
#             r_mask.raise_for_status()

#         basename = str(index).zfill(len(str(img_params["count"])))
#         img_filename = f"{out_country_topo_dir}/{basename}_{landcover}_Topo.{ext}"
#         mask_filename = f"{out_country_labels_dir}/{basename}_{landcover}_Labels.{ext}"

#         with open(img_filename, "wb") as out_file:
#             shutil.copyfileobj(r_img.raw, out_file)
#         print("Done (Image): ", basename)

#         with open(mask_filename, "wb") as out_file:
#             shutil.copyfileobj(r_mask.raw, out_file)
#         print("Done (Mask): ", basename)

#     # Extract the patches
#     logging.basicConfig()
#     items = [list(i.values()) for i in sample_json]

#     pool = multiprocessing.Pool(img_params["processes"])
#     pool.starmap(getResult, items)

#     pool.close()

In [21]:
# # Convert the Folders into Zip File in Google Drive
# # Create a folder to store zip files

# out_zip_dir = f'datasets/label_patches_zip'

# if os.path.exists(out_zip_dir)==False:
#     os.mkdir(out_zip_dir)
#     print("Zip directory has been successfuly created.")

# else:
#     print("Zip directory is already existed.")


# # Read all the season folder paths
# f_paths = glob(f'datasets/label_patches/*')

# for country in tqdm(f_paths):
#     f_path = next((i for i in f_paths if country in i), None)
#     print(f_path)

#     if f_path != None:
#         # Replace 'folder_path' with the path to your folder
#         output_file_name = f_path.split("/")[-1]
#         print(f'Season: {output_file_name}')
#         output_file_path = os.path.join(out_zip_dir, output_file_name)

#         # Create a ZIP file
#         shutil.make_archive(output_file_path, 'zip', f_path, verbose=True)
#         print(f"Folder {f_path} has been zipped to {output_file_path}.zip")

#         # Delete the folder
#         shutil.rmtree(f_path, ignore_errors=True)
#         print(f"Folder {f_path} deleted!")

#     else:
#         continue