<a href="https://colab.research.google.com/github/MyatToe/100days-python/blob/main/SNN_080623.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install snntorch --quiet

In [None]:
# imports
import snntorch as snn
from snntorch import surrogate
from snntorch import functional as SF
from snntorch import utils

import torch
import torch.nn as nn

import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import pandas as pd
import numpy as np
import csv
import time

import matplotlib.pyplot as plt
import itertools
import random
import statistics
import tqdm
print('Input library finished')

In [None]:
# start = time.time()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('device:', device)


In [None]:
# Seed
torch.manual_seed(0)
random.seed(0)
np.random.seed(0)

In [None]:
class CustomDataset(Dataset):
    def __init__(self,csv_path_inp,csv_path_outp):
        df_inp = pd.read_csv(csv_path_inp)
        df_outp = pd.read_csv(csv_path_outp)
        self.inp = df_inp.iloc[:,:].values
        self.outp = df_outp.iloc[:,:].values
        
    def __len__(self):
        return len(self.inp)
    
    def __getitem__(self,idx):
        inp = torch.FloatTensor(self.inp[idx])
        outp = torch.FloatTensor(self.outp[idx])
        # Convert the input and output to spikes using a suitable method
        inp_spikes = self.convert_to_spikes(inp)
        outp_spikes = self.convert_to_spikes(outp)
        return inp_spikes, outp_spikes

    def convert_to_spikes(self, data):
        # Implement your spike conversion logic here
        threshold = 0.5  # Example threshold for spike generation
        spikes = torch.where(data >= threshold, torch.ones_like(data), torch.zeros_like(data))
        return spikes

### Need to convert construct Model
class SNNNet(nn.Module):
    def __init__(self):
        super(SNNNet, self).__init__()
        self.spike_relu = nn.ReLU().to(device)  # ReLU activation for spike generation
        self.spike_tanh = nn.Tanh().to(device)  # Tanh activation for spike generation
        self.spike_linear1 = nn.Linear(16, 12).to(device)
        self.spike_linear2 = nn.Linear(12, 12).to(device)
        self.spike_linear3 = nn.Linear(12, 12).to(device)
        self.spike_linear4 = nn.Linear(12, 12).to(device)

    def forward(self, x):
        spikes = x.float().to(device)  # Assuming the input is spike-encoded
        h1 = self.spike_tanh(self.spike_linear1(spikes.view(-1, 16)))  # Spike-based linear and activation
        h2 = self.spike_tanh(self.spike_linear2(h1))
        h3 = self.spike_tanh(self.spike_linear3(h2))
        h4 = self.spike_linear4(h3)
        return h4
print('Defining neural network is finished')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
csv_path_inp1 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/simple_manuevers_inp_deleted_150.csv'
csv_path_outp1 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/simple_manuevers_outp_deleted_150.csv'
estimate_1 = CustomDataset(csv_path_inp1, csv_path_outp1)
dataloader_1 = DataLoader(estimate_1, batch_size=1)

csv_path_inp2 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/20230324_LQR_KP2 (3)_inp_deleted_250.csv'
csv_path_outp2 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/20230324_LQR_KP2 (3)_outp_deleted_250.csv'
estimate_2 = CustomDataset(csv_path_inp2, csv_path_outp2)
dataloader_2 = DataLoader(estimate_2, batch_size=1)

csv_path_inp3 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/simple_manuevers_inp_deleted_300.csv'
csv_path_outp3 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/simple_manuevers_outp_deleted_300.csv'
estimate_3 = CustomDataset(csv_path_inp3, csv_path_outp3)
dataloader_3 = DataLoader(estimate_3, batch_size=1)

csv_path_inp4 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/20230324_LQR_KP2 (3)_inp_deleted_500.csv'
csv_path_outp4 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/20230324_LQR_KP2 (3)_outp_deleted_500.csv'
estimate_4 = CustomDataset(csv_path_inp4, csv_path_outp4)
dataloader_4 = DataLoader(estimate_4, batch_size=1)

csv_path_inp5 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/8_windy_maneuver_inp.csv'
csv_path_outp5 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230522/8_windy_maneuver_outp.csv'
estimate_5 = CustomDataset(csv_path_inp5, csv_path_outp5)
dataloader_5 = DataLoader(estimate_5, batch_size=1)

csv_path_inp9 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230517/20221103_LQR_KP2 (2)_inp_half.csv'
csv_path_outp9 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_230517/20221103_LQR_KP2 (2)_outp_half.csv'
estimate_9 = CustomDataset(csv_path_inp9, csv_path_outp9)
dataloader_9 = DataLoader(estimate_9, batch_size=1)

csv_path_inp10 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_deleted/simple_manuevers_inp_deleted_150.csv'
csv_path_outp10 = '/content/drive/MyDrive/Dataset_KP2/Dataset_KP2/10Hz_deleted/simple_manuevers_outp_deleted_150.csv'
estimate_10 = CustomDataset(csv_path_inp10, csv_path_outp10)
dataloader_10 = DataLoader(estimate_10, batch_size=1)

print('Data path showing finished')

In [None]:
lr = 1e-2
model = SNNNet().to(device)
loss_fn = nn.MSELoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

def train_150(dataloader, model, loss_fn, optimizer, epochs):
    size = len(dataloader.dataset)
    for batch, (inp, outp) in enumerate(dataloader):
        inp, outp = inp.to(device), outp.to(device)
        optimizer.zero_grad()
        
        # Spike neural network forward pass
        spikes = model(inp)
        loss = loss_fn(spikes, outp)
        
        # Spike neural network backward pass
        loss.backward()
        optimizer.step()
        
        if batch % 10 == 0:
            # torch.save(model, f'./230522_Rand_KP2_PI-DNN_{epochs}_{batch}.pt')
            loss, current = loss.item(), batch * len(inp)
            print(f"loss: [{loss:>12f}] [{current:>5d}/{size:>5d}]")
            with open('./loss_data_Test_230522_ND_training_150_5.csv','a', newline='') as f:
                wr = csv.writer(f)
                wr.writerow([loss])        
                
def train_250(dataloader, model, loss_fn, optimizer, epochs):
    size = len(dataloader.dataset)
    for batch, (inp, outp) in enumerate(dataloader):
        inp, outp = inp.to(device), outp.to(device)
        optimizer.zero_grad()
        
        # Spike neural network forward pass
        spikes = model(inp)
        loss = loss_fn(spikes, outp)
        
        # Spike neural network backward pass
        loss.backward()
        optimizer.step()
        
        if batch % 10 == 0:
            # torch.save(model, f'./230522_Rand_KP2_PI-DNN_{epochs}_{batch}.pt')
            loss, current = loss.item(), batch * len(inp)
            print(f"loss: [{loss:>12f}] [{current:>5d}/{size:>5d}]")
            with open('./loss_data_Test_230522_ND_training_150_5.csv','a', newline='') as f:
                wr = csv.writer(f)
                wr.writerow([loss])    
                
def train_300(dataloader, model, loss_fn, optimizer, epochs):
    size = len(dataloader.dataset)
    for batch, (inp, outp) in enumerate(dataloader):
        inp, outp = inp.to(device), outp.to(device)
        optimizer.zero_grad()
        
        # Spike neural network forward pass
        spikes = model(inp)
        loss = loss_fn(spikes, outp)
        
        # Spike neural network backward pass
        loss.backward()
        optimizer.step()
        
        if batch % 10 == 0:
            # torch.save(model, f'./230522_Rand_KP2_PI-DNN_{epochs}_{batch}.pt')
            loss, current = loss.item(), batch * len(inp)
            print(f"loss: [{loss:>12f}] [{current:>5d}/{size:>5d}]")
            with open('./loss_data_Test_230522_ND_training_150_5.csv','a', newline='') as f:
                wr = csv.writer(f)
                wr.writerow([loss])      
                
def train_500(dataloader, model, loss_fn, optimizer, epochs):
    size = len(dataloader.dataset)
    for batch, (inp, outp) in enumerate(dataloader):
        inp, outp = inp.to(device), outp.to(device)
        optimizer.zero_grad()
        
        # Spike neural network forward pass
        spikes = model(inp)
        loss = loss_fn(spikes, outp)
        
        # Spike neural network backward pass
        loss.backward()
        optimizer.step()
        
        if batch % 10 == 0:
            # torch.save(model, f'./230522_Rand_KP2_PI-DNN_{epochs}_{batch}.pt')
            loss, current = loss.item(), batch * len(inp)
            print(f"loss: [{loss:>12f}] [{current:>5d}/{size:>5d}]")
            with open('./loss_data_Test_230522_ND_training_150_5.csv','a', newline='') as f:
                wr = csv.writer(f)
                wr.writerow([loss])       
                
                
def train(dataloader, model, loss_fn, optimizer, epochs):
    size = len(dataloader.dataset)
    for batch, (inp, outp) in enumerate(dataloader):
        inp, outp = inp.to(device), outp.to(device)
        optimizer.zero_grad()
        
        # Spike neural network forward pass
        spikes = model(inp)
        loss = loss_fn(spikes, outp)
        
        # Spike neural network backward pass
        loss.backward()
        optimizer.step()
        
        if batch % 10 == 0:
            # torch.save(model, f'./230522_Rand_KP2_PI-DNN_{epochs}_{batch}.pt')
            loss, current = loss.item(), batch * len(inp)
            print(f"loss: [{loss:>12f}] [{current:>5d}/{size:>5d}]")
            with open('./loss_data_Test_230522_ND_training_150_5.csv','a', newline='') as f:
                wr = csv.writer(f)
                wr.writerow([loss])    
    print('Data is loaded, Loss defined.')
    

In [None]:
# train_batch = iter(dataloader_2)
# num_timesteps = 100  # Define the number of time steps

# # Run a single forward-pass
# with torch.no_grad():
#     for feature, label in train_batch:
#         feature = torch.swapaxes(input=feature, axis0=0, axis1=1)
#         label = torch.swapaxes(input=label, axis0=0, axis1=1)
#         feature = feature.to(device)
#         label = label.to(device)

#         # Initialize membrane potential and spike recording
#         spk_recording = []
#         mem = None

#         # Simulate spiking neural network over time steps
#         for step in range(num_timesteps):
#             mem, spk = model(feature, mem)
#             spk_recording.append(spk)

#         # Convert spike recordings to membrane potentials
#         mem = model.spike_to_membrane(mem)

#         # Plot
#         plt.plot(mem[:, 0, 0].cpu(), label="Output")
#         plt.plot(label[:, 0, 0].cpu(), '--', label="Target")
#         plt.title("Untrained Output Neuron")
#         plt.xlabel("Time")
#         plt.ylabel("Membrane Potential")
#         plt.legend(loc='best')
        plt.show()

In [None]:
epochs = 1
for t in range(epochs):    
    
    print(f"Epoch {t+1}\n-------------------------------")    
    # print(f"Episode 1 {t+1} Training")
    # start = time.time()
    # train(dataloader_5, model, loss_func_MSE, optimizer, t)
    # print("time :", time.time() - start)
    # print('\n') 
    
    print(f"Episode 1 {t+1} Training")
    start = time.time()
    train(dataloader_2, model, loss_fn, optimizer, t)
    print("time :", time.time() - start)
    print('\n') 
    torch.save(model, f'./230522_KP2_DNN_{epochs}_250_7.pt')
    
    with open('./Training Time_230522_ND_250.csv','a', newline='') as f:
                wr = csv.writer(f)
                wr.writerow([time.time() - start])
print("Done!")

## prediction code