In [None]:
import os
from terrain_classification.svm_classification.train_svm import SVMClassification
# from terrain_classification.svm_classification.predict_svm import PredictSVM
import numpy as np
from sklearn.metrics import ConfusionMatrixDisplay

In [None]:

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score


In [None]:
parent_dir = os.path.abspath(os.path.join(os.getcwd(), "terrain_classification"))

def create_label_dict(files:dict=None, parent_directory:str=None):
    if parent_directory == None:
        parent_directory = os.path.abspath(os.path.join(os.getcwd(), "terrain_classification"))

    label_files = {}
    for key_ in files.keys():
        for file_ in files[key_]:
            label_files[os.path.join(parent_directory, 'data/force_torque', key_, file_)] = key_

    return label_files

train_files = ['1', '2', '3', '4', '5', '6', ]
validate_files = ['7', '8']
# train_files = ['1', '2', '3', '4',]
# validate_files = [ '5', '6', '7', '8']

training_file_list = [f"trial{file_no}.csv" for file_no in train_files]
validation_file_list = [f"trial{file_no}.csv" for file_no in validate_files]

files_to_train = {
    "sand": training_file_list,
    "wood": training_file_list,
    # "concrete": train_files,
    "gravel": training_file_list,
    "clay": training_file_list,
}
files_to_validate = {
    "sand": validation_file_list,
    "wood": validation_file_list,
    # "concrete": validation_file_list,
    "gravel": validation_file_list,
    "clay": validation_file_list,
}

train_data_labels = create_label_dict(files_to_train, parent_dir)
validation_data_labels = create_label_dict(files_to_validate, parent_dir)

In [None]:
def adjust_same_length(features_, labels_):
    """
    Adjusts the length of the data arrays to be the same by padding with zeros.
    """
    if len(labels_) == 0:
        print("Cannot adjust since the length is zero")
        return None, None, False
    unique_labels, label_counts = np.unique(labels_, return_counts=True)

    min_count = min(label_counts)
    #extract index of all unique labels
    indices = []
    for label in unique_labels:
        id_ = np.where(labels_ == label)[0]
        if len(id_) >= min_count:
            indices.extend(id_[:min_count])
        else:
            print(f"Warning: Not enough samples for label '{label}'. Found {len(id_)} samples.")
    return features_[indices], labels_[indices], True

In [None]:
# Create an instance of the SVMClassification class
svm_classifier = SVMClassification(train_data_labels)

# Load the data
# components = ['x', 'z']
# components = ['x', 'z', 'calf', 'tigh', 'hip']
components = [
         ['z'],
        #  ['x','z'],
        #  ['x','z', 'tigh'],
        #  ['x','z', 'hip'],
        #  ['x','z', 'calf', 'tigh'],
        #  ['x','z', 'calf', 'tigh', 'hip']
        ]

cmp = {
    'fl-contact': [f'fl-{comp}' for comp in components[0]],
    'fr-contact': [f'fr-{comp}' for comp in components[0]],
    'rl-contact': [f'rl-{comp}' for comp in components[0]],
    'rr-contact': [f'rr-{comp}' for comp in components[0]],
}
comb_legs = False #currently combine legs logic will not work
norm_data = True
use_org_sig = True
aug_param = {'wavelet': None,}
# aug_param = {'wavelet': None, 'derivative':None,}
# aug_param = {'wavelet': None, 'derivative':None, 'fft':None}
# aug_param = {'correlation': None, 'derivative':None,}
# aug_param = {'wavelet': None,}
# aug_param = {}
svm_classifier.create_feature_matrix_and_label(normalize_data=norm_data,
                            components=cmp,
                            combine_legs = comb_legs,
                            data_padding_size=80,
                            augmetation_params=aug_param,
                            use_original_signal=use_org_sig,
                            )
unfiltered_train_fm = svm_classifier.feature_matrix.copy()
unfiltered_train_labels = svm_classifier.labels.copy()
svm_classifier.feature_matrix, svm_classifier.labels, success_ = adjust_same_length(svm_classifier.feature_matrix, svm_classifier.labels)

if not success_:
    raise AttributeError("Feature matrix not computed")

unique_labels, label_counts = np.unique(svm_classifier.labels, return_counts=True)
for label, count in zip(unique_labels, label_counts):
    print(f"Label {label}: {count} samples")

print(f"ratio {max(label_counts)/min(label_counts)}")
print(f"feature matrix shape: {svm_classifier.feature_matrix.shape}")
print(f"labels shape: {svm_classifier.labels.shape}")
print(f"original data shape: {np.array(svm_classifier.original_data).shape}")

In [None]:
#train with sklearn 
#using python 3.8
report_dir = os.path.join(parent_dir, "svm_classification")
C = [1e3, 1e2, 1e1, 1]
gamma = [ 1e-4, 1e-2, 1e-3,]
# C = [1, 1e1, 1e2]
# gamma = [0.1, 1, 10]
C = [1000]
gamma = [0.0001]
svm_classifier.train_classifier(C=C, gamma=gamma, find_best_parameters=True, 
                                save_model=False, model_file_name="3class_model", 
                                save_report=False, file_path=None)

print("Training completed.")
print("Report: \n")
# print(svm_classifier.report)
svm_classifier.print_classification_report()


In [None]:

validation_classifier = SVMClassification(validation_data_labels)
validation_classifier.create_feature_matrix_and_label(normalize_data=norm_data,
                            components=cmp,
                            combine_legs = comb_legs,
                            data_padding_size=80,
                            augmetation_params=aug_param,
                            use_original_signal=use_org_sig,
                            )
org_validation_fm = svm_classifier.scaler.transform(validation_classifier.feature_matrix.copy())
org_validation_labels = validation_classifier.labels.copy()
validation_classifier.feature_matrix, validation_classifier.labels, success_ = adjust_same_length(validation_classifier.feature_matrix, validation_classifier.labels)

if not success_:
    raise AttributeError("Feature matrix not computed")

print(f"validation feature matrix shape: {validation_classifier.feature_matrix.shape}")
# unique_labels, label_counts = np.unique(validation_classifier.labels, return_counts=True)
# for label, count in zip(unique_labels, label_counts):
#     print(f"Label {label}: {count} samples")

# print(f"ratio {max(label_counts)/min(label_counts)}")

# scaler = StandardScaler()
# features_ = scaler.fit_transform(validation_classifier.feature_matrix)
features_ = svm_classifier.scaler.transform(validation_classifier.feature_matrix)
# features_ = validation_classifier.feature_matrix
true_labels_ = validation_classifier.labels
predictions_ = svm_classifier.latest_model.predict(features_)
# Print the classification report
print("Classification Report:")
print(classification_report(true_labels_, predictions_))
cm = ConfusionMatrixDisplay.from_predictions(true_labels_, predictions_, display_labels=svm_classifier.latest_model.classes_, cmap='Blues')

# from sklearn.metrics import fbeta_score
# f05 = fbeta_score(true_labels_, predictions_, beta=0.5, average=None)
# print(f"F-beta score (beta=0.5): {f05}")

In [None]:
# Testing MLP
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report

def convert_labels(labels):
    # Convert string labels to integers
    label_map = {}
    label_map['sand'] = 0
    label_map['gravel'] = 1
    label_map['wood'] = 2
    label_map['clay'] = 3
    # label_map['concrete'] = 4
    return np.array([label_map[label] for label in labels], dtype=int)

# Define MLP model
class MLP(nn.Module):
    def __init__(self, input_size, hidden_layers, output_size):
        """
        Args:
            input_size (int): Number of input features.
            hidden_layers (list): List of integers specifying the number of neurons in each hidden layer.
            output_size (int): Number of output classes.
        """
        super(MLP, self).__init__()
        layers = []
        prev_size = input_size

        # Add hidden layers dynamically
        for i, hidden_size in enumerate(hidden_layers):
            layers.append(nn.Linear(prev_size, hidden_size))
            # layers.append(nn.BatchNorm1d(hidden_size))  # Add batch normalization
            layers.append(nn.ReLU())  # You can replace ReLU with other activation functions
            layers.append(nn.Dropout(0.1))  # Optional dropout
            prev_size = hidden_size

        # Add the output layer
        layers.append(nn.Linear(prev_size, output_size))

        # Create the sequential model
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        BCE_loss = nn.CrossEntropyLoss(reduction='none')(inputs, targets)
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss

        if self.reduction == 'mean':
            return F_loss.mean()
        elif self.reduction == 'sum':
            return F_loss.sum()
        else:
            return F_loss

def train_MLP(train_features, train_labels, validation_features, validation_labels, print_confusion_matrix=False):
    # Example usage
    X = train_features
    # print(f"X shape: {X.shape}")
    y = convert_labels(train_labels)

    op_class, label_counts = np.unique(y, return_counts=True)
    # ul, lc = np.unique(y, return_counts=True)
    # for label, count in zip(ul, lc):
    #     print(f"Label {label}: {count} samples")

    # Preprocess
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    X_validation = scaler.transform(validation_features)
    # Convert to tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)
    X_validation_tensor = torch.tensor(X_validation, dtype=torch.float32)
    y_validation_tensor = torch.tensor(convert_labels(validation_labels), dtype=torch.long)

    input_size = X.shape[1]
    hidden_layers = [256, 128, 64]  # Three hidden layers
    # hidden_layers = [512, 256, 128, 64, 16]  # Three hidden layers
    output_size = len(op_class)

    model = MLP(input_size, hidden_layers, output_size)
    # Training setup
    criterion = nn.CrossEntropyLoss()
    # criterion = FocalLoss(alpha=1, gamma=2.0)
    # optimizer = optim.Adam(model.parameters(), lr=0.001)
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)

    epochs = int(1000*1.5)

    # Training loop
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(X_train_tensor)
        loss = criterion(outputs, y_train_tensor)
        loss.backward()
        optimizer.step()
        scheduler.step()
        # if epoch % 500 == 0:
        #     print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

    # Evaluation
    model.eval()
    with torch.no_grad():
        test_preds = model(X_test_tensor).argmax(dim=1)
        valid_preds = model(X_validation_tensor).argmax(dim=1)
        
        test_score = accuracy_score(y_test_tensor, test_preds)
        validation_score = accuracy_score(y_validation_tensor, valid_preds)
        print(f"Test score: {test_score:.2f} Validation score: {validation_score:.2f}")
        
        if print_confusion_matrix and validation_score >= 0.5:
            # ConfusionMatrixDisplay.from_predictions(y_test_tensor, test_preds, display_labels=op_class, cmap='Blues')
            print(classification_report(y_validation_tensor, valid_preds))
            ConfusionMatrixDisplay.from_predictions(y_validation_tensor, valid_preds, display_labels=['sand', 'gravel', 'wood', 'clay'], cmap='Blues')
    return validation_score

In [None]:
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.optim as optim

class TerrainDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]
    
# filepath: /Users/brolin/Documents/RWTH/terrain_property_prediction/src/training_workbook.ipynb
def train_MLP_with_batches(train_features, train_labels, validation_features, validation_labels, 
                          batch_size=32, print_confusion_matrix=False):
    # Convert labels
    X = train_features
    y = convert_labels(train_labels)
    
    op_class, label_counts = np.unique(y, return_counts=True)
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
    
    # Scale features
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    X_validation = scaler.transform(validation_features)
    
    # Convert validation labels
    y_validation = convert_labels(validation_labels)
    
    # Create datasets
    train_dataset = TerrainDataset(X_train, y_train)
    test_dataset = TerrainDataset(X_test, y_test)
    validation_dataset = TerrainDataset(X_validation, y_validation)
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
    
    # Model setup
    input_size = X.shape[1]
    hidden_layers = [256, 128, 64]
    output_size = len(op_class)
    
    model = MLP(input_size, hidden_layers, output_size)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)
    
    epochs = 1500  # Reduced epochs for batch learning
    
    # Training loop with batches
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        total_samples = 0
        
        for batch_features, batch_labels in train_loader:
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(batch_features)
            loss = criterion(outputs, batch_labels)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            # Statistics
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_samples += batch_labels.size(0)
            correct_predictions += (predicted == batch_labels).sum().item()
        
        scheduler.step()
        
        # Calculate epoch metrics
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct_predictions / total_samples
        
        # Validation every 50 epochs
        if epoch % 50 == 0:
            val_loss, val_acc = evaluate_model(model, validation_loader, criterion)
            # print(f"Epoch {epoch}: Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, "
            #       f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    # Final evaluation
    test_loss, test_acc = evaluate_model(model, test_loader, criterion)
    val_loss, val_acc = evaluate_model(model, validation_loader, criterion)
    
    print(f"Final - Test Acc: {test_acc:.2f}, Validation Acc: {val_acc:.2f}")
    
    if print_confusion_matrix:
        # Generate confusion matrices
        test_preds, test_true = get_predictions(model, test_loader)
        val_preds, val_true = get_predictions(model, validation_loader)
        
        ConfusionMatrixDisplay.from_predictions(test_true, test_preds, display_labels=op_class, cmap='Blues')
        ConfusionMatrixDisplay.from_predictions(val_true, val_preds, display_labels=op_class, cmap='Blues')

def evaluate_model(model, data_loader, criterion):
    """Evaluate model on given data loader"""
    model.eval()
    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    
    with torch.no_grad():
        for batch_features, batch_labels in data_loader:
            outputs = model(batch_features)
            loss = criterion(outputs, batch_labels)
            
            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_samples += batch_labels.size(0)
            correct_predictions += (predicted == batch_labels).sum().item()
    
    avg_loss = total_loss / len(data_loader)
    accuracy = correct_predictions / total_samples
    
    return avg_loss, accuracy

def get_predictions(model, data_loader):
    """Get all predictions and true labels from data loader"""
    model.eval()
    all_preds = []
    all_true = []
    
    with torch.no_grad():
        for batch_features, batch_labels in data_loader:
            outputs = model(batch_features)
            _, predicted = torch.max(outputs.data, 1)
            
            all_preds.extend(predicted.cpu().numpy())
            all_true.extend(batch_labels.cpu().numpy())
    
    return all_preds, all_true

In [None]:
#Testing MLP with different components and augmentations
#if you want only one component, you can set the components variable to a single component
norm_data = [True]
comp_ = [
        #  ['z'],
        #  ['x','z'],
        #  ['x','z', 'tigh'],
        #  ['x','z', 'hip'],
        #  ['x','z', 'calf', 'tigh'],
         ['x','z', 'calf', 'tigh', 'hip']
        ]

augumentations = [
                # {'wavelet': None,},
                #  {'wavelet': None, 'derivative':None,},
                 {'wavelet': None, 'derivative':None, 'fft': None},
                #  {'correlation': None,},
                #  {'wavelet':None, 'correlation': None, },
                ]

for aug_param in augumentations:
    print(f"-----\nAugmentations: {aug_param}\n-----")
    for cmp_ in comp_:
        print(f"Components: {cmp_}")
        for nd in norm_data:

            test_comp = {
                'fl-contact': [f'fl-{comp}' for comp in cmp_],
                'fr-contact': [f'fr-{comp}' for comp in cmp_],
                'rl-contact': [f'rl-{comp}' for comp in cmp_],
                'rr-contact': [f'rr-{comp}' for comp in cmp_],
            }
            train_classifier = SVMClassification(train_data_labels)
            train_classifier.create_feature_matrix_and_label(normalize_data=norm_data,
                            components=test_comp,
                            combine_legs = False,
                            data_padding_size=80,
                            augmetation_params=aug_param,
                            use_original_signal=True,
                            )

            validation_classifier = SVMClassification(validation_data_labels)
            validation_classifier.create_feature_matrix_and_label(normalize_data=norm_data,
                            components=test_comp,
                            combine_legs = False,
                            data_padding_size=80,
                            augmetation_params=aug_param,
                            use_original_signal=True,
                            )
            val_score = 0.0
            counter=0
            while (val_score < 0.5):
                val_score = train_MLP(train_classifier.feature_matrix, train_classifier.labels,
                                      validation_classifier.feature_matrix, validation_classifier.labels, print_confusion_matrix=True)
                counter += 1
                # train_MLP_with_batches(train_classifier.feature_matrix, train_classifier.labels,
                #                       validation_classifier.feature_matrix, validation_classifier.labels, print_confusion_matrix=True)

            print(f"Number of attempts to reach validation score of 0.5: {counter}")


In [None]:
#evaluate multiple parameter combination for SVM Classifier 

norm_data = [True]
comp_ = [
         ['z'],
         ['x','z'],
         ['x','z', 'tigh'],
         ['x','z', 'hip'],
         ['x','z', 'calf', 'tigh'],
         ['x','z', 'calf', 'tigh', 'hip']
        ]
# comp_ = [['z'],
#         #  ['z', 'tigh'],
#         #  ['x','z', 'hip'],
#          ['z', 'calf'],
#          ['z', 'hip'],
#          ['x','z', 'hip']
#         ]

# aug_param = {'wavelet': None,}
augumentations = [
                {'wavelet': None,},
                 {'wavelet': None, 'derivative':None,},
                 {'wavelet': None, 'derivative':None, },
                 {'wavelet': None, 'derivative':None, 'fft': None},
                #  {'correlation': None,},
                #  {'wavelet':None, 'correlation': None, },
                ]
# params = [0.1, 25,]
# augment = False

for aug_param in augumentations:
    print(f"\nAugmentations: {aug_param}\n-----")
    for cmp_ in comp_:
        for nd in norm_data:

            test_comp = {
                'fl-contact': [f'fl-{comp}' for comp in cmp_],
                'fr-contact': [f'fr-{comp}' for comp in cmp_],
                'rl-contact': [f'rl-{comp}' for comp in cmp_],
                'rr-contact': [f'rr-{comp}' for comp in cmp_],
            }
            svm_classifier = SVMClassification(train_data_labels)
            svm_classifier.create_feature_matrix_and_label(normalize_data=norm_data,
                            components=test_comp,
                            combine_legs = False,
                            data_padding_size=80,
                            augmetation_params=aug_param,
                            use_original_signal=True,
                            )
            
            svm_classifier.feature_matrix, svm_classifier.labels, success_ = adjust_same_length(svm_classifier.feature_matrix, svm_classifier.labels)
            
            C = [1e3, 1e2, 1e1, 1]
            gamma = [ 1e-4, 1e-2, 1e-3,]
            
            svm_classifier.train_classifier(C=C, gamma=gamma, find_best_parameters=True, 
                                            save_model=False, model_file_name="3class_model", 
                                            save_report=False, file_path=None)
            
            validation_classifier = SVMClassification(validation_data_labels)
            validation_classifier.create_feature_matrix_and_label(normalize_data=norm_data,
                            components=test_comp,
                            combine_legs = False,
                            data_padding_size=80,
                            augmetation_params=aug_param,
                            use_original_signal=True,
                            )
            validation_classifier.feature_matrix, validation_classifier.labels, success_ = adjust_same_length(validation_classifier.feature_matrix, validation_classifier.labels)
            features_ = svm_classifier.scaler.transform(validation_classifier.feature_matrix)            
            true_labels_ = validation_classifier.labels
            predictions_ = svm_classifier.latest_model.predict(features_)
            validation_score_ = accuracy_score(true_labels_, predictions_)
            # print(f"norm_data: {nd} | comps: {cmp_} | train_score: {svm_classifier.classification_report['report']['accuracy']:.2f} "
            #       f"val_score: {validation_score_:.2f} | feature_shape: {svm_classifier.feature_matrix.shape}") 
            print(f"comps: {cmp_} | train_score: {svm_classifier.classification_report['report']['accuracy']:.2f} "
                  f"val_score: {validation_score_:.2f} ") 
                

In [None]:
# from terrain_classification.svm_classification.predict_svm import SVMPredictor
# from terrain_classification.svm_classification.train_svm import SVMClassification
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import classification_report
# from sklearn.preprocessing import StandardScaler
# import numpy as np
# import os
# from joblib import load
# # Create an instance of the PredictSVM class
# parent_dir = os.path.abspath(os.path.join(os.getcwd(), "terrain_classification"))
# file_name = "3class_model"
# # file_name = "gravel_concrete_model"
# model_file = os.path.join(parent_dir, "svm_classification/models", f"{file_name}.joblib")
# scaler_path = os.path.join(parent_dir, "svm_classification/models", f"{file_name}-scaler.pkl")
# # scaler = load(scaler_path)
# predictor = SVMPredictor(model_path=model_file)
# two_class_model = load(os.path.join(parent_dir, "svm_classification/models", "2class_model.joblib"))

In [None]:
# #plot samples from each class
# import matplotlib.pyplot as plt

# step_dict = np.array(test_classifier.original_data)
# print(step_dict.shape)
# # print(step_dict.keys())
# plt.figure(figsize=(10, 6))
# for i, label in enumerate(np.unique(test_classifier.labels)):
#     # if label == 'sand':
#     #     continue
#     indices = np.where(test_classifier.labels == label)[0]
#     print(f"Label {label}: {len(indices)} samples")
#     if len(indices) > 0:
#         # Select a random sample from the indices
#         index = np.random.choice(indices)
#         print(f"index: {index}")
#         # Plot the sample
#         # plt.plot(step_dict[index][:, 0], step_dict[index][:, 1], label=label)
#         plt.plot(step_dict[index], label=label)
# plt.title("Samples from each class")
# plt.xlabel("X-axis")
# plt.ylabel("Y-axis")
# plt.legend()
# plt.show()

In [None]:
#Random Forest Classifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

# Best Parameters: {'max_depth': None, 'max_features': 'log2', 'min_samples_leaf': 1, 'min_samples_split': 2, 'n_estimators': 200}
# Best Parameters with combined legs: {'max_depth': 20, 'max_features': 'sqrt', 'min_samples_leaf': 1, 'min_samples_split': 5, 'n_estimators': 100}
rf = RandomForestClassifier(n_estimators=20, max_depth=None, max_features='log2', 
                            min_samples_split=5, random_state=42)
rf.fit(svm_classifier.feature_matrix, svm_classifier.labels)
predictions_rf = rf.predict(validation_classifier.feature_matrix)

# param_grid = {
#     'n_estimators': [100, 200, 500],
#     'max_depth': [10, 20, None],
#     'min_samples_split': [2, 5, 10],
#     'min_samples_leaf': [1, 2, 4],
#     'max_features': ['sqrt', 'log2']
# }

# rf = RandomForestClassifier(random_state=42)
# grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=10, scoring='f1_weighted', verbose=2, n_jobs=-1)
# grid_search.fit(svm_classifier.feature_matrix, svm_classifier.labels)
# print("Best Parameters:", grid_search.best_params_)
# predictions_rf = grid_search.predict(validation_classifier.feature_matrix)

print("Random Forest Classification Report:")
print(classification_report(validation_classifier.labels, predictions_rf))
# cm_rf = ConfusionMatrixDisplay.from_predictions(validation_classifier.labels, predictions_rf, display_labels=svm_classifier.latest_model.classes_, cmap='Blues')

In [None]:
#Data preprocessing for MLP
from terrain_classification.wavelet_analysis import WaveletAnalysis
from terrain_classification.data_augmentation import DataAugmentation

wa = WaveletAnalysis()
da = DataAugmentation()

org_data = np.array(svm_classifier.original_data)
# org_labels = np.array(unfiltered_labels)
print(f"org shape: {org_data.shape}")
org_labels = unfiltered_train_labels
# org_labels = org_labels.reshape(-1, 1)

wave_results = np.array(wa.perform_analysis(org_data))
# print(f'wave shape: {wave_results.shape}')
fft_signals = []
for signal in org_data:
    fft_signal = svm_classifier.data_augmentation.low_pass_filter(signal)
    fft_signals.append(fft_signal)

noise_signals = []
for signal in org_data:
    noise_signal = svm_classifier.data_augmentation.add_noise(signal, 0.1)
    noise_signals.append(noise_signal)

# augmented_ = feature_augmentation(['wavelet'], org_data)
# print(f"lenght: {len(fft_signals[0])}")
final_feature_matrix = []
final_labels = []

# aug_types = ['fft', 'wavelets']
# for data, label in zip (org_data, org_labels):

# for data_, fft_sig, wav_res, label in zip(org_data, fft_signals, wave_results, org_labels):
    # final_feature_matrix.append(np.hstack((data_.flatten(), fft_sig.flatten())))
for feature, fft_sig, wav_res, noi,  label in zip(org_data, fft_signals, wave_results, noise_signals, org_labels):
    final_feature_matrix.append(np.hstack((feature.flatten(), noi.flatten(), fft_sig.flatten(), wav_res.flatten())))
    # final_feature_matrix.append(np.hstack((feature.flatten(), fft_sig.flatten())))
    # for aug_typ in aug_types:
    #     augmented_ = feature_augmentation(aug_typ, data)
    final_labels.append(label)

final_feature_matrix = np.array(final_feature_matrix)
final_labels = np.array(final_labels)

# final_feature_matrix, final_labels = adjust_same_length(final_feature_matrix, final_labels)
print(f'feature size: {final_feature_matrix.shape}')
print(f'label size: {final_labels.shape}')

In [None]:
# from scipy import fft
# from terrain_classification.wavelet_analysis import WaveletAnalysis
# from terrain_classification.data_augmentation import DataAugmentation

# wa = WaveletAnalysis()
# da = DataAugmentation()

# def feature_augmentation (names, signals):
#     augmented_sig = []
#     for name in names:
#         if name == 'fft':
#             # for sig in signals:
#             fft_sig = da.low_pass_filter(signals)
#             augmented_sig.append(fft_sig.flatten())

#         if name == 'wavelet':
#             wave_result = wa.perform_analysis(signals)

#             for res in wave_result:
#                 augmented_sig.append(res.flatten())

#     return augmented_sig

# org_data = np.array(svm_classifier.original_data)
# # org_labels = np.array(unfiltered_labels)
# print(f"org shape: {org_data.shape}")
# org_labels = unfiltered_train_labels
# # org_labels = org_labels.reshape(-1, 1)

# wave_results = np.array(wa.perform_analysis(org_data))
# # print(f'wave shape: {wave_results.shape}')
# fft_signals = []
# for signal in org_data:
#     fft_signal = svm_classifier.data_augmentation.low_pass_filter(signal)
#     fft_signals.append(fft_signal)

# # augmented_ = feature_augmentation(['wavelet'], org_data)
# print(f"lenght: {len(fft_signals[0])}")
# final_feature_matrix = []
# final_labels = []

# aug_types = ['wavelets']
# for data, label in zip (org_data, org_labels):
#     aug_ = feature_augmentation(aug_types, data)
#     print(f'aug shape: {np.array(aug_).shape}')

# # for data_, fft_sig, wav_res, label in zip(org_data, fft_signals, wave_results, org_labels):
#     # final_feature_matrix.append(np.hstack((data_.flatten(), fft_sig.flatten())))
# # for feature, fft_sig, wav_res, label in zip(org_data, fft_signals, wave_results, org_labels):
# #     final_feature_matrix.append(np.hstack((feature.flatten(), fft_sig.flatten(), wav_res.flatten())))
# #     # for aug_typ in aug_types:
# #     #     augmented_ = feature_augmentation(aug_typ, data)
# #     final_labels.append(label)

# final_feature_matrix = np.array(final_feature_matrix)
# final_labels = np.array(final_labels)
# print(f'feature size: {final_feature_matrix.shape}')
# print(f'label size: {final_labels.shape}')

In [None]:
from sklearn.ensemble import VotingClassifier

voting_clf = VotingClassifier(estimators=[('rf', rf), ('svc', svm_classifier.latest_model)], voting='hard')
voting_clf.fit(svm_classifier.feature_matrix, svm_classifier.labels)
predictions_voting = voting_clf.predict(test_classifier.feature_matrix)
print("Voting Classifier Classification Report:")
print(classification_report(test_classifier.labels, predictions_voting))
ConfusionMatrixDisplay.from_predictions(test_classifier.labels, predictions_voting, display_labels=voting_clf.classes_, cmap='Blues')

In [None]:
#two and three class ensemble
from sklearn.ensemble import VotingClassifier

two_three_ensemble = VotingClassifier(estimators=[
                    ('svc3', predictor.model), 
                    ('svc2', two_class_model)], voting='hard', 
                    weights=[10, 1], verbose=2)
two_three_ensemble.fit(svm_classifier.feature_matrix, svm_classifier.labels)
predictions_ensemble = two_three_ensemble.predict(test_classifier.feature_matrix)
print("Ensemble Classifier Classification Report:")
print(classification_report(test_classifier.labels, predictions_ensemble))
ConfusionMatrixDisplay.from_predictions(test_classifier.labels, predictions_ensemble, 
                                        cmap='Blues')

In [None]:
# Custom ensemble prediction
def custom_ensemble_predict(X):
    # Get probabilities from both models
    prob_three_class = predictor.model.predict_proba(X)
    # print("prob_three_class", prob_three_class)
    prob_two_class = two_class_model.predict_proba(X)
    # print("prob_two_class", prob_two_class)

    # Combine predictions (example: prioritize three-class SVC)
    final_predictions = []
    for i, prob in enumerate(prob_two_class):
        if max(prob) > 0.7:  # High confidence from three-class SVC
            final_predictions.append(predictor.model.classes_[np.argmax(prob)])
        else:  # Use two-class SVC for refinement
            final_predictions.append(two_class_model.classes_[np.argmax(prob_three_class[i])])
    return final_predictions

custom_predictions = custom_ensemble_predict(features_)

print("Custom Ensemble Classifier Classification Report:")
print(classification_report(true_labels_, custom_predictions))
ConfusionMatrixDisplay.from_predictions(true_labels_, custom_predictions, 
                                         cmap='Blues')

In [None]:

# #to train with cuML, we need to convert the labels to integers
# l_ = ['sand', 'gravel', 'concrete']
# min_len = [len(np.where(svm_classifier.labels == l)[0]) for l in l_ if len(np.where(svm_classifier.labels == l)[0]) > 0]

# print(f"min_len {min_len}")
# new_ids = np.empty(0)

# if use_sand:
#     new_ids = np.concatenate((new_ids, np.where(svm_classifier.labels == 'sand')[0]))
# if use_gravel:
#     new_ids = np.concatenate((new_ids, np.where(svm_classifier.labels == 'gravel')[0]))
# if use_concrete:
#     new_ids = np.concatenate((new_ids, np.where(svm_classifier.labels == 'concrete')[0]))

# new_ids = new_ids.astype(int)
# new_labels = svm_classifier.labels[new_ids]
# new_features = svm_classifier.feature_matrix[new_ids]

# print(f"new labels shape {new_labels.shape}")
# print(f"new features shape {new_features.shape}")


In [None]:
def convert_labels(labels):
    # Convert string labels to integers
    label_map = {}
    label_map['sand'] = 0
    label_map['gravel'] = 1
    label_map['concrete'] = 2
    return np.array([label_map[label] for label in labels], dtype=int)

In [None]:
#use miniconda python 3.12
new_X_train, new_X_test, new_y_train, new_y_test = train_test_split(svm_classifier.feature_matrix, svm_classifier.labels, test_size=0.2, random_state=42)
# new_X_train, new_X_test, new_y_train, new_y_test = train_test_split(new_features, new_labels, test_size=0.2, random_state=42)
# print(f"X type {new_X_train.dtype}")
# print(f"y type {new_y_train.dtype}")
orig_label_train = new_y_train
orig_label_test = new_y_test
new_y_train = convert_labels(new_y_train)
new_y_test = convert_labels(new_y_test)

scaler = StandardScaler()
new_X_train = scaler.fit_transform(new_X_train)
new_X_test = scaler.transform(new_X_test)


In [None]:

from cuml.svm import SVC as cmSVC
from cuml.metrics import accuracy_score

# print("Creating SVC model")
# c = [0.1, 1, 10, 100]
c = [1000, 10, 100]
g = [0.001, 0.01, 0.1, 1]
# g = ['scale', 'auto']
# g = [0.01, 1]
kernel = ['rbf']
# kernel = ['rbf', 'poly', 'sigmoid']
c
# for c_, g_ in zip(c, g):
for k_ in kernel:
    for c_ in c:
        for g_ in g:
        # print(f"Trying c: {c_} g: {g_}")
        # print(f"accuracy {accuracy_score(new_y_pred, new_y_test)}")
            new_svc = cmSVC(kernel=k_, C=c_, gamma=g_, cache_size=2000)
            new_svc.fit(new_X_train, new_y_train)
            new_y_pred = new_svc.predict(new_X_test)
            print(f"Trying c: {c_} g: {g_} kernel: {k_} -> Accuracy score: {accuracy_score(new_y_pred, new_y_test):.4f}") 
            # print(classification_report(new_y_test, new_y_pred))
            # print("=========================")
        print("----------------------")
    print("----------------------")

print("All Done")

In [None]:
from joblib import dump
from cuml.svm import SVC as cmSVC

# from sklearn.externals import joblib

new_svc = cmSVC(kernel='rbf', C=100, gamma=0.01, cache_size=2000)
new_svc.fit(new_X_train, new_y_train)
print("Training done")
# sk_model = new_svc.to_sklearn()
# dump(sk_model, os.path.join(parent_dir, 'svm_classification/cpu_test_model.joblib'))

In [None]:
from joblib import load
# import cuml
from sklearn.metrics import classification_report
loaded_model = load(os.path.join(parent_dir, 'svm_classification/models/trained_test_model.joblib'))
print(f"loaded model {loaded_model}")

predictions_ = loaded_model.predict(new_X_train)
print(classification_report(orig_label_train, predictions_))
# print(f"Accuracy: {accuracy_score(new_y_test, predictions_):.4f}")