# Data Preprocessing

### Load the Iris dataset using scikit-learn

In [31]:
import torch
import torch.nn as nn
from sklearn import datasets
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

In [3]:
iris = datasets.load_iris()
X, y = iris.data, iris.target

### Perform appropriate preprocessing (normalization, train/test split)

In [4]:
from sklearn.model_selection import train_test_split

In [19]:
Xtrain, Xtest, ytrain, ytest = train_test_split(X, y, test_size=0.2)

In [20]:
print(len(Xtrain))
print(len(Xtest))
print(len(ytrain))
print(len(ytest))

120
30
120
30


In [17]:
Xtrain[0:5,:]

array([[7.7, 3.8, 6.7, 2.2],
       [6.5, 3. , 5.5, 1.8],
       [5. , 3.2, 1.2, 0.2],
       [4.3, 3. , 1.1, 0.1],
       [6.1, 2.8, 4.7, 1.2]])

In [23]:
Xtest[0:5, :]

array([[5.8, 4. , 1.2, 0.2],
       [6.6, 3. , 4.4, 1.4],
       [6.7, 3.1, 4.7, 1.5],
       [5.1, 3.5, 1.4, 0.3],
       [6.5, 3.2, 5.1, 2. ]])

In [24]:
scaler = MinMaxScaler()
scaler.fit(Xtrain)

In [25]:
Xtrain = scaler.transform(Xtrain)
Xtest = scaler.transform(Xtest)

In [26]:
Xtrain[0:5,:]

array([[0.63636364, 0.36363636, 0.61016949, 0.58333333],
       [0.48484848, 0.40909091, 0.59322034, 0.58333333],
       [0.51515152, 0.40909091, 0.62711864, 0.54166667],
       [0.36363636, 0.45454545, 0.52542373, 0.5       ],
       [0.45454545, 0.45454545, 0.54237288, 0.58333333]])

In [27]:
Xtest[0:5, :]

array([[0.42424242, 0.90909091, 0.03389831, 0.04166667],
       [0.66666667, 0.45454545, 0.57627119, 0.54166667],
       [0.6969697 , 0.5       , 0.62711864, 0.58333333],
       [0.21212121, 0.68181818, 0.06779661, 0.08333333],
       [0.63636364, 0.54545455, 0.69491525, 0.79166667]])

In [28]:
#taking the entire Xtrain for forward prop
inputs = Xtrain

In [None]:
### defining the model
class SKA_iris(nn.Module):
    def __init__(self, input_size = 4, layer_size = [128, 64, 10, 3], K = 50):
        super(SKA_iris).__init__()

        #defing the input_size, layer_size, and k
        self.input_size = input_size
        self.layer_size = layer_size
        self.K = K

        # initilisin the weights and the bias
        self.weights = nn.ParameterList()
        self.bias = nn.ParameterList()
        prev_size = input_size
        # traverseing the number of layers and creating a randowm weights and bias matrix
        for size in layer_size:
            self.weights.append(nn.Parameter(torch.randn(prev_size, size) * 0.01))
            self.bias.append(nn.Parameter(torch.randn(size)))
            #updating the prev size to build the matrix of weights and the bias for the next layer
            prev_size = size
        
        # Tracking tensors for knowledge accumulation and entropy computation
        self.Z = [None] * len(layer_size)  # Knowledge tensors per layer
        self.D = [None] * len(layer_size)  # Decision probability tensors
        self.D_prev = [None] * len(layer_size)  # Previous decisions for computing shifts
        self.delta_D = [None] * len(layer_size)  # Decision shifts per step
        self.entropy = [None] * len(layer_size)  # Layer-wise entropy storage

        # Store entropy, cosine, and output distribution history for visualization
        self.entropy_history = [[] for _ in range(len(layer_size))]
        self.cosine_history = [[] for _ in range(len(layer_size))]
        self.output_history = []  # New: Store mean output distribution (3 classes) per step

    def forward(self, x):
        """Computes SKA forward pass, storing knowledge and decisions."""
        batch_size = x.shape[0]
        x = x.view(batch_size, -1)  # Flatten images

        for l in range(len(self.layer_sizes)):
            # Compute knowledge tensor Z = Wx + b
            z = torch.mm(x, self.weights[l]) + self.biases[l]
            # Apply sigmoid activation to get decision probabilities
            d = torch.sigmoid(z)
            # Store values for entropy computation
            self.Z[l] = z
            self.D[l] = d
            x = d  # Output becomes input for the next layer

        return x
        
    def calculate_entropy(self):
        """Computes entropy reduction and cos(theta) per layer."""
        total_entropy = 0
        for l in range(len(self.layer_sizes)):
            if self.Z[l] is not None and self.D_prev[l] is not None and self.D[l] is not None:
                # Compute decision shifts
                self.delta_D[l] = self.D[l] - self.D_prev[l]
                # Entropy reduction using SKA formula
                dot_product = torch.sum(self.Z[l] * self.delta_D[l])
                layer_entropy = -1 / np.log(2) * dot_product
                self.entropy[l] = layer_entropy.item()
                self.entropy_history[l].append(layer_entropy.item())

                # Compute cos(theta) for alignment
                z_norm = torch.norm(self.Z[l])
                delta_d_norm = torch.norm(self.delta_D[l])
                if z_norm > 0 and delta_d_norm > 0:
                    cos_theta = dot_product / (z_norm * delta_d_norm)
                    self.cosine_history[l].append(cos_theta.item())
                else:
                    self.cosine_history[l].append(0.0)  # Default if norms are zero

                total_entropy += layer_entropy
        return total_entropy


    def ska_update(self, inputs, learning_rate=0.01):
        """Updates weights using entropy-based learning without backpropagation."""
        for l in range(len(self.layer_sizes)):
            if self.delta_D[l] is not None:
                # Previous layer's output
                prev_output = inputs.view(inputs.shape[0], -1) if l == 0 else self.D_prev[l-1]
                # Compute sigmoid derivative: D * (1 - D)
                d_prime = self.D[l] * (1 - self.D[l])
                # Compute entropy gradient
                gradient = -1 / np.log(2) * (self.Z[l] * d_prime + self.delta_D[l])
                # Compute weight updates via outer product
                dW = torch.matmul(prev_output.t(), gradient) / prev_output.shape[0]
                # Update weights and biases
                self.weights[l] = self.weights[l] - learning_rate * dW
                self.biases[l] = self.biases[l] - learning_rate * gradient.mean(dim=0)


    def initialize_tensors(self, batch_size):
        """Resets decision tensors at the start of each training iteration."""
        for l in range(len(self.layer_sizes)):
            self.Z[l] = None         # Reset knowledge tensors
            self.D[l] = None         # Reset current decision probabilities
            self.D_prev[l] = None    # Reset previous decision probabilities
            self.delta_D[l] = None   # Reset decision shifts
            self.entropy[l] = None   # Reset entropy storage
            self.entropy_history[l] = []  # Reset entropy history
            self.cosine_history[l] = []   # Reset cosine history
        self.output_history = []  # Reset output history


    def visualize_entropy_heatmap(self, step):
        """Dynamically scales the heatmap range and visualizes entropy reduction."""
        entropy_data = np.array(self.entropy_history)
        vmin = np.min(entropy_data)  # Dynamically set minimum entropy value
        vmax = 0.0  # Keep 0 as the upper limit for standardization
        plt.figure(figsize=(12, 8))
        sns.heatmap(entropy_data, cmap="Blues_r", vmin=vmin, vmax=vmax,  
                    xticklabels=range(1, entropy_data.shape[1] + 1),
                    yticklabels=[f"Layer {i+1}" for i in range(len(self.layer_sizes))])
        plt.title(f"Layer-wise Entropy Heatmap (Step {step})")
        plt.xlabel("Step Index K")
        plt.ylabel("Network Layers")
        plt.tight_layout()
        plt.savefig(f"entropy_heatmap_step_{step}.png")
        plt.show(block=False)  # Non-blocking
        plt.pause(2)  # Wait for 2 seconds
        plt.close()  # Close automatically

    def visualize_cosine_heatmap(self, step):
        """Visualizes cos(theta) alignment heatmap with a diverging scale."""
        cosine_data = np.array(self.cosine_history)
        plt.figure(figsize=(12, 8))
        sns.heatmap(cosine_data, cmap="coolwarm_r", vmin=-1.0, vmax=1.0,  
                    xticklabels=range(1, cosine_data.shape[1] + 1),
                    yticklabels=[f"Layer {i+1}" for i in range(len(self.layer_sizes))])
        plt.title(f"Layer-wise Cos(\u03B8) Alignment Heatmap (Step {step})")
        plt.xlabel("Step Index K")
        plt.ylabel("Network Layers")
        plt.tight_layout()
        plt.savefig(f"cosine_heatmap_step_{step}.png")
        plt.show(block=False)  # Non-blocking
        plt.pause(2)  # Wait for 2 seconds
        plt.close()  # Close automatically

    def visualize_output_distribution(self):
        """Plots the evolution of the 10-class output distribution over K steps."""
        output_data = np.array(self.output_history)  # Shape: [K, 10]
        plt.figure(figsize=(10, 6))
        plt.plot(output_data)  # Plot each class as a line
        plt.title('Output Decision Probability Evolution Across Steps (Single Pass)')
        plt.xlabel('Step Index K')
        plt.ylabel('Mean Sigmoid Output')
        plt.legend([f"Class {i}" for i in range(10)], loc='upper right', bbox_to_anchor=(1.15, 1))
        plt.grid(True)
        plt.tight_layout()
        plt.savefig("output_distribution_single_pass.png")
        plt.show(block=False)  # Non-blocking
        plt.pause(2)  # Wait for 2 seconds
        plt.close()  # Close automatically

