In [1]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import snntorch as snn
from snntorch import spikegen
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns

root_folder = "./tactile_dataset/"

velocities = [30]

num_textures = 12
texture_names = [f"texture_{i:02d}" for i in range(1, num_textures + 1)]

def load_and_merge_data(root, velocity, texture):
    velocity_folder = os.path.join(root, f"pickles_{velocity}")
    texture_folder = os.path.join(velocity_folder, texture)
    
    baro_file = os.path.join(texture_folder, "full_baro.csv")
    imu_file = os.path.join(texture_folder, "full_imu.csv")
    
    baro_df = pd.read_csv(baro_file)
    
    imu_df = pd.read_csv(imu_file)
    
    imu_df['baro'] = baro_df['baro']
        
    return imu_df



In [2]:

df_list = []

# for velocity in velocities:
#     for texture in texture_names:
#         df = load_and_merge_data(root_folder, velocity, texture).iloc[0:10000]
#         if df.empty:
#             print(f"No data for Velocity: {velocity} mm/s, Texture: {texture}. Skipping...")
#             continue
#         df_list.append(df)

# merged_df = pd.concat(df_list)

sliding_window_size=500
for velocity in velocities:
    for texture in texture_names:
        merged_df = load_and_merge_data(root_folder, velocity, texture)
        if merged_df.empty:
            print(f"No data for Velocity: {velocity} mm/s, Texture: {texture}. Skipping...")
            continue

        total_samples = len(merged_df)

        num_chunks = total_samples // sliding_window_size

        truncated_df = merged_df.iloc[:num_chunks * sliding_window_size]

        averaged_df = truncated_df.values.reshape(num_chunks, sliding_window_size, merged_df.shape[1]).mean(axis=1)

        averaged_df = pd.DataFrame(averaged_df, columns=merged_df.columns)

        df_list.append(averaged_df)

final_merged_df = pd.concat(df_list, ignore_index=True)



In [3]:
X = final_merged_df[['baro','imu_ax', 'imu_ay', 'imu_az','imu_gx', 'imu_gy', 'imu_gz','imu_mx', 'imu_my', 'imu_mz']].values
y = final_merged_df['Texture']

In [4]:

print(f"Total samples collected: {X.shape[0]}")
print(f"Feature vector size: {X.shape[1]}")
print(f"Unique textures: {np.unique(y)}")

Total samples collected: 90665
Feature vector size: 10
Unique textures: [ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12.]


In [5]:

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
texture_classes = label_encoder.classes_
num_classes = len(texture_classes)
print("\nEncoded texture labels:", texture_classes)

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)


Encoded texture labels: [ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12.]


In [10]:
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded
)

# def count_classes(y, label_encoder):
#     unique, counts = np.unique(y, return_counts=True)
#     class_counts = dict(zip(label_encoder.inverse_transform(unique), counts))
#     return class_counts

# train_class_counts = count_classes(y_train, label_encoder)
# test_class_counts = count_classes(y_test, label_encoder)

# print("Training Set Class Distribution:")
# print(train_class_counts)
# print("\nTest Set Class Distribution:")
# print(test_class_counts)

def poisson_encoding(features, num_steps, device='cpu'):

    features_repeated = features.unsqueeze(1).repeat(1, num_steps, 1)
    
    rand_vals = torch.rand_like(features_repeated, device=device)
    
    spikes = (rand_vals < features_repeated).float()
    
    return spikes

def encode_to_spike_train(features, num_steps, device='cpu'):
    features_tensor = torch.tensor(features, dtype=torch.float32, device=device)

    spike_trains = poisson_encoding(features_tensor, num_steps=num_steps, device=device)
    
    return spike_trains

num_steps = 100
train_spike_trains = encode_to_spike_train(X_train, num_steps)
test_spike_trains = encode_to_spike_train(X_test, num_steps)

y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

batch_size = 64

train_dataset = TensorDataset(train_spike_trains, y_train_tensor)
test_dataset = TensorDataset(test_spike_trains, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [11]:


num_inputs = X_train.shape[1]
num_hidden = 100
num_outputs = num_classes
beta = 0.9

class SNNModel(nn.Module):
    def __init__(self):
        super(SNNModel, self).__init__()
        self.fc1 = nn.Linear(num_inputs, num_hidden)
        self.lif1 = snn.Leaky(beta=beta)
        self.fc2 = nn.Linear(num_hidden, num_hidden)
        self.lif2 = snn.Leaky(beta=beta)
        self.fc3 = nn.Linear(num_hidden, num_outputs)
        self.lif3 = snn.Leaky(beta=beta)

    def forward(self, x):
        mem1 = self.lif1.init_leaky()
        mem2 = self.lif2.init_leaky()
        mem3 = self.lif3.init_leaky()

        spk_rec = []
        for step in range(x.size(1)):
            input_step = x[:, step, :]
            cur1 = self.fc1(input_step)
            spk1, mem1 = self.lif1(cur1, mem1)
            cur2 = self.fc2(spk1)
            spk2, mem2 = self.lif2(cur2, mem2)
            cur3 = self.fc3(spk2)
            spk3, mem3 = self.lif3(cur3, mem3)
            spk_rec.append(spk3)
        
        out_spikes = torch.stack(spk_rec, dim=0)
        return out_spikes
    
class ANNModel(nn.Module):
    def __init__(self):
        super(ANNModel, self).__init__()
        self.fc1 = nn.Linear(num_inputs, num_hidden)
        self.lif1 = nn.ReLU()
        self.fc2 = nn.Linear(num_hidden, num_hidden)
        self.lif2 = nn.ReLU()
        self.fc3 = nn.Linear(num_hidden, num_outputs)
        self.lif3 = nn.ReLU()

    def forward(self, x):
        for step in range(x.size(1)):
            input_step = x[:, step, :]
            cur1 = self.fc1(input_step)
            spk1 = self.lif1(cur1)
            cur2 = self.fc2(spk1)
            spk2 = self.lif2(cur2)
            cur3 = self.fc3(spk2)
            out = self.lif3(cur3)
        
        return out

net = SNNModel()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)

def forward_pass(net, data):

    spk_rec = net(data)
    return spk_rec

def train_model(net, train_loader, optimizer, criterion, num_epochs=20):

    net.train()
    for epoch in range(num_epochs):
        avg_loss = 0
        total_correct = 0
        total_samples = 0
        for data, targets in train_loader:
            data = data.float()
            targets = targets.long()
            spk_rec = forward_pass(net, data)
            spk_sum = spk_rec.sum(dim=0)
            loss = criterion(spk_sum, targets)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            avg_loss += loss.item()
            _, predicted = spk_sum.max(1)
            total_correct += (predicted == targets).sum().item()
            total_samples += targets.size(0)
        acc = total_correct / total_samples
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss / len(train_loader):.4f}, Accuracy: {acc*100:.2f}%")

def evaluate_model(net, test_loader):
    net.eval()
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for data, targets in test_loader:
            data = data.float()
            targets = targets.long()
            spk_rec = forward_pass(net, data)
            spk_sum = spk_rec.sum(dim=0)
            _, predicted = spk_sum.max(1)
            total_correct += (predicted == targets).sum().item()
            total_samples += targets.size(0)
    acc = total_correct / total_samples
    print(f"Test Accuracy: {acc*100:.2f}%")

num_epochs = 20
train_model(net, train_loader, optimizer, criterion, num_epochs)

evaluate_model(net, test_loader)

Epoch 1/20, Loss: 1.8253, Accuracy: 32.48%
Epoch 2/20, Loss: 1.6706, Accuracy: 36.58%
Epoch 3/20, Loss: 1.6432, Accuracy: 37.48%
Epoch 4/20, Loss: 1.6254, Accuracy: 37.91%
Epoch 5/20, Loss: 1.6129, Accuracy: 38.30%
Epoch 6/20, Loss: 1.6011, Accuracy: 38.73%
Epoch 7/20, Loss: 1.5874, Accuracy: 39.12%
Epoch 8/20, Loss: 1.5837, Accuracy: 39.30%
Epoch 9/20, Loss: 1.5747, Accuracy: 39.55%
Epoch 10/20, Loss: 1.5726, Accuracy: 39.51%
Epoch 11/20, Loss: 1.5627, Accuracy: 39.92%
Epoch 12/20, Loss: 1.5605, Accuracy: 39.94%
Epoch 13/20, Loss: 1.5498, Accuracy: 40.32%
Epoch 14/20, Loss: 1.5459, Accuracy: 40.46%
Epoch 15/20, Loss: 1.5425, Accuracy: 40.50%
Epoch 16/20, Loss: 1.5398, Accuracy: 40.74%
Epoch 17/20, Loss: 1.5347, Accuracy: 40.84%
Epoch 18/20, Loss: 1.5345, Accuracy: 40.86%
Epoch 19/20, Loss: 1.5288, Accuracy: 40.99%
Epoch 20/20, Loss: 1.5279, Accuracy: 41.03%
Test Accuracy: 40.71%


Temporal coding

In [21]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import snntorch as snn
from snntorch import spikegen
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Define the root folder containing the dataset
root_folder = "./tactile_dataset/"

# Define velocities and texture names
velocities = [30]
num_textures = 12
texture_names = [f"texture_{i:02d}" for i in range(1, num_textures + 1)]

def load_and_merge_data(root, velocity, texture):
    """
    Load and merge barometric and IMU data for a specific velocity and texture.
    """
    velocity_folder = os.path.join(root, f"pickles_{velocity}")
    texture_folder = os.path.join(velocity_folder, texture)
    
    baro_file = os.path.join(texture_folder, "full_baro.csv")
    imu_file = os.path.join(texture_folder, "full_imu.csv")
    
    baro_df = pd.read_csv(baro_file)
    imu_df = pd.read_csv(imu_file)
    
    # Merge barometric data into IMU dataframe
    imu_df['baro'] = baro_df['baro']
        
    return imu_df

df_list = []

# Parameters for sliding window
sliding_window_size = 500

# Load and preprocess data
for velocity in velocities:
    for texture in texture_names:
        merged_df = load_and_merge_data(root_folder, velocity, texture)
        if merged_df.empty:
            print(f"No data for Velocity: {velocity} mm/s, Texture: {texture}. Skipping...")
            continue

        total_samples = len(merged_df)
        num_chunks = total_samples // sliding_window_size
        truncated_df = merged_df.iloc[:num_chunks * sliding_window_size]
        averaged_df = truncated_df.values.reshape(num_chunks, sliding_window_size, merged_df.shape[1]).mean(axis=1)
        averaged_df = pd.DataFrame(averaged_df, columns=merged_df.columns)
        df_list.append(averaged_df)

final_merged_df = pd.concat(df_list, ignore_index=True)

# Feature extraction
X = final_merged_df[['baro','imu_ax', 'imu_ay', 'imu_az','imu_gx', 'imu_gy', 'imu_gz','imu_mx', 'imu_my', 'imu_mz']].values
y = final_merged_df['Texture']

print(f"Total samples collected: {X.shape[0]}")
print(f"Feature vector size: {X.shape[1]}")
print(f"Unique textures: {np.unique(y)}")

# Label encoding
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
texture_classes = label_encoder.classes_
num_classes = len(texture_classes)
print("\nEncoded texture labels:", texture_classes)

# Feature scaling
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded
)

def temporal_encoding(features, num_steps, device='cpu'):
    """
    Temporal encoding where each feature fires a single spike at a time step
    inversely proportional to its value (higher value -> earlier spike).
    """
    # Convert features to tensor if not already
    if not isinstance(features, torch.Tensor):
        features = torch.tensor(features, dtype=torch.float32, device=device)
    
    # Ensure features are scaled between 0 and 1
    features = torch.clamp(features, 0, 1)
    
    # Calculate spike times: higher feature value -> earlier spike
    spike_times = torch.round((1.0 - features) * (num_steps - 1)).long()
    spike_times = torch.clamp(spike_times, 0, num_steps - 1)
    
    batch_size, num_features = spike_times.size()
    spikes = torch.zeros(batch_size, num_steps, num_features, device=device)
    
    # Vectorized assignment of spikes
    batch_indices = torch.arange(batch_size, device=device).unsqueeze(1).repeat(1, num_features)
    feature_indices = torch.arange(num_features, device=device).unsqueeze(0).repeat(batch_size, 1)
    spikes[batch_indices, spike_times, feature_indices] = 1.0
    
    return spikes

def encode_to_spike_train_temporal(features, num_steps, device='cpu'):
    """
    Wrapper for temporal encoding.
    """
    return temporal_encoding(features, num_steps=num_steps, device=device)

# Define number of time steps for temporal encoding
num_steps = 100
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Encode training and testing data
train_spike_trains = encode_to_spike_train_temporal(X_train, num_steps, device=device)
test_spike_trains = encode_to_spike_train_temporal(X_test, num_steps, device=device)

# Convert labels to tensors
y_train_tensor = torch.tensor(y_train, dtype=torch.long, device=device)
y_test_tensor = torch.tensor(y_test, dtype=torch.long, device=device)

# Create TensorDatasets
train_dataset = TensorDataset(train_spike_trains, y_train_tensor)
test_dataset = TensorDataset(test_spike_trains, y_test_tensor)

# DataLoaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define network parameters
num_inputs = X_train.shape[1]
num_hidden = 100
num_outputs = num_classes
beta = 0.9  # Leaky parameter

class TemporalSNNModel(nn.Module):
    def __init__(self):
        super(TemporalSNNModel, self).__init__()
        self.fc1 = nn.Linear(num_inputs, num_hidden)
        self.lif1 = snn.Leaky(beta=beta)
        self.fc2 = nn.Linear(num_hidden, num_hidden)
        self.lif2 = snn.Leaky(beta=beta)
        self.fc3 = nn.Linear(num_hidden, num_outputs)
        self.lif3 = snn.Leaky(beta=beta)

    def forward(self, x):
        # Initialize membrane potentials
        mem1 = self.lif1.init_leaky()
        mem2 = self.lif2.init_leaky()
        mem3 = self.lif3.init_leaky()

        spk_rec = []
        for step in range(x.size(1)):
            input_step = x[:, step, :]
            cur1 = self.fc1(input_step)
            spk1, mem1 = self.lif1(cur1, mem1)
            cur2 = self.fc2(spk1)
            spk2, mem2 = self.lif2(cur2, mem2)
            cur3 = self.fc3(spk2)
            spk3, mem3 = self.lif3(cur3, mem3)
            spk_rec.append(spk3)
        
        # Stack spike recordings across time
        out_spikes = torch.stack(spk_rec, dim=0)
        return out_spikes

# Initialize the network, loss function, and optimizer
net = TemporalSNNModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)

def forward_pass(net, data):
    """
    Forward pass through the network.
    """
    spk_rec = net(data)
    return spk_rec

def train_model(net, train_loader, optimizer, criterion, num_epochs=20):
    """
    Train the SNN model.
    """
    net.train()
    for epoch in range(num_epochs):
        avg_loss = 0
        total_correct = 0
        total_samples = 0
        for data, targets in train_loader:
            data = data.float().to(device)
            targets = targets.long().to(device)
            
            # Forward pass
            spk_rec = forward_pass(net, data)
            
            # Decode output spikes by summing over time
            spk_sum = spk_rec.sum(dim=0)
            
            # Compute loss
            loss = criterion(spk_sum, targets)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            avg_loss += loss.item()
            
            # Calculate accuracy
            _, predicted = spk_sum.max(1)
            total_correct += (predicted == targets).sum().item()
            total_samples += targets.size(0)
        
        acc = total_correct / total_samples
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss / len(train_loader):.4f}, Accuracy: {acc*100:.2f}%")

def evaluate_model(net, test_loader):
    """
    Evaluate the SNN model on the test set.
    """
    net.eval()
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for data, targets in test_loader:
            data = data.float().to(device)
            targets = targets.long().to(device)
            
            # Forward pass
            spk_rec = forward_pass(net, data)
            
            # Decode output spikes by summing over time
            spk_sum = spk_rec.sum(dim=0)
            
            # Calculate accuracy
            _, predicted = spk_sum.max(1)
            total_correct += (predicted == targets).sum().item()
            total_samples += targets.size(0)
    
    acc = total_correct / total_samples
    print(f"Test Accuracy: {acc*100:.2f}%")

# Train the model
num_epochs = 20
train_model(net, train_loader, optimizer, criterion, num_epochs)

# Evaluate the model
evaluate_model(net, test_loader)


Total samples collected: 90665
Feature vector size: 10
Unique textures: [ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12.]

Encoded texture labels: [ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12.]
Epoch 1/20, Loss: 2.4315, Accuracy: 11.57%
Epoch 2/20, Loss: 2.0260, Accuracy: 29.30%
Epoch 3/20, Loss: 1.7644, Accuracy: 37.56%
Epoch 4/20, Loss: 1.6160, Accuracy: 42.53%
Epoch 5/20, Loss: 1.5509, Accuracy: 44.78%


KeyboardInterrupt: 

In [22]:
evaluate_model(net, test_loader)


Test Accuracy: 45.99%
