# Support Vector Machine Classifier implementation
Created for the bachelor thesis of Jakub Dvorak (2020) | jakub.dvorak@natur.cuni.cz<br>
Department of Applied Geoinformatics and Cartography, Faculty of Science, Charles University

Adapted from EduServ18 (2020): 3D Sensing, Scene Reconstruction and Semantic Interpretation<br>
The original code was created by Martin Weinmann, Franz Rottensteiner and Dennis Wittich

__imports__ and __settings__ for this lab:

In [None]:
# IMPORTS
import os
import numpy as np
import time
import matplotlib
import matplotlib.pyplot as plt
import imageio
from sklearn.svm import SVC as SupportVectorMachine
from sklearn.model_selection import GridSearchCV as GridSearch
from sklearn.model_selection import RandomizedSearchCV as RandomizedSearch
import scipy.stats

# GLOBAL SETTINGS
PlotSize = 8                                     # Size of plots
matplotlib.rcParams['figure.figsize'] = [PlotSize*2, PlotSize]  
CMAP = matplotlib.colors.ListedColormap(['black', 'white', 'orange'])               # Color mapping 
np.set_printoptions(precision=2, suppress=True)  # Array print precision

# CLASS AND FEATURE DESCRIPTION
class_names = ['PICEA','PINUS','BACKGRD']
feature_names = ['NIR','RED','GREEN','NDVI']
num_classes = len(class_names); num_features = len(feature_names)

# PATHS TO TRAIN/TEST DATA
data_path = '../data/66_33/'
training_set_path = data_path + 'train/'         # Relative path to training patch root folder
test_set_path =     data_path + 'test/'         # Relative path to test patch root folder

num_of_training_tiles = len(os.listdir(training_set_path + 'CIR/'))
num_of_test_tiles = len(os.listdir(test_set_path + 'CIR/'))

# USE CIR OR RGB DATA
use_cir = True
use_rgb = True
patch_size = 512

## Data import and handling
Following function reads input data and creates np arrays out of it

In [None]:
def read_patch(root_folder, cir, rgb):
    ##########################################################
    # READ IMAGES as FLOAT
    if cir:
        cir_file_list = os.listdir(root_folder + 'CIR/')
        cir_list = []
        
        for file in cir_file_list:
            cir_patch = imageio.imread(root_folder + 'CIR/' + file).astype(np.float32)
            cir_patch = cir_patch * 1/255
            
            h, w = cir_patch.shape[:2]
            num_samples = h*w
            cir_list.append(cir_patch[:,:,:].reshape((num_samples, 3)))
            del cir_patch

        cir_features = np.concatenate(cir_list, axis=0)
    
    if rgb:
        rgb_file_list = os.listdir(root_folder + 'RGB/')
        rgb_list = []
        
        for file in rgb_file_list:
            rgb_patch = imageio.imread(root_folder + 'RGB/' + file).astype(np.float32)
            rgb_patch = rgb_patch * 1/255
            
            h, w = rgb_patch.shape[:2]
            num_samples = h*w
            rgb_list.append(rgb_patch[:,:,:].reshape((num_samples, 3)))
            del rgb_patch
        
        rgb_features = np.concatenate(rgb_list, axis=0)


    if cir and rgb:
        features = np.concatenate([cir_features, rgb_features], axis=1)
    elif cir:
        features = cir_features
    elif rgb:
        features = rgb_features
    else:
        print('No valid data input.')


    gt_file_list = os.listdir(root_folder + 'GT/')
    gt_list = []

    for file in gt_file_list:
        gt_patch = imageio.imread(root_folder + 'GT/' + file).astype(np.float32)
 
        h, w = gt_patch.shape[:2]
        num_samples = h*w
    
        gt_list.append(gt_patch[:,:].reshape((num_samples)))
        del gt_patch

    ground_truth = np.concatenate(gt_list, axis=0)

    ########################################################## 
    return features, ground_truth

In [None]:
a = time.time()
X, y = read_patch(training_set_path, use_cir, use_rgb)
b = time.time()
X_t, y_t = read_patch(test_set_path, use_cir, use_rgb)
c = time.time()

print('Training set loaded in ' + str(b-a) + 's')
print('Testing set loaded in ' + str(c-b) + 's')
print(X_t.shape)
print(y.shape)
print(X.max())
print(X.min())

## Visualisation function
The following function visualises IRRG, ground truth and predicted labels

In [None]:
def plot_rgb_cir_gt_pred(tile_index, data, gt, pred, num_of_tiles, cir, rgb, size):
    # Function to plot prediction vs ground truth
    
    # Split the datasets into tiles
    data_list = np.split(data, num_of_tiles)
    gt_list =   np.split(gt,   num_of_tiles)
    pred_list = np.split(pred, num_of_tiles)
    
    # Select tile by tile_index and reshape to original dimensions
    data = data_list[tile_index].reshape((size,size,data.shape[1]))
    gt   = gt_list[tile_index].reshape((size,size))
    pred = pred_list[tile_index].reshape((size,size))
    
    # Plotting
    plt.figure(facecolor='white')
    
    if cir and rgb:
        plt.subplot(1, 4, 1)
        plt.imshow(data[:,:,:3])
        plt.title('NIR Red Green composite')
        plt.axis('off')

        plt.subplot(1, 4, 2)
        plt.imshow(data[:,:,-3:])
        plt.title('Red Green Blue composite')
        plt.axis('off')
        
        plt.subplot(1, 4, 3)
        plt.imshow(gt, CMAP)
        plt.title('GT Labels')
        plt.axis('off')

        plt.subplot(1, 4, 4)
        plt.imshow(pred, CMAP)
        plt.title('Predicted Labels')
        plt.axis('off')
    
    elif cir or rgb:
        plt.subplot(1, 3, 1)
        plt.imshow(data)
        if cir:
            plt.title('NIR Red Green composite')
        else:
            plt.title('Red Green Blue composite')
        plt.axis('off')

        plt.subplot(1, 3, 2)
        plt.imshow(gt, CMAP)
        plt.title('GT Labels')
        plt.axis('off')

        plt.subplot(1, 3, 3)
        plt.imshow(pred, CMAP)
        plt.title('Predicted Labels')
        plt.axis('off')

In [None]:
#Test of the created visualisation function
plot_rgb_cir_gt_pred(3, X, y, y, num_of_training_tiles, use_cir, use_rgb, patch_size)

## Accuracy metrics
The following function returns accuracy metrics, namely overall accuracy, precision, recall and f1 score

In [None]:
def compute_quality_metrics(Y, y, C):
    # Copy the code from the last lab here
    ##########################################################
    M = np.equal(Y,y).astype(np.int)
    TP = np.array([np.sum( M * (Y==i).astype(np.int)) for i in range(C)])
    FP = np.array([np.sum( (1-M) * (Y==i)) for i in range(C)])
    FN = np.array([np.sum( (1-M) * (y==i)) for i in range(C)])

    precisions = TP/(TP+FP)
    recalls = TP/(TP+FN)
    f1_scores = 2 * precisions * recalls / (precisions + recalls)
    overall_accuracy = np.sum(Y==y) / len(y)
    mean_f1_score = np.mean(f1_scores)
    return precisions, recalls, f1_scores, overall_accuracy, mean_f1_score, TP, FP, FN
    ##########################################################

## Classifier #2
using uniformly distributed training data

In [None]:
def get_random_training_subset(X, y, N_s):
    ##########################################################
    Xy = np.hstack((X,y.reshape((-1, 1))))
    np.random.shuffle(Xy)
    
    X_s = Xy[:N_s, :-1]
    y_s = Xy[:N_s, -1]        
    ##########################################################
    return X_s, y_s

In [None]:
def get_uniform_training_subset(X, y, N_s, num_classes=3):
    ##########################################################
    N, F = X.shape
    counts = np.zeros(num_classes)
    num_drawn = 0
    X_s = np.zeros((N_s, F), dtype=np.float32)
    y_s = np.zeros((N_s,), dtype=np.float32)
    
    while(num_drawn < N_s):
        i = np.random.randint(N)
        yi = y[i]
        if yi == np.argmin(counts):
            counts[int(yi)] += 1
            X_s[num_drawn] = X[i]
            y_s[num_drawn] = yi
            num_drawn += 1
    ##########################################################
    return X_s, y_s

In [None]:
N_s = 3000
#X_rs, y_rs = get_random_training_subset(X, y, N_s)
a = time.time()
X_es, y_es = get_uniform_training_subset(X, y, N_s, 3)
b = time.time()
print(str(b-a))

plt.subplot(1,2,1)
plt.hist(y_es,bins=range(6), rwidth=0.9, align='left')
plt.title('random sampling')
plt.subplot(1,2,2)
plt.hist(y_es,bins=range(6), rwidth=0.9, align='left')
plt.title('uniformly dist.')
plt.show()

In [None]:
# Fixed parameters for the SVM
fixed_parameters_svm = {
    'kernel' : 'rbf',                        # Define the kernel function
    'cache_size' : 40000,
    }

# Tunable parameters for the SVM
tunable_parameters_svm = { 
    'C': [1e0,1e1,1e2,1e3,1e4], # scipy.stats.expon([1]),             # Define the penalty value
    'gamma': [0.1,1e0,10,100] #scipy.stats.expon(scale=.1)     # Kernel parameter
}

# Create an instance of the model that is to be optimized
model = SupportVectorMachine(**fixed_parameters_svm)

# Create the optimizer and run the optimization
opt = GridSearch(model, tunable_parameters_svm, cv = 3, scoring="f1_macro", refit=False, verbose=1, n_jobs=-1)
opt.fit(X_es, y_es) #X_es, y_es)

# Save and print optimal parameters
opt_parameters_svm = opt.best_params_

print("Best found parameters:", opt_parameters_svm)

In [None]:
# Generate the SVM classifier
parameters_svm = {
    'kernel' : 'rbf',                        # Define the kernel function
    'cache_size' : 40000,
    'C' : 1e4,
    'gamma' : 1
}

svm_r = SupportVectorMachine(**parameters_svm)

# Train the classifier
a = time.time()
svm_r.fit(X_es, y_es)
b = time.time()

print('Training finished in ' + str(b-a) + 's')

In [None]:
# Perform prediction
jobs = 388
Y_t = np.empty_like(y_t)
Y_t_list = np.split(Y_t, jobs)
X_t_list = np.split(X_t, jobs)

b = time.time()

print(Y_t_list[0].shape)
print(X_t_list[0].shape)
for i in range(jobs):
    Y_t_list[i][:] = svm_r.predict(X_t_list[i][:])
    print('Processed tile # ' + str(i))
c = time.time()

print('Inferrence finished in ' + str(c-b) + 's')

In [None]:
precisions, recalls, f1_scores, overall_accuracy, mean_f1_score,TrueP, FalseP, FalseN = compute_quality_metrics(Y_t, y_t, 3)
print('precisions [%]:      ', precisions*100)
print('recalls    [%]:      ', recalls*100)
print('f1_scores  [%]:      ', f1_scores*100)
print('')
print('overall accuracy: {:.2%}'.format(overall_accuracy))
print('mean f1 score:    {:.2%}'.format(mean_f1_score))
print('True Positive:' + str(TrueP) + '\nFalse Positive:' + str(FalseP) + '\nFalse Negative:' + str(FalseN))

In [None]:
plot_rgb_cir_gt_pred(18, X_t, y_t, Y_t, num_of_test_tiles, use_cir, use_rgb, patch_size)

In [None]:
np.savetxt(data_path + 'svm.csv', Y_t, fmt='%d')

In [None]:
data_path = '../data/33_66/'
training_set_path = data_path + 'train/'         # Relative path to training patch root folder
test_set_path =     data_path + 'test/'         # Relative path to test patch root folder

num_of_training_tiles = len(os.listdir(training_set_path + 'CIR/'))
num_of_test_tiles = len(os.listdir(test_set_path + 'CIR/'))

a = time.time()
X, y = read_patch(training_set_path, use_cir, use_rgb)
b = time.time()
X_t, y_t = read_patch(test_set_path, use_cir, use_rgb)
c = time.time()

print('Training set loaded in ' + str(b-a) + 's')
print('Testing set loaded in ' + str(c-b) + 's')

X_es, y_es = get_uniform_training_subset(X, y, N_s, 3)

# Generate the SVM classifier
parameters_svm = {
    'kernel' : 'rbf',                        # Define the kernel function
    'cache_size' : 40000,
    'C' : 1e4,
    'gamma' : 1
}

svm_r = SupportVectorMachine(**parameters_svm)

# Train the classifier
a = time.time()
svm_r.fit(X_es, y_es)
b = time.time()

print('Training finished in ' + str(b-a) + 's')

# Perform prediction
jobs = num_of_test_tiles
Y_t = np.empty_like(y_t)
Y_t_list = np.split(Y_t, jobs)
X_t_list = np.split(X_t, jobs)

b = time.time()

print(Y_t_list[0].shape)
print(X_t_list[0].shape)
for i in range(jobs):
    Y_t_list[i][:] = svm_r.predict(X_t_list[i][:])
    print('Processed tile # ' + str(i))
c = time.time()

print('Inferrence finished in ' + str(c-b) + 's')
precisions, recalls, f1_scores, overall_accuracy, mean_f1_score,TrueP, FalseP, FalseN = compute_quality_metrics(Y_t, y_t, 3)
print('precisions [%]:      ', precisions*100)
print('recalls    [%]:      ', recalls*100)
print('f1_scores  [%]:      ', f1_scores*100)
print('')
print('overall accuracy: {:.2%}'.format(overall_accuracy))
print('mean f1 score:    {:.2%}'.format(mean_f1_score))
print('True Positive:' + str(TrueP) + '\nFalse Positive:' + str(FalseP) + '\nFalse Negative:' + str(FalseN))

plot_rgb_cir_gt_pred(18, X_t, y_t, Y_t, num_of_test_tiles, use_cir, use_rgb, patch_size)

np.savetxt(data_path + 'svm.csv', Y_t, fmt='%d')

In [None]:
data_path = '../data/01_99/'
training_set_path = data_path + 'train/'         # Relative path to training patch root folder
test_set_path =     data_path + 'test/'         # Relative path to test patch root folder

num_of_training_tiles = len(os.listdir(training_set_path + 'CIR/'))
num_of_test_tiles = len(os.listdir(test_set_path + 'CIR/'))

a = time.time()
X, y = read_patch(training_set_path, use_cir, use_rgb)
b = time.time()
X_t, y_t = read_patch(test_set_path, use_cir, use_rgb)
c = time.time()

print('Training set loaded in ' + str(b-a) + 's')
print('Testing set loaded in ' + str(c-b) + 's')

X_es, y_es = get_uniform_training_subset(X, y, N_s, 3)

# Generate the SVM classifier
parameters_svm = {
    'kernel' : 'rbf',                        # Define the kernel function
    'cache_size' : 40000,
    'C' : 1e4,
    'gamma' : 1
}

svm_r = SupportVectorMachine(**parameters_svm)

# Train the classifier
a = time.time()
svm_r.fit(X_es, y_es)
b = time.time()

print('Training finished in ' + str(b-a) + 's')

# Perform prediction
jobs = num_of_test_tiles
Y_t = np.empty_like(y_t)
Y_t_list = np.split(Y_t, jobs)
X_t_list = np.split(X_t, jobs)

b = time.time()

print(Y_t_list[0].shape)
print(X_t_list[0].shape)
for i in range(jobs):
    Y_t_list[i][:] = svm_r.predict(X_t_list[i][:])
    print('Processed tile # ' + str(i))
c = time.time()

print('Inferrence finished in ' + str(c-b) + 's')
precisions, recalls, f1_scores, overall_accuracy, mean_f1_score,TrueP, FalseP, FalseN = compute_quality_metrics(Y_t, y_t, 3)
print('precisions [%]:      ', precisions*100)
print('recalls    [%]:      ', recalls*100)
print('f1_scores  [%]:      ', f1_scores*100)
print('')
print('overall accuracy: {:.2%}'.format(overall_accuracy))
print('mean f1 score:    {:.2%}'.format(mean_f1_score))
print('True Positive:' + str(TrueP) + '\nFalse Positive:' + str(FalseP) + '\nFalse Negative:' + str(FalseN))

plot_rgb_cir_gt_pred(18, X_t, y_t, Y_t, num_of_test_tiles, use_cir, use_rgb, patch_size)

np.savetxt(data_path + 'svm.csv', Y_t, fmt='%d')