# KAN for IRIS

Iris dataset classification with Kolmogorov-Arnold networks. For learning purpsoes.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_iris

from kan.utils import create_dataset_from_data
from kan import KAN 

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'cuda:9'
print("Device:", device)

dtype = torch.get_default_dtype()
print ("Dtype:", dtype)


In [None]:
iris = load_iris()
X = iris.data
y = iris.target

X = torch.tensor(X, dtype=torch.float32)  # Ensure float32 for numerical data
y = torch.tensor(y, dtype=torch.long).unsqueeze(1)

dataset = create_dataset_from_data(X, y, device=device)

for key, value in dataset.items():
    print(key, value.shape)

nr_features = dataset['train_input'].shape[1]
print (f'We have {nr_features} features.')


### Regression - not recommended
Let's approach this as a regression problem first. I do not recommend doing classification this way, it was just something that came up during learning.


In [None]:
model_reg = KAN(width=[nr_features,16, 16,1], grid=5, k=3, seed=0).to(device)

def train_acc_reg():
    return torch.mean((torch.round(model_reg(dataset['train_input'])[:,0]) == dataset['train_label'][:,0]).float())

def test_acc_reg():
    return torch.mean((torch.round(model_reg(dataset['test_input'])[:,0]) == dataset['test_label'][:,0]).float())



In [None]:
results = model_reg.fit(
    dataset, 
    opt="LBFGS", 
    steps=10, 
    metrics=(train_acc_reg, test_acc_reg)
    )

print(results['train_acc_reg'][-1], results['test_acc_reg'][-1])



### Classification

Now we can do classification, typical softmax on the last layer.

Notes:
- We do not add softmax to the last layer, it is added by CrossEntropyLoss()
- We are rebuilding dataset. The trick is, for regression we wanted labes to be unsqueezed to size `[n, 1]`, and now we want a vector `[n]` where `n` stands for the number of examples (rows) in the dataset

In [None]:
if len(dataset['train_label'].shape) == 2:
    dataset['train_label'] = dataset['train_label'].squeeze()
    dataset['test_label'] = dataset['test_label'].squeeze()

for key, value in dataset.items():
    print(key, value.shape)

nr_features = dataset['train_input'].shape[1]
nr_classes = y.unique().shape[0]
print (f'We have {nr_features} features and {nr_classes} classes.')


In [None]:
model = KAN(
    width=[nr_features,16, 8,nr_classes], 
    grid=5, 
    k=3, 
    seed=0
    ).to(device)

def train_acc():
    return torch.mean((torch.argmax(model(
        dataset['train_input']), dim=1) == dataset['train_label']).type(dtype))

def test_acc():
    return torch.mean((torch.argmax(model(
        dataset['test_input']), dim=1) == dataset['test_label']).type(dtype))


results = model.fit(
    dataset, 
    opt="LBFGS", 
    steps=10, 
    metrics=(train_acc, test_acc), 
    loss_fn=torch.nn.CrossEntropyLoss()
    )

print(results['train_acc'][-1], results['test_acc'][-1])

In [None]:
# Results are logged after every step. Example:
print ('Train: ', results['train_acc'])
print ('Test: ', results['test_acc'])

#### Classification with adam learning

In [None]:
model = KAN(
    width=[nr_features,16, 8,nr_classes], 
    grid=5, 
    k=3, 
    seed=0
    ).to(device)    

model.fit(
    dataset, 
    opt="Adam", 
    lr=1e-3, 
    steps=50, 
    #lamb=1e-3, 
    #lamb_entropy=5., 
    #update_grid=False
    metrics=(train_acc, test_acc), 
    loss_fn=torch.nn.CrossEntropyLoss()    
    )

print(results['train_acc'][-1], results['test_acc'][-1])


### Learning with batches
You can learn with batches using native `KAN.fit()` but it works weird, and I did not figure out why. 
Anyway, to implement it into Dreamer we will have to integrate this with Torch, so let's check if we can do it this way.

In [None]:
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm

# Create DataLoaders
batch_size = 8
train_dataset = TensorDataset(dataset['train_input'],  dataset['train_label'])
test_dataset = TensorDataset(dataset['test_input'], dataset['test_label'])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

def calculate_accuracy(model, data_loader):
    """
    Calculate the accuracy of the model on data from data_loader
    """
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        acc = correct / total * 100
    return acc


model = KAN(
    width=[nr_features,16, 8,nr_classes], 
    grid=5, 
    k=3, 
    seed=0
    ).to(device) 

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)  # Decay factor of 0.95

# Training loop
epochs = 30

progress_bar = tqdm(range(epochs), desc="Training Progress", unit="epoch")
for epoch in progress_bar:
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU if available
        # Forward pass
        outputs = model(inputs)
        
        # Compute loss
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    scheduler.step()
    current_lr = scheduler.get_last_lr()[0]
    
    avg_loss = running_loss / len(train_loader)

    avg_loss = running_loss / len(train_loader)
    progress_bar.set_postfix(loss=f"{avg_loss:.4f}", lr=f"{current_lr:.4f}")

# Evaluate the model
model.eval()

train_acc = calculate_accuracy(model, train_loader)
test_acc = calculate_accuracy(model, test_loader)

print(f"Train Accuracy: {train_acc:.2f}%")
print(f"Test Accuracy: {test_acc:.2f}%")


In [48]:
import numpy as np
import sys
sys.path.append('..')
import tools

class MyKan(KAN):
    def __init__(
        self,
        inp_dim,
        shape,
        width,
        grid = 5, 
        k = 3,
        act="SiLU",
        norm=True,
        dist="normal",
        std=1.0,
        min_std=0.1,
        max_std=1.0,
        absmax=None,
        temp=0.1,
        unimix_ratio=0.01,
        outscale=1.0,
        symlog_inputs=False,
        device="cuda",
        name="NoName",
    ):
           
        seed = 0
        units = width[-1]

        super().__init__([inp_dim] + width, grid, k, seed)
        inp_dim = width[-1]

        self._shape = (shape,) if isinstance(shape, int) else shape
        if self._shape is not None and len(self._shape) == 0:
            self._shape = (1,)
        act = getattr(torch.nn, act)
        self._dist = dist
        self._std = std if isinstance(std, str) else torch.tensor((std,), device=device)
        self._min_std = min_std
        self._max_std = max_std
        self._absmax = absmax
        self._temp = temp
        self._unimix_ratio = unimix_ratio
        self._symlog_inputs = symlog_inputs
        self._device = device

        if isinstance(self._shape, dict):
            self.mean_layer = nn.ModuleDict()
            for name, shape in self._shape.items():
                self.mean_layer[name] = nn.Linear(inp_dim, np.prod(shape))
            self.mean_layer.apply(tools.uniform_weight_init(outscale))
            if self._std == "learned":
                assert dist in ("tanh_normal", "normal", "trunc_normal", "huber"), dist
                self.std_layer = nn.ModuleDict()
                for name, shape in self._shape.items():
                    self.std_layer[name] = nn.Linear(inp_dim, np.prod(shape))
                self.std_layer.apply(tools.uniform_weight_init(outscale))
        elif self._shape is not None:
            self.mean_layer = nn.Linear(inp_dim, np.prod(self._shape))
            self.mean_layer.apply(tools.uniform_weight_init(outscale))
            if self._std == "learned":
                assert dist in ("tanh_normal", "normal", "trunc_normal", "huber"), dist
                self.std_layer = nn.Linear(units, np.prod(self._shape))
                self.std_layer.apply(tools.uniform_weight_init(outscale))

    def forward(self, x):
        out = super().forward(x)
        out = self.mean_layer(out)
        return out
    

In [None]:
model = MyKan(
    inp_dim=nr_features,
    shape=nr_classes,
    width=[16, 8],
    grid=5,
    k=3,
    ).to(device)
print (model)

In [None]:
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)  # Decay factor of 0.95

# Training loop
epochs = 30

progress_bar = tqdm(range(epochs), desc="Training Progress", unit="epoch")
for epoch in progress_bar:
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU if available
        # Forward pass
        outputs = model(inputs)
        
        # Compute loss
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    scheduler.step()
    current_lr = scheduler.get_last_lr()[0]
    
    avg_loss = running_loss / len(train_loader)

    avg_loss = running_loss / len(train_loader)
    progress_bar.set_postfix(loss=f"{avg_loss:.4f}", lr=f"{current_lr:.4f}")

# Evaluate the model
model.eval()

train_acc = calculate_accuracy(model, train_loader)
test_acc = calculate_accuracy(model, test_loader)

print(f"Train Accuracy: {train_acc:.2f}%")
print(f"Test Accuracy: {test_acc:.2f}%")

### Quick evaluation with MLP

In [None]:
# write the MLP that we want to compare to
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(nr_features, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 3)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
mlp = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(mlp.parameters(), lr=1e-3)

# train the mlp
nr_epochs = 2000
for epoch in range(nr_epochs):
    optimizer.zero_grad()
    output = mlp(dataset['train_input'])
    loss = criterion(output, dataset['train_label'].squeeze())
    loss.backward()
    optimizer.step()
    if (epoch % (nr_epochs / 10) == 0) or (epoch == nr_epochs - 1):
        with torch.no_grad():
            train_acc = torch.mean(
                (torch.argmax(mlp(dataset['train_input']), dim=1) 
                == dataset['train_label']).type(dtype))
            test_acc = torch.mean(
                (torch.argmax(mlp(dataset['test_input']), dim=1) 
                == dataset['test_label']).type(dtype))

        print(f'Epoch {epoch}, Loss: {loss.item():.5f}, Train Acc: {train_acc:.3f}, Test Acc: {test_acc:.3f}')



In [None]:
import numpy as np
softmax = np.array([0.2] * 5)
print (sum(softmax))
entropy = -np.sum(softmax * np.log(softmax))
print (softmax, entropy)

In [None]:
softmax = np.array([0.9] + [0.02] * 5)
print (sum(softmax))
entropy = -np.sum(softmax * np.log(softmax))
print (softmax, entropy)