### KAN For Multivariate Classification in Predicting Visual Stimuli Given Firing Rates

In [25]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import KANLayer  # Assuming kan is the module containing KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

# Hyper-parameters
batch_size = 16
num_epochs = 5
num_intervals = 8
k = 6
noise_scale = 0.1
scale_base = 1.0
scale_sp = 1.0
base_fun = nn.SiLU()
grid_eps = 0.0
grid_range = [0, 1]
sp_trainable = True
sb_trainable = True
# Sparsification parameters
lambda_l1 = 1e-4
lambda_entropy = 1e-4

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Pull and process data
mouse_number = 715093703
spike_df = master_function(session_number=mouse_number, output_dir="output", timesteps_per_frame=1)
spike_df = spike_df[spike_df['frame'] >= 0]

# Prepare data
X = spike_df.drop('frame', axis=1).values
y = spike_df['frame'].values.astype(int)

# Normalize the data
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X = X_normalized
num_classes = len(np.unique(y))

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Convert to PyTorch tensors
X_train = torch.from_numpy(X_train).float().to(device)
y_train = torch.from_numpy(y_train).long().to(device)
X_test = torch.from_numpy(X_test).float().to(device)
y_test = torch.from_numpy(y_test).long().to(device)

# Create DataLoader
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Initialize the KANLayer
in_dim = X_train.shape[1]  # 2073 in this case
out_dim = num_classes

kan_layer = KANLayer(in_dim=in_dim, out_dim=out_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                     scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                     grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)

# Example forward pass
def forward_pass(kan_layer, X):
    y_pred, preacts, postacts, postspline = kan_layer(X)
    return y_pred

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(kan_layer.parameters(), lr=0.01)

# Helper function to compute sparsification penalties
def compute_sparsification_penalty(kan_layer):
    l1_penalty = 0.0
    entropy_penalty = 0.0
    for param in kan_layer.parameters():
        l1_penalty += torch.sum(torch.abs(param))
        param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
        entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
    return lambda_l1 * l1_penalty + lambda_entropy * entropy_penalty

# Training loop with sparsification
for epoch in range(num_epochs):
    kan_layer.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = forward_pass(kan_layer, X_batch)
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(kan_layer)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    kan_layer.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred = forward_pass(kan_layer, X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')


Updated version 3!
Initializing workflow...
Loading existing datasets...
Loaded spike trains dataset: <class 'pandas.core.frame.DataFrame'>
Total time elapsed: 0.01 seconds


                                                            

Epoch [1/5], Train Loss: 195.3483, Train Accuracy: 20.04%, Test Loss: 4.1914, Test Accuracy: 51.10%


                                                            

Epoch [2/5], Train Loss: 163.6227, Train Accuracy: 77.78%, Test Loss: 2.0736, Test Accuracy: 58.14%


                                                            

Epoch [3/5], Train Loss: 152.0205, Train Accuracy: 86.72%, Test Loss: 1.6183, Test Accuracy: 67.97%


                                                            

Epoch [4/5], Train Loss: 145.4312, Train Accuracy: 88.05%, Test Loss: 1.5204, Test Accuracy: 70.76%


                                                            

Epoch [5/5], Train Loss: 141.8137, Train Accuracy: 88.22%, Test Loss: 1.5012, Test Accuracy: 71.86%


In [35]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import KANLayer  # Assuming kan is the module containing KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

# Hyper-parameters
batch_size = 16
num_epochs = 5
num_intervals = 3
k = 3
noise_scale = 0.1
scale_base = 1.0
scale_sp = 1.0
base_fun = nn.SiLU()
grid_eps = 0.0
grid_range = [0, 1]
sp_trainable = True
sb_trainable = True
# Sparsification parameters
lambda_l1 = 1e-4
lambda_entropy = 1e-4
n_components = 120  # Adjust this based on your requirements

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Pull and process data
mouse_number = 715093703
spike_df = master_function(session_number=mouse_number, output_dir="output", timesteps_per_frame=1)
spike_df = spike_df[spike_df['frame'] >= 0]

# Prepare data
X = spike_df.drop('frame', axis=1).values
y = spike_df['frame'].values.astype(int)

# Normalize the data
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X = X_normalized
num_classes = len(np.unique(y))

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Perform PCA to reduce dimensions
pca = PCA(n_components=n_components)
X_train_reduced = pca.fit_transform(X_train)
X_test_reduced = pca.transform(X_test)

# Convert reduced data back to PyTorch tensors
X_train_reduced = torch.from_numpy(X_train_reduced).float().to(device)
X_test_reduced = torch.from_numpy(X_test_reduced).float().to(device)
y_train = torch.from_numpy(y_train).long().to(device)
y_test = torch.from_numpy(y_test).long().to(device)

# Create DataLoader with reduced data
train_dataset_reduced = TensorDataset(X_train_reduced, y_train)
test_dataset_reduced = TensorDataset(X_test_reduced, y_test)
train_loader_reduced = DataLoader(train_dataset_reduced, batch_size=batch_size, shuffle=True)
test_loader_reduced = DataLoader(test_dataset_reduced, batch_size=batch_size, shuffle=False)

# Initialize the KANLayer with reduced input dimension
in_dim_reduced = X_train_reduced.shape[1]  # This should be equal to n_components
out_dim = num_classes

kan_layer_reduced = KANLayer(in_dim=in_dim_reduced, out_dim=out_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                             scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                             grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)

# Example forward pass
def forward_pass(kan_layer, X):
    y_pred, preacts, postacts, postspline = kan_layer(X)
    return y_pred

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(kan_layer_reduced.parameters(), lr=0.01)

# Helper function to compute sparsification penalties
def compute_sparsification_penalty(kan_layer):
    l1_penalty = 0.0
    entropy_penalty = 0.0
    for param in kan_layer.parameters():
        l1_penalty += torch.sum(torch.abs(param))
        param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
        entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
    return lambda_l1 * l1_penalty + lambda_entropy * entropy_penalty

# Training loop with reduced data
for epoch in range(num_epochs):
    kan_layer_reduced.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader_reduced, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = forward_pass(kan_layer_reduced, X_batch)
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(kan_layer_reduced)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    kan_layer_reduced.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader_reduced:
            y_pred = forward_pass(kan_layer_reduced, X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')

Updated version 3!
Initializing workflow...
Loading existing datasets...
Loaded spike trains dataset: <class 'pandas.core.frame.DataFrame'>
Total time elapsed: 0.01 seconds


                                                             

Epoch [1/5], Train Loss: 10.2611, Train Accuracy: 38.03%, Test Loss: 2.1334, Test Accuracy: 58.98%


                                                             

Epoch [2/5], Train Loss: 7.8033, Train Accuracy: 75.70%, Test Loss: 1.8754, Test Accuracy: 62.20%


                                                             

Epoch [3/5], Train Loss: 7.0614, Train Accuracy: 80.04%, Test Loss: 1.8434, Test Accuracy: 61.27%


                                                             

Epoch [4/5], Train Loss: 6.7371, Train Accuracy: 81.42%, Test Loss: 1.8541, Test Accuracy: 60.17%


                                                             

Epoch [5/5], Train Loss: 6.5690, Train Accuracy: 81.48%, Test Loss: 1.8588, Test Accuracy: 60.25%


### KAN Symbolic layers

In [36]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import KANLayer  # Assuming kan is the module containing KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

# Hyper-parameters
batch_size = 16
num_epochs = 5
num_intervals = 3
k = 3
noise_scale = 0.1
scale_base = 1.0
scale_sp = 1.0
base_fun = nn.SiLU()
grid_eps = 0.0
grid_range = [0, 1]
sp_trainable = True
sb_trainable = True
lambda_l1 = 1e-4
lambda_entropy = 1e-4
n_components = 120  # Adjust this based on your requirements

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Pull and process data
mouse_number = 715093703
spike_df = master_function(session_number=mouse_number, output_dir="output", timesteps_per_frame=1)
spike_df = spike_df[spike_df['frame'] >= 0]

# Prepare data
X = spike_df.drop('frame', axis=1).values
y = spike_df['frame'].values.astype(int)

# Normalize the data
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X = X_normalized
num_classes = len(np.unique(y))

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Perform PCA to reduce dimensions
pca = PCA(n_components=n_components)
X_train_reduced = pca.fit_transform(X_train)
X_test_reduced = pca.transform(X_test)

# Convert reduced data back to PyTorch tensors
X_train_reduced = torch.from_numpy(X_train_reduced).float().to(device)
X_test_reduced = torch.from_numpy(X_test_reduced).float().to(device)
y_train = torch.from_numpy(y_train).long().to(device)
y_test = torch.from_numpy(y_test).long().to(device)

# Create DataLoader with reduced data
train_dataset_reduced = TensorDataset(X_train_reduced, y_train)
test_dataset_reduced = TensorDataset(X_test_reduced, y_test)
train_loader_reduced = DataLoader(train_dataset_reduced, batch_size=batch_size, shuffle=True)
test_loader_reduced = DataLoader(test_dataset_reduced, batch_size=batch_size, shuffle=False)

# Initialize the KANLayer with reduced input dimension
in_dim_reduced = X_train_reduced.shape[1]  # This should be equal to n_components
out_dim = num_classes

kan_layer_reduced = KANLayer(in_dim=in_dim_reduced, out_dim=out_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                             scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                             grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)

# Example forward pass
def forward_pass(kan_layer, X):
    y_pred, preacts, postacts, postspline = kan_layer(X)
    return y_pred

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(kan_layer_reduced.parameters(), lr=0.01)

# Helper function to compute sparsification penalties
def compute_sparsification_penalty(kan_layer):
    l1_penalty = 0.0
    entropy_penalty = 0.0
    for param in kan_layer.parameters():
        l1_penalty += torch.sum(torch.abs(param))
        param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
        entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
    return lambda_l1 * l1_penalty + lambda_entropy * entropy_penalty

# Training loop with reduced data
for epoch in range(num_epochs):
    kan_layer_reduced.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader_reduced, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = forward_pass(kan_layer_reduced, X_batch)
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(kan_layer_reduced)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    kan_layer_reduced.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader_reduced:
            y_pred = forward_pass(kan_layer_reduced, X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')


Updated version 3!
Initializing workflow...
Loading existing datasets...
Loaded spike trains dataset: <class 'pandas.core.frame.DataFrame'>
Total time elapsed: 0.01 seconds


                                                             

Epoch [1/5], Train Loss: 10.2471, Train Accuracy: 37.84%, Test Loss: 2.2095, Test Accuracy: 55.85%


                                                             

Epoch [2/5], Train Loss: 7.7887, Train Accuracy: 75.68%, Test Loss: 1.9351, Test Accuracy: 60.51%


                                                             

Epoch [3/5], Train Loss: 7.0602, Train Accuracy: 80.44%, Test Loss: 1.8320, Test Accuracy: 62.71%


                                                             

Epoch [4/5], Train Loss: 6.7352, Train Accuracy: 80.91%, Test Loss: 1.8876, Test Accuracy: 60.34%


                                                             

Epoch [5/5], Train Loss: 6.5696, Train Accuracy: 81.55%, Test Loss: 1.8697, Test Accuracy: 61.19%


In [26]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import Symbolic_KANLayer  # Assuming kan is the module containing Symbolic_KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

# Hyper-parameters
batch_size = 16
num_epochs = 5
num_intervals = 4
k = 3
noise_scale = 0.1
scale_base = 1.0
scale_sp = 1.0
base_fun = nn.SiLU()
grid_eps = 0.0
grid_range = [0, 1]
sp_trainable = True
sb_trainable = True
# Sparsification parameters
lambda_l1 = 1e-4
lambda_entropy = 1e-4

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Pull and process data
mouse_number = 715093703
spike_df = master_function(session_number=mouse_number, output_dir="output", timesteps_per_frame=1)
spike_df = spike_df[spike_df['frame'] >= 0]

# Prepare data
X = spike_df.drop('frame', axis=1).values
y = spike_df['frame'].values.astype(int)

# Normalize the data
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X = X_normalized
num_classes = len(np.unique(y))

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Convert to PyTorch tensors
X_train = torch.from_numpy(X_train).float().to(device)
y_train = torch.from_numpy(y_train).long().to(device)
X_test = torch.from_numpy(X_test).float().to(device)
y_test = torch.from_numpy(y_test).long().to(device)

# Create DataLoader
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Initialize the Symbolic_KANLayer
in_dim = X_train.shape[1]  # 2073 in this case
out_dim = num_classes

symbolic_kan_layer = Symbolic_KANLayer(in_dim=in_dim, out_dim=out_dim, device=device).to(device)

# Define symbolic functions for the layer (example, you can modify as needed)
for i in range(in_dim):
    for j in range(out_dim):
        symbolic_kan_layer.fix_symbolic(i, j, 'sin')  # Replace 'sin' with the desired symbolic function

# Example forward pass
def forward_pass(symbolic_kan_layer, X):
    y_pred, postacts = symbolic_kan_layer(X)
    return y_pred

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(symbolic_kan_layer.parameters(), lr=0.01)

# Helper function to compute sparsification penalties
def compute_sparsification_penalty(symbolic_kan_layer):
    l1_penalty = 0.0
    entropy_penalty = 0.0
    for param in symbolic_kan_layer.parameters():
        l1_penalty += torch.sum(torch.abs(param))
        param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
        entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
    return lambda_l1 * l1_penalty + lambda_entropy * entropy_penalty

# Training loop with sparsification
for epoch in range(num_epochs):
    symbolic_kan_layer.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = forward_pass(symbolic_kan_layer, X_batch)
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(symbolic_kan_layer)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    symbolic_kan_layer.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred = forward_pass(symbolic_kan_layer, X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')


Updated version 3!
Initializing workflow...
Loading existing datasets...
Loaded spike trains dataset: <class 'pandas.core.frame.DataFrame'>
Total time elapsed: 0.02 seconds


                                                              

KeyboardInterrupt: 

In [27]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import Symbolic_KANLayer  # Assuming kan is the module containing Symbolic_KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

# Hyper-parameters
batch_size = 16
num_epochs = 5
num_intervals = 4
k = 4
noise_scale = 0.1
scale_base = 1.0
scale_sp = 1.0
base_fun = nn.SiLU()
grid_eps = 0.0
grid_range = [0, 1]
sp_trainable = True
sb_trainable = True
# Sparsification parameters
lambda_l1 = 1e-4
lambda_entropy = 1e-4

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Pull and process data
mouse_number = 715093703
spike_df = master_function(session_number=mouse_number, output_dir="output", timesteps_per_frame=1)
spike_df = spike_df[spike_df['frame'] >= 0]

# Prepare data
X = spike_df.drop('frame', axis=1).values
y = spike_df['frame'].values.astype(int)

# Normalize the data
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X = X_normalized
num_classes = len(np.unique(y))

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Convert to PyTorch tensors
X_train = torch.from_numpy(X_train).float().to(device)
y_train = torch.from_numpy(y_train).long().to(device)
X_test = torch.from_numpy(X_test).float().to(device)
y_test = torch.from_numpy(y_test).long().to(device)

# Create DataLoader
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define a model with a linear layer followed by a Symbolic_KANLayer
class DimensionalityReductionModel(nn.Module):
    def __init__(self, input_dim, reduced_dim, symbolic_kan_layer):
        super(DimensionalityReductionModel, self).__init__()
        self.linear = nn.Linear(input_dim, reduced_dim)
        self.symbolic_kan_layer = symbolic_kan_layer

    def forward(self, x):
        reduced_x = self.linear(x)
        y_pred, postacts = self.symbolic_kan_layer(reduced_x)
        return y_pred, postacts

# Initialize the Symbolic_KANLayer
in_dim = X_train.shape[1]  # 2073 in this case
reduced_dim = 512  # You can choose an appropriate reduced dimension
out_dim = num_classes

symbolic_kan_layer = Symbolic_KANLayer(in_dim=reduced_dim, out_dim=out_dim, device=device).to(device)

# Define symbolic functions for the layer (example, you can modify as needed)
for i in range(reduced_dim):
    for j in range(out_dim):
        symbolic_kan_layer.fix_symbolic(i, j, 'sin')  # Replace 'sin' with the desired symbolic function

# Initialize the full model
model = DimensionalityReductionModel(in_dim, reduced_dim, symbolic_kan_layer).to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Helper function to compute sparsification penalties
def compute_sparsification_penalty(symbolic_kan_layer):
    l1_penalty = 0.0
    entropy_penalty = 0.0
    for param in symbolic_kan_layer.parameters():
        l1_penalty += torch.sum(torch.abs(param))
        param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
        entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
    return lambda_l1 * l1_penalty + lambda_entropy * entropy_penalty

# Training loop with sparsification
for epoch in range(num_epochs):
    model.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred, _ = model(X_batch)
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(symbolic_kan_layer)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    model.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred, _ = model(X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')


Updated version 3!
Initializing workflow...
Loading existing datasets...
Loaded spike trains dataset: <class 'pandas.core.frame.DataFrame'>
Total time elapsed: 0.02 seconds


                                                            

KeyboardInterrupt: 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import Symbolic_KANLayer  # Assuming kan is the module containing KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

# Hyper-parameters
batch_size = 16
num_epochs = 5
num_intervals = 4
k = 4
noise_scale = .05
scale_base = 1
scale_sp = 1.0
base_fun = nn.SiLU()
grid_eps = 1
grid_range = [0, 1]
sp_trainable = True
sb_trainable = True
reduction_dim = 120  # New dimension after reduction
# Sparsification parameters
lambda_l1 = 1e-4
lambda_entropy = 1e-4

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Pull and process data
mouse_number = 732592105

spike_df = master_function(session_number=mouse_number, output_dir="output", timesteps_per_frame=1)
spike_df = spike_df[spike_df['frame'] >= 0]

# Prepare data
X = spike_df.drop('frame', axis=1).values
y = spike_df['frame'].values.astype(int)

# Normalize the data
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X = X_normalized
num_classes = len(np.unique(y))

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Convert to PyTorch tensors
X_train = torch.from_numpy(X_train).float().to(device)
y_train = torch.from_numpy(y_train).long().to(device)
X_test = torch.from_numpy(X_test).float().to(device)
y_test = torch.from_numpy(y_test).long().to(device)

# Create DataLoader
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define the model
class ConnectomicsModel(nn.Module):
    def __init__(self, in_dim, reduction_dim, out_dim, num_intervals, k, noise_scale, scale_base, scale_sp, base_fun, grid_eps, grid_range, sp_trainable, sb_trainable, device):
        super(ConnectomicsModel, self).__init__()
        self.fc = nn.Linear(in_dim, reduction_dim)
        self.symbolic_kan = Symbolic_KANLayer(in_dim=reduction_dim, out_dim=out_dim, device=device)

    def forward(self, x):
        x = self.fc(x)
        y_pred, postacts = self.symbolic_kan(x)
        return y_pred

# Initialize the model
model = ConnectomicsModel(
    in_dim=X_train.shape[1], 
    reduction_dim=reduction_dim, 
    out_dim=num_classes, 
    num_intervals=num_intervals, 
    k=k, 
    noise_scale=noise_scale, 
    scale_base=scale_base, 
    scale_sp=scale_sp, 
    base_fun=base_fun, 
    grid_eps=grid_eps, 
    grid_range=grid_range, 
    sp_trainable=sp_trainable, 
    sb_trainable=sb_trainable, 
    device=device
).to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Helper function to compute sparsification penalties
def compute_sparsification_penalty(kan_layer):
    l1_penalty = 0.0
    entropy_penalty = 0.0
    for param in kan_layer.parameters():
        l1_penalty += torch.sum(torch.abs(param))
        param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
        entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
    return lambda_l1 * l1_penalty + lambda_entropy * entropy_penalty

# Training loop with sparsification
for epoch in range(num_epochs):
    model.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = model(X_batch)
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(model.symbolic_kan)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    model.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')


## Multi-Layered KAN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import KANLayer  # Assuming kan is the module containing KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Pull and process data
mouse_number = 715093703
spike_df = master_function(session_number=mouse_number, output_dir="output", timesteps_per_frame=1)
spike_df = spike_df[spike_df['frame'] >= 0]

# Prepare data
X = spike_df.drop('frame', axis=1).values
y = spike_df['frame'].values.astype(int)

# Normalize the data
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X = X_normalized
num_classes = len(np.unique(y))

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Convert to PyTorch tensors
X_train = torch.from_numpy(X_train).float().to(device)
y_train = torch.from_numpy(y_train).long().to(device)
X_test = torch.from_numpy(X_test).float().to(device)
y_test = torch.from_numpy(y_test).long().to(device)

# Create DataLoader
batch_size = 16
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define a custom KAN model with an intermediate layer
class CustomKANModel(nn.Module):
    def __init__(self, in_dim, inter_dim, out_dim, num_intervals, k, noise_scale, scale_base, scale_sp, base_fun, grid_eps, grid_range, sp_trainable, sb_trainable, device):
        super(CustomKANModel, self).__init__()
        self.kan_layer1 = KANLayer(in_dim=in_dim, out_dim=inter_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                                   scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                                   grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)
        self.kan_layer2 = KANLayer(in_dim=inter_dim, out_dim=out_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                                   scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                                   grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)

    def forward(self, X):
        y_pred1, preacts1, postacts1, postspline1 = self.kan_layer1(X)
        y_pred2, preacts2, postacts2, postspline2 = self.kan_layer2(y_pred1)
        return y_pred2

# Initialize the CustomKANModel
in_dim = X_train.shape[1]  # 2073 in this case
inter_dim = 1000  # Example intermediate dimension
out_dim = num_classes
num_intervals = 8
k = 6
noise_scale = 0.1
scale_base = 1.0
scale_sp = 1.0
base_fun = nn.SiLU()
grid_eps = 1
grid_range = [0, 1]
sp_trainable = True
sb_trainable = True

kan_model = CustomKANModel(in_dim=in_dim, inter_dim=inter_dim, out_dim=out_dim, num_intervals=num_intervals, k=k, noise_scale=noise_scale, 
                            scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps, grid_range=grid_range, 
                            sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(kan_model.parameters(), lr=0.01)

# Sparsification parameters
lambda_l1 = 1e-4
lambda_entropy = 1e-4

# Helper function to compute sparsification penalties
def compute_sparsification_penalty(model):
    l1_penalty = 0.0
    entropy_penalty = 0.0
    for param in model.parameters():
        l1_penalty += torch.sum(torch.abs(param))
        param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
        entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
    return lambda_l1 * l1_penalty + lambda_entropy * entropy_penalty

# Training loop with sparsification
num_epochs = 100
for epoch in range(num_epochs):
    kan_model.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = kan_model(X_batch)
        
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(kan_model)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    kan_model.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred = kan_model(X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')

# Pruning function
def prune_kan_layer(kan_layer, threshold=1e-2):
    with torch.no_grad():
        for name, param in kan_layer.named_parameters():
            if 'weight' in name:
                param *= (torch.abs(param) > threshold).float()

# Prune the KAN layers after training
prune_kan_layer(kan_model.kan_layer1)
prune_kan_layer(kan_model.kan_layer2)

# Retrain the pruned KAN model
for epoch in range(num_epochs):
    kan_model.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = kan_model(X_batch)
        
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(kan_model)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    kan_model.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred = kan_model(X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')


In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import KANLayer  # Assuming kan is the module containing KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

class KANModelTrainer:
    def __init__(self, mouse_number, output_dir, timesteps_per_frame, batch_size=16, num_epochs=100, device=None):
        self.mouse_number = mouse_number
        self.output_dir = output_dir
        self.timesteps_per_frame = timesteps_per_frame
        self.batch_size = batch_size
        self.num_epochs = num_epochs
        self.device = device if device else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.criterion = nn.CrossEntropyLoss()
        self.lambda_l1 = 1e-4
        self.lambda_entropy = 1e-4

        # Set default hyperparameters
        self.in_dim = None
        self.inter_dim = 250
        self.out_dim = None
        self.num_intervals = 7
        self.k = 6
        self.noise_scale = 0.1
        self.scale_base = 1.0
        self.scale_sp = 1.0
        self.base_fun = nn.SiLU()
        self.grid_eps = 0.02
        self.grid_range = [0, 1]
        self.sp_trainable = True
        self.sb_trainable = True
        self.num_layers = 3
        self.lr = 0.01

    def set_hyperparameters(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

    def prepare_data(self):
        spike_df = master_function(session_number=self.mouse_number, output_dir=self.output_dir, timesteps_per_frame=self.timesteps_per_frame)
        spike_df = spike_df[spike_df['frame'] >= 0]

        X = spike_df.drop('frame', axis=1).values
        y = spike_df['frame'].values.astype(int)

        scaler = MinMaxScaler()
        X_normalized = scaler.fit_transform(X)
        X = X_normalized
        self.out_dim = len(np.unique(y))

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
        
        self.in_dim = X_train.shape[1]

        X_train = torch.from_numpy(X_train).float().to(self.device)
        y_train = torch.from_numpy(y_train).long().to(self.device)
        X_test = torch.from_numpy(X_test).float().to(self.device)
        y_test = torch.from_numpy(y_test).long().to(self.device)

        train_dataset = TensorDataset(X_train, y_train)
        test_dataset = TensorDataset(X_test, y_test)
        self.train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
        self.test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)

    def create_kan_model(self):
        class CustomKANModel(nn.Module):
            def __init__(self, in_dim, inter_dim, out_dim, num_intervals, k, noise_scale, scale_base, scale_sp, base_fun, grid_eps, grid_range, sp_trainable, sb_trainable, num_layers, device):
                super(CustomKANModel, self).__init__()
                self.layers = nn.ModuleList()
                self.layers.append(KANLayer(in_dim=in_dim, out_dim=inter_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                                            scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                                            grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device))
                for _ in range(num_layers - 2):
                    self.layers.append(KANLayer(in_dim=inter_dim, out_dim=inter_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                                                scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                                                grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device))
                self.layers.append(KANLayer(in_dim=inter_dim, out_dim=out_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                                            scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                                            grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device))
            
            def forward(self, X):
                for layer in self.layers:
                    y_pred, _, _, _ = layer(X)
                    X = y_pred
                return y_pred

        self.kan_model = CustomKANModel(self.in_dim, self.inter_dim, self.out_dim, self.num_intervals, self.k, self.noise_scale, 
                                        self.scale_base, self.scale_sp, self.base_fun, self.grid_eps, self.grid_range, 
                                        self.sp_trainable, self.sb_trainable, self.num_layers, self.device).to(self.device)
        self.optimizer = optim.Adam(self.kan_model.parameters(), lr=self.lr)

    def compute_sparsification_penalty(self, model):
        l1_penalty = 0.0
        entropy_penalty = 0.0
        for param in model.parameters():
            l1_penalty += torch.sum(torch.abs(param))
            param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
            entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
        return self.lambda_l1 * l1_penalty + self.lambda_entropy * entropy_penalty

    def train(self):
        for epoch in range(self.num_epochs):
            self.kan_model.train()
            train_losses = []
            train_correct = 0
            total_train = 0
            
            for X_batch, y_batch in tqdm(self.train_loader, desc=f'Epoch {epoch+1}/{self.num_epochs}', leave=False):
                y_pred = self.kan_model(X_batch)
                loss = self.criterion(y_pred, y_batch)
                sparsification_penalty = self.compute_sparsification_penalty(self.kan_model)
                total_loss = loss + sparsification_penalty
                self.optimizer.zero_grad()
                total_loss.backward()
                self.optimizer.step()
                
                train_losses.append(total_loss.item())
                _, predicted = torch.max(y_pred.data, 1)
                total_train += y_batch.size(0)
                train_correct += (predicted == y_batch).sum().item()
            
            train_accuracy = 100 * train_correct / total_train
            
            self.kan_model.eval()
            test_losses = []
            test_correct = 0
            total_test = 0
            
            with torch.no_grad():
                for X_batch, y_batch in self.test_loader:
                    y_pred = self.kan_model(X_batch)
                    loss = self.criterion(y_pred, y_batch)
                    test_losses.append(loss.item())
                    _, predicted = torch.max(y_pred.data, 1)
                    total_test += y_batch.size(0)
                    test_correct += (predicted == y_batch).sum().item()
            
            test_accuracy = 100 * test_correct / total_test
            
            print(f'Epoch [{epoch+1}/{self.num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')

    def prune_kan_layer(self, kan_layer, threshold=1e-2):
        with torch.no_grad():
            for name, param in kan_layer.named_parameters():
                if 'weight' in name:
                    param *= (torch.abs(param) > threshold).float()

    def prune_and_retrain(self):
        for layer in self.kan_model.layers:
            self.prune_kan_layer(layer)
        
        for epoch in range(self.num_epochs):
            self.kan_model.train()
            train_losses = []
            train_correct = 0
            total_train = 0
            
            for X_batch, y_batch in tqdm(self.train_loader, desc=f'Epoch {epoch+1}/{self.num_epochs}', leave=False):
                y_pred = self.kan_model(X_batch)
                loss = self.criterion(y_pred, y_batch)
                sparsification_penalty = self.compute_sparsification_penalty(self.kan_model)
                total_loss = loss + sparsification_penalty
                self.optimizer.zero_grad()
                total_loss.backward()
                self.optimizer.step()
                
                train_losses.append(total_loss.item())
                _, predicted = torch.max(y_pred.data, 1)
                total_train += y_batch.size(0)
                train_correct += (predicted == y_batch).sum().item()
            
            train_accuracy = 100 * train_correct / total_train
            
            self.kan_model.eval()
            test_losses = []
            test_correct = 0
            total_test = 0
            
            with torch.no_grad():
                for X_batch, y_batch in self.test_loader:
                    y_pred = self.kan_model(X_batch)
                    loss = self.criterion(y_pred, y_batch)
                    test_losses.append(loss.item())
                    _, predicted = torch.max(y_pred.data, 1)
                    total_test += y_batch.size(0)
                    test_correct += (predicted == y_batch).sum().item()
            
            test_accuracy = 100 * test_correct / total_test
            
            print(f'Epoch [{epoch+1}/{self.num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')

# Example usage
trainer = KANModelTrainer(mouse_number=715093703, output_dir="output", timesteps_per_frame=1, batch_size=16, num_epochs=6)
trainer.set_hyperparameters(inter_dim=500, num_layers=1, lr=0.01)
trainer.prepare_data()
trainer.create_kan_model()
trainer.train()
trainer.prune_and_retrain()


Updated version 3!
Initializing workflow...
Loading existing datasets...
Loaded spike trains dataset: <class 'pandas.core.frame.DataFrame'>
Total time elapsed: 0.01 seconds


                                                            

Epoch [1/6], Train Loss: 739.1743, Train Accuracy: 0.68%, Test Loss: 4.8930, Test Accuracy: 1.02%


                                                            

Epoch [2/6], Train Loss: 573.5616, Train Accuracy: 1.50%, Test Loss: 4.7925, Test Accuracy: 3.05%


                                                            

Epoch [3/6], Train Loss: 560.4291, Train Accuracy: 11.29%, Test Loss: 3.5963, Test Accuracy: 19.75%


                                                            

Epoch [4/6], Train Loss: 558.4958, Train Accuracy: 38.33%, Test Loss: 2.2982, Test Accuracy: 47.12%


                                                            

Epoch [5/6], Train Loss: 557.6904, Train Accuracy: 59.11%, Test Loss: 1.8608, Test Accuracy: 57.37%


                                                            

Epoch [6/6], Train Loss: 557.2176, Train Accuracy: 68.94%, Test Loss: 1.8196, Test Accuracy: 56.02%


                                                            

Epoch [1/6], Train Loss: 557.0008, Train Accuracy: 74.64%, Test Loss: 1.6900, Test Accuracy: 57.54%


                                                            

Epoch [2/6], Train Loss: 556.9613, Train Accuracy: 76.97%, Test Loss: 1.6114, Test Accuracy: 60.25%


                                                            

Epoch [3/6], Train Loss: 557.0994, Train Accuracy: 75.47%, Test Loss: 2.3321, Test Accuracy: 49.83%


                                                            

KeyboardInterrupt: 

In [None]:
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from kan import KANLayer  # Assuming kan is the module containing KANLayer
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from data_processors.pull_and_process_data import master_function
from tqdm import tqdm

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Pull and process data
mouse_number = 715093703
spike_df = master_function(session_number=mouse_number, output_dir="output", timesteps_per_frame=1)
spike_df = spike_df[spike_df['frame'] >= 0]

# Prepare data
X = spike_df.drop('frame', axis=1).values
y = spike_df['frame'].values.astype(int)

# Normalize the data
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X = X_normalized
num_classes = len(np.unique(y))

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Convert to PyTorch tensors
X_train = torch.from_numpy(X_train).float().to(device)
y_train = torch.from_numpy(y_train).long().to(device)
X_test = torch.from_numpy(X_test).float().to(device)
y_test = torch.from_numpy(y_test).long().to(device)

# Create DataLoader
batch_size = 16
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define a custom KAN model with an intermediate layer
class CustomKANModel(nn.Module):
    def __init__(self, in_dim, inter_dim, out_dim, num_intervals, k, noise_scale, scale_base, scale_sp, base_fun, grid_eps, grid_range, sp_trainable, sb_trainable, device):
        super(CustomKANModel, self).__init__()
        self.kan_layer1 = KANLayer(in_dim=in_dim, out_dim=inter_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                                   scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                                   grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)
        self.kan_layer2 = KANLayer(in_dim=inter_dim, out_dim=out_dim, num=num_intervals, k=k, noise_scale=noise_scale, 
                                   scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps,
                                   grid_range=grid_range, sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)

    def forward(self, X):
        y_pred1, preacts1, postacts1, postspline1 = self.kan_layer1(X)
        y_pred2, preacts2, postacts2, postspline2 = self.kan_layer2(y_pred1)
        return y_pred2

# Initialize the CustomKANModel
in_dim = X_train.shape[1]  # 2073 in this case
inter_dim = 118  # Example intermediate dimension
out_dim = num_classes
num_intervals = 7
k = 3
noise_scale = 0.1
scale_base = 1.0
scale_sp = 1.0
base_fun = nn.SiLU()
grid_eps = 0.02
grid_range = [-1, 1]
sp_trainable = True
sb_trainable = True

kan_model = CustomKANModel(in_dim=in_dim, inter_dim=inter_dim, out_dim=out_dim, num_intervals=num_intervals, k=k, noise_scale=noise_scale, 
                            scale_base=scale_base, scale_sp=scale_sp, base_fun=base_fun, grid_eps=grid_eps, grid_range=grid_range, 
                            sp_trainable=sp_trainable, sb_trainable=sb_trainable, device=device).to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(kan_model.parameters(), lr=0.01)

# Sparsification parameters
lambda_l1 = 1e-4
lambda_entropy = 1e-4

# Helper function to compute sparsification penalties
def compute_sparsification_penalty(model):
    l1_penalty = 0.0
    entropy_penalty = 0.0
    for param in model.parameters():
        l1_penalty += torch.sum(torch.abs(param))
        param_normalized = torch.abs(param) / torch.sum(torch.abs(param))
        entropy_penalty -= torch.sum(param_normalized * torch.log(param_normalized + 1e-10))
    return lambda_l1 * l1_penalty + lambda_entropy * entropy_penalty

# Training loop with sparsification
num_epochs = 100
for epoch in range(num_epochs):
    kan_model.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = kan_model(X_batch)
        
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(kan_model)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    kan_model.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred = kan_model(X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')

# Pruning function
def prune_kan_layer(kan_layer, threshold=1e-2):
    with torch.no_grad():
        for name, param in kan_layer.named_parameters():
            if 'weight' in name:
                param *= (torch.abs(param) > threshold).float()

# Prune the KAN layers after training
prune_kan_layer(kan_model.kan_layer1)
prune_kan_layer(kan_model.kan_layer2)

# Retrain the pruned KAN model
for epoch in range(num_epochs):
    kan_model.train()
    train_losses = []
    train_correct = 0
    total_train = 0
    
    # Training phase
    for X_batch, y_batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False):
        # Forward pass
        y_pred = kan_model(X_batch)
        
        # Compute loss
        loss = criterion(y_pred, y_batch)
        
        # Compute sparsification penalty
        sparsification_penalty = compute_sparsification_penalty(kan_model)
        
        # Total loss
        total_loss = loss + sparsification_penalty
        
        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        train_losses.append(total_loss.item())
        _, predicted = torch.max(y_pred.data, 1)
        total_train += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    
    train_accuracy = 100 * train_correct / total_train
    
    # Testing phase
    kan_model.eval()
    test_losses = []
    test_correct = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            y_pred = kan_model(X_batch)
            loss = criterion(y_pred, y_batch)
            test_losses.append(loss.item())
            _, predicted = torch.max(y_pred.data, 1)
            total_test += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    
    test_accuracy = 100 * test_correct / total_test
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {np.mean(train_losses):.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {np.mean(test_losses):.4f}, Test Accuracy: {test_accuracy:.2f}%')

In [None]:
# Example usage
trainer = KANModelTrainer(mouse_number=715093703, output_dir="output", timesteps_per_frame=1, batch_size=16, num_epochs=100)
trainer.set_hyperparameters(inter_dim=118, num_layers=3, lr=0.01)
trainer.prepare_data()
trainer.create_kan_model()

# Number of repetitions
num_repetitions = 3

# Repeat training and pruning
for i in range(num_repetitions):
    print(f"Repetition {i+1}/{num_repetitions}")
    trainer.train()
    trainer.prune_and_retrain()
