In [None]:
#@title datset class

import numpy as np
import csv

class dataset():
  def __init__(self,filename=None, row=1, col=1):
    self.data=np.zeros((row,col),dtype=np.float64)
    self.data=self.load_data(filename,row,col)

  def load_data(self, filename,row, col):
    """
      Loads data from a CSV file into a numpy array.

      Parameters:
      - filename: Name of the CSV file.
      - row: Number of rows in the data.
      - col: Number of columns in the data.

      Returns:
      - Numpy array containing the loaded data.
    """

    with open(filename, newline='') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=',')
        num=0
        for row in spamreader:
            #print(', '.join(row))
            if num==0:
              print(row)  # Print header row
              print(len(row)) # Print number of columns
            else:
              self.data[num-1,:]=np.array(row)
            num+=1
        print(num)  # Print number of rows read
    return self.data

class datasetIndex():
  def __init__(self,filename=None, row=1, col=1):
    self.index=np.zeros((row,col),dtype=np.float64)
    self.index=self.load_index(filename,row,col)

  def load_index(self, filename, row, col):
    """
      Loads index data from a CSV file into a numpy array.

      Parameters:
      - filename: Name of the CSV file.
      - row: Number of rows in the index data.
      - col: Number of columns in the index data.

      Returns:
      - Numpy array containing the loaded index data.
    """
    with open(filename, newline='') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=',')
        num=0
        for row in spamreader:
            #print(', '.join(row))
            if num==0:
              print(row)  # Print header row
              print(len(row)) # Print number of columns
            else:
              self.index[num-1,:]=np.array(row)
            num+=1
        print(num)  # Print number of rows read
    return self.index

  def get_xy(self,ind,data):
    """
      Retrieves data (x, y) based on a given index from the dataset.

      Parameters:
      - ind: Index of interest.
      - data: Raw dataset.

      Returns:
      - data_x: Input data (features).
      - data_y: Output data (labels).
    """
    raw_data=data
    select_index=self.index[:,ind]
    bool_idx = np.isin(raw_data[:, 0], select_index)
    #print(bool_idx.shape)
    selected_rows = raw_data[bool_idx]
    #print(selected_rows[0:20,0])
    #print(select_index[0:20])
    data_x=selected_rows[:,3:38]  # Adjusted range based on your dataset structure, 3-37 are the features we need
    data_y=selected_rows[:,39]  # Adjusted based on your dataset structure, 39 is the traget
    return data_x, data_y

  def get_xy2(self,ind,data):
    """
      Retrieves alternative data (x, y) based on a given index from the dataset.

      Parameters:
      - ind: Index of interest.
      - data: Raw dataset.

      Returns:
      - data_x: Input data (features).
      - data_y: Output data (labels).
    """
    raw_data=data
    select_index=self.index[:,ind]
    bool_idx = np.isin(raw_data[:, 0], select_index)
    #print(bool_idx.shape)
    selected_rows = raw_data[bool_idx]
    #print(selected_rows[0:20,0])
    #print(select_index[0:20])
    data_x=selected_rows[:,3:-2]  # Adjusted range based on your dataset structure
    data_y=selected_rows[:,-2]  # Adjusted range based on your dataset structure
    return data_x, data_y

In [None]:
#@title load post_temp.csv and pre_temp.csv
# Pre data
predata=dataset(filename='__data_allvar_pre.csv', row=1440186, col=41)
# Post data
postdata=dataset(filename='__data_allvar_post.csv', row=1440186, col=41)

# 10-fold cross-validaiton
indexdata_train=datasetIndex(filename='spatial_train_samples.csv', row=95398, col=10)
indexdata_test=datasetIndex(filename='spatial_test_samples.csv', row=23850, col=10)

['FID', 'x', 'y', 'class', 'B2', 'B3', 'B4', 'B5', 'swir1', 'swir2', 'ndvi', 'ndmi', 'savi', 'wavi', 'vv1', 'vv2', 'vv3', 'vv4', 'vv5', 'vv6', 'vv7', 'vv8', 'vv9', 'vv10', 'vv11', 'vv12', 'vh1', 'vh2', 'vh3', 'vh4', 'vh5', 'vh6', 'vh7', 'vh8', 'vh9', 'vh10', 'vh11', 'vh12', 'coh', 'ch_rfull', 'std_rfull']
41
1440186
['FID', 'x', 'y', 'class', 'B2', 'B3', 'B4', 'B5', 'swir1', 'swir2', 'ndvi', 'ndmi', 'savi', 'wavi', 'vv1', 'vv2', 'vv3', 'vv4', 'vv5', 'vv6', 'vv7', 'vv8', 'vv9', 'vv10', 'vv11', 'vv12', 'vh1', 'vh2', 'vh3', 'vh4', 'vh5', 'vh6', 'vh7', 'vh8', 'vh9', 'vh10', 'vh11', 'vh12', 'coh', 'ch_rfull', 'std_rfull']
41
1440186
['X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10']
10
95398
['X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10']
10
23850


In [None]:
# check data values
#predata.data=predata.data[:-1,:]
#postdata.data=postdata.data[:-1,:]
#indexdata_train.index=indexdata_train.index[:-1,:]
#indexdata_test.index=indexdata_test.index[:-1,:]

In [None]:
# check data values
#print(predata.data[-1,:])
#print(postdata.data[-1,:])
#print(indexdata_train.index[-1,:])
#print(indexdata_test.index[-1,:])

[ 1.440185e+06  4.687800e+05  2.860440e+06  1.000000e+00  8.207031e-02
  6.336829e-02  4.095015e-02  2.361556e-01  6.084230e-02  2.290391e-02
  7.044439e-01  5.902846e-01  3.767932e-01  3.241691e-01 -9.826913e+00
 -1.083508e+01 -1.129599e+01 -9.689360e+00 -9.132193e+00 -9.107721e+00
 -8.916616e+00 -1.013159e+01 -1.069031e+01 -9.184856e+00 -9.679404e+00
 -9.455123e+00 -1.680941e+01 -1.619661e+01 -1.538779e+01 -1.596265e+01
 -1.560736e+01 -1.577439e+01 -1.575403e+01 -1.456224e+01 -1.565089e+01
 -1.473206e+01 -1.470666e+01 -1.446797e+01  5.130670e-01           nan
           nan]
[ 1.440185e+06  4.687800e+05  2.860440e+06  1.000000e+00  9.150410e-02
  6.337730e-02  3.882900e-02  1.836100e-01  5.434728e-02  1.899646e-02
  6.508800e-01  5.432190e-01  3.006090e-01  2.414356e-01 -8.903753e+00
 -9.476945e+00 -9.102501e+00 -8.831133e+00 -8.354572e+00 -7.584333e+00
 -9.715966e+00 -1.001684e+01 -1.031588e+01 -8.940706e+00 -9.599371e+00
 -7.851721e+00 -8.903753e+00 -9.476945e+00 -9.102501e+00 -8.8

In [None]:
def relative_absolute_error(true, pred):
    """
      Calculates Relative Absolute Error (RAE) between true and predicted values.

      Parameters:
      true : array-like
          True values.
      pred : array-like
          Predicted values.

      Returns:
      float
          Relative Absolute Error (RAE).
    """
    true_mean = np.mean(true)
    squared_error_num = np.sum(np.abs(true - pred))
    squared_error_den = np.sum(np.abs(true - true_mean))
    rae_loss = squared_error_num / squared_error_den
    return rae_loss

def performance(true,pred,str=None):
    """
      Evaluates performance metrics between true and predicted values.

      Parameters:
      true : array-like
          True values.
      pred : array-like
          Predicted values.
      str : str, optional
          String to print before performance metrics (default is None).

      Prints:
      RMSE : float
          Root Mean Squared Error.
      MAE : float
          Mean Absolute Error.
      RAE : float
          Relative Absolute Error.
      Pearson's correlation : float
          Pearson's correlation coefficient squared.

      Returns:
      None
    """
  if str!=None:
    print(str)
    print("RMSE:",np.sqrt(mean_squared_error(true,pred)))
    print("MAE",mean_absolute_error(true, pred))

    print("RAE",relative_absolute_error(true,pred))
    corr, _ = pearsonr(true,pred)
    print('Pearsons correlation: %.3f' % (corr*corr))


In [None]:
# norm data experiment results perfomance
# spatial; evaluate on pre_data only
from sklearn.metrics import mean_absolute_error
from scipy.stats import pearsonr
from sklearn.preprocessing import normalize

modelselection_flag=2  #custom function to get a model

for i in range(0,10):
  print("index",i)

  # Get training and testing data using custom functions indexdata_train.get_xy2 and indexdata_test.get_xy2
  train_x,train_y=indexdata_train.get_xy2(i, predata.data)
  test_x,test_y=indexdata_test.get_xy2(i,predata.data)

  tp_train_x=train_x
  tp_test_x=test_x

  # Normalize data using max normalization along axis 0
  tp_train_x,norm_train=normalize(train_x, norm="max",axis=0,return_norm=True)
  tp_test_x,norm_test=normalize(test_x, norm="max",axis=0,return_norm=True)

  # Define and train model
  model=get_model2(modelselection_flag)
  model.fit(tp_train_x,train_y)

  # Predict on training and testing sets
  pred1=model.predict(tp_train_x)
  pred2= model.predict(tp_test_x)
  #pred3= model.predict(test2_x)

  # Evaluate performance on training and testing sets
  performance(train_y,pred1,"trainset")
  performance(test_y,pred2,"validset")

index 0
trainset
RMSE: 0.0
MAE 0.0
RAE 0.0
Pearsons correlation: 1.000
validset
RMSE: 4.056222353876377
MAE 2.9092076635079045
RAE 0.738438680908676
Pearsons correlation: 0.366
index 1
trainset
RMSE: 0.0
MAE 0.0
RAE 0.0
Pearsons correlation: 1.000
validset
RMSE: 4.209629828003817
MAE 3.0572971994632905
RAE 0.7751764254541316
Pearsons correlation: 0.318
index 2
trainset
RMSE: 9.037230827736126e-05
MAE 6.23918991163235e-07
RAE 1.5832417407014778e-07
Pearsons correlation: 1.000
validset
RMSE: 4.956674197553225
MAE 3.5924744203530543
RAE 0.9099482994395973
Pearsons correlation: 0.190
index 3
trainset
RMSE: 0.0
MAE 0.0
RAE 0.0
Pearsons correlation: 1.000
validset
RMSE: 3.3368079629950973
MAE 2.4936609755126002
RAE 0.6339011562290016
Pearsons correlation: 0.522
index 4
trainset
RMSE: 0.0
MAE 0.0
RAE 0.0
Pearsons correlation: 1.000
validset
RMSE: 4.716988860735148
MAE 3.425618594741918
RAE 0.8695627459559968
Pearsons correlation: 0.297
index 5
trainset
RMSE: 0.0
MAE 0.0
RAE 0.0
Pearsons corre

In [None]:
# norm data experiment results perfomance
# spatial; evaluate on post_data only
from sklearn.metrics import mean_absolute_error
from scipy.stats import pearsonr
from sklearn.preprocessing import normalize

modelselection_flag=2  #custom function to get a model

for i in range(0,10):
  print("index",i)

  # Get training and testing data using custom functions indexdata_train.get_xy2 and indexdata_test.get_xy2
  train_x,train_y=indexdata_train.get_xy2(i, postdata.data)
  test_x,test_y=indexdata_test.get_xy2(i,postdata.data)

  tp_train_x=train_x
  tp_test_x=test_x

  # Normalize data using max normalization along axis 0
  tp_train_x,norm_train=normalize(train_x, norm="max",axis=0,return_norm=True)
  tp_test_x,norm_test=normalize(test_x, norm="max",axis=0,return_norm=True)

  # Define and train model
  model=get_model2(modelselection_flag)
  model.fit(tp_train_x,train_y)

  # Predict on training and testing sets
  pred1=model.predict(tp_train_x)
  pred2= model.predict(tp_test_x)
  #pred3= model.predict(test2_x)

  # Evaluate performance on training and testing sets
  performance(train_y,pred1,"trainset")
  performance(test_y,pred2,"validset")

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn import tree
from sklearn.ensemble import GradientBoostingRegressor
import xgboost as xgb
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import StackingRegressor
from sklearn.neural_network import MLPRegressor
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from keras import Model, layers
from keras import optimizers
from sklearn.model_selection import train_test_split, KFold,StratifiedKFold
from sklearn.metrics import mean_squared_error,r2_score
import scipy.io as scio
from sklearn.ensemble import RandomForestRegressor
import matplotlib.pyplot as plt
import csv
import math
from numpy import savetxt
from math import sqrt

def get_model2(flag):
   """
    Function to return a regression model based on the specified flag.

    Parameters:
    flag (int): Specifies which model to return.

    Returns:
    model: Instance of the selected regression model.
  """
  if flag==1:
    model = LinearRegression()
  elif flag==2:
    model=tree.DecisionTreeRegressor(max_depth=50)
  elif flag==3:
    model=GradientBoostingRegressor(n_estimators=500)
  elif flag==4:
    model=xgb.XGBRegressor(objective="reg:squarederror",max_depth=20, n_estimators=100)
  elif flag==5:
    model=SVR()
  elif flag==6:
    # Define base models for stacking
    level0 = list()
    level0.append(('knn', KNeighborsRegressor()))
    level0.append(('cart', DecisionTreeRegressor()))
    level0.append(('svm', SVR()))
    # define meta learner model
    level1 = LinearRegression()
    # define the stacking ensemble
    model = StackingRegressor(estimators=level0, final_estimator=level1)
  elif flag==7:
    #model = RandomForestRegressor(n_estimators=10,max_depth=10)
    model = RandomForestRegressor()
  elif flag==8:
    model = MLPRegressor(max_iter=5000)
  else:
        raise ValueError("Invalid flag. Please choose a flag from 1 to 8.")
  return model