<a href="https://colab.research.google.com/github/Nmg1994/Crop_mapping/blob/main/Crop_mapping_Point_based_models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Intalling necessary packages

In [None]:
!pip install tqdm
!pip install earthengine-api

# Importing necessary libraries/modules

In [5]:
import numpy as np
import seaborn as sns
import matplotlib.pylab as plt
import pandas as pd
from tqdm import tqdm
import ast
import math
import random
from sklearn.ensemble import RandomForestClassifier
import sklearn.metrics
from sklearn.metrics import roc_curve, auc
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_validate
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import jaccard_score
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import cross_val_score
import datetime
from datetime import timedelta
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import confusion_matrix

# Mounting Google Drive

In [None]:
from google.colab import drive
drive.mount("/content/drive")

# Connecting to Google Earth Engine (GEE) through Earth Engine Python API

In [None]:
import ee
ee.Authenticate()
ee.Initialize(project='ee-navidmehdizade73nm')

# Setting the period for downloading the Sentinel-2 satellite images

In [None]:
def prompt_for_date(prompt_message):
    while True:
        date_str = input(prompt_message)
        try:
            # Try to create a datetime object from the input
            datetime.datetime.strptime(date_str, '%Y-%m-%d')
            return date_str
        except ValueError:
            # If there is a ValueError, it means the format is incorrect
            print("Incorrect date format, should be YYYY-MM-DD. Please try again.")

# Prompting the user for start and end dates
start_date_str = prompt_for_date("Enter the start date (YYYY-MM-DD): ") # e.g., 2021-04-01
end_date_str = prompt_for_date("Enter the end date (YYYY-MM-DD): ") # e.g., 2021-10-31

start_date = ee.Date(start_date_str)
end_date = ee.Date(end_date_str)

# Calculating number of 5-day intervals in the given period for downloading the Sentinel-2
number_of_intervals = end_date.difference(start_date, 'day').divide(5).ceil().getInfo()
print('number_of_intervals: ', number_of_intervals)

# Extracting bands' values of the Sentinel-2 satellite imagery for the point samples

In [None]:
# Initializing point shapefile after uploading the samples into assets of the GEE
point_shapefile = ee.FeatureCollection('projects/ee-navidmehdizade73nm/assets/LandC_Crops_samples_quebec')

# Function to extract maximum values of Sentinel-2 bands for a given date range
def extract_max_values(feature, start_date, end_date):
    filtered_collection = ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED') \
        .filterDate(start_date, end_date) \
        .filterBounds(feature.geometry()) \
        .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20))

    renamed_collection = filtered_collection.select(['B2', 'B3', 'B4', 'B8'], ['Blue', 'Green', 'Red', 'NIR']) # extracting values of four bands, namely Blue, Green, Red, and NIR

    max_values = renamed_collection \
        .reduce(ee.Reducer.max()) \
        .reduceRegion(
            reducer=ee.Reducer.max(),
            geometry=feature.geometry(),
            scale=10
        )

    return feature.set(max_values)

# Function to process a specific interval
def process_interval(index, start_date, end_date, point_shapefile):
    interval = 5  # days
    offset = start_date.advance(interval * index, 'day')
    interval_start = offset
    interval_end = offset.advance(interval, 'day')

    if interval_end.difference(end_date, 'day').gt(0):
        interval_end = end_date

    processed_interval = point_shapefile.map(
        lambda feature: extract_max_values(feature, interval_start, interval_end)
    )

    task = ee.batch.Export.table.toDrive(
        collection=processed_interval,
        description=f'Processed_Interval_{index}',
        folder='Point_samples',
        fileFormat='CSV'
    )
    task.start()

    print(f'Processed interval {index}')

for ind in range(number_of_intervals):
    process_interval(ind, start_date, end_date, point_shapefile)

print('Pay attention: Although it appears that the code has finished executing, the .csv files will take a few minutes to appear in your Google Drive, as they are still being prepared and processed in Google Earth Engine!')


# Preprocessing the extracted bands' value and saving them in one excel file

In [None]:
# defining two functions to extract the coordinate of the samples
def extract_latitude(s):
    try:
        parts = s.split('[')[1].split(']')[0]
        lat = parts.split(',')[1]
        return float(lat)
    except Exception as e:
        print(f"Error processing string: {s}, Error: {e}")
        return None

def extract_longitude(s):
    try:
        parts = s.split('[')[1].split(']')[0]
        lon = parts.split(',')[0]
        return float(lon)
    except Exception as e:
        print(f"Error processing string: {s}, Error: {e}")
        return None

# reading the .csv files, extracting the coordinate of the samples, and preprocessing them to save all files into one excel file
for t in range(number_of_intervals):
  S2_5_days_df = pd.read_csv(f"/content/drive/My Drive/Point_samples/Processed_Interval_{t}.csv")

  S2_5_days_df['latitude'] = S2_5_days_df['.geo'].apply(extract_latitude)
  S2_5_days_df['longitude'] = S2_5_days_df['.geo'].apply(extract_longitude)
  S2_5_days_df.drop(columns=['.geo', 'system:index'], inplace=True)

  replacements = {
      'Blue_max': f'S2_Img{t}_Blue',
      'Green_max': f'S2_Img{t}_Green',
      'NIR_max': f'S2_Img{t}_NIR',
      'Red_max': f'S2_Img{t}_Red'
  }

  S2_5_days_df.rename(columns=replacements, inplace=True)

  if t == 0:
    Preprocessed_Point_samples = S2_5_days_df
  else:
    S2_5_days_df = S2_5_days_df.drop(columns= ['CID','latitude','longitude'])
    Preprocessed_Point_samples = pd.concat([Preprocessed_Point_samples, S2_5_days_df], axis=1)


Preprocessed_Point_samples.to_csv('/content/Preprocessed_Point_samples.csv', index=False)

In [None]:
# Conducting undersampling to make the balance among the classes
DF_Sentinel = pd.read_csv('/content/Preprocessed_Point_samples.csv')
DF_Sentinel_CID_sorted = DF_Sentinel.sort_values(by = 'CID', axis = 0, ascending=True).dropna().reset_index(drop = True)
print('DF_Sentinel_CID_sorted shape: ', DF_Sentinel_CID_sorted.shape)
Num_img_S2 = sum(('Blue') in column for column in DF_Sentinel_CID_sorted.columns)

num_uni_CID = DF_Sentinel_CID_sorted['CID'].unique()

for u in num_uni_CID:
 print(f'Count of sample with CID {u}: ', (DF_Sentinel_CID_sorted['CID'] == u).sum())
 if u == num_uni_CID[0]:
  Min_num_sample = (DF_Sentinel_CID_sorted['CID'] == u).sum()
 else:
  if (DF_Sentinel_CID_sorted['CID'] == u).sum() < Min_num_sample:
    Min_num_sample = (DF_Sentinel_CID_sorted['CID'] == u).sum()

print('Min_num_sample: ', Min_num_sample)

for w in num_uni_CID:
  sampled_DF_Sentinel_CID_sorted = DF_Sentinel_CID_sorted[DF_Sentinel_CID_sorted['CID'] == w].sample(Min_num_sample, replace=False)

  if w == num_uni_CID[0]:
    DF_Sentinel_CID_equal_samples = sampled_DF_Sentinel_CID_sorted
  else:
    DF_Sentinel_CID_equal_samples = pd.concat([DF_Sentinel_CID_equal_samples, sampled_DF_Sentinel_CID_sorted], axis=0, ignore_index=True)

In [None]:
# Defining a function to preprocess data and make them suitable for feeding in the ML models
def preprocessing_data(samples_dataframe, Prediction_purpose = False):

  print('samples_dataframe shape: ', samples_dataframe.shape)
  samples_dataframe = samples_dataframe.sort_values(by = ['CID']).reset_index(drop = True)

  sample_id = list (range(1,1 + samples_dataframe.shape[0]))
  samples_dataframe['Sample_ID']=sample_id

  Counts_img_S2 = sum(('Blue') in column for column in samples_dataframe.columns)
  print('Counts_img_S2: ', Counts_img_S2)

  S2_Img_ind = 0
  Img_ind_S2 = [str(number) for number in list(range(0, int(Counts_img_S2)))]

  for m_S2 in Img_ind_S2:
    col_title_S2 = '(S2_Img'+ m_S2+'_)'
    DF_each_img = samples_dataframe.filter(regex='(CID)|(Sample_ID)|'+ col_title_S2).reset_index(drop = True)
    DF_each_img['Image_index'] = S2_Img_ind
    DF_each_img.columns = DF_each_img.columns.str.replace('S2_Img'+ m_S2+'_', 'S2_')

    if(m_S2 == '0'):
      Samples_preprocessed = DF_each_img
    else:
      Samples_preprocessed = pd.concat([Samples_preprocessed, DF_each_img], axis=0, ignore_index=True)


    S2_Img_ind += 1


  raw_bands = ['S2_Blue', 'S2_Green', 'S2_NIR', 'S2_Red']
  Samples_preprocessed[raw_bands] = Samples_preprocessed[raw_bands].astype('float64')

  # Calculation of vegetation indices (VI), NDWI, and BI
  # NDVI
  Samples_preprocessed['NDVI'] = Samples_preprocessed.apply(lambda row: ((row.S2_NIR - row.S2_Red) / (row.S2_NIR + row.S2_Red)), axis=1)
  # GNDVI
  Samples_preprocessed['GNDVI'] = Samples_preprocessed.apply(lambda row: ((row.S2_NIR - row.S2_Green) / (row.S2_NIR + row.S2_Green)), axis=1)
  # GSAVI
  Samples_preprocessed['GSAVI'] = Samples_preprocessed.apply(lambda row: ((row.S2_NIR - row.S2_Green) / (row.S2_NIR + row.S2_Green + 0.5)) * 1.5, axis=1)
  # GOSAVI
  Samples_preprocessed['GOSAVI'] = Samples_preprocessed.apply(lambda row: ((row.S2_NIR - row.S2_Green) / (row.S2_NIR + row.S2_Green + 0.16)), axis=1)
  # SAVI
  Samples_preprocessed['SAVI'] = Samples_preprocessed.apply(lambda row: ((row.S2_NIR - row.S2_Red) / (row.S2_NIR + row.S2_Red + 0.5)) * 1.5, axis=1)
  # OSAVI
  Samples_preprocessed['OSAVI'] = Samples_preprocessed.apply(lambda row: ((row.S2_NIR - row.S2_Red) / (row.S2_NIR + row.S2_Red + 0.16)) * 1.16, axis=1)
  # EVI
  Samples_preprocessed['EVI'] = Samples_preprocessed.apply(lambda row: 2.5 * ((row.S2_NIR - row.S2_Red) / (row.S2_NIR + 6 * row.S2_Red - 7.5 * row.S2_Blue + 1)) if (row.S2_NIR + 6 * row.S2_Red - 7.5 * row.S2_Blue + 1) != 0 else 0, axis=1)

  # NDWI
  Samples_preprocessed['NDWI'] = Samples_preprocessed.apply(lambda row: ((row.S2_Green - row.S2_NIR) / (row.S2_Green + row.S2_NIR)), axis=1)
  # BI
  Samples_preprocessed['BI'] = Samples_preprocessed.apply(lambda row: math.sqrt(((row.S2_Blue) ** 2) + ((row.S2_Green) ** 2) + ((row.S2_NIR) ** 2) + ((row.S2_Red) ** 2)), axis=1)


  if Prediction_purpose == False:
    Samples_preprocessed = Samples_preprocessed.sort_values(['CID', 'Sample_ID'], ascending=[True, True], axis = 0).reset_index(drop = True)
  else:
    Samples_preprocessed = Samples_preprocessed.sort_values(['Sample_ID','Image_index'], ascending=[True, True], axis = 0).reset_index(drop = True)

  print('Samples_preprocessed:\n', Samples_preprocessed)
  return Samples_preprocessed


In [None]:
# Splitting the data into training and test datasets and applying one-hot encoded on the labels
def Preprocessing_ML(samples_with_labels, label_column = 'CID', Sample_ID= 'Sample_ID'):

  # Using one-hot encoding
  encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
  one_hot_encoded = encoder.fit_transform(np.array(samples_with_labels[label_column]).reshape(-1,1))
  one_hot_array = np.array(one_hot_encoded)
  print('one_hot_array shape:', one_hot_array.shape)

  # Splitting training and testing datasets with the portion 70:30
  X = np.array(samples_with_labels.drop(columns= [label_column, Sample_ID]))
  Y = np.array(one_hot_array)

  Features_training, Features_testing, labels_training, labels_testing = train_test_split(X, Y, test_size=0.3, shuffle=True, random_state=0)

  # Convert input features to a float array
  Features_training = Features_training.astype(float)
  Features_testing = Features_testing.astype(float)

  # Convert one-hot encoded labels to a numerical array (dtype=float)
  labels_training = labels_training.astype(float)
  labels_testing = labels_testing.astype(float)

  # Training datasets
  print('Features_training shape: ', Features_training.shape)
  print('labels_training shape: ', labels_training.shape)

  # Testing datasets
  print('Features_testing shape: ', Features_testing.shape)
  print('labels_testing shape: ', labels_testing.shape)

  return Features_training, Features_testing, labels_training, labels_testing

In [None]:
Samples_preprocessed_aggregated = preprocessing_data(samples_dataframe = DF_Sentinel_CID_equal_samples)
Features_training, Features_testing, landcover_training, landcover_testing = Preprocessing_ML(samples_with_labels = Samples_preprocessed_aggregated, label_column = 'CID', Sample_ID= 'Sample_ID')

landcover_training_GBM = np.argmax(landcover_training, axis=1)
print('landcover_training_GBM shape: ', landcover_training_GBM.shape)

landcover_testing_GBM = np.argmax(landcover_testing, axis=1)
print('landcover_testing_GBM shape: ', landcover_testing_GBM.shape)

# Finding optimized values for the hyper-parameters in the RF model

In [None]:
parameters = {'n_estimators':np.arange(100,501,100), 'max_depth':np.arange(1,30,4)}
RF_classifier = RandomForestClassifier(criterion="entropy",random_state=0, bootstrap = True)
RF_model_gs = GridSearchCV(RF_classifier, parameters, scoring='accuracy', cv=5, return_train_score=True, n_jobs=-1, verbose=1)
RF_model_gs.fit(Features_training, landcover_training)

RF_model_gs.best_params_

n_estimators_RF = RF_model_gs.best_params_['n_estimators']
max_depth_RF = RF_model_gs.best_params_['max_depth']

print("n_estimators_RF:", n_estimators_RF)
print("max_depth_RF:", max_depth_RF)

# Extracting the cross-validation results
cv_results = RF_model_gs.cv_results_

mean_val_scores = cv_results['mean_test_score']
std_val_scores = cv_results['std_test_score']
params = cv_results['params']

# Creating a DataFrame to store the results
RF_Hyperparameters_results = pd.DataFrame({'Parameters': params, 'Mean Accuracy': mean_val_scores, 'Std Dev Accuracy': std_val_scores})

RF_Hyperparameters_results['Parameters'] = RF_Hyperparameters_results['Parameters'].apply(ast.literal_eval)

# Extracting 'max_depth' and 'n_estimators' from 'Parameters' column
RF_Hyperparameters_results['max_depth'] = RF_Hyperparameters_results['Parameters'].apply(lambda x: x['max_depth'])
RF_Hyperparameters_results['n_estimators'] = RF_Hyperparameters_results['Parameters'].apply(lambda x: x['n_estimators'])

# Pivot the dataframe to create a matrix suitable for heatmap
pivot_table = RF_Hyperparameters_results.pivot('n_estimators', 'max_depth', 'Mean Accuracy')

# Plotting the heatmap
plt.figure(figsize=(10, 2.25))
sns.heatmap(pivot_table, annot=True, cmap='coolwarm', fmt=".5f")
plt.title('Mean test accuracy score heatmap on validation datasets')
plt.ylabel('n_estimators')
plt.xlabel('max_depth')
plt.show()

# RF model

In [None]:
def RF_classification_model(Original_samples_with_only_features, Features_training, Features_testing, landcover_training, landcover_testing, number_of_intervals, Prediction_only = False, max_depth= 29, min_samples_leaf = 1, n_estimators = 200):

  RF_model = RandomForestClassifier(criterion="entropy", random_state=0, bootstrap=True, max_depth= max_depth, min_samples_leaf = min_samples_leaf, n_estimators = n_estimators)#**RF_model_gs.best_params_)
  cv_scores = cross_val_score(RF_model, Features_training, landcover_training, cv=5, scoring='accuracy', n_jobs=-1)

  if Prediction_only == False:
    # Output the results
    print(f"CV Scores: {cv_scores}")
    print(f"Mean CV Score: {cv_scores.mean()}")
    print(f"Standard Deviation of CV Scores: {cv_scores.std()}")

  RF_model.fit(Features_training, landcover_training)

  # Get feature importances
  importances = RF_model.feature_importances_
  feature_names = Original_samples_with_only_features.columns

  # Removing 'Image_index' feature from the importance values and corresponding feature names
  importances_filtered = np.delete(importances, feature_names.tolist().index('Image_index'))
  feature_names_filtered = [name for name in feature_names if name != 'Image_index']

  # Scaling the importance values using MinMaxScaler
  scaler = MinMaxScaler()
  importances_normalized_filtered = scaler.fit_transform(importances_filtered.reshape(-1, 1)).flatten()

  # Calculating the sum of importances
  total_importance = np.sum(importances_normalized_filtered)

  # Calculating the relative importance of each feature
  relative_importance = importances_normalized_filtered / total_importance

  feature_importance_dict = dict(zip(feature_names_filtered, relative_importance))

  # Sorting feature importances in descending order
  indices = relative_importance.argsort()[::-1]

  # Printing and plotting the sorted relative feature importances
  for idx in indices:
      print(f"Feature: {feature_names_filtered[idx]}, Relative Importance: {relative_importance[idx]}")

  # Setting font properties
  font = {'family': 'serif', 'weight': 'normal', 'size': 12}

  # Printing and plotting the sorted relative feature importances
  plt.figure(figsize=(10, 6))
  sns.barplot(x=[feature_names_filtered[idx] for idx in indices], y=relative_importance[indices], palette="viridis", legend=False)
  plt.xticks(rotation=90)
  plt.xlabel("Feature", fontdict=font)
  plt.ylabel("Relative Importance", fontdict=font)
  plt.title("Random Forest Relative Feature Importances (Normalized to 1)", fontdict=font)
  plt.tight_layout()
  plt.show()


  # plotting the heatmap of features importance
  features = list(feature_importance_dict.keys())
  importance_values = list(feature_importance_dict.values())

  # Plotting heatmap of feature importance with font settings
  plt.figure(figsize=(8,2))
  sns.heatmap(data=[importance_values], annot=True, fmt='.4f', cmap='YlGnBu', xticklabels=features, yticklabels=False, annot_kws={'fontproperties': font, 'rotation': 90}, cbar_kws={'aspect': 8})
  plt.xlabel('Features', fontdict=font)
  plt.ylabel('Importance', fontdict=font)
  plt.title('Feature Importance Heatmap', fontdict=font)
  plt.show()

  #Testing Accuracy
  landcover_testing_predicted = RF_model.predict(Features_testing)

  if Prediction_only == False:
    print("Recall score for test dataset: ", sklearn.metrics.recall_score(landcover_testing, landcover_testing_predicted, average = 'macro'))
    print("Precision score for test dataset: ", sklearn.metrics.precision_score(landcover_testing, landcover_testing_predicted, average = 'macro'))
    print("F1 score for test dataset: ", sklearn.metrics.f1_score(landcover_testing, landcover_testing_predicted, average = 'macro'))
    print("ROC-AUC score for test dataset: ", sklearn.metrics.roc_auc_score(landcover_testing, landcover_testing_predicted, average = 'macro', multi_class = 'ovr'))
    print(sklearn.metrics.classification_report(landcover_testing, landcover_testing_predicted))

    roc_auc_scores = []

    # Iterating over each class
    for class_index in range(landcover_testing.shape[1]):
        # Extracting the true labels and predicted probabilities for the current class
        y_true_class = np.array([1 if label == class_index else 0 for label in np.argmax(landcover_testing, axis=1)])
        y_pred_class = np.array([1 if label == class_index else 0 for label in np.argmax(landcover_testing_predicted, axis=1)])  # Predicted probabilities for the current class

        # Calculate the ROC-AUC score for the current class
        roc_auc_each_class = sklearn.metrics.roc_auc_score(y_true_class, y_pred_class)

        # Now, roc_auc_scores list contains the ROC-AUC score for each class
        print(f'ROC-AUC scores for class {class_index}:', roc_auc_each_class)

        # Appending the ROC-AUC score to the list
        roc_auc_scores.append(roc_auc_each_class)

  landcover_testing_predicted_prob = RF_model.predict_proba(Features_testing)
  # Stacking the probabilities for each class
  combined_probabilities = np.stack(landcover_testing_predicted_prob, axis=-1)

  # This will give an array of shape (number of samples, 2, number of classes)
  # We need to take the probabilities corresponding to the positive class (usually the second element in the innermost arrays)
  positive_class_probs = combined_probabilities[:, 1, :]
  # Assigning the label of the class with the highest probability
  predicted_labels = np.argmax(positive_class_probs, axis=1)
  predicted_labels_df = pd.DataFrame(predicted_labels.reshape(-1,1), columns=['CID']).reset_index(drop = True)

  Y_true = np.argmax(landcover_testing, axis=1)
  True_labels_df = pd.DataFrame(Y_true.reshape(-1,1), columns=['CID']).reset_index(drop = True)

  Predicted_CID_list = []
  True_CID_list = []

  for jj in range(int(len(predicted_labels_df) / number_of_intervals)):
    starting_ind = jj*number_of_intervals
    ending_ind = (jj+1)*number_of_intervals
    Predicted_CID = predicted_labels_df.iloc[starting_ind:ending_ind,0].value_counts().idxmax()
    Predicted_CID_list.append(Predicted_CID)

    True_CID = True_labels_df.iloc[starting_ind:ending_ind,0].value_counts().idxmax()
    True_CID_list.append(True_CID)

  Reshaped_Cropland_predicted_samples = pd.DataFrame(Predicted_CID_list, columns=['CID']).reset_index(drop = True)

  confusion_matrix = sklearn.metrics.confusion_matrix(np.array(True_CID_list).reshape(-1), np.array(Predicted_CID_list).reshape(-1))

  # class labels
  class_labels = ['Pasture', 'Wetland', 'Water', 'Shrubland', 'Forest', 'Urban', 'Barren', 'Apple', 'Small fruits', 'Canola', 'Soy', 'Corn', 'Other crops']

  # Plotting confusion matrix
  plt.figure(figsize=(12, 12))
  sns.heatmap(confusion_matrix, annot=True, fmt='d', cmap='Reds', cbar=False, xticklabels=class_labels, yticklabels=class_labels)
  plt.xlabel('Predicted Label')
  plt.ylabel('True Label')
  plt.title('Confusion matrix of the RF model for the test dataset')
  plt.xticks(ha='right')
  plt.yticks(rotation=0, ha='right')
  plt.tick_params(axis='x',  top=True, bottom=False, labeltop=True, labelbottom=False)
  plt.show()

  return Reshaped_Cropland_predicted_samples

In [None]:
# Applying RF model and predicting class labels for the test dataset
RF_classification_model(Original_samples_with_only_features = Samples_preprocessed_aggregated.drop(columns= ['CID','Sample_ID']), Features_training = Features_training, Features_testing = Features_testing, landcover_training = landcover_training, landcover_testing = landcover_testing, number_of_intervals= Num_img_S2)


# Finding optimized values for the hyper-parameters in the GBM model

In [None]:
GBM_parameters = {'n_estimators':np.arange(100,501,100), 'learning_rate':[0.1, 0.2, 0.4, 0.5, 0.6, 0.8, 1]}
GBM_classifier = GradientBoostingClassifier(random_state=0)
GBM_model_gs = GridSearchCV(GBM_classifier, GBM_parameters, scoring='accuracy', cv=5, return_train_score=True, n_jobs=-1, verbose=2)
GBM_model_gs.fit(Features_training, landcover_training_GBM)

GBM_model_gs.best_params_

learning_rate_GBM = GBM_model_gs.best_params_['learning_rate']
n_estimators_GBM = GBM_model_gs.best_params_['n_estimators']

print("Learning rate:", learning_rate_GBM)
print("Number of estimators:", n_estimators_GBM)

# Extracting the cross-validation results
cv_results_GBM = GBM_model_gs.cv_results_

mean_val_scores_GBM = cv_results_GBM['mean_test_score']
std_val_scores_GBM = cv_results_GBM['std_test_score']
params_GBM = cv_results_GBM['params']

# Creating a DataFrame to store the results
GBM_Hyperparameters_results = pd.DataFrame({'Parameters': params_GBM, 'Mean Accuracy': mean_val_scores_GBM, 'Std Dev Accuracy': std_val_scores_GBM})

import ast
GBM_Hyperparameters_results['Parameters'] = GBM_Hyperparameters_results['Parameters'].apply(ast.literal_eval)

# Extracting 'learning_rate' and 'n_estimators' from 'Parameters' column
GBM_Hyperparameters_results['learning_rate'] = GBM_Hyperparameters_results['Parameters'].apply(lambda x: x['learning_rate'])
GBM_Hyperparameters_results['n_estimators'] = GBM_Hyperparameters_results['Parameters'].apply(lambda x: x['n_estimators'])

# Pivotting the dataframe to create a matrix suitable for heatmap
pivot_table = GBM_Hyperparameters_results.pivot('n_estimators', 'learning_rate', 'Mean Accuracy')

# Plotting the heatmap
plt.figure(figsize=(10, 2.25))
sns.heatmap(pivot_table, annot=True, cmap='coolwarm', fmt=".5f")
plt.title('Mean test accuracy score heatmap on validation datasets')
plt.ylabel('n_estimators')
plt.xlabel('learning_rate')
plt.show()

# GBM model

In [None]:
def GBM_classification_model(Original_samples_with_only_features, Features_training, Features_testing, landcover_training, landcover_testing, number_of_intervals, Prediction_only = False, learning_rate = 0.4, n_estimators= 500):

  GBM_model = GradientBoostingClassifier(learning_rate = learning_rate, n_estimators=n_estimators)
  cv_scores = cross_val_score(GBM_model, Features_training, landcover_training, cv=5, scoring='accuracy', n_jobs=-1)

  if Prediction_only == False:
    # Output the results
    print(f"CV Scores: {cv_scores}")
    print(f"Mean CV Score: {cv_scores.mean()}")
    print(f"Standard Deviation of CV Scores: {cv_scores.std()}")

  GBM_model.fit(Features_training, landcover_training)

  #Testing Accuracy
  landcover_testing_predicted = GBM_model.predict(Features_testing)
  landcover_testing_predicted_prob = GBM_model.predict_proba(Features_testing)
  print(landcover_testing_predicted_prob.shape)

  if Prediction_only == False:
    print("Recall score for test dataset: ", sklearn.metrics.recall_score(landcover_testing, landcover_testing_predicted, average = 'macro'))
    print("Precision score for test dataset: ", sklearn.metrics.precision_score(landcover_testing, landcover_testing_predicted, average = 'macro'))
    print("F1 score for test dataset: ", sklearn.metrics.f1_score(landcover_testing, landcover_testing_predicted, average = 'macro'))
    print("ROC-AUC score for test dataset: ", sklearn.metrics.roc_auc_score(landcover_testing, landcover_testing_predicted_prob, average = 'macro', multi_class = 'ovr'))
    print(sklearn.metrics.classification_report(landcover_testing, landcover_testing_predicted))

    roc_auc_scores = []

    # Iterating over each class
    for class_index in range(13):
        # Extracting the true labels and predicted probabilities for the current class
        y_true_class = np.array([1 if label == class_index else 0 for label in landcover_testing])
        y_pred_class = np.array([1 if label == class_index else 0 for label in np.argmax(landcover_testing_predicted_prob, axis= 1)])  # Predicted probabilities for the current class

        # Calculating the ROC-AUC score for the current class
        roc_auc_each_class = sklearn.metrics.roc_auc_score(y_true_class, y_pred_class)

        # Now, roc_auc_scores list contains the ROC-AUC score for each class
        print(f'ROC-AUC scores for class {class_index}:', roc_auc_each_class)

        # Appending the ROC-AUC score to the list
        roc_auc_scores.append(roc_auc_each_class)


  predicted_labels = np.argmax(landcover_testing_predicted_prob, axis=1)
  predicted_labels_df = pd.DataFrame(predicted_labels.reshape(-1,1), columns=['CID']).reset_index(drop = True)

  Predicted_CID_list = []

  for jj in range(int(len(predicted_labels_df) / number_of_intervals)):
    starting_ind = jj*number_of_intervals
    ending_ind = (jj+1)*number_of_intervals
    Predicted_CID = predicted_labels_df.iloc[starting_ind:ending_ind,0].value_counts().idxmax()
    Predicted_CID_list.append(Predicted_CID)

  Reshaped_Cropland_predicted_samples = pd.DataFrame(Predicted_CID_list, columns=['CID']).reset_index(drop = True)

  return Reshaped_Cropland_predicted_samples

In [None]:
#Applying GBM model and predicting class labels for the test dataset
GBM_classification_model(Original_samples_with_only_features = Samples_preprocessed_aggregated.drop(columns= ['CID','Sample_ID']), Features_training = Features_training, Features_testing = Features_testing, landcover_training = landcover_training_GBM, landcover_testing = landcover_testing_GBM, number_of_intervals= Num_img_S2)


# Assessing the impact of per-class sample sizes on the accuracy of point-based ML models

In [None]:
train_scores = []
test_scores = []

train_scores_GBM = []
test_scores_GBM = []

sample_sizes = np.append(np.arange(30,int(DF_Sentinel_CID_equal_samples.shape[0] / len(DF_Sentinel_CID_equal_samples['CID'].unique())),30), int(DF_Sentinel_CID_equal_samples.shape[0] / len(DF_Sentinel_CID_equal_samples['CID'].unique())))

for batch_size in sample_sizes:

  for each_class in DF_Sentinel_CID_equal_samples['CID'].unique():

    sampled_batch_each_class = DF_Sentinel_CID_equal_samples[DF_Sentinel_CID_equal_samples['CID'] == each_class].sample(batch_size, replace=False)

    if each_class == DF_Sentinel_CID_equal_samples['CID'].unique()[0]:
      DF_Sentinel_CID_equal_samples_batch_size = sampled_batch_each_class
    else:
      DF_Sentinel_CID_equal_samples_batch_size = pd.concat([DF_Sentinel_CID_equal_samples_batch_size, sampled_batch_each_class], axis=0, ignore_index=True)


  samples_preprocessed_batches = preprocessing_data(samples_dataframe = DF_Sentinel_CID_equal_samples_batch_size)

  Features_training, Features_testing, landcover_training, landcover_testing = Preprocessing_ML(samples_with_labels = samples_preprocessed_batches, label_column = 'CID', Sample_ID= 'Sample_ID')

  landcover_training_GBM = np.argmax(landcover_training, axis=1)
  print('landcover_training_GBM shape: ', landcover_training_GBM.shape)

  landcover_testing_GBM = np.argmax(landcover_testing, axis=1)
  print('landcover_testing_GBM shape: ', landcover_testing_GBM.shape)

  #RF model
  RF_model = RandomForestClassifier(criterion="entropy", random_state=0, bootstrap=True, max_depth= 25, min_samples_leaf = 1, n_estimators = 500)#**RF_model_gs.best_params_)
  RF_model.fit(Features_training, landcover_training)

  landcover_training_predicted = RF_model.predict(Features_training)
  landcover_testing_predicted = RF_model.predict(Features_testing)

  # Calculating accuracy scores
  train_accuracy = sklearn.metrics.f1_score(landcover_training, landcover_training_predicted, average = 'macro')
  test_accuracy = sklearn.metrics.f1_score(landcover_testing, landcover_testing_predicted, average = 'macro')

  # Appending accuracy scores to lists
  train_scores.append(train_accuracy)
  test_scores.append(test_accuracy)

  #GBM model
  GBM_model = GradientBoostingClassifier(learning_rate = 0.4, n_estimators= 500)
  GBM_model.fit(Features_training, landcover_training_GBM)

  landcover_training_predicted_GBM = GBM_model.predict(Features_training)
  landcover_testing_predicted_GBM = GBM_model.predict(Features_testing)

  # Calculating accuracy scores
  train_accuracy_GBM = sklearn.metrics.f1_score(landcover_training_GBM, landcover_training_predicted_GBM, average = 'macro')
  test_accuracy_GBM = sklearn.metrics.f1_score(landcover_testing_GBM, landcover_testing_predicted_GBM, average = 'macro')

  # Appending accuracy scores to lists
  train_scores_GBM.append(train_accuracy_GBM)
  test_scores_GBM.append(test_accuracy_GBM)


print('The highest F1-score accuracy was achieved using the RF model on the sample size: ', sample_sizes[np.argmax(np.array(test_scores))])
print('Testing F1-score accuracy value: ', np.max(np.array(test_scores)))

print('The highest F1-score accuracy was achieved using GBM model on the sample size: ', sample_sizes[np.argmax(np.array(test_scores_GBM))])
print('Testing F1-score accuracy value: ', np.max(np.array(test_scores_GBM)))

sns.set(font_scale=1)
sns.set_style("darkgrid")
plt.figure(figsize=(10, 3))
# Plotting the accuracy scores over the number of samples
plt.plot(sample_sizes, train_scores, linestyle='--', color='blue', label='RF training accuracy')
plt.plot(sample_sizes, test_scores, color = 'blue', label='RF testing accuracy')

plt.plot(sample_sizes, train_scores_GBM, linestyle='--', color='orange', label='GBM training accuracy')
plt.plot(sample_sizes, test_scores_GBM, color = 'orange', label='GBM testing accuracy')
plt.xlabel('Sample size')
plt.ylabel('F1-score value')
plt.title('F1-score accuracy of the ML models based on the sample size')
plt.xticks(sample_sizes)
plt.legend()
plt.show()