In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.nn.functional import normalize
from pandas import DataFrame
from datetime import datetime, timezone
import time
import requests
import math
import matplotlib.pyplot as plt
import json
import os
from enum import StrEnum
import pandas as pd

In [None]:
LEARNING_START = "learning_started"
PROCESSING_FINISH = "processing_finished"
LEARNING_SUCCESS = "learning_success"
LEARNING_FAIL = "learning_failed"

In [None]:
class LogStructs:
    @staticmethod
    def get_time():
        return str(datetime.now(timezone.utc))

    @staticmethod
    def learning_start(line: int, direction: int) -> dict:
        return {
            "line": line,
            "direction": direction,
            "type": LEARNING_START,
            "log": "START",
            "timestamp": LogStructs.get_time(),
        }

    @staticmethod
    def processing_finish(
        line: int, direction: int, records_count: int, time_taken: int
    ) -> dict:
        return {
            "line": line,
            "direction": direction,
            "type": PROCESSING_FINISH,
            "log": {"records_count": records_count, "time": time_taken},
            "timestamp": LogStructs.get_time(),
        }

    @staticmethod
    def learning_success(
        line: int, direction: int, parameters_count: int, loss: float, time_taken: int
    ) -> dict:
        return {
            "line": line,
            "direction": direction,
            "type": LEARNING_SUCCESS,
            "log": {
                "parameters_count": parameters_count,
                "loss": loss,
                "time": time_taken,
            },
            "timestamp": LogStructs.get_time(),
        }

    @staticmethod
    def learning_fail(line: int, direction: int, exception: str) -> dict:
        return {
            "line": line,
            "direction": direction,
            "type": LEARNING_FAIL,
            "log": exception,
            "timestamp": LogStructs.get_time(),
        }

In [None]:
%load_ext sql
%config SqlMagic.style = '_DEPRECATED_DEFAULT'
%config SqlMagic.autopandas = True
%sql postgresql+psycopg://admin:admin@localhost:5432/buses

In [None]:
LINE_NAME = int(os.getenv("line_name"))
DIRECTION = int(os.getenv("direction"))

In [None]:
log = json.dumps(LogStructs.learning_start(LINE_NAME, DIRECTION))

In [None]:
%sql UPDATE public.ai_tasks SET status = 'STARTED' WHERE line_name = :LINE_NAME AND direction = :DIRECTION
%sql INSERT INTO public.ai_logs (line, direction, log_type, log) VALUES (:LINE_NAME, :DIRECTION, :LEARNING_START, :log)

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available() else "cpu"
)

print(f"Using {device} device")

In [None]:
def get_model_data_from_db(line_name: int, optional_direction: str | int):
    if isinstance(optional_direction, str): 
        res = %sql SELECT id FROM public.directions WHERE value = :optional_direction
        direction_id = res[0][0]
    else:
        direction_id = optional_direction
    records = %sql SELECT course_loid, day_course_loid, longitude, latitude, angle, reached_meters, order_in_course, last_ping_date FROM \
                public.positions WHERE optional_direction = :direction_id AND line_name = :line_name
    course_loids = %sql SELECT DISTINCT(course_loid) FROM public.positions WHERE optional_direction = :direction_id \
                AND line_name = :line_name
    return (records, course_loids)

In [None]:
def get_stop_points():
    stops = requests.get("https://przystanki.bialystok.pl/portal/getStops.json").json()
    parsed = {
        point["symbol"]: {
            "latitude": point["latitude"],
            "longitude": point["longitude"],
        }
        for point in stops["stopPoints"]
    }
    return parsed

In [None]:
def get_real_courses(course_loids: list[int]):
    courses = {}
    for loid in course_loids:
        course = requests.get(
            f"https://przystanki.bialystok.pl/portal/getRealCourse.json?courseId={loid}"
        ).json()
        courses[loid] = {
            point["orderInCourse"]: {
                "scheduled_departure": point["scheduledDepartureSec"],
                "symbol": point["stopPointSymbol"],
            }
            for point in course["realCourse"]["stoppings"]
        }
    return courses

In [None]:
def prepare_objects(data: DataFrame, course_loids: DataFrame) -> torch.Tensor:
    parsed_x = []
    parsed_x_extended = []
    parsed_y = []
    courses = {}
    counter = 0
    stop_points = get_stop_points()
    loids = [row.course_loid for _, row in course_loids.iterrows()]
    courses = get_real_courses(loids)
    for _, row in data.iterrows():
        counter += 1
        if counter % 10000 == 0:
            print(f"Parsed {counter}")
        predicted_stop_point = (
            (row.order_in_course + 1)
            if data[
                (
                    (data["day_course_loid"] == row.day_course_loid)
                    & (data["order_in_course"] == (row.order_in_course + 1))
                )
            ]
            .any()
            .any()
            else row.order_in_course
        )
        dt = datetime.fromtimestamp(row.last_ping_date / 1000)
        time = dt.hour * 3600 + dt.minute * 60 + dt.second
        arrived_dt = datetime.fromtimestamp(
            data[
                (
                    (data["day_course_loid"] == row.day_course_loid)
                    & (data["order_in_course"] == predicted_stop_point)
                )
            ]
            .nsmallest(1, "last_ping_date")
            .last_ping_date.item()
            / 1000
        )
        arrived_time = (
            arrived_dt.hour * 3600 + arrived_dt.minute * 60 + arrived_dt.second
        )
        stop_point_data = courses[row.course_loid][predicted_stop_point]
        parsed_x.append(
            [
                row.longitude,
                row.latitude,
                float(row.angle),
                float(row.reached_meters),
                float(row.order_in_course),
                time,
                stop_point_data["scheduled_departure"],
                predicted_stop_point,
            ]
        )

        parsed_x_extended.append(
            [
                row.longitude,
                row.latitude,
                float(row.angle),
                float(row.reached_meters),
                float(row.order_in_course),
                time,
                stop_point_data["scheduled_departure"],
                predicted_stop_point,
                stop_points[stop_point_data["symbol"]]["latitude"],
                stop_points[stop_point_data["symbol"]]["longitude"],
            ]
        )
        parsed_y.append([arrived_time - time])

    parsed_x = normalize(torch.tensor(parsed_x, dtype=torch.float32, device="cuda"))
    parsed_x_extended = normalize(
        torch.tensor(parsed_x_extended, dtype=torch.float32, device="cuda")
    )
    parsed_y = torch.tensor(parsed_y, dtype=torch.float32, device="cuda")

    return zip(parsed_x, parsed_y), zip(parsed_x_extended, parsed_y)

In [None]:
def train(model, dataset, num_epochs=100, batch_size=16, learning_rate=0.0011):

    learning_start = time.monotonic()
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    criterion = nn.L1Loss()  # Zmień na MSELoss jeśli Y jest ciągłe
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Przygotowanie do przechowywania historii strat
    history = {"epoch_loss": []}
    min_loss = float("inf")

    for epoch in range(num_epochs):
        epoch_loss = 0
        for inputs, targets in dataloader:
            inputs = inputs.to(device)
            targets = targets.to(device)

            # Forward i backward pass
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            # Sumowanie strat na potrzeby obliczenia średniej
            epoch_loss += loss.item()

        # Oblicz średnią stratę na epokę i dodaj do historii
        epoch_loss /= len(dataloader)
        history["epoch_loss"].append(epoch_loss)

        # Aktualizacja minimalnej straty
        if epoch_loss < min_loss:
            min_loss = epoch_loss

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")

    log = json.dumps(LogStructs.learning_success(LINE_NAME, DIRECTION, len(dataset[0][0]), epoch_loss, int(time.monotonic() - learning_start)))
    %sql INSERT INTO public.ai_logs (line, direction, log_type, log) VALUES (:LINE_NAME, :DIRECTION, :LEARNING_SUCCESS, :log)

    output_dir = f"{LINE_NAME}_{DIRECTION}/{epoch_loss}_{datetime.now(timezone.utc)}"
    os.makedirs(os.path.dirname(f"{output_dir}/training_history.json"), exist_ok=True)
    torch.save(model, f"{output_dir}/model.pt")
    # Zapis historii strat do pliku JSON
    with open(f"{output_dir}/training_history.json", "w") as f:
        json.dump(history, f)

    data = []
    for inputs, targets in dataset:
        row = {"inputs": inputs.tolist(), "targets": targets.tolist()}
        data.append(row)
    dataset_df = pd.DataFrame(data)
    dataset_df.to_csv(f"{output_dir}/dataset.csv", index=False)

    # Tworzenie wykresu strat i zapis do pliku
    plt.plot(range(1, num_epochs + 1), history["epoch_loss"], label="Training Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training Loss Over Epochs")
    plt.legend()
    plt.grid(True)
    plt.savefig(f"{output_dir}/loss_plot.png")
    plt.close()

    # Generowanie raportu
    with open(f"{output_dir}/training_report.txt", "w") as f:
        f.write("Model Training Report\n")
        f.write("=====================\n")
        f.write(f"Model: {model}\n\n")
        f.write(f"Number of epochs: {num_epochs}\n")
        f.write(f"Batch size: {batch_size}\n")
        f.write(f"Learning rate: {learning_rate}\n")
        f.write(f"Final Loss: {epoch_loss:.4f}\n")
        f.write(f"Minimum Loss: {min_loss:.4f}\n")

    return model

In [None]:
def test(model, test_dataset, batch_size=16):
    dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    criterion = nn.L1Loss()  # or nn.MSELoss() depending on your use case

    model.eval()  # Set the model to evaluation mode
    test_loss = 0
    with torch.no_grad():  # Disable gradient calculation
        for inputs, targets in dataloader:
            inputs = inputs.to(device)
            targets = targets.to(device)

            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, targets)  # Calculate loss
            test_loss += loss.item() * inputs.size(
                0
            )  # Accumulate loss (weighted by batch size)

    avg_test_loss = test_loss / len(test_dataset)
    print(f"Test Loss: {avg_test_loss:.4f}")

    return avg_test_loss

In [None]:
def run_script():
    try:
        data, loids = get_model_data_from_db(LINE_NAME, DIRECTION)

        processing_start = time.monotonic()
        prepared, prepared_extended = prepare_objects(data, loids)

        data_zipped = list(prepared)
        data_extended_zipped = list(prepared_extended)

        l = len(data_zipped)
        training_data, test_data = data_zipped[:math.floor(0.8*l)], data_zipped[math.floor(0.8*l):]
        etraining_data, etest_data = data_extended_zipped[:math.floor(0.8*l)], data_extended_zipped[math.floor(0.8*l):]

        log = json.dumps(LogStructs.processing_finish(LINE_NAME, DIRECTION, l, int(time.monotonic() - processing_start)))
        %sql INSERT INTO public.ai_logs (line, direction, log_type, log) VALUES (:LINE_NAME, :DIRECTION, :PROCESSING_FINISH, :log)

        net = nn.Sequential(
            nn.Linear(8, 48),
            nn.LeakyReLU(),
            nn.Linear(48,32),
            nn.LeakyReLU(),
            nn.Linear(32, 16),
            nn.LeakyReLU(),
            nn.Linear(16, 1),
        ).to(device)
        net_extraneous = nn.Sequential(
            nn.Linear(10, 56),
            nn.LeakyReLU(),
            nn.Linear(56,48),
            nn.LeakyReLU(),
            nn.Linear(48, 32),
            nn.LeakyReLU(),
            nn.Linear(32, 16),
            nn.LeakyReLU(),
            nn.Linear(16, 1),
        ).to(device)

        trained = train(net, training_data)
        trained_extended = train(net_extraneous, etraining_data)
    except Exception as e:
        log = LogStructs.learning_fail(LINE_NAME, DIRECTION, str(e))
        %sql INSERT INTO public.ai_logs (line, direction, log_type, log) VALUES (:LINE_NAME, :DIRECTION, :LEARNING_FAIL, :log)

    %sql UPDATE public.ai_tasks SET status = 'SUCCESS' WHERE line_name = :LINE_NAME AND direction = :DIRECTION;

In [None]:
run_script()

In [None]:
%sql UPDATE public.ai_tasks SET status = 'SUCCESS' WHERE line_name = :LINE_NAME AND direction = :DIRECTION