<a href="https://colab.research.google.com/github/alexuqt/neural-networks/blob/main/1_intent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install snntorch --quiet

In [None]:
# imports
import snntorch as snn
from snntorch import spikeplot as splt
import torch
import matplotlib.pyplot as plt
# ---
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np
import itertools

dtype = torch.long
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# data preparation
import torch
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from torch.utils.data.dataset import random_split
from keras.preprocessing.text import Tokenizer
from google.colab import drive

# Load and preprocess your text data
drive.mount('/content/drive')
path = '/content/drive/My Drive/CTTC/RNN-data/all_tswift_lyrics.txt'
path = '/content/drive/My Drive/CTTC/RNN-data/NosotrosEnLaLuna.txt'
data = open(path).read().lower()
print('length of the corpus is:', len(data))

# Create a tokenizer
tokenizer = Tokenizer()
tokenizer.fit_on_texts([data])
encoded_data = tokenizer.texts_to_sequences([data])[0]
vocab_size = len(tokenizer.word_index) + 1

# Create input sequences and corresponding labels
sequence_length = 2  # Adjust this based on your desired sequence length
sequences = []
for i in range(sequence_length, len(encoded_data)):
    sequence = encoded_data[i - sequence_length:i + 1]
    sequences.append(sequence)
sequences = torch.LongTensor(sequences)  # Convert to PyTorch tensor

# Separate into input (X) and target (y) tensors
X = sequences[:, :-1]  # All except the last element
y = sequences[:, -1]    # Last element

# Create DataLoader for training and validation
dataset = TensorDataset(X, y)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

batch_size = 32
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
length of the corpus is: 656424


In [None]:
for data, labels in iter(train_dataloader):
  print(data.size())
  print(labels)
  break

torch.Size([32, 2])
tensor([2733,   54,  110,    8,  924, 1828,   10,   57,   69,    4, 1182,    7,
          98,    8,   17,   35,  963,  821,  931, 4374, 2121, 5477,  233,   48,
          92,  219,    8,   27, 1868,    2,   11,  237])


In [None]:
print(y[:5])

tensor([270,   5,   3, 181,  14])


# Recurrent SNN

In [None]:
# Define Network
class RSNN(nn.Module):
    def __init__(self):
        super().__init__()

        # hyperparam stuff
        num_inputs = 2
        num_hidden = 512
        beta = 0.9 # arbitrary

        # Initialize layers
        self.fc1 = nn.Linear(num_inputs, num_hidden).to(torch.float32)
        self.lif1 = snn.RLeaky(beta=beta, linear_features=num_hidden).to(torch.float32) # RLeaky instead or Leaky

        self.fc2 = nn.Linear(num_hidden, vocab_size).to(torch.float32)
        self.lif2 = snn.RLeaky(beta=beta, linear_features=vocab_size).to(torch.float32)

    def forward(self, x):

        # Initialize hidden states at t=0
        spk1, mem1 = self.lif1.init_rleaky()
        spk2, mem2 = self.lif2.init_rleaky()

        # Record the final layer
        spk2_rec = []
        mem2_rec = []

        for step in range(x.size(0)):  # time x batch x num_inputs
            # cur1 = self.fc1(x[step].flatten(1))
            cur1 = self.fc1(x[step])
            spk1, mem1 = self.lif1(cur1, spk1, mem1)
            cur2 = self.fc2(spk1)
            spk2, mem2 = self.lif2(cur2, spk2, mem2)
            spk2_rec.append(spk2)
            mem2_rec.append(mem2)

        return torch.stack(spk2_rec, dim=0), torch.stack(mem2_rec, dim=0)


# Load the network onto CUDA if available
rnet = RSNN().to(device)

In [None]:
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(rnet.parameters(), lr=1e-3, betas=(0.9, 0.999))

num_epochs = 5
loss_hist = []
counter = 0

# Outer training loop
for epoch in range(num_epochs):
    print("Epoch", epoch)
    counter = 0
    # Minibatch training loop
    for data, targets in iter(train_dataloader):
        data = data.to(device, dtype=torch.float32)
        targets = targets.to(device)

        # forward pass
        rnet.train()

        spk_rec, _ = rnet(data.unsqueeze(0))

        # initialize the loss & sum over time
        # loss_val = torch.zeros((1), dtype=dtype, device=device)
        # loss_val += loss(spk_rec.sum(0), targets)
        loss_val = loss(spk_rec.view(-1, vocab_size), targets.view(-1))  # Reshape for CrossEntropyLoss

        # Gradient calculation + weight update
        optimizer.zero_grad()
        loss_val.backward()
        optimizer.step()

        # Store loss history for future plotting
        loss_hist.append(loss_val.item())

        # Print train/test loss/accuracy
        if counter % 10 == 0:
            print(f"Iteration: {counter} \t Train Loss: {loss_val.item()}")
        counter += 1

        if counter == 100:
          break

Epoch 0
Iteration: 0 	 Train Loss: 9.36059284210205
Iteration: 10 	 Train Loss: 9.197749137878418
Iteration: 20 	 Train Loss: 9.13675308227539
Iteration: 30 	 Train Loss: 8.981857299804688
Iteration: 40 	 Train Loss: 9.201375961303711
Iteration: 50 	 Train Loss: 9.045947074890137
Iteration: 60 	 Train Loss: 9.015724182128906
Iteration: 70 	 Train Loss: 9.016396522521973
Iteration: 80 	 Train Loss: 9.048698425292969
Iteration: 90 	 Train Loss: 8.830820083618164
Epoch 1
Iteration: 0 	 Train Loss: 8.831679344177246
Iteration: 10 	 Train Loss: 8.644898414611816
Iteration: 20 	 Train Loss: 8.9275484085083
Iteration: 30 	 Train Loss: 8.897180557250977
Iteration: 40 	 Train Loss: 8.741438865661621
Iteration: 50 	 Train Loss: 8.929606437683105
Iteration: 60 	 Train Loss: 8.804450035095215
Iteration: 70 	 Train Loss: 9.086922645568848
Iteration: 80 	 Train Loss: 9.087335586547852
Iteration: 90 	 Train Loss: 8.838024139404297
Epoch 2
Iteration: 0 	 Train Loss: 8.807150840759277
Iteration: 10 	 T

In [None]:
# Set the model to evaluation mode
rnet.eval()

# Define your input sequence as a list of word indices (adjust this based on your input)
input_sequence = [tokenizer.word_index["mi"], tokenizer.word_index["amigo"]]  # Example input, adjust as needed

# Convert the input sequence to a tensor
input_sequence = torch.tensor(input_sequence, dtype=torch.float32).to(device)

# Run inference on the input sequence
with torch.no_grad():
    output_spikes, _ = rnet(input_sequence.unsqueeze(0).unsqueeze(0))

# Convert output spikes into probabilities
output_probabilities = torch.softmax(output_spikes[-1], dim=1)
print(output_probabilities)

# Get the predicted word index (argmax)
predicted_word_index = output_probabilities.argmax(dim=1).item()

# Convert the predicted word index back to text using your vocabulary mapping
predicted_word = tokenizer.index_word[predicted_word_index]

# You now have the generated word based on your model's prediction
print(predicted_word_index, "-->", predicted_word)



tensor([[8.1919e-05, 2.2268e-04, 2.2268e-04,  ..., 8.1919e-05, 8.1919e-05,
         8.1919e-05]])
1 --> que


In [None]:
torch.set_printoptions(threshold=10_000)