# 1. Data Preparation


In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.datasets import mnist
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score

# Load MNIST dataset
(X_train, y_train), (X_test, y_test) = mnist.load_data()

def preprocess_data(X, y, n_samples=None):
    """Normalize images and convert labels to one-hot encoding"""
    if n_samples is not None:
        X = X[:n_samples]
        y = y[:n_samples]
    
    # Flatten and normalize images (0-1 range)
    X_flat = X.reshape(X.shape[0], -1).astype('float32')
    X_norm = MinMaxScaler().fit_transform(X_flat)
    
    # Convert labels to one-hot encoding
    y_onehot = np.zeros((len(y), 10))
    y_onehot[np.arange(len(y)), y] = 1
    
    return X_norm, y_onehot

# Preprocess training and test data
X_train_prep, y_train_prep = preprocess_data(X_train, y_train, n_samples=10000)
X_test_prep, y_test_prep = preprocess_data(X_test, y_test)

2025-06-18 15:54:16.361847: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1750251256.399348   11715 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1750251256.410875   11715 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1750251256.439572   11715 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1750251256.439659   11715 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1750251256.439666   11715 computation_placer.cc:177] computation placer alr

**Explanation:**

-  Loads MNIST dataset (28x28 grayscale digits 0-9)
-  Normalizes pixel values to [0,1] range using MinMaxScaler

- Converts labels to one-hot encoded vectors

- Uses 10,000 samples for training (can be adjusted)



# 2. Model Architecture

In [2]:
class PredictiveCodingModel:
    def __init__(self, layer_sizes=[784, 256, 10], learning_rate=0.001, n_iterations=5):
        """Initialize model with layer sizes, learning rate, and iterations"""
        self.layer_sizes = layer_sizes
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        
        # Initialize weights with He initialization (good for sigmoid/ReLU)
        self.weights = []
        for i in range(len(layer_sizes)-1):
            std = np.sqrt(2. / layer_sizes[i])  # He initialization std
            self.weights.append(np.random.normal(0, std, (layer_sizes[i+1], layer_sizes[i])))
        
        # Initialize small lateral connection weights
        self.lateral_weights = []
        for size in layer_sizes[1:]:
            self.lateral_weights.append(0.01 * np.random.randn(size, size))

**Layer Structure:**

- Input Layer: 784 units (28x28 flattened image)

- Hidden Layer: 256 units with lateral connections

- Output Layer: 10 units (one per digit class)

# 3. Core Functions

Sigmoid Activation


In [3]:
    def sigmoid(self, x):
        """Numerically stable sigmoid function"""
        return np.where(x >= 0, 
                       1 / (1 + np.exp(-x)), 
                       np.exp(x) / (1 + np.exp(x)))
    
    def sigmoid_derivative(self, x):
        """Derivative of sigmoid with clipping for stability"""
        return np.clip(x * (1 - x), 1e-8, 1)

Forward Pass


In [4]:
    def forward_pass(self, x):
        """Compute predictions for each layer"""
        layer_predictions = []
        current_activity = x
        
        for i in range(len(self.weights)):
            # Compute prediction: σ(W·x)
            prediction = self.sigmoid(np.dot(self.weights[i], current_activity))
            layer_predictions.append(prediction)
            current_activity = prediction
        
        return layer_predictions

# 4. Predictive Coding Update Process


In [5]:
    def update_states_and_weights(self, x, y):
        """Core predictive coding update with lateral connections"""
        # Initialize states (input + predictions)
        states = [x.copy()]  # Input layer state
        predictions = self.forward_pass(x)
        states.extend(predictions)
        states[-1] = y.copy()  # Replace top prediction with true label
        
        # Initialize errors for each layer
        errors = [np.zeros_like(s) for s in states]
        
        # Iterative inference (n_iterations)
        for _ in range(self.n_iterations):
            # Top-down error propagation
            for l in range(len(self.weights), 0, -1):
                # Compute prediction error
                if l == len(self.weights):
                    # Output layer error: target - prediction
                    errors[l] = states[l] - predictions[l-1]
                else:
                    # Hidden layer error: top-down + lateral - prediction error
                    error_from_above = np.dot(self.weights[l].T, errors[l+1])
                    if l > 0:
                        lateral_error = np.dot(self.lateral_weights[l-1], errors[l])
                        errors[l] = error_from_above + 0.1 * lateral_error - (states[l] - predictions[l-1])
                    else:
                        errors[l] = error_from_above - (states[l] - predictions[l-1])
                
                # Update states with clipped gradients
                state_update = 0.1 * self.learning_rate * np.clip(errors[l], -10, 10)
                if l > 0:
                    state_update *= self.sigmoid_derivative(states[l])
                states[l] += state_update
            
            # Recompute predictions with updated states
            for l in range(len(self.weights)):
                predictions[l] = self.sigmoid(np.dot(self.weights[l], states[l]))
        
        # Weight updates
        for l in range(len(self.weights)):
            # Update forward weights: ΔW ∝ error * σ'(state) * input
            weight_update = np.outer(
                np.clip(errors[l+1], -5, 5) * self.sigmoid_derivative(states[l+1]), 
                states[l]
            )
            self.weights[l] += self.learning_rate * np.clip(weight_update, -0.1, 0.1)
            
            # Update lateral weights (if not input layer)
            if l > 0:
                lateral_update = np.outer(
                    np.clip(errors[l], -5, 5), 
                    np.clip(errors[l], -5, 5)
                )
                self.lateral_weights[l-1] += 0.01 * self.learning_rate * lateral_update
        
        return predictions, errors

**Key Concepts:**

- **Predictions:** Each layer's estimate of next layer's state

- **Errors:** Difference between top-down predictions and actual states

- **State Updates:** Adjustments to neuron activations based on errors

- **Weight Updates:** Changes to connection strengths based on errors

# 5. Training Process


In [6]:
    def train(self, X, y, epochs=5, batch_size=32):
        """Training loop with mini-batches"""
        n_samples = X.shape[0]
        
        for epoch in range(epochs):
            # Shuffle data each epoch
            permutation = np.random.permutation(n_samples)
            X_shuffled = X[permutation]
            y_shuffled = y[permutation]
            
            epoch_loss = 0
            correct = 0
            
            # Mini-batch training
            for i in range(0, n_samples, batch_size):
                batch_X = X_shuffled[i:i+batch_size]
                batch_y = y_shuffled[i:i+batch_size]
                
                batch_loss = 0
                batch_correct = 0
                
                for j in range(batch_X.shape[0]):
                    # Process each sample in batch
                    predictions, errors = self.update_states_and_weights(batch_X[j], batch_y[j])
                    
                    # Compute regularized loss
                    reg_loss = 0
                    for w in self.weights:
                        reg_loss += 0.001 * np.sum(w**2)  # L2 regularization
                    
                    loss = 0.5 * np.sum(errors[-1]**2) + reg_loss
                    batch_loss += loss
                    
                    # Track accuracy
                    predicted_class = np.argmax(predictions[-1])
                    true_class = np.argmax(batch_y[j])
                    if predicted_class == true_class:
                        batch_correct += 1
                
                epoch_loss += batch_loss / batch_size
                correct += batch_correct
            
            # Epoch statistics
            accuracy = correct / n_samples
            print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/n_samples:.4f}, Accuracy: {accuracy:.4f}")

# 6. Evaluation


In [7]:
    def predict(self, X):
        """Generate predictions for input data"""
        predictions = []
        for x in X:
            layer_predictions = self.forward_pass(x)
            predictions.append(layer_predictions[-1])  # Return output layer
        return np.array(predictions)

# Initialize and train model
pc_model = PredictiveCodingModel(layer_sizes=[784, 256, 10], 
                                learning_rate=0.001,
                                n_iterations=5)

# Train for 20 epochs (can reduce to 10 if needed)
pc_model.train(X_train_prep, y_train_prep, epochs=20, batch_size=32)

# Evaluate on test set
y_pred_probs = pc_model.predict(X_test_prep)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test_prep, axis=1)
accuracy = accuracy_score(y_true, y_pred)
print(f"Test Accuracy: {accuracy:.4f}")

IndentationError: unexpected indent (3392337019.py, line 1)