In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

import backend.config as config
import dataprocessing
import surfacemodelclass as sf
import util

### Data preparation
extract current data from the database and prepare for processing

In [None]:
from requests.exceptions import ChunkedEncodingError

CLASSES = config.classes

raw_data = ""
while raw_data == "":
    try:
        raw_data = dataprocessing.get_data_db()
    except ChunkedEncodingError:
        print("Couldn't get Data, retrying ...")

In [None]:
FAKE_DATE = pd.to_datetime("1.1.2000")
BATCH_SIZE = 10
n_cols = 8

NORMALIZE_SIZE = True

pavement_start = pd.Timestamp(year=2023, month=11, day=6, hour=18, minute=34)
pavement_end = pd.Timestamp(year=2023, month=11, day=6, hour=18, minute=54)
asphalt_start_1 = pd.Timestamp(year=2023, month=11, day=6, hour=19, minute=10)
asphalt_end_1 = pd.Timestamp(year=2023, month=11, day=6, hour=19, minute=16)
asphalt_start_2 = pd.Timestamp(year=2023, month=11, day=6, hour=19, minute=31)
asphalt_end_2 = pd.Timestamp(year=2023, month=11, day=6, hour=19, minute=50)


def data_preparation(data):
    global n_cols
    df = pd.read_json(data)

    df['time'] = pd.to_datetime(df['time'], format='mixed')
    for i, row in df.iterrows():
        if (row.time >= pavement_start) and (row.time <= pavement_end):
            df.at[i, 'terrain'] = config.map_to_int('pavement')
        elif (row.time >= asphalt_start_1) and (row.time <= asphalt_end_1):
            df.at[i, 'terrain'] = config.map_to_int('asphalt')
        elif (row.time >= asphalt_start_2) and (row.time <= asphalt_end_2):
            df.at[i, 'terrain'] = config.map_to_int('asphalt')

    df.dropna(subset=['terrain'], inplace=True)

    df['time_second'] = df.time.map(lambda x: pd.Timestamp(x).floor(freq='S'))
    df['time'] = df.time.map(pd.Timestamp.timestamp)

    grouped = df.groupby([df.trip_id, df.time_second])  # grouped.get_group(1)
    x = []
    y = []
    for i, (trip_seconds, table) in enumerate(grouped):
        if (i + 1) % 100 == 0:
            print("# trip seconds: " + str(i + 1))

        train_input = table.drop(columns=['terrain', 'trip_id', 'crash', 'time_second', 'latitude', 'longitude'])
        n_cols = len(train_input.columns)

        train_input = train_input.to_numpy()

        if NORMALIZE_SIZE:
            # print(f"normalizing missing {str(BATCH_SIZE - len(train_input))} rows")
            n_missing_rows = BATCH_SIZE - len(train_input)
            if n_missing_rows <= 4:
                print("Got a good sample")
            for _ in range(n_missing_rows):
                fake_array = [0] * n_cols
                train_input = np.append(train_input, fake_array)

        train_target = table.terrain.min()

        x.append(train_input)
        y.append(train_target)

    return np.array(x), np.array(y)


X, Y = data_preparation(raw_data)

### Training the Model

In [None]:
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader


def gen_dataloader(x, y, test_size=0.2, random_state=0):
    scaler = MinMaxScaler()  # TODO: choose scaler
    scaler.fit(x)

    x = torch.tensor(x)
    y = torch.tensor(y)

    train_x, test_x, train_y, test_y = train_test_split(x, y, test_size=test_size, random_state=random_state)

    train_x = torch.Tensor(train_x)
    train_y = F.one_hot(train_y.long(), len(CLASSES)).to(torch.float32)

    test_x = torch.Tensor(test_x)
    test_y = F.one_hot(test_y.long(), len(CLASSES)).to(torch.float32)

    train_dataset = TensorDataset(train_x, train_y)
    test_dataset = TensorDataset(test_x, test_y)

    train_loader = DataLoader(train_dataset)
    test_loader = DataLoader(test_dataset)

    return train_loader, test_loader


train_loader, test_loader = gen_dataloader(X, Y)  # with default values

In [None]:
from torch import optim

LR = 1e-4
MOMENTUM = 0.9
NUM_EPOCHS = 10
HIDDEN_LAYERS = 64


def train_model(model, train_loader, num_epochs=NUM_EPOCHS, lr=LR, momentum=MOMENTUM, device=None):
    global n_cols
    criterion = nn.CrossEntropyLoss()
    # criterion = nn.MSELoss()
    # criterion = nn.NLLLoss()

    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
    # optimizer = optim.Adam(model.parameters(), lr=lr)

    print("Training STARTED")
    # model.to(device)

    for e in range(0, num_epochs):
        model.train()  # set the model in training mode
        total_train_loss = 0  # initialize the total training and validation loss

        for i, (training_input, target) in enumerate(train_loader):  # loop over the training set
            hidden = model.initHidden()
            model.zero_grad()

            output = [.0, .0, .0, .0]
            for data_row in training_input:
                for i in range(0, len(data_row), n_cols):
                    model_input = data_row[None, i:i + n_cols].float()
                    output, hidden = model(model_input, hidden)

            optimizer.zero_grad()
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            # add the loss to the total training loss so far and calculate the number of correct predictions
            total_train_loss += loss.item()

        if (e + 1) % 10 == 0:
            print("Epoch", e + 1, "Training Loss:", total_train_loss)

    print("Training FINISHED")

    return model, criterion, optimizer


model = sf.RNN(n_cols, HIDDEN_LAYERS, len(CLASSES))
model, criterion, optimizer = train_model(model=model, train_loader=train_loader)


## Testing

In [None]:
def print_training_accuracy(model, train_loader, criterion, classes):
    training_loss, class_correct, class_total = util.compute_accuracy(model=model, loader=train_loader, criterion=criterion, n_cols=n_cols)

    # average training loss
    training_loss = training_loss / len(train_loader.dataset)
    print('Training Loss: {:.6f}\n'.format(training_loss))
    for i in range(10):
        if class_total[i] > 0:
            print('Training Accuracy of %5s: %2d%% (%2d/%2d)' % (
                classes[i], 100.0 * class_correct[i] / class_total[i],
                np.sum(class_correct[i]), np.sum(class_total[i])))
        else:
            print('Training Accuracy of %5s: N/A ' % (classes[i]))

    print('\Training Accuracy (Overall): %2d%% (%2d/%2d)' % (
        100. * np.sum(class_correct) / np.sum(class_total),
        np.sum(class_correct), np.sum(class_total)))


print_training_accuracy(model, train_loader, criterion, CLASSES)

In [None]:
def print_testing_accuracy(model, test_loader, device, classes, criterion=None):
    test_loss, class_correct, class_total = util.compute_accuracy(model=model, loader=test_loader, criterion=criterion, n_cols=n_cols)

    # average test loss
    test_loss = test_loss / len(test_loader.dataset)
    print('Test Loss: {:.6f}\n'.format(test_loss))

    for i in range(10):
        if class_total[i] > 0:
            print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (
                classes[i], 100.0 * class_correct[i] / class_total[i],
                np.sum(class_correct[i]), np.sum(class_total[i])))
        else:
            print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))

    print('\nTest Accuracy (Overall): %2d%% (%2d/%2d)' % (
        100. * np.sum(class_correct) / np.sum(class_total),
        np.sum(class_correct), np.sum(class_total)))


print_testing_accuracy(model, test_loader, criterion, CLASSES)