In [None]:
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import os
import datetime
import wandb
from tqdm import tqdm
import random
import time


def add_username_watermark (username = "shriansh.sahu"):

    plt.text(
        0.98,0.02,username,
        ha = 'right' , va = 'bottom',
        transform = plt.gca().transAxes,
        fontsize = 10 , color = 'gray' , alpha = 0.7
    )


class Identity:
    def forward(self,z) : return z
    def backward(self,z): return np.ones(z.shape)

class Sigmoid:
    def forward(self,z) : return 1/(1 + np.exp(-np.clip(z, -500 , 500)))
    def backward(self,z):
        s = self.forward(z)
        return s * (1-s)

class Tanh:
    def forward(self,z) : return np.tanh(z)
    def backward(self,z): return 1 - np.tanh(z) ** 2

class ReLU:
    def forward(self,z) : return np.maximum(0,z)
    def backward(self,z): return (z >0).astype(float)

class MSE:
    def forward(self, y_true , y_pred):
        return np.mean((y_pred - y_true)**2)

    def backward(self, y_true , y_pred):
        return 2*(y_pred - y_true) / y_true.size

class BCE:
    def forward(self, y_true , y_pred):
        epsilon = 1e-5
        y_pred = np.clip(y_pred , epsilon , 1-epsilon)
        return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))

    def backward(self, y_true , y_pred):
       epsilon = 1e-5
       y_pred = np.clip(y_pred , epsilon , 1-epsilon)
       return ((1 - y_true) / ( 1 - y_pred) - y_true / y_pred) / y_true.size


class Linear:

    def __init__(self ,input_width  , output_width , activation ):
        limit = np.sqrt(6 / (input_width + output_width))
        self.weights = np.random.uniform(-limit , limit, ( input_width, output_width))
        self.biases = np.zeros((1 , output_width))
        self.activation = activation()
        self.input , self.z , self.a = None, None, None
        self.grad_weights_cumulative = np.zeros_like(self.weights)
        self.grad_biases_cumulative = np.zeros_like(self.biases)

    def forward(self, input_data):
        self.input , self.z = input_data , np.dot(input_data , self.weights) + self.biases
        self.a = self.activation.forward(self.z)
        return self.a

    def backward( self , upstream_gradient):
        d_z = upstream_gradient * self.activation.backward(self.z)
        self.grad_weights_cumulative += np.dot(self.input.T , d_z)
        self.grad_biases_cumulative += np.sum(d_z , axis = 0 , keepdims= True)
        return np.dot(d_z , self.weights.T)


class Model:
    def __init__(self, layers , loss_function):
        self.layers , self.loss_fn = layers, BCE() if loss_function.lower() == 'bce' else MSE()

    def forward(self, x):
            for layer in self.layers : x = layer.forward(x)
            return x

    def backward(self, y_true , y_pred):
            grad = self.loss_fn.backward(y_true , y_pred)
            for layer in reversed(self.layers): grad = layer.backward(grad)

    def train(self,x,y):
            y_pred = self.forward(x)
            loss = self.loss_fn.forward(y , y_pred)
            self.backward(y , y_pred)
            return loss , y_pred

    def calculate_loss(self , x , y):
            y_pred = self.forward(x)
            loss = self.loss_fn.forward(y , y_pred)
            return loss

    def zero_grad(self):
            for layer in self.layers:
                layer.grad_weights_cumulative.fill(0)
                layer.grad_biases_cumulative.fill(0)

    def update(self, learning_rate):
            for layer in self.layers:
                layer.weights -= learning_rate * layer.grad_weights_cumulative
                layer.biases -= learning_rate * layer.grad_biases_cumulative

    def predict(self,x):
            return self.forward(x)


class XORProblem:

    def __init__(self):
        self.X = np.array([[0,0] , [0,1] , [1,0] , [1,1]])
        self.y = np.array([[0] ,[1] ,[1] ,[0]])

    def run_test(self ,model_layers, epochs = 5000 , lr = 0.1):

        model = Model(layers=model_layers , loss_function='bce')
        for epoch in range(epochs):
            model.zero_grad()
            loss, _ = model.train(self.X, self.y)
            model.update(lr)

            if(epoch+1) % 1000 == 0:
                print(f" Epoch {epoch+1} / {epochs}, Loss: {loss: .6f}")

        predictions = model.predict(self.X)
        binary_preds = (predictions > 0.5).astype(int)
        accuracy = np.mean(binary_preds == self.y) * 100
        print(f" Final Accuracy:{accuracy: .2f}%")
        if accuracy == 100.0 :
            print(" Status : Converged Successfully")
        else:
            print("Status : Failed to converged")

        return model


class GradientChecker:

    def check(self, model , x ,y, epsilon = 1e-5 , debug = False):
        print("\n Runnig Gradient Check")

        model.zero_grad()
        y_pred = model.forward(x)
        loss = model.loss_fn.forward(y , y_pred)
        model.backward(y ,y_pred)

        analytical_list = []
        for layer in model.layers:
            analytical_list.append(layer.grad_weights_cumulative.ravel())
            analytical_list.append(layer.grad_biases_cumulative.ravel())
        analytical_grads_flat = np.concatenate(analytical_list)

        numerical_grads = []
        for layer in model.layers:
            for param_tensor in [layer.weights , layer.biases]:
                it = np.nditer(param_tensor , flags=['multi_index'],op_flags=['readwrite'])
                while not it.finished:
                    idx = it.multi_index
                    orig = param_tensor[idx]

                    param_tensor[idx] = orig + epsilon
                    loss_plus = model.calculate_loss(x,y)

                    param_tensor[idx] = orig - epsilon
                    loss_minus = model.calculate_loss(x,y)

                    numerical_grads.append((loss_plus - loss_minus) / (2 * epsilon))

                    param_tensor[idx] = orig
                    it.iternext()

        numerical_grads_flat = np.array(numerical_grads)

        if analytical_grads_flat.shape != numerical_grads_flat.shape:
            raise RuntimeError("Shape Mismatch of analytical and numerical grad")

        numerator = np.linalg.norm(analytical_grads_flat - numerical_grads_flat)
        denominator = np.linalg.norm(analytical_grads_flat) + np.linalg.norm(numerical_grads_flat)
        relative_error = numerator / (denominator + 1e-12)

        print(f"Shape of Analytical Gradients : {analytical_grads_flat.shape}")
        print(f" Shape of Numerical Gradient : {numerical_grads_flat.shape}")
        print(f"Relative Error : {relative_error: .2e}")

        if relative_error < 1e-7:
            print(" Gradient Check Passed")

        else:
            print("Gradient Check failed")



class ImageBorderDataset:

    def __init__(self, image_path):
        self.pixels,self.width ,self.height = self._load_and_process_image(image_path)

    def _load_and_process_image(self , image_path):
        try: image = Image.open(image_path).convert('RGB')
        except FileNotFoundError:
            arr = np.zeros((50,50,3), dtype=np.uint8)
            arr[:25,:,:] = [255 , 165 ,0]
            arr[25:,:,:] = [138 ,43 ,226]
            image = Image.fromarray(arr)

        img_array = np.array(image)
        height ,width,_ = img_array.shape
        data = [(((x / (width-1) ,y / (height -1))) , 1 if all(img_array[y,x][:2] < [150,50]) and img_array[y,x][2] > 100 else 0) for y in range(height) for x in range(width)]
        return data, width , height

    def get_shuffled_data(self):
        shuffled  = self.pixels[:]
        random.shuffle(shuffled)
        return shuffled


def evaluate_and_visualize(model, dataset ,run_folder):

    ORANGE , PURPLE ,RED = [255,165,0],[138,43,226],[255,0,0]
    coords = np.array([p[0] for p in dataset.pixels])
    true_labels = np.array([p[1] for p in dataset.pixels])
    pred_labels = (model.predict(coords) > 0.5).astype(int).flatten()
    accuracy  = np.mean(pred_labels == true_labels) * 100
    print(f"Final Pixel Accuracy: {accuracy: .2f}%")
    wandb.log({"final_accuracy" : accuracy})

    true_map , pred_map = true_labels.reshape(dataset.height , dataset.width), pred_labels.reshape(dataset.height , dataset.width)
    def create_color_map(m):
        c = np.zeros((dataset.height , dataset.width ,3) , dtype = np.uint8)
        c[m == 0] ,c[m == 1] = ORANGE , PURPLE
        return c
    gt_img , pred_img = create_color_map(true_map),create_color_map(pred_map)
    err_img = gt_img.copy()
    err_img [true_map != pred_map] = RED

    fog, axes = plt.subplots(1,3 ,figsize = (18,6))
    titles = ['Ground Truth' , 'Model Prediction' , f'Error Map({true_map != pred_map}) incorrect']
    for ax , img ,title in zip(axes , [gt_img , pred_img , err_img] , titles):
        ax.imshow(img)
        ax.set_title(title)
        ax.axis('off')
        ax.text(0.98 , 0.2 , 'shriansh.sahu' ,ha = 'right' ,va = 'bottom' ,transform = ax.transAxes , fontsize = 12, color ='white', alpha = 0.7)

    map_path = os.path.join(run_folder , "prediction_maps.png")
    plt.savefig(map_path)
    plt.close()
    wandb.log({"prediction_analysis" : wandb.Image(map_path)})
    return accuracy


def plot_architecture_results( results , x_values, x_label , run_folder):

    losses = [r['loss'] for r in results]
    accuracies = [r['accuracy'] for r in results]

    fig,(ax1 , ax2) = plt.subplots(1 ,2 ,figsize = (15,6))

    ax1.plot(x_values , losses , marker = 'o' , linestyle = '--')
    ax1.set_title(f'Final Loss vs Model{x_label.capitalize()}')
    ax1.set_xlabel(f'Model {x_label.capitalize()}')
    ax1.set_ylabel('Final Loss')
    ax1.grid(True)

    ax2.plot(x_values , accuracies , marker = 'o' ,linestyle ='--' ,color = 'g')
    ax2.set_title(f'Final Accuracy vs Model{x_label.capitalize()}')
    ax2.set_xlabel(f'Model {x_label.capitalize()}')
    ax2.set_ylabel('Final Accurcay(%)')
    ax2.grid(True)

    add_username_watermark()
    plt.sca(ax1)
    add_username_watermark()

    plt.tight_layout()
    plot_path = os.path.join(run_folder , f"architecture_{x_label}_analysis.png")
    plt.savefig(plot_path)
    plt.close()


def train_model(model,dataset,config):

    run_name = f"{config['name']}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
    run_folder = os.path.join('runs' ,run_name)
    os.makedirs(run_folder, exist_ok = True)
    wandb.init(project = config['project_name'] ,name = run_name , config = config , reinit = True)

    start_time = time.time()
    epoch_losses, samples_seen_log, samples_seen = [],[],0
    pbar = tqdm(range(config['epochs']) , desc = f"Training {config['name']}", leave = False)
    for epoch in pbar:
        data , epoch_loss = dataset.get_shuffled_data(), 0.0
        for i ,(coords , label) in enumerate(data):
            loss,_ = model.train(np.array([coords]) ,np.array([[label]]))
            epoch_loss += loss
            if (i+1) % config['grad_accumulation_steps'] == 0:
                model.update(config['learning_rate'])

        samples_seen += len(data)
        model.update(config['learning_rate'])
        avg_loss = epoch_loss / len(data)
        epoch_losses.append(avg_loss)
        samples_seen_log.append(samples_seen)
        pbar.set_description(f"Epoch{epoch+1} | Loss : {avg_loss: .6f}")
        wandb.log({"epoch": epoch , "loss" : avg_loss , "samples_seen": samples_seen})
        if epoch >= config['patience'] and avg_loss >= ( 1 - config['relative_loss_threshold']) * epoch_losses[epoch-config['patience']]:
            print(f"Early Stopping at epoch {epoch+1}.")
            break

    plt.figure()
    plt.plot(samples_seen_log , epoch_losses)
    plt.title("Loss vs Samples Seen")
    plt.xlabel("Samples Seen")
    plt.ylabel("Loss")
    plt.grid(True)
    add_username_watermark()
    loss_plot_path = os.path.join(run_folder, "loss_vs_samples.png")
    plt.savefig(loss_plot_path)
    plt.close()
    wandb.log({"loss_plot" : wandb.Image(loss_plot_path)})

    final_accuracy = evaluate_and_visualize(model ,dataset ,run_folder)
    wandb.finish()

    return {'name' : run_name , 'loss' : epoch_losses[-1] , 'accuracy': final_accuracy , 'time' : time.time()  - start_time , 'samples_seen' : samples_seen}


if __name__ == "__main__":

        os.makedirs("runs" , exist_ok = True)

        print("=" *60)
        print("1.3 RUNNING SANITY CHECKS")
        print("=" *60)

        xor_tester = XORProblem()
        grad_checker = GradientChecker()
        model_to_check = xor_tester.run_test([Linear(2, 8 ,Tanh) , Linear(8,1,Sigmoid)])
        grad_checker.check(model_to_check , xor_tester.X[1:2] , xor_tester.y[1:2])


        print("\n" + "="*60); print("TRAINING ON BORDER IMAGE"); print("="*60)
        border_dataset = ImageBorderDataset('/content/border.png')
        base_config = {'epochs': 50, 'grad_accumulation_steps': 32, 'patience': 5, 'relative_loss_threshold': 0.01, 'learning_rate': 0.01}
        configurations = [
        { 'name': 'Shallow_Wide_ReLU', 'layers': [Linear(2, 128, ReLU), Linear(128, 1, Sigmoid)], 'lr': 0.01 },
        { 'name': 'Deep_Narrow_Tanh', 'layers': [Linear(2, 32, Tanh), Linear(32, 32, Tanh), Linear(32, 1, Sigmoid)], 'lr': 0.01 },
        ]
        for arch_config in configurations:
          train_params = {
              'project_name': "Traine Model on Border Dataset", 'epochs': 150,
              'grad_accumulation_steps': 32, 'patience': 10,
              'relative_loss_threshold': 0.01, 'loss_function': 'bce'
              }
          full_config = {**arch_config, **train_params, 'learning_rate': arch_config['lr']}
          border_model = Model(layers=full_config['layers'], loss_function=full_config['loss_function'])
          train_model(border_model, border_dataset, full_config)


        print("\n" + "="*80); print("1.4.3: ARCHITECTURE EXPERIMENTS"); print("="*80)
        # Varying Depth
        print("\n--- Experiment: Varying Depth (Width fixed at 64) ---")
        depths = [2, 3, 4, 5]; depth_results = []
        for d in depths:
            layers = [Linear(2, 64, ReLU)] + [Linear(64, 64, ReLU) for _ in range(d-2)] + [Linear(64, 1, Sigmoid)]
            config = {**base_config, 'project_name': 'border-arch-depth', 'name': f"Depth_{d}-Width_64"}
            depth_results.append(train_model(Model(layers, 'bce'), border_dataset, config))
        plot_architecture_results(depth_results, depths, "depth", "runs")

        # Varying Width
        print("\n--- Experiment: Varying Width (Depth fixed at 4) ---")
        widths = [16, 32, 64, 128]; width_results = []
        for w in widths:
            layers = [Linear(2, w, ReLU)] + [Linear(w, w, ReLU) for _ in range(2)] + [Linear(w, 1, Sigmoid)]
            config = {**base_config, 'project_name': 'border-arch-width', 'name': f"Depth_4-Width_{w}"}
            width_results.append(train_model(Model(layers, 'bce'), border_dataset, config))
        plot_architecture_results(width_results, widths, "width", "runs")


        print("\n" + "="*80); print("ðŸš€ 1.4.4: HYPERPARAMETER EXPERIMENTS"); print("="*80)
        hparam_configs = [
            {'name': 'HighLR', 'learning_rate': 0.05, 'grad_accumulation_steps': 32},
            {'name': 'LowLR', 'learning_rate': 0.005, 'grad_accumulation_steps': 32},
            {'name': 'LargeBatch', 'learning_rate': 0.01, 'grad_accumulation_steps': 128},
            {'name': 'SmallBatch', 'learning_rate': 0.01, 'grad_accumulation_steps': 8},
            ]
        hparam_results = []
        for h_config in hparam_configs:
            model_layers = [Linear(2, 64, ReLU), Linear(64, 64, ReLU), Linear(64, 1, Sigmoid)]
            config = {**base_config, 'project_name': 'border-hparams', **h_config}
            hparam_results.append(train_model(Model(model_layers, 'bce'), border_dataset, config))

        print("\n--- Hyperparameter Experimentation Summary ---")
        print(f"{'Run Name':<25} | {'Accuracy (%)':<15} | {'Final Loss':<15} | {'Time (s)':<15} | {'Samples Seen':<15}")
        print("-" * 90)
        for res in hparam_results:
            print(f"{res['name']:<25} | {res['accuracy']:<15.2f} | {res['loss']:<15.4f} | {res['time']:<15.2f} | {res['samples_seen']:<15}")

        print("\n All experiments completed.")



ModuleNotFoundError: No module named 'wandb'