# A notebook for manually implementing classic machine learning and deep learning algorithms (without using ready-made high-level functions from Sklearn/PyTorch).

# This notebook includes the manual implementation of some newer models: Like BERT, AlexNet, ResNet.

# This notebook also includes the manual implementation of classic models: Like RNN, LSTM, CNN, MLP, logistic regression, ridge regression, decision tree, random forest, and XGBoost.

# I'm still keeping updated

## BERT

In [23]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_size, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_size = embed_size
        self.num_heads = num_heads
        self.head_dim = embed_size // num_heads

        assert (
            self.head_dim * num_heads == embed_size
        ), "Embedding size needs to be divisible by num_heads"

        self.values = nn.Linear(self.head_dim, embed_size, bias=False)
        self.keys = nn.Linear(self.head_dim, embed_size, bias=False)
        self.queries = nn.Linear(self.head_dim, embed_size, bias=False)
        self.fc_out = nn.Linear(num_heads * self.head_dim, embed_size)

    def forward(self, values, keys, query, mask):
        N = query.shape[0]
        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        # Split the embedding into self.num_heads different pieces
        values = values.reshape(N, value_len, self.num_heads, self.head_dim)
        keys = keys.reshape(N, key_len, self.num_heads, self.head_dim)
        queries = query.reshape(N, query_len, self.num_heads, self.head_dim)

        energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])

        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))

        attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)

        out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
            N, query_len, self.num_heads * self.head_dim
        )

        out = self.fc_out(out)
        return out

class FeedForward(nn.Module):
    def __init__(self, embed_size, forward_expansion):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(embed_size, forward_expansion * embed_size)
        self.fc2 = nn.Linear(forward_expansion * embed_size, embed_size)

    def forward(self, x):
        return self.fc2(F.gelu(self.fc1(x)))

class TransformerBlock(nn.Module):
    def __init__(self, embed_size, num_heads, forward_expansion, dropout):
        super(TransformerBlock, self).__init__()
        self.attention = MultiHeadSelfAttention(embed_size, num_heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)
        self.feed_forward = FeedForward(embed_size, forward_expansion)
        self.dropout = nn.Dropout(dropout)

    def forward(self, value, key, query, mask):
        attention = self.attention(value, key, query, mask)
        x = self.dropout(self.norm1(attention + query))
        forward = self.feed_forward(x)
        out = self.dropout(self.norm2(forward + x))
        return out

class BERT(nn.Module):
    def __init__(
        self,
        embed_size,
        num_layers,
        num_heads,
        forward_expansion,
        vocab_size,
        max_length,
        dropout,
    ):
        super(BERT, self).__init__()
        self.embed_size = embed_size
        self.word_embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList(
            [
                TransformerBlock(
                    embed_size, num_heads, forward_expansion, dropout
                )
                for _ in range(num_layers)
            ]
        )
        self.fc_out = nn.Linear(embed_size, vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        N, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(x.device)
        out = self.dropout(self.word_embedding(x) + self.position_embedding(positions))

        for layer in self.layers:
            out = layer(out, out, out, mask)

        out = self.fc_out(out)
        return out

# Hyperparameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = 30522  # Vocabulary size for BERT (usually 30k+)
embed_size = 256  # Embedding size
num_layers = 6  # Number of transformer blocks
num_heads = 8  # Number of heads in multi-head attention
forward_expansion = 4  # Expansion factor in feed forward network
max_length = 100  # Maximum length of input sequence
dropout = 0.1  # Dropout rate

# Initialize model
model = BERT(
    embed_size,
    num_layers,
    num_heads,
    forward_expansion,
    vocab_size,
    max_length,
    dropout,
).to(device)

# Example input
input_ids = torch.randint(0, vocab_size, (1, max_length)).to(device)  # Random input for demonstration
mask = None  # For simplicity, no mask is applied here

# Forward pass
output = model(input_ids, mask)
print(output.shape)  # Should output (batch_size, seq_length, vocab_size)

# Loss and optimizer (for training purposes)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=3e-5)

# Dummy training loop for demonstration purposes
num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(input_ids, mask)
    # Shift prediction to align with target labels
    loss = criterion(outputs.view(-1, vocab_size), input_ids.view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")

torch.Size([1, 100, 30522])
Epoch 1, Loss: 10.583015441894531
Epoch 2, Loss: 10.463675498962402
Epoch 3, Loss: 10.540785789489746


## AlexNet

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

# Define AlexNet model class
class AlexNet(nn.Module):
    def __init__(self, num_classes=1000):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 384, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# Setting device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Transformations for the training and testing sets
transform = transforms.Compose([
    transforms.Resize((227, 227)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load CIFAR-10 dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=32, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=32, shuffle=False, num_workers=2)

# Define the model, loss function, and optimizer
num_classes = 10
model = AlexNet(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Training the model
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(trainloader, 0):
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()
        if i % 100 == 99:    # Print every 100 mini-batches
            print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(trainloader)}], Loss: {running_loss / 100:.4f}')
            running_loss = 0.0

print('Finished Training')

# Testing the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in testloader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the 10000 test images: {100 * correct / total:.2f}%')

# Testing the model on each class
classes = trainset.classes
class_correct = list(0. for i in range(num_classes))
class_total = list(0. for i in range(num_classes))

with torch.no_grad():
    for inputs, labels in testloader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels).squeeze()
        for i in range(len(labels)):
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1

for i in range(num_classes):
    print(f'Accuracy of {classes[i]} : {100 * class_correct[i] / class_total[i]:.2f}%')

print('Testing Completed')

## ResNet

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder

# 1. Data Loading and Preprocessing for Custom Dataset
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Assuming your dataset is in the 'data/' directory
train_dataset = ImageFolder(root='./data/train', transform=transform)
test_dataset = ImageFolder(root='./data/test', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# 2. Defining the Residual Block
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.downsample = None
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity
        out = self.relu(out)

        return out

# 3. Defining the ResNet Model
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=2):  # Changed num_classes to 2 (cat and dog)
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# 4. Creating the ResNet-18 Model
# ResNet-18 has [2, 2, 2, 2] blocks in each of the 4 layers
model = ResNet(BasicBlock, [2, 2, 2, 2])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 5. Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 6. Training the Model
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            if i % 10 == 9:  # print every 10 mini-batches
                print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {running_loss / 10:.4f}')
                running_loss = 0.0

# Train the model
train_model(model, train_loader, criterion, optimizer, num_epochs=10)

# 7. Evaluating the Model
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f'Accuracy of the model on the test images: {100 * correct / total:.2f}%')

# Evaluate the model
evaluate_model(model, test_loader)

# 8. Saving the Model
torch.save(model.state_dict(), 'resnet18_cats_dogs.pth')

# Recurrent neural network, RNN

In [18]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Manually implemented RNN
class RNNManual:
    def __init__(self, input_size, hidden_size, output_size, seq_length):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.seq_length = seq_length

        # Initialize weights
        self.Wxh = torch.randn(hidden_size, input_size, requires_grad=True) * 0.1
        self.Whh = torch.randn(hidden_size, hidden_size, requires_grad=True) * 0.1
        self.Why = torch.randn(output_size, hidden_size, requires_grad=True) * 0.1
        self.bh = torch.zeros(hidden_size, requires_grad=True)
        self.by = torch.zeros(output_size, requires_grad=True)

    def forward(self, inputs):
        h = torch.zeros(self.hidden_size, requires_grad=False)  # Initialize hidden state
        self.last_hs = []  # Store hidden state at each time step

        # Loop through time steps for forward propagation
        for t in range(self.seq_length):
            x = inputs[t]
            h = torch.tanh(self.Wxh @ x + self.Whh @ h + self.bh)  # Calculate new hidden state
            self.last_hs.append(h)  # Save hidden state for backpropagation

        y = self.Why @ h + self.by  # Final output
        return y, h

    def backward(self, inputs, dL_dy, learning_rate=0.001):
        dWhy = torch.zeros_like(self.Why)
        dby = torch.zeros_like(self.by)
        dWxh = torch.zeros_like(self.Wxh)
        dWhh = torch.zeros_like(self.Whh)
        dbh = torch.zeros_like(self.bh)

        # Gradients for output layer
        dWhy += dL_dy.view(-1, 1) @ self.last_hs[-1].view(1, -1)
        dby += dL_dy

        # Backpropagation through time (BPTT)
        dL_dh_next = torch.zeros(self.hidden_size)
        for t in reversed(range(self.seq_length)):
            dL_dh = dL_dh_next + (self.Why.T @ dL_dy if t == self.seq_length - 1 else 0)
            dh_raw = (1 - self.last_hs[t] ** 2) * dL_dh  # Derivative of tanh

            dbh += dh_raw
            dWxh += dh_raw.view(-1, 1) @ inputs[t].view(1, -1)
            if t > 0:
                dWhh += dh_raw.view(-1, 1) @ self.last_hs[t-1].view(1, -1)

            dL_dh_next = self.Whh.T @ dh_raw

        # Gradient descent to update parameters
        self.Wxh.data -= learning_rate * dWxh
        self.Whh.data -= learning_rate * dWhh
        self.Why.data -= learning_rate * dWhy
        self.bh.data -= learning_rate * dbh
        self.by.data -= learning_rate * dby

# Data preparation
def toy_data():
    # A simple sequence mapping problem, e.g., input [1, 2, 3] -> output [0.1]
    inputs = [torch.tensor([i], dtype=torch.float32) for i in range(1, 4)]
    target = torch.tensor([0.1], dtype=torch.float32)
    return inputs, target

# Hyperparameters
input_size = 1
hidden_size = 10
output_size = 1
seq_length = 3
learning_rate = 0.01
epochs = 500

# Instantiate the model
rnn = RNNManual(input_size, hidden_size, output_size, seq_length)

# Training loop
for epoch in range(epochs):
    inputs, target = toy_data()

    # Forward pass
    output, _ = rnn.forward(inputs)

    # Calculate loss
    loss = (output - target).pow(2).sum() * 0.5

    # Manually clear gradients
    rnn.Wxh.grad = None
    rnn.Whh.grad = None
    rnn.Why.grad = None
    rnn.bh.grad = None
    rnn.by.grad = None

    # Backward pass
    dL_dy = output - target
    rnn.backward(inputs, dL_dy, learning_rate)

    if epoch % 50 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item()}')

# Testing
test_inputs, _ = toy_data()
test_output, _ = rnn.forward(test_inputs)
print('Test Output:', test_output.item())

Epoch 0, Loss: 0.008545284159481525
Epoch 50, Loss: 5.8329940657131374e-05
Epoch 100, Loss: 3.8827207049507706e-07
Epoch 150, Loss: 2.5777093792811456e-09
Epoch 200, Loss: 1.7322254741714005e-11
Epoch 250, Loss: 1.6456280782506383e-13
Epoch 300, Loss: 1.5987211554602254e-14
Epoch 350, Loss: 3.9968028886505635e-15
Epoch 400, Loss: 3.9968028886505635e-15
Epoch 450, Loss: 3.9968028886505635e-15
Test Output: 0.09999992698431015


## Long short-term memory, LSTM

In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# 1. Define the manually implemented LSTM class
class ManualLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(ManualLSTM, self).__init__()

        # LSTM size parameters
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        # Input gate parameters
        self.W_i = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_i = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_i = nn.Parameter(torch.Tensor(hidden_size))

        # Forget gate parameters
        self.W_f = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_f = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_f = nn.Parameter(torch.Tensor(hidden_size))

        # Output gate parameters
        self.W_o = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_o = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_o = nn.Parameter(torch.Tensor(hidden_size))

        # Cell state parameters
        self.W_c = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_c = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_c = nn.Parameter(torch.Tensor(hidden_size))

        # Output layer parameters
        self.W_y = nn.Parameter(torch.Tensor(hidden_size, output_size))
        self.b_y = nn.Parameter(torch.Tensor(output_size))

        # Parameter initialization
        self.init_parameters()

    def init_parameters(self):
        for param in self.parameters():
            nn.init.uniform_(param, -0.08, 0.08)

    def forward(self, x, hidden):
        h_prev, c_prev = hidden
        outputs = []

        # For each time step in the sequence
        for t in range(x.size(1)):
            x_t = x[:, t, :]

            # Input gate calculation
            i_t = torch.sigmoid(x_t @ self.W_i + h_prev @ self.U_i + self.b_i)

            # Forget gate calculation
            f_t = torch.sigmoid(x_t @ self.W_f + h_prev @ self.U_f + self.b_f)

            # Output gate calculation
            o_t = torch.sigmoid(x_t @ self.W_o + h_prev @ self.U_o + self.b_o)

            # Candidate cell state calculation
            c_hat_t = torch.tanh(x_t @ self.W_c + h_prev @ self.U_c + self.b_c)

            # Current cell state
            c_t = f_t * c_prev + i_t * c_hat_t

            # Current hidden state
            h_t = o_t * torch.tanh(c_t)

            # Save current output
            outputs.append(h_t.unsqueeze(1))

            # Update the previous hidden state and cell state
            h_prev, c_prev = h_t, c_t

        # Concatenate outputs from all time steps
        outputs = torch.cat(outputs, dim=1)

        # Use the output of the last time step for prediction
        y = outputs[:, -1, :] @ self.W_y + self.b_y
        return y, (h_t, c_t)

# 2. Data generation and model training
if __name__ == "__main__":
    # Set random seed
    torch.manual_seed(0)
    np.random.seed(0)

    # Create a simple dataset, for example, fitting a sine function
    time_steps = np.linspace(0, np.pi * 2, 100)
    data = np.sin(time_steps)
    X = []
    y = []
    seq_length = 10

    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])

    X = np.array(X)
    y = np.array(y)

    X = torch.Tensor(X).unsqueeze(-1)  # shape: (batch, seq_length, input_size)
    y = torch.Tensor(y).unsqueeze(-1)  # shape: (batch, output_size)

    # Model parameters
    input_size = 1
    hidden_size = 32
    output_size = 1

    # Instantiate the model
    model = ManualLSTM(input_size, hidden_size, output_size)

    # Loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)

    # Train the model
    n_epochs = 200
    for epoch in range(n_epochs):
        # Initialize hidden state and cell state
        h_0 = torch.zeros(X.size(0), hidden_size)
        c_0 = torch.zeros(X.size(0), hidden_size)

        # Forward pass
        output, (h_n, c_n) = model(X, (h_0, c_0))

        # Compute the loss
        loss = criterion(output, y)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print the loss
        if (epoch + 1) % 20 == 0:
            print(f'Epoch [{epoch+1}/{n_epochs}], Loss: {loss.item():.4f}')

    # Test the model
    model.eval()
    with torch.no_grad():
        test_input = torch.Tensor(data[-seq_length:]).unsqueeze(0).unsqueeze(-1)
        h_0 = torch.zeros(1, hidden_size)
        c_0 = torch.zeros(1, hidden_size)
        test_output, _ = model(test_input, (h_0, c_0))
        print(f'Test Output: {test_output.item():.4f}')

Epoch [20/200], Loss: 0.0644
Epoch [40/200], Loss: 0.0278
Epoch [60/200], Loss: 0.0143
Epoch [80/200], Loss: 0.0052
Epoch [100/200], Loss: 0.0005
Epoch [120/200], Loss: 0.0001
Epoch [140/200], Loss: 0.0000
Epoch [160/200], Loss: 0.0000
Epoch [180/200], Loss: 0.0000
Epoch [200/200], Loss: 0.0000
Test Output: 0.0623


## Convolutional neural network, CNN

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np

# Data preparation
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=1000, shuffle=False)

# Manually implement convolutional layer
class ManualConv2d:
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        # Initialize convolutional kernel and bias
        self.weight = torch.randn(out_channels, in_channels, kernel_size, kernel_size) * 0.1
        self.bias = torch.randn(out_channels) * 0.1

    def __call__(self, x):
        return self.forward(x)

    def forward(self, x):
        # Get input and output dimensions
        batch_size, in_channels, height, width = x.shape
        out_height = (height - self.kernel_size + 2 * self.padding) // self.stride + 1
        out_width = (width - self.kernel_size + 2 * self.padding) // self.stride + 1
        # Initialize output
        out = torch.zeros((batch_size, self.out_channels, out_height, out_width))
        # Add padding to input
        if self.padding > 0:
            x = F.pad(x, (self.padding, self.padding, self.padding, self.padding))
        # Manually implement convolution operation
        for b in range(batch_size):
            for o in range(self.out_channels):
                for i in range(in_channels):
                    for h in range(0, out_height):
                        for w in range(0, out_width):
                            h_start = h * self.stride
                            w_start = w * self.stride
                            h_end = h_start + self.kernel_size
                            w_end = w_start + self.kernel_size
                            out[b, o, h, w] += torch.sum(x[b, i, h_start:h_end, w_start:w_end] * self.weight[o, i])
                out[b, o] += self.bias[o]
        return out

# Manually implement max pooling layer
class ManualMaxPool2d:
    def __init__(self, kernel_size, stride):
        self.kernel_size = kernel_size
        self.stride = stride

    def __call__(self, x):
        return self.forward(x)

    def forward(self, x):
        batch_size, channels, height, width = x.shape
        out_height = (height - self.kernel_size) // self.stride + 1
        out_width = (width - self.kernel_size) // self.stride + 1
        out = torch.zeros((batch_size, channels, out_height, out_width))
        for b in range(batch_size):
            for c in range(channels):
                for h in range(out_height):
                    for w in range(out_width):
                        h_start = h * self.stride
                        w_start = w * self.stride
                        h_end = h_start + self.kernel_size
                        w_end = w_start + self.kernel_size
                        out[b, c, h, w] = torch.max(x[b, c, h_start:h_end, w_start:w_end])
        return out

# Build manual CNN model
class ManualCNN(nn.Module):
    def __init__(self):
        super(ManualCNN, self).__init__()
        self.conv1 = ManualConv2d(in_channels=1, out_channels=8, kernel_size=3, stride=1, padding=1)
        self.pool = ManualMaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(8 * 14 * 14, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool(x)
        x = x.view(-1, 8 * 14 * 14)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Training and testing functions
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}]	Loss: {loss.item():.6f}')

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)}'
          f' ({100. * correct / len(test_loader.dataset):.0f}%)\n')

# Device configuration and model initialization
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ManualCNN().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

## Multi-Layer Perceptron, MLP

In [20]:
import torch
import torch.optim as optim
import torch.autograd as autograd
import torch.nn.functional as F
import numpy as np

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create synthetic data
np.random.seed(42)
x_train = np.random.rand(100, 2)
y_train = (x_train[:, 0] + x_train[:, 1] > 1).astype(np.float32).reshape(-1, 1)

x_train = torch.tensor(x_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).to(device)

# Define the architecture of the multilayer perceptron
input_size = 2
hidden_size = 5
output_size = 1

# Randomly initialize weights and biases
w1 = torch.randn(input_size, hidden_size, device=device, requires_grad=True)
b1 = torch.randn(hidden_size, device=device, requires_grad=True)
w2 = torch.randn(hidden_size, output_size, device=device, requires_grad=True)
b2 = torch.randn(output_size, device=device, requires_grad=True)

# Define hyperparameters
learning_rate = 0.01
num_epochs = 1000

# Start training
for epoch in range(num_epochs):
    # Forward propagation
    hidden_layer = torch.matmul(x_train, w1) + b1
    hidden_layer_activation = torch.sigmoid(hidden_layer)
    output_layer = torch.matmul(hidden_layer_activation, w2) + b2
    predictions = torch.sigmoid(output_layer)

    # Compute the loss function (Mean Squared Error)
    loss = torch.mean((predictions - y_train) ** 2)

    # Backward propagation
    loss.backward()

    # Update weights using gradient descent
    with torch.no_grad():
        w1 -= learning_rate * w1.grad
        b1 -= learning_rate * b1.grad
        w2 -= learning_rate * w2.grad
        b2 -= learning_rate * b2.grad

        # Clear gradients
        w1.grad.zero_()
        b1.grad.zero_()
        w2.grad.zero_()
        b2.grad.zero_()

    # Print the loss every 100 epochs
    if (epoch + 1) % 100 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

# Test the model
test_input = torch.tensor([[0.7, 0.8], [0.1, 0.1]], dtype=torch.float32).to(device)
with torch.no_grad():
    hidden_layer = torch.matmul(test_input, w1) + b1
    hidden_layer_activation = torch.sigmoid(hidden_layer)
    output_layer = torch.matmul(hidden_layer_activation, w2) + b2
    predictions = torch.sigmoid(output_layer)
    print('Test Predictions:', predictions.cpu().numpy())


Epoch [100/1000], Loss: 0.2648
Epoch [200/1000], Loss: 0.2590
Epoch [300/1000], Loss: 0.2563
Epoch [400/1000], Loss: 0.2549
Epoch [500/1000], Loss: 0.2540
Epoch [600/1000], Loss: 0.2533
Epoch [700/1000], Loss: 0.2528
Epoch [800/1000], Loss: 0.2522
Epoch [900/1000], Loss: 0.2517
Epoch [1000/1000], Loss: 0.2512
Test Predictions: [[0.4247769 ]
 [0.45901698]]


## Logistic Regression

In [1]:
import numpy as np

class LogisticRegression:
    def __init__(self, lr=0.01, n_iter=1000):
        self.lr = lr
        self.n_iter = n_iter
        self.weights = None
        self.bias = None

    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        for _ in range(self.n_iter):
            linear_model = np.dot(X, self.weights) + self.bias
            y_pred = self.sigmoid(linear_model)

            dw = (1 / n_samples) * np.dot(X.T, (y_pred - y))
            db = (1 / n_samples) * np.sum(y_pred - y)

            self.weights -= self.lr * dw
            self.bias -= self.lr * db

    def predict(self, X):
        linear_model = np.dot(X, self.weights) + self.bias
        y_pred = self.sigmoid(linear_model)
        return [1 if i > 0.5 else 0 for i in y_pred]

# Example usage:
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5]])  # Features
y = np.array([0, 0, 1, 1])  # Target

log_reg = LogisticRegression(lr=0.1, n_iter=1000)
log_reg.fit(X, y)
print(log_reg.predict(X))


[0, 0, 1, 1]


## Ridge Regression

In [2]:
class RidgeRegression:
    def __init__(self, lr=0.01, n_iter=1000, alpha=0.1):
        self.lr = lr
        self.n_iter = n_iter
        self.alpha = alpha
        self.weights = None
        self.bias = None

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        for _ in range(self.n_iter):
            y_pred = np.dot(X, self.weights) + self.bias
            dw = (1 / n_samples) * np.dot(X.T, (y_pred - y)) + (self.alpha / n_samples) * self.weights
            db = (1 / n_samples) * np.sum(y_pred - y)

            self.weights -= self.lr * dw
            self.bias -= self.lr * db

    def predict(self, X):
        return np.dot(X, self.weights) + self.bias

# Example usage:
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5]])  # Features
y = np.array([5, 7, 9, 11])  # Target

ridge_reg = RidgeRegression(lr=0.01, n_iter=1000, alpha=0.1)
ridge_reg.fit(X, y)
print(ridge_reg.predict(X))

[ 4.85180138  6.92572064  8.9996399  11.07355916]


## Support Vector Machine, SVM

In [3]:
class SVM:
    def __init__(self, lr=0.001, n_iter=1000, lambda_param=0.01):
        self.lr = lr
        self.n_iter = n_iter
        self.lambda_param = lambda_param
        self.weights = None
        self.bias = None

    def fit(self, X, y):
        y_ = np.where(y <= 0, -1, 1)
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        for _ in range(self.n_iter):
            for idx, x_i in enumerate(X):
                condition = y_[idx] * (np.dot(x_i, self.weights) - self.bias) >= 1
                if condition:
                    self.weights -= self.lr * (2 * self.lambda_param * self.weights)
                else:
                    self.weights -= self.lr * (2 * self.lambda_param * self.weights - np.dot(x_i, y_[idx]))
                    self.bias -= self.lr * y_[idx]

    def predict(self, X):
        approx = np.dot(X, self.weights) - self.bias
        return np.sign(approx)

# Example usage:
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5]])  # Features
y = np.array([0, 1, 0, 1])  # Target

svm = SVM()
svm.fit(X, y)
print(svm.predict(X))

[-1.  1.  1.  1.]


## K-Nearest Neighbors, KNN

In [4]:
class KNN:
    def __init__(self, k=3):
        self.k = k

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y

    def predict(self, X):
        y_pred = [self._predict(x) for x in X]
        return np.array(y_pred)

    def _predict(self, x):
        distances = [np.sqrt(np.sum((x - x_train) ** 2)) for x_train in self.X_train]
        k_indices = np.argsort(distances)[:self.k]
        k_nearest_labels = [self.y_train[i] for i in k_indices]
        most_common = np.bincount(k_nearest_labels).argmax()
        return most_common

# Example usage:
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5]])  # Features
y = np.array([0, 0, 1, 1])  # Target

knn = KNN(k=3)
knn.fit(X, y)
print(knn.predict(X))

[0 0 1 1]


## Decision Tree

In [5]:
class DecisionTree:
    def __init__(self, max_depth=10):
        self.max_depth = max_depth
        self.tree = None

    def fit(self, X, y):
        self.tree = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        if n_samples == 0 or depth >= self.max_depth:
            return np.bincount(y).argmax()

        best_split = self._best_split(X, y)
        if best_split["gain"] == 0:
            return np.bincount(y).argmax()

        left_idx, right_idx = best_split["groups"]
        left = self._grow_tree(X[left_idx], y[left_idx], depth + 1)
        right = self._grow_tree(X[right_idx], y[right_idx], depth + 1)
        return {"feature": best_split["feature"], "threshold": best_split["threshold"], "left": left, "right": right}

    def _best_split(self, X, y):
        best_gain = -1
        split = None
        for feature_idx in range(X.shape[1]):
            for threshold in np.unique(X[:, feature_idx]):
                groups = self._split(X[:, feature_idx], threshold)
                gain = self._information_gain(y, groups)
                if gain > best_gain:
                    best_gain = gain
                    split = {"feature": feature_idx, "threshold": threshold, "gain": gain, "groups": groups}
        return split

    def _split(self, X_column, threshold):
        left_idx = np.where(X_column < threshold)[0]
        right_idx = np.where(X_column >= threshold)[0]
        return left_idx, right_idx

    def _information_gain(self, y, groups):
        n = len(y)
        p_left, p_right = len(groups[0]) / n, len(groups[1]) / n
        gain = self._entropy(y) - (p_left * self._entropy(y[groups[0]]) + p_right * self._entropy(y[groups[1]]))
        return gain

    def _entropy(self, y):
        proportions = np.bincount(y) / len(y)
        return -np.sum([p * np.log2(p) for p in proportions if p > 0])

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.tree) for x in X])

    def _traverse_tree(self, x, node):
        if isinstance(node, dict):
            if x[node["feature"]] < node["threshold"]:
                return self._traverse_tree(x, node["left"])
            else:
                return self._traverse_tree(x, node["right"])
        else:
            return node

# Example usage:
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5]])  # Features
y = np.array([0, 0, 1, 1])  # Target

tree = DecisionTree(max_depth=3)
tree.fit(X, y)
print(tree.predict(X))

[0 0 1 1]


## Random Forest

In [6]:
class RandomForest:
    def __init__(self, n_trees=10, max_depth=10):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.trees = []

    def fit(self, X, y):
        for _ in range(self.n_trees):
            indices = np.random.choice(len(y), size=len(y), replace=True)
            X_sample, y_sample = X[indices], y[indices]
            tree = DecisionTree(max_depth=self.max_depth)
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)

    def predict(self, X):
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        tree_preds = np.swapaxes(tree_preds, 0, 1)
        return [np.bincount(tree_pred).argmax() for tree_pred in tree_preds]

# Example usage:
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5]])  # Features
y = np.array([0, 0, 1, 1])  # Target

forest = RandomForest(n_trees=5, max_depth=3)
forest.fit(X, y)
print(forest.predict(X))


[0, 0, 1, 1]


## XGBoost

In [11]:
import numpy as np

class XGBoost:
    def __init__(self, n_estimators=100, lr=0.1, max_depth=3, lambda_reg=1.0):
        self.n_estimators = n_estimators      # Number of trees
        self.lr = lr                          # Learning rate
        self.max_depth = max_depth            # Maximum depth of each tree
        self.lambda_reg = lambda_reg          # Regularization parameter
        self.trees = []                       # Store each tree

    def fit(self, X, y):
        # Ensure y and residual are float types
        y = y.astype(np.float64)
        residual = np.copy(y)  # Initial residual

        for _ in range(self.n_estimators):
            # Use first and second derivatives (Gradient and Hessian) to fit decision trees
            gradient = self._gradient(y, residual)
            hessian = self._hessian(y, residual)

            tree = self._build_tree(X, gradient, hessian, depth=0)
            y_pred = self._predict_tree(tree, X)

            # Update residual
            residual -= self.lr * y_pred
            self.trees.append(tree)

    def predict(self, X):
        # Initial predictions are zero (assume all initial predictions are 0)
        pred = np.zeros(X.shape[0])
        for tree in self.trees:
            pred += self.lr * self._predict_tree(tree, X)
        return pred

    def _gradient(self, y, y_pred):
        # Gradient is the first derivative of the loss function w.r.t predictions
        return y_pred - y

    def _hessian(self, y, y_pred):
        # Hessian (second derivative of the loss function)
        return np.ones_like(y_pred)

    def _build_tree(self, X, gradient, hessian, depth):
        if depth >= self.max_depth or len(X) <= 1:
            leaf_value = -np.sum(gradient) / (np.sum(hessian) + self.lambda_reg)
            return {"leaf": leaf_value}

        # Iterate over features and all possible split points to find the best split
        n_samples, n_features = X.shape
        best_gain = -float('inf')
        split = None
        for feature_idx in range(n_features):
            thresholds = np.unique(X[:, feature_idx])
            for threshold in thresholds:
                left_idx, right_idx = self._split(X[:, feature_idx], threshold)
                if len(left_idx) == 0 or len(right_idx) == 0:
                    continue

                gain = self._calc_gain(gradient, hessian, left_idx, right_idx)
                if gain > best_gain:
                    best_gain = gain
                    split = {"feature_idx": feature_idx, "threshold": threshold,
                             "left_idx": left_idx, "right_idx": right_idx}

        if best_gain == -float('inf'):
            leaf_value = -np.sum(gradient) / (np.sum(hessian) + self.lambda_reg)
            return {"leaf": leaf_value}

        # Build left and right subtrees
        left_subtree = self._build_tree(X[split["left_idx"]], gradient[split["left_idx"]],
                                        hessian[split["left_idx"]], depth + 1)
        right_subtree = self._build_tree(X[split["right_idx"]], gradient[split["right_idx"]],
                                         hessian[split["right_idx"]], depth + 1)

        return {"feature_idx": split["feature_idx"], "threshold": split["threshold"],
                "left": left_subtree, "right": right_subtree}

    def _calc_gain(self, gradient, hessian, left_idx, right_idx):
        # Calculate the gain for the split: gain = G_left^2 / H_left + G_right^2 / H_right - (G_total^2 / H_total)
        G_left = np.sum(gradient[left_idx])
        H_left = np.sum(hessian[left_idx])
        G_right = np.sum(gradient[right_idx])
        H_right = np.sum(hessian[right_idx])

        G_total = G_left + G_right
        H_total = H_left + H_right

        gain = 0.5 * (G_left**2 / (H_left + self.lambda_reg) + G_right**2 / (H_right + self.lambda_reg)
                      - G_total**2 / (H_total + self.lambda_reg))
        return gain

    def _split(self, X_column, threshold):
        # Split data based on a feature column and threshold
        left_idx = np.where(X_column < threshold)[0]
        right_idx = np.where(X_column >= threshold)[0]
        return left_idx, right_idx

    def _predict_tree(self, tree, X):
        # Predict values for given samples X
        if "leaf" in tree:
            return np.ones(X.shape[0]) * tree["leaf"]

        feature_idx = tree["feature_idx"]
        threshold = tree["threshold"]

        left_idx = np.where(X[:, feature_idx] < threshold)[0]
        right_idx = np.where(X[:, feature_idx] >= threshold)[0]

        pred = np.zeros(X.shape[0])
        pred[left_idx] = self._predict_tree(tree["left"], X[left_idx])
        pred[right_idx] = self._predict_tree(tree["right"], X[right_idx])

        return pred

# Example usage
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5], [5, 6]])  # Features
y = np.array([5, 7, 9, 11, 13])  # Target

xgb = XGBoost(n_estimators=5, lr=0.1, max_depth=3, lambda_reg=1.0)
xgb.fit(X, y)
predictions = xgb.predict(X)
