In [None]:
# @title Definicje Klas i Funkcji Pomocniczych
import numpy as np
import urllib.request
import zipfile
import os
import struct
import time
import ssl

# Definicje Klas

class Layer:
    """Reprezentuje warstwę w pełni połączoną (liniową) z obsługą batchy"""
    def __init__(self, input_size, output_size):
        # Zmiana inicjalizacji wag, aby dopasować do różnych aktywacji
        self.weights = np.random.randn(input_size, output_size) * np.sqrt(2. / input_size)
        self.bias = np.zeros((1, output_size))
        self.input = None
        self.weights_error = None
        self.bias_error = None

    def forward(self, input_data):
        self.input = input_data
        return self.input @ self.weights + self.bias

    def backward(self, output_error):
        self.weights_error = self.input.T @ output_error
        self.bias_error = np.sum(output_error, axis=0, keepdims=True)
        input_error = output_error @ self.weights.T
        return input_error

    def adjust(self, learning_rate):
        self.weights -= learning_rate * self.weights_error
        self.bias -= learning_rate * self.bias_error

class Activation:
    """Reprezentuje warstwę aktywacji"""
    def __init__(self, activation, activation_prime):
        self.activation = activation
        self.activation_prime = activation_prime
        self.input = None
        self.output = None

    def forward(self, input_data):
        self.input = input_data
        self.output = self.activation(self.input)
        return self.output

    def backward(self, output_error):
        return output_error * self.activation_prime(self.input)

    def adjust(self, learning_rate):
        pass

class Dropout:
    """Warstwa Dropout"""
    def __init__(self, rate):
        self.rate = rate
        self.mask = None

    def forward(self, input_data, training=True):
        if training:
            # Tworzymy maskę i skalujemy
            self.mask = np.random.binomial(1, 1 - self.rate, size=input_data.shape) / (1 - self.rate)
            return input_data * self.mask
        else:
            return input_data

    def backward(self, output_error):
        # Propagujemy błąd tylko przez aktywne neurony
        return output_error * self.mask

    def adjust(self, learning_rate):
        pass


# Funkcje Aktywacji

def relu(x):
    return np.maximum(0, x)

def relu_prime(x):
    return (x > 0).astype(float)

def sigmoid(x):
    return 1 / (1 + np.exp(-np.clip(x, -500, 500)))

def sigmoid_prime(x):
    s = sigmoid(x)
    return s * (1 - s)

def tanh(x):
    return np.tanh(x)

def tanh_prime(x):
    return 1 - np.tanh(x)**2

def softmax(x):
    exps = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exps / np.sum(exps, axis=1, keepdims=True)

def softmax_prime(x):
    return 1

# Funkcje do ładowania danych MNIST
def _download_data(url, save_path='.'):
    zip_path = os.path.join(save_path, 'MNIST_ORG.zip')
    data_dir = os.path.join(save_path, 'mnist_data')
    if not os.path.exists(data_dir):
        print("Pobieranie danych MNIST...")
        ssl_context = ssl._create_unverified_context()
        with urllib.request.urlopen(url, context=ssl_context) as response, open(zip_path, 'wb') as out_file:
            out_file.write(response.read())
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(data_dir)
        os.remove(zip_path)
        print("Dane gotowe.")
    return data_dir

def _load_mnist(data_dir):
    def load_images(filename):
        with open(filename, 'rb') as f:
            _, num, rows, cols = struct.unpack('>IIII', f.read(16))
            return np.fromfile(f, dtype=np.uint8).reshape(num, rows * cols).astype(np.float32) / 255.0
    def load_labels(filename):
        with open(filename, 'rb') as f:
            _, num = struct.unpack('>II', f.read(8))
            labels = np.fromfile(f, dtype=np.uint8)
            one_hot = np.zeros((num, 10))
            one_hot[np.arange(num), labels] = 1
            return one_hot, labels

    X_train = load_images(os.path.join(data_dir, 'train-images.idx3-ubyte'))
    Y_train, y_train_labels = load_labels(os.path.join(data_dir, 'train-labels.idx1-ubyte'))
    X_test = load_images(os.path.join(data_dir, 't10k-images.idx3-ubyte'))
    _, y_test_labels = load_labels(os.path.join(data_dir, 't10k-labels.idx1-ubyte'))
    return X_train, Y_train, y_train_labels, X_test, y_test_labels

# Wczytanie danych raz na początku
data_dir = _download_data('http://pduch.kis.p.lodz.pl/PSI/MNIST_ORG.zip')
X_train_all, Y_train_all, y_train_labels_all, X_test_all, y_test_labels_all = _load_mnist(data_dir)

print("Klasy, funkcje pomocnicze i dane MNIST gotowe.")

Pobieranie danych MNIST...
Dane gotowe.
Klasy, funkcje pomocnicze i dane MNIST gotowe.


In [None]:
# @title L4 - Zadanie 1: Implementacja Dropout
# section: main_execution (Zadanie 4.3.1)
def run_experiment_dropout(hidden_size, train_size, epochs, alpha):
    print("\n" + "="*80)
    print(f"START EKSPERYMENTU (Dropout): Hidden: {hidden_size}, Train: {train_size}")
    print(f"Params: alpha={alpha}, epochs={epochs}, weights=<-0.1, 0.1>")
    print("="*80)

    # Przygotowanie danych
    X_train = X_train_all[:train_size]
    Y_train = Y_train_all[:train_size]
    y_train_labels = y_train_labels_all[:train_size]
    X_test = X_test_all
    y_test_labels = y_test_labels_all

    # Budowa sieci
    network = [
        Layer(784, hidden_size),
        Activation(relu, relu_prime),
        Dropout(0.5),
        Layer(hidden_size, 10)
    ]
    network[0].weights = np.random.uniform(-0.1, 0.1, (784, hidden_size))
    network[3].weights = np.random.uniform(-0.1, 0.1, (hidden_size, 10))

    start_time = time.time()
    for epoch in range(epochs):
        total_error = 0
        train_correct = 0
        indices = np.arange(train_size)
        np.random.shuffle(indices)

        for i in indices:
            x = X_train[i:i+1]
            y = Y_train[i:i+1]

            # Forward pass (z włączonym Dropout)
            output = x
            for layer in network:
                if isinstance(layer, Dropout):
                    output = layer.forward(output, training=True)
                else:
                    output = layer.forward(output)

            # Zbieranie statystyk
            total_error += np.sum((output - y)**2)
            if np.argmax(output) == y_train_labels[i]:
                train_correct += 1

            # Obliczanie błędu do propagacji
            error = 2 * (output - y) / y.size

            # Backward pass
            for layer in reversed(network):
                error = layer.backward(error)

            # Aktualizacja wag
            for layer in network:
                layer.adjust(alpha)

        # Ewaluacja i logowanie
        if epoch % 10 == 0 or epoch == epochs - 1:
            test_correct = 0
            for i in range(len(X_test)):
                x_test = X_test[i:i+1]
                output = x_test
                # Forward pass (z wyłączonym Dropout)
                for layer in network:
                    if isinstance(layer, Dropout):
                        output = layer.forward(output, training=False)
                    else:
                        output = layer.forward(output)
                if np.argmax(output) == y_test_labels[i]:
                    test_correct += 1

            avg_error = total_error / train_size
            train_accuracy = (train_correct / train_size) * 100
            test_accuracy = (test_correct / len(X_test)) * 100

            print(f"Iter: {epoch:3d} Error: {avg_error:.6f} Train Acc: {train_accuracy:.2f}% Test Acc: {test_accuracy:.2f}%")

    print(f"Trening zakończony w {time.time() - start_time:.2f}s")

# Uruchomienie eksperymentów z zadania
run_experiment_dropout(hidden_size=40, train_size=1000, epochs=350, alpha=0.005)
run_experiment_dropout(hidden_size=100, train_size=10000, epochs=350, alpha=0.005)
run_experiment_dropout(hidden_size=100, train_size=60000, epochs=350, alpha=0.005)


START EKSPERYMENTU (Dropout): Hidden: 40, Train: 1000
Params: alpha=0.005, epochs=350, weights=<-0.1, 0.1>
Iter:   0 Error: 0.990782 Train Acc: 15.50% Test Acc: 33.72%
Iter:  10 Error: 0.623250 Train Acc: 59.00% Test Acc: 67.72%
Iter:  20 Error: 0.512055 Train Acc: 67.50% Test Acc: 75.23%
Iter:  30 Error: 0.467602 Train Acc: 72.70% Test Acc: 79.65%
Iter:  40 Error: 0.431993 Train Acc: 76.20% Test Acc: 81.36%
Iter:  50 Error: 0.399666 Train Acc: 77.80% Test Acc: 82.71%
Iter:  60 Error: 0.389508 Train Acc: 79.60% Test Acc: 83.48%
Iter:  70 Error: 0.371970 Train Acc: 81.20% Test Acc: 84.56%
Iter:  80 Error: 0.364072 Train Acc: 82.70% Test Acc: 84.90%
Iter:  90 Error: 0.342050 Train Acc: 85.00% Test Acc: 85.07%
Iter: 100 Error: 0.338579 Train Acc: 84.40% Test Acc: 85.10%
Iter: 110 Error: 0.329628 Train Acc: 86.00% Test Acc: 85.36%
Iter: 120 Error: 0.309416 Train Acc: 87.00% Test Acc: 85.61%
Iter: 130 Error: 0.324329 Train Acc: 84.20% Test Acc: 85.68%
Iter: 140 Error: 0.306630 Train Acc: 8

In [None]:
# @title L4 - Zadanie 2: Implementacja Mini-Batch
# section: main_execution (Zadanie 4.3.2)
def run_experiment_minibatch(hidden_size, train_size, epochs, alpha, batch_size):
    print("\n" + "="*80)
    print(f"START EKSPERYMENTU (Mini-Batch): Hidden: {hidden_size}, Train: {train_size}, Batch: {batch_size}")
    print(f"Params: alpha={alpha}, epochs={epochs}, weights=<-0.1, 0.1>")
    print("="*80)

    # Przygotowanie danych
    X_train = X_train_all[:train_size]
    Y_train = Y_train_all[:train_size]
    y_train_labels = y_train_labels_all[:train_size]
    X_test = X_test_all
    y_test_labels = y_test_labels_all

    # Budowa sieci
    network = [
        Layer(784, hidden_size),
        Activation(relu, relu_prime),
        Dropout(0.5),
        Layer(hidden_size, 10)
    ]
    network[0].weights = np.random.uniform(-0.1, 0.1, (784, hidden_size))
    network[3].weights = np.random.uniform(-0.1, 0.1, (hidden_size, 10))

    start_time = time.time()
    for epoch in range(epochs):
        total_error = 0
        indices = np.arange(train_size)
        np.random.shuffle(indices)

        for i in range(0, train_size, batch_size):
            batch_indices = indices[i:i+batch_size]
            x_batch = X_train[batch_indices]
            y_batch = Y_train[batch_indices]

            # Forward pass
            output = x_batch
            for layer in network:
                if isinstance(layer, Dropout):
                    output = layer.forward(output, training=True)
                else:
                    output = layer.forward(output)

            total_error += np.sum((output - y_batch)**2)

            # Obliczanie błędu do propagacji
            error = 2 * (output - y_batch) / y_batch.size

            # Backward pass
            for layer in reversed(network):
                error = layer.backward(error)

            # Aktualizacja wag
            for layer in network:
                layer.adjust(alpha)

        # Ewaluacja i logowanie
        if epoch % 10 == 0 or epoch == epochs - 1:
            # Ewaluacja na zbiorze treningowym (z wyłączonym dropout)
            train_correct = 0
            for j in range(0, train_size, batch_size):
                x_train_batch = X_train[j:j+batch_size]
                y_train_labels_batch = y_train_labels[j:j+batch_size]
                output = x_train_batch
                for layer in network:
                    if isinstance(layer, Dropout):
                        output = layer.forward(output, training=False)
                    else:
                        output = layer.forward(output)
                train_correct += np.sum(np.argmax(output, axis=1) == y_train_labels_batch)

            # Ewaluacja na zbiorze testowym
            test_correct = 0
            for j in range(0, len(X_test), batch_size):
                x_test_batch = X_test[j:j+batch_size]
                y_test_labels_batch = y_test_labels[j:j+batch_size]
                output = x_test_batch
                for layer in network:
                    if isinstance(layer, Dropout):
                        output = layer.forward(output, training=False)
                    else:
                        output = layer.forward(output)
                test_correct += np.sum(np.argmax(output, axis=1) == y_test_labels_batch)

            avg_error = total_error / train_size
            train_accuracy = (train_correct / train_size) * 100
            test_accuracy = (test_correct / len(X_test)) * 100

            print(f"Iter: {epoch:3d} Error: {avg_error:.6f} Train Acc: {train_accuracy:.2f}% Test Acc: {test_accuracy:.2f}%")

    print(f"Trening zakończony w {time.time() - start_time:.2f}s")

# Uruchomienie eksperymentów z zadania
run_experiment_minibatch(hidden_size=40, train_size=1000, epochs=350, alpha=0.1, batch_size=100)
run_experiment_minibatch(hidden_size=100, train_size=10000, epochs=350, alpha=0.1, batch_size=100)
run_experiment_minibatch(hidden_size=100, train_size=60000, epochs=350, alpha=0.1, batch_size=100)


START EKSPERYMENTU (Mini-Batch): Hidden: 40, Train: 1000, Batch: 100
Params: alpha=0.1, epochs=350, weights=<-0.1, 0.1>
Iter:   0 Error: 1.198729 Train Acc: 13.20% Test Acc: 12.76%
Iter:  10 Error: 0.837676 Train Acc: 49.40% Test Acc: 45.18%
Iter:  20 Error: 0.767793 Train Acc: 64.50% Test Acc: 57.81%
Iter:  30 Error: 0.712162 Train Acc: 69.00% Test Acc: 63.10%
Iter:  40 Error: 0.675672 Train Acc: 73.40% Test Acc: 66.97%
Iter:  50 Error: 0.632124 Train Acc: 75.20% Test Acc: 69.30%
Iter:  60 Error: 0.599789 Train Acc: 77.30% Test Acc: 71.14%
Iter:  70 Error: 0.583847 Train Acc: 78.40% Test Acc: 72.77%
Iter:  80 Error: 0.557539 Train Acc: 79.90% Test Acc: 73.58%
Iter:  90 Error: 0.541881 Train Acc: 81.30% Test Acc: 74.66%
Iter: 100 Error: 0.525972 Train Acc: 82.90% Test Acc: 75.22%
Iter: 110 Error: 0.504870 Train Acc: 83.60% Test Acc: 76.15%
Iter: 120 Error: 0.514933 Train Acc: 84.40% Test Acc: 77.14%
Iter: 130 Error: 0.478173 Train Acc: 86.00% Test Acc: 78.01%
Iter: 140 Error: 0.470652

In [None]:
# @title L4 - Zadanie 3: Tanh + Softmax
# section: main_execution (Zadanie 4.3.3)
def run_experiment_advanced(train_size, epochs, alpha, batch_size):
    print("\n" + "="*80)
    print(f"START EKSPERYMENTU (Tanh+Softmax): Train: {train_size}, Alpha: {alpha}")
    print(f"Params: epochs={epochs}, batch_size={batch_size}, weights=<-0.01, 0.01>")
    print("="*80)

    # Przygotowanie danych
    X_train = X_train_all[:train_size]
    Y_train = Y_train_all[:train_size]
    y_train_labels = y_train_labels_all[:train_size]
    X_test = X_test_all
    y_test_labels = y_test_labels_all
    hidden_size = 100

    # Budowa sieci
    network = [
        Layer(784, hidden_size),
        Activation(tanh, tanh_prime),
        Layer(hidden_size, 10),
        Activation(softmax, softmax_prime)
    ]
    network[0].weights = np.random.uniform(-0.01, 0.01, (784, hidden_size))
    network[2].weights = np.random.uniform(-0.01, 0.01, (hidden_size, 10))

    start_time = time.time()
    for epoch in range(epochs):
        total_error = 0
        train_correct = 0
        indices = np.arange(train_size)
        np.random.shuffle(indices)

        for i in range(0, train_size, batch_size):
            batch_indices = indices[i:i+batch_size]
            x_batch = X_train[batch_indices]
            y_batch = Y_train[batch_indices]

            # Forward pass
            output = x_batch
            for layer in network:
                output = layer.forward(output)

            # Zbieranie statystyk
            total_error += np.sum((output - y_batch)**2)
            train_correct += np.sum(np.argmax(output, axis=1) == np.argmax(y_batch, axis=1))

            # Obliczanie błędu do propagacji
            error = (output - y_batch) / batch_size

            # Backward pass
            error = network[2].backward(error)
            error = network[1].backward(error)
            error = network[0].backward(error)

            # Aktualizacja wag
            network[2].adjust(alpha)
            network[0].adjust(alpha)

        # Ewaluacja i logowanie
        if epoch % 10 == 0 or epoch == epochs - 1:
            test_correct = 0
            for j in range(0, len(X_test), batch_size):
                x_test_batch = X_test[j:j+batch_size]
                y_test_labels_batch = y_test_labels[j:j+batch_size]
                output = x_test_batch
                for layer in network:
                    output = layer.forward(output)
                test_correct += np.sum(np.argmax(output, axis=1) == y_test_labels_batch)

            avg_error = total_error / train_size
            train_accuracy = (train_correct / train_size) * 100
            test_accuracy = (test_correct / len(X_test)) * 100

            print(f"Iter: {epoch:3d} Error: {avg_error:.6f} Train Acc: {train_accuracy:.2f}% Test Acc: {test_accuracy:.2f}%")

    print(f"Trening zakończony w {time.time() - start_time:.2f}s")

# Uruchomienie eksperymentów z zadania
run_experiment_advanced(train_size=1000, epochs=350, alpha=0.02, batch_size=100)
run_experiment_advanced(train_size=10000, epochs=350, alpha=0.2, batch_size=100)
run_experiment_advanced(train_size=60000, epochs=350, alpha=0.5, batch_size=100)


START EKSPERYMENTU (Tanh+Softmax): Train: 1000, Alpha: 0.02
Params: epochs=350, batch_size=100, weights=<-0.01, 0.01>
Iter:   0 Error: 0.899920 Train Acc: 10.10% Test Acc: 13.60%
Iter:  10 Error: 0.894566 Train Acc: 43.00% Test Acc: 40.72%
Iter:  20 Error: 0.865844 Train Acc: 51.40% Test Acc: 47.71%
Iter:  30 Error: 0.742690 Train Acc: 55.60% Test Acc: 52.08%
Iter:  40 Error: 0.575852 Train Acc: 63.70% Test Acc: 61.23%
Iter:  50 Error: 0.450681 Train Acc: 74.10% Test Acc: 71.64%
Iter:  60 Error: 0.362187 Train Acc: 82.10% Test Acc: 76.92%
Iter:  70 Error: 0.300872 Train Acc: 84.80% Test Acc: 79.53%
Iter:  80 Error: 0.258297 Train Acc: 86.40% Test Acc: 81.08%
Iter:  90 Error: 0.227358 Train Acc: 88.00% Test Acc: 82.03%
Iter: 100 Error: 0.203398 Train Acc: 88.40% Test Acc: 82.53%
Iter: 110 Error: 0.184671 Train Acc: 89.10% Test Acc: 83.37%
Iter: 120 Error: 0.169520 Train Acc: 89.60% Test Acc: 83.99%
Iter: 130 Error: 0.156695 Train Acc: 90.00% Test Acc: 84.52%
Iter: 140 Error: 0.145608 T