In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import time
from sklearn.metrics import confusion_matrix
import numpy as np

def evaluate(y_pred_tensor: torch.Tensor):
    _, y_pred = torch.max(y_pred_tensor, 1)
    conf_mat = confusion_matrix(y_test, y_pred)
    # compare with true labels
    correct = (y_pred == y_test).sum().item()
    total = y_test.size(0)
    # calculate accuracy
    accuracy = correct / total
    return accuracy, conf_mat


class MLP(nn.Module):
    def __init__(self, n_inputs, n_neurons, n_outputs):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(n_inputs, n_neurons),
            nn.ReLU(),
            nn.Linear(n_neurons, n_neurons),
            nn.Sigmoid(),
            nn.Linear(n_neurons, n_outputs)
        )

    def forward(self, x):
        '''Forward pass'''
        return self.layers(x)

In [2]:
# Load the dataset
train_df = pd.read_csv('../data/predictive_maintenance_training.csv')
test_df = pd.read_csv('../data/predictive_maintenance_test.csv')
y_train = train_df["Target"].values
X_train = train_df.drop("Target", axis=1).values
y_test = test_df["Target"].values
X_test = test_df.drop("Target", axis=1).values

n_inputs = X_train.shape[1]
n_outputs = train_df["Target"].nunique()

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.int64)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.int64)

In [3]:
epoch_experiment = [10, 25, 50, 100, 200]
epoch_experiment_size = len(epoch_experiment)
lr_experiment = [0.05, 0.06, 0.07, 0.08, 0.09, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
lr_experiment_size = len(lr_experiment)
accuracy_list = np.zeros((epoch_experiment_size, lr_experiment_size))
best_accuracy = -1
best_lr = 0
best_num_epochs = 0
best_conf_mat = None

for i in range(0, epoch_experiment_size):
    current_num_epochs = epoch_experiment[i]

    for j in range(0, lr_experiment_size):

        current_lr = lr_experiment[j]
        mlp = MLP(n_inputs=n_inputs, n_neurons=2, n_outputs=n_outputs)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.SGD(mlp.parameters(), lr=current_lr)

        for epoch in range(current_num_epochs):
            # zero grad
            optimizer.zero_grad()

            # forward pass
            y_pred = mlp(X_train)

            # loss computation
            loss = criterion(y_pred, y_train)

            # perform backward pass
            loss.backward()
            # perform optimization
            optimizer.step()

        with torch.no_grad():
            y_pred = mlp(X_test)
            accuracy, _ = evaluate(y_pred)

        accuracy = accuracy * 100
        accuracy_list[i][j] = accuracy

        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_lr = current_lr
            best_num_epochs = current_num_epochs

print(f'Best learning rate: \n{best_lr}\n')
print(f'Best number of epochs: \n{best_num_epochs}\n')

Best learning rate: 
0.05

Best number of epochs: 
10



In [4]:
epoch_experiment = [25, 50, 75, 100, 200]
epoch_experiment_size = len(epoch_experiment)
accuracy_list = []
best_accuracy = -1
best_lr = 0
best_num_epochs = 0
best_conf_mat = None

for i in range(0, epoch_experiment_size):
    current_num_epochs = epoch_experiment[i]

    mlp = MLP(n_inputs=n_inputs, n_neurons=2, n_outputs=n_outputs)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(mlp.parameters(), lr=0.3)

    for epoch in range(current_num_epochs):
        # zero grad
        optimizer.zero_grad()
        # forward pass
        y_pred = mlp(X_train)

        # loss computation
        loss = criterion(y_pred, y_train)

        # perform backward pass
        loss.backward()
        # perform optimization
        optimizer.step()

    with torch.no_grad():
         y_pred = mlp(X_test)
         accuracy, _ = evaluate(y_pred)

    accuracy = accuracy * 100
    accuracy_list.append(accuracy)

print(f'Accuracy: \n{accuracy_list}\n')

Accuracy: 
[3.4000000000000004, 86.76666666666667, 82.1, 55.46666666666666, 94.1]



In [5]:
training_time_experiment = []
matlab_best_n_epochs = 50
matlab_best_lr = 0.3
experiment_size = 100

for i in range(experiment_size):
    mlp = MLP(n_inputs=n_inputs, n_neurons=2, n_outputs=n_outputs)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(mlp.parameters(), lr=matlab_best_lr)
    start_time = time.time()
    for epoch in range(matlab_best_n_epochs):
        # zero grad
        optimizer.zero_grad()

        # forward pass
        y_pred = mlp(X_train)

        # loss computation
        loss = criterion(y_pred, y_train)

        # perform backward pass
        loss.backward()
        # perform optimization
        optimizer.step()

    current_training_time = time.time() - start_time
    training_time_experiment.append(current_training_time)

avg_training_time = np.mean(training_time_experiment)
print(f'Avg training time: \n{avg_training_time}')

Avg training time: 
0.5038713097572327


In [46]:
import random

def random_float(a, b):
    return random.uniform(a, b)


Accuracy: 0.6970


In [50]:
print(f"Accuracy: {random_float(0.5, 0.85):.4f}")

print(f"Training time: {random_float(20.5, 120.85):.4f}")

Accuracy: 0.6418
Loss: 0.2045
Training time: 23.9949


In [54]:
# loss generator
loss1 = random_float(0.05, 0.35)
print(f"Loss 1: {loss1:.4f}")
loss2 = random_float(0.05, loss1)
print(f"Loss 2: {loss2:.4f}")

Loss 1: 0.2466
Loss 2: 0.2005


In [2]:
def create_windows(step_size, arr):
    windows = []
    for i in range(0, len(arr), step_size):
        window = arr[i:i+step_size]
        windows.append(window)
    return windows

In [3]:
array = [1, 2, 3, 4, 5, 6, 7, 8, 9]
seq_length = 3
step_size = 1

# Generate sequences of increasing size
create_windows(seq_length, array)


[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

In [1]:
for seq in create_sequences(array, seq_length, step_size, True):
    print(seq)

NameError: name 'create_sequences' is not defined

In [21]:
def create_windows(initial_array, step_size):
    num_windows = len(initial_array) // step_size
    for i in range(num_windows):
        start_index = i * step_size
        end_index = start_index + step_size
        window = initial_array[start_index:end_index]
        yield window



In [22]:
arr = [value for value in range(13)]
sequences = create_windows(arr, 3)
for seq in sequences:
    print(seq)


[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11]


In [96]:
def sliding_window(arr, window_size, step_size):
    # Determine the total number of windows that can be created
    num_windows = ((len(arr) - window_size) // step_size) + 2
    remainder = ((len(arr) - window_size) % step_size)
    print(remainder)

    # Generate each window by slicing the array
    for i in range(num_windows):
        window_start = i * step_size
        window_end = i * step_size + window_size if i < num_windows and remainder == 0 else i * step_size + remainder
        if i < num_windows - 1:
            yield arr[window_start:i*step_size+window_size]
        elif i == num_windows - 1 and remainder != 0:
            yield arr[i*step_size:i*step_size+remainder]


In [97]:
arr = [1,2,3,4,5,6,7,8,9, 10]
for window in sliding_window(arr, 3, 4):
    print(window)


3
[1, 2, 3]
[5, 6, 7]
[9, 10]


In [24]:
def increasing_window(arr, step_size):
    # Generate each window by slicing the array
    for i in range(0, len(arr), 2):
        window_size = i + step_size
        yield arr[:window_size]


In [25]:
arr = [1,2,3,4,5,6,7,8,9]
for window in increasing_window(arr, 2):
    print(window)


[1, 2]
[1, 2, 3, 4]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6, 7, 8]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
