In [10]:
import torch
import torch.nn as nn
import torch as th
import pandas as pd
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset, random_split
from torch.optim import Adam
from torch.nn import MSELoss
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import time
import numpy as np


# Define the Naive Fourier KAN Layer class
class NaiveFourierKANLayer(th.nn.Module):
    def __init__(self, inputdim, outdim, gridsize, addbias=True):
        super(NaiveFourierKANLayer, self).__init__()
        self.gridsize = gridsize
        self.addbias = addbias
        self.inputdim = inputdim
        self.outdim = outdim
        
        self.fouriercoeffs = th.nn.Parameter(
            th.randn(2, outdim, inputdim, gridsize) / (np.sqrt(inputdim) * np.sqrt(self.gridsize))
        )
        if self.addbias:
            self.bias = th.nn.Parameter(th.zeros(1, outdim))

    def forward(self, x):
        xshp = x.shape
        outshape = xshp[0:-1] + (self.outdim,)
        x = th.reshape(x, (-1, self.inputdim))
        k = th.reshape(th.arange(1, self.gridsize + 1, device=x.device), (1, 1, 1, self.gridsize))
        xrshp = th.reshape(x, (x.shape[0], 1, x.shape[1], 1))
        c = th.cos(k * xrshp)
        s = th.sin(k * xrshp)
        y = th.sum(c * self.fouriercoeffs[0:1], (-2, -1))
        y += th.sum(s * self.fouriercoeffs[1:2], (-2, -1))
        if self.addbias:
            y += self.bias
        y = th.reshape(y, outshape)
        return y

def demo():
    bs = 10
    L = 3
    
    # Load data
    data = pd.read_csv('RRR_data2.csv')
    X = data[['px', 'py', 'pz']].values
    y = data[['theta_1', 'theta_2', 'theta_3']].values
    
    # Standardize the data
    scaler_X = StandardScaler()
    scaler_y = StandardScaler()
    X = scaler_X.fit_transform(X)
    y = scaler_y.fit_transform(y)
    
    # Convert to tensors
    X = th.tensor(X, dtype=th.float32)
    y = th.tensor(y, dtype=th.float32)
    
    # Create dataset and split into train and test sets
    dataset = TensorDataset(X, y)
    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
    
    train_loader = DataLoader(train_dataset, batch_size=bs, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=bs, shuffle=False)
    
    # Model parameters
    inputdim = 3
    hidden = 130
    outdim = 3
    gridsize = 5
    
    device = "cuda" if th.cuda.is_available() else "cpu"
    
    # Initialize models
    fkan1 = NaiveFourierKANLayer(inputdim, hidden, gridsize).to(device)
    fkan2 = NaiveFourierKANLayer(hidden, outdim, gridsize).to(device)
    
    # Loss function
    loss_fn = MSELoss()

    # Optimizer
    optimizer = Adam(list(fkan1.parameters()) + list(fkan2.parameters()), lr=1e-3)
    
    # Lists to store metrics
    train_losses = []
    test_losses = []
    epoch_rmse = []

    # Training loop
    for epoch in range(1, 101):  # 100 epochs with a print every 10 epochs
        fkan1.train()
        fkan2.train()
        epoch_train_loss = 0
        start_time = time.time()
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            h = fkan1(X_batch)
            y_pred = fkan2(h)
            loss = loss_fn(y_pred, y_batch)
            loss.backward()
            optimizer.step()
            epoch_train_loss += loss.item()
        
        # Calculate average train loss
        epoch_train_loss /= len(train_loader)
        train_losses.append(epoch_train_loss)
        
        # Validation
        fkan1.eval()
        fkan2.eval()
        epoch_test_loss = 0
        all_y_true = []
        all_y_pred = []
        with th.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                h = fkan1(X_batch)
                y_pred = fkan2(h)
                epoch_test_loss += loss_fn(y_pred, y_batch).item()
                all_y_true.append(y_batch.cpu().numpy())
                all_y_pred.append(y_pred.cpu().numpy())
        
        # Calculate average test loss
        epoch_test_loss /= len(test_loader)
        test_losses.append(epoch_test_loss)

        # Calculate RMSE
        all_y_true = np.concatenate(all_y_true)
        all_y_pred = np.concatenate(all_y_pred)
        all_y_true = scaler_y.inverse_transform(all_y_true)
        all_y_pred = scaler_y.inverse_transform(all_y_pred)
        rmse = np.sqrt(mean_squared_error(all_y_true, all_y_pred))
        epoch_rmse.append(rmse)

        # Print epoch metrics
        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Train Loss: {epoch_train_loss}, Test Loss: {epoch_test_loss}, RMSE: {rmse}')
    
    # Calculate final RMSE and R2 score on the whole dataset
    fkan1.eval()
    fkan2.eval()
    with th.no_grad():
        predictions = fkan2(fkan1(X.to(device)))
        predictions = scaler_y.inverse_transform(predictions.cpu().numpy())
        y_original = scaler_y.inverse_transform(y.cpu().numpy())
        final_rmse = np.sqrt(mean_squared_error(y_original, predictions))
        final_r2 = r2_score(y_original, predictions)
        print(f'Final RMSE: {final_rmse}')
        print(f'Final R2 Score: {final_r2}')
    
    # Plotting epoch vs loss and epoch vs RMSE
    plt.figure(figsize=(12, 5))

    # Plot Training and Testing Loss
    plt.subplot(1, 2, 1)
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Training Loss')
    plt.plot(range(1, len(test_losses) + 1), test_losses, label='Testing Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Epoch vs Loss')
    plt.legend()

    # Plot RMSE
    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(epoch_rmse) + 1), epoch_rmse, label='RMSE', marker='o')
    plt.xlabel('Epoch')
    plt.ylabel('RMSE')
    plt.title('Epoch vs RMSE')
    plt.legend()

    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    demo()


Epoch 10, Train Loss: 0.07259460911154747, Test Loss: 0.10674368747406536, RMSE: 0.3651912808418274
Epoch 20, Train Loss: 0.05361318092603805, Test Loss: 0.039542679105781846, RMSE: 0.22265422344207764
Epoch 30, Train Loss: 0.06796069824173626, Test Loss: 0.050936807899011505, RMSE: 0.2525542378425598


KeyboardInterrupt: 