In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import f1_score

In [2]:
def linear(x):
    return x

def linear_derivative(x):
    return np.ones_like(x)

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_derivative(x):
    s = sigmoid(x)
    return s * (1 - s)

def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return np.where(x > 0, 1.0, 0.0)

def tanh(x):
    return np.tanh(x)

def tanh_derivative(x):
    return 1.0 - np.tanh(x)**2

def mse(predictions, targets):
    return np.mean((predictions - targets) ** 2)

def cost_derivative(output_activations, y):
    return output_activations - y


In [4]:
class MLP:
    def __init__(self, layer_sizes, activation='relu', scaler=None, verbose=False, beta_momentum=0.90, beta_rmsprop=0.999, epsilon=1e-8):
        self.layer_sizes = layer_sizes
        self.activation_name = activation
        self.activation, self.activation_derivative = {
            'linear': (linear, linear_derivative),
            'sigmoid': (sigmoid, sigmoid_derivative),
            'tanh': (tanh, tanh_derivative),
            'relu': (relu, relu_derivative),
        }[activation]
        self.weights = [np.random.rand(y, x) for x, y in zip(layer_sizes[:-1], layer_sizes[1:])]
        self.biases = [np.random.randn(y, 1) for y in layer_sizes[1:]]
        self.scaler = scaler
        self.beta_momentum = beta_momentum
        self.beta_rmsprop = beta_rmsprop
        self.epsilon = epsilon
        # Initialize momentum and RMSprop caches
        self.vdw = [np.zeros_like(w) for w in self.weights]
        self.sdw = [np.zeros_like(w) for w in self.weights]
        self.vdb = [np.zeros_like(b) for b in self.biases]
        self.sdb = [np.zeros_like(b) for b in self.biases]
        self.verbose = verbose
        if self.verbose:
            print("Initial weights: ", self.weights)
            
    def plot_weights(self, epoch):
        for i, w in enumerate(self.weights):
            plt.figure(figsize=(40, 20))
            plt.hist(w.flatten(), bins=50)
            plt.title(f"Layer {i + 1} Weight Distribution")
            plt.xlabel("Weight Value")
            plt.ylabel("Frequency")

            filename = f"epoch_{epoch}_layer_{i + 1}_weights_distribution.png"
            filepath = f"weights_plots/multimodal-large/{filename}"

            # Save the plot to file
            plt.savefig(filepath)

            plt.clf()

    def print_final_weights_and_biases(self):
        print("Final Weights and Biases:", self.weights, self.biases)

        for i, w in enumerate(self.weights):
            plt.figure(figsize=(40, 20))
            plt.hist(w.flatten(), bins=50)
            plt.title(f"Layer {i + 1} Weight Distribution")
            plt.xlabel("Weight Value")
            plt.ylabel("Frequency")

            filename = f"final_layer_{i + 1}_weights_distribution.png"
            filepath = f"weights_plots/multimodal-large/{filename}"

            # Save the plot to file
            plt.savefig(filepath)

            plt.clf()
    
    def feedforward(self, a):
        activations = [a]
        for b, w in zip(self.biases[:-1], self.weights[:-1]):
            z = np.dot(w, a) + b
            a = self.activation(z)
            activations.append(a)
        # Linear activation for the last layer
        z = np.dot(self.weights[-1], a) + self.biases[-1]
        a = linear(z)
        activations.append(a)
        return a, activations
    
    def backprop(self, x, y):
        nabla_w = [np.zeros(w.shape) for w in self.weights]
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        final_output, activations = self.feedforward(x)
        zs = [np.dot(w, act) + b for w, b, act in zip(self.weights, self.biases, activations[:-1])]
        
        # Output layer error
        delta = cost_derivative(final_output, y)
        nabla_b[-1] = delta
        nabla_w[-1] = np.dot(delta, activations[-2].T)

        # Backpropagate the error
        for l in range(2, len(self.layer_sizes)):
            z = zs[-l]
            sp = self.activation_derivative(z)
            delta = np.dot(self.weights[-l + 1].T, delta) * sp
            nabla_b[-l] = delta
            nabla_w[-l] = np.dot(delta, activations[-l - 1].T)

        return nabla_w, nabla_b
    
    def update_mini_batch(self, mini_batch, learning_rate, lambda_, n):
        nabla_w = [np.zeros(w.shape) for w in self.weights]
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        for x, y in mini_batch:
            delta_nabla_w, delta_nabla_b = self.backprop(x, y)
            nabla_w = [nw + dnw for nw, dnw in zip(nabla_w, delta_nabla_w)]
            nabla_b = [nb + dnb for nb, dnb in zip(nabla_b, delta_nabla_b)]

        # Update velocities for weights
        self.vdw = [self.beta_momentum * v + (1 - self.beta_momentum) * nw for v, nw in zip(self.vdw, nabla_w)]
        self.vdb = [self.beta_momentum * v + (1 - self.beta_momentum) * nb for v, nb in zip(self.vdb, nabla_b)]

        # Update squared gradients for weights
        self.sdw = [self.beta_rmsprop * s + (1 - self.beta_rmsprop) * (nw ** 2) for s, nw in zip(self.sdw, nabla_w)]
        self.sdb = [self.beta_rmsprop * s + (1 - self.beta_rmsprop) * (nb ** 2) for s, nb in zip(self.sdb, nabla_b)]

        # Correct the bias for initial iterations for both velocity and squared gradients
        vdw_corrected = [v / (1 - self.beta_momentum ** (i + 1)) for i, v in enumerate(self.vdw)]
        vdb_corrected = [v / (1 - self.beta_momentum ** (i + 1)) for i, v in enumerate(self.vdb)]
        sdw_corrected = [s / (1 - self.beta_rmsprop ** (i + 1)) for i, s in enumerate(self.sdw)]
        sdb_corrected = [s / (1 - self.beta_rmsprop ** (i + 1)) for i, s in enumerate(self.sdb)]

        # Update weights and biases with L2 regularization, RMSprop and Momentum
        self.weights = [(1 - learning_rate * (lambda_ / n)) * w - (learning_rate / len(mini_batch)) * (
                v / (np.sqrt(s) + self.epsilon))
                        for w, v, s in zip(self.weights, vdw_corrected, sdw_corrected)]
        self.biases = [b - (learning_rate / len(mini_batch)) * (v / (np.sqrt(s) + self.epsilon))
                       for b, v, s in zip(self.biases, vdb_corrected, sdb_corrected)]

    def train(
            self,
            training_data,
            epochs,
            learning_rate,
            batch_size,
            adaptive_learning_rate=True,
            test_data=None,
            treshold_f1_train=-(np.inf),
            treshold_f1_test=-(np.inf),
            lambda_=0.0,
            update_method="batch",
            plot_interval=None,
    ):
        n = len(training_data)
        learning_rate_init = learning_rate

        for j in range(epochs):
            if j % (epochs / 100) == 0:
                print("Epoch: ", j)

                f1_train = self.calculate_f1_score(training_data)
                print(f1_train)
                if test_data:
                    if f1_train > treshold_f1_train:
                        f1_test = self.calculate_f1_score(test_data)
                        print(f1_test)
                        if f1_test > treshold_f1_test:
                            break
            # Plot weights at the specified interval
            if self.verbose and plot_interval and j % plot_interval == 0:
                self.plot_weights(epoch=j)

            np.random.shuffle(training_data)
            if update_method == "batch":
                mini_batches = [
                    training_data[k: k + batch_size] for k in range(0, n, batch_size)
                ]
                for mini_batch in mini_batches:
                    self.update_mini_batch(mini_batch, learning_rate, lambda_, n)
            elif update_method == "epoch":
                self.update_mini_batch(training_data, learning_rate, lambda_, n)
                
            if adaptive_learning_rate:
                # Learning rate schedule
                #learning_rate = learning_rate_init / (1 + 0.1 * j)
                if j % 10 == 0:
                    learning_rate *= 0.8

In [5]:
class DataScaler:
    def __init__(self, method="standardization"):
        self.method = method
        self.min = None
        self.max = None
        self.mean = None
        self.std = None

    def fit_transform(self, data):
        if self.method == "min_max":
            return self.fit_transform_min_max(data)
        elif self.method == "standardization":
            return self.fit_transform_standardization(data)
        else:
            raise ValueError("Unsupported scaling method")

    def transform(self, data):
        if self.method == "min_max":
            return self.transform_min_max(data)
        elif self.method == "standardization":
            return self.transform_standardization(data)
        else:
            raise ValueError("Unsupported scaling method")

    def inverse_transform(self, data):
        if self.method == "min_max":
            return self.inverse_transform_min_max(data)
        elif self.method == "standardization":
            return self.inverse_transform_standardization(data)
        else:
            raise ValueError("Unsupported scaling method")

    def fit_transform_min_max(self, data):
        self.min = np.min(data, axis=0)
        self.max = np.max(data, axis=0)
        return (data - self.min) / (self.max - self.min)

    def transform_min_max(self, data):
        return (data - self.min) / (self.max - self.min)

    def inverse_transform_min_max(self, data):
        return data * (self.max - self.min) + self.min

    def fit_transform_standardization(self, data):
        self.mean = np.mean(data, axis=0)
        self.std = np.std(data, axis=0)
        return (data - self.mean) / self.std

    def transform_standardization(self, data):
        return (data - self.mean) / self.std

    def inverse_transform_standardization(self, data):
        return data * self.std + self.mean

## WCZYTANIE DANYCH

In [7]:
df_train_square = pd.read_csv("mio1/regression/multimodal-large-training.csv")
X_train_square = df_train_square["x"].values.reshape(-1, 1)
y_train_square = df_train_square["y"].values.reshape(-1, 1)

df_test_square = pd.read_csv("mio1/regression/multimodal-large-test.csv")
X_test_square = df_test_square["x"].values.reshape(-1, 1)
y_test_square = df_test_square["y"].values.reshape(-1, 1)

In [8]:
# Initialize the scaler for X and y with the desired scaling method
scaler_X = DataScaler(method="min_max")
scaler_y = DataScaler(method="min_max")

In [9]:
# Fit and transform the training data
X_train_scaled = scaler_X.fit_transform(X_train_square)
y_train_scaled = scaler_y.fit_transform(y_train_square)
X_test_scaled = scaler_X.fit_transform(X_test_square)
y_test_scaled = scaler_y.fit_transform(y_test_square)

In [10]:
np.random.seed(42)

In [11]:
mlp_square_1_5 = MLP([1, 64, 32, 32, 1], scaler=scaler_y, verbose=False)

In [12]:
training_data_scaled = [
    (x.reshape(-1, 1), y) for x, y in zip(X_train_scaled, y_train_scaled)
]
test_data_scaled = [(x.reshape(-1, 1), y) for x, y in zip(X_test_scaled, y_test_scaled)]
mlp_square_1_5.train(
    training_data_scaled,
    epochs=300,
    learning_rate=1,
    batch_size=10,
    test_data=test_data_scaled,
    threshold_mse_train=20,
    threshold_mse_test=9,
    plot_interval=20,
)


# Scale the test data using the transform method (DO NOT refit scaler)
X_test_scaled = scaler_X.transform(X_test_square)

# Generate predictions on the scaled test data and inverse transform
predictions_scaled = np.array(
    [mlp_square_1_5.feedforward(x.reshape(-1, 1))[0] for x in X_test_scaled]
)
predictions = scaler_y.inverse_transform(predictions_scaled.reshape(-1, 1))

train_predictions_scaled = np.array(
    [mlp_square_1_5.feedforward(x.reshape(-1, 1))[0] for x in X_train_scaled]
)
train_predictions = scaler_y.inverse_transform(train_predictions_scaled.reshape(-1, 1))
# Calculate MSE score
for i in range(len(predictions)):
    print("predicted value: ", predictions[i], "actual value: ", y_test_square[i])
mse_score_train = mse(train_predictions, y_train_square)

print(f"Train MSE Score: {mse_score_train}")
mse_score = mse(predictions, y_test_square)

print(f"MSE Score: {mse_score}")

TypeError: train() got an unexpected keyword argument 'threshold_mse_train'