# Улучшенная модель из курсовой работы

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import matplotlib.pyplot as plt

# Параметры
NUM_POINTS = 5
INPUT_SIZE = 4
OUTPUT_SIZE = 4
HIDDEN_SIZE = 256
NUM_LAYERS = 4
LEARNING_RATE = 0.001
NUM_EPOCHS = 200
BATCH_SIZE = 64
MODEL_PATH = 'model_advanced/model.pth'
ERROR_THRESHOLD = 0.1  # Порог для определения "правильно предсказанных" точек

# Датасет
class TrajectoryDataset(Dataset):
    def __init__(self, directory):
        self.data = []
        self.load_data(directory)

    def load_data(self, directory):
        for filename in os.listdir(directory):
            if filename.endswith('.txt'):
                file_path = os.path.join(directory, filename)
                trajectory = np.loadtxt(file_path)
                for i in range(NUM_POINTS, len(trajectory)):
                    input_data = trajectory[i - NUM_POINTS:i]
                    output_data = trajectory[i - NUM_POINTS - 1]
                    self.data.append((input_data, output_data))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx][0], dtype=torch.float32), torch.tensor(self.data[idx][1], dtype=torch.float32)

# Улучшенная модель
class AdvancedTrajectoryRNN(nn.Module):
    def __init__(self):
        super(AdvancedTrajectoryRNN, self).__init__()
        # Входная проекция
        self.input_projection = nn.Sequential(
            nn.Linear(INPUT_SIZE, HIDDEN_SIZE // 2),
            nn.ReLU(),
            nn.Linear(HIDDEN_SIZE // 2, HIDDEN_SIZE),
            nn.ReLU()
        )
        
        # Рекуррентный блок
        self.lstm = nn.LSTM(HIDDEN_SIZE, HIDDEN_SIZE, NUM_LAYERS, batch_first=True, dropout=0.4)
        
        # Глобальный механизм внимания
        self.attention = nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE)
        self.attention_context = nn.Parameter(torch.randn(HIDDEN_SIZE))
        
        # Batch Normalization и проекции
        self.batch_norm = nn.BatchNorm1d(HIDDEN_SIZE)
        self.residual_connection = nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE)
        
        # Выходной слой
        self.output_layers = nn.Sequential(
            nn.Linear(HIDDEN_SIZE, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, OUTPUT_SIZE)
        )

    def forward(self, x):
        # Входная проекция
        x_proj = self.input_projection(x)  # (batch_size, NUM_POINTS, HIDDEN_SIZE)
        
        # LSTM
        lstm_out, _ = self.lstm(x_proj)  # (batch_size, NUM_POINTS, HIDDEN_SIZE)
        
        # Механизм внимания
        attention_weights = torch.tanh(self.attention(lstm_out))  # (batch_size, NUM_POINTS, HIDDEN_SIZE)
        attention_weights = torch.matmul(attention_weights, self.attention_context)  # (batch_size, NUM_POINTS)
        attention_weights = torch.softmax(attention_weights, dim=1)  # (batch_size, NUM_POINTS)
        context_vector = torch.sum(lstm_out * attention_weights.unsqueeze(-1), dim=1)  # (batch_size, HIDDEN_SIZE)
        
        # BatchNorm
        normalized_out = self.batch_norm(context_vector)
        
        # Остаточная связь
        residual_out = normalized_out + self.residual_connection(normalized_out)
        
        # Выходной слой
        output = self.output_layers(residual_out)
        return output

# Функции для вычисления ошибок
def compute_mse(outputs, targets):
    return torch.mean((outputs - targets) ** 2)

def compute_mae(outputs, targets):
    return torch.mean(torch.abs(outputs - targets))

def compute_med(outputs, targets):
    # Евклидово расстояние (MED) между предсказанием и истинной точкой
    distance = torch.norm(outputs - targets, dim=1)
    return torch.mean(distance)

def compute_accuracy(outputs, targets, error_threshold=ERROR_THRESHOLD):
    # Евклидово расстояние (MED) между предсказанием и истинной точкой
    distance = torch.norm(outputs - targets, dim=1)
    # Если расстояние меньше порога, считаем точку предсказанную правильно
    correct_predictions = (distance < error_threshold).float()
    accuracy = correct_predictions.mean().item()
    return accuracy

# Основная функция
def main(directory):
    os.makedirs('model_advanced', exist_ok=True)

    dataset = TrajectoryDataset(directory)
    dataset_size = len(dataset)
    val_size = int(0.2 * dataset_size)  # 20% данных на валидацию
    train_size = dataset_size - val_size

    # Разделение на обучающую и валидационную выборки
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    model = AdvancedTrajectoryRNN()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.MSELoss()

    best_loss = float('inf')
    epoch_losses = []
    val_losses = []
    train_mse = []
    val_mse = []
    train_mae = []
    val_mae = []
    train_med = []
    val_med = []
    train_accuracy = []
    val_accuracy = []

    for epoch in range(NUM_EPOCHS):
        model.train()
        epoch_loss = 0.0
        epoch_mse = 0.0
        epoch_mae = 0.0
        epoch_med = 0.0
        epoch_accuracy = 0.0
        for inputs, targets in train_dataloader:
            inputs = inputs.view(-1, NUM_POINTS, INPUT_SIZE)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

            # Вычисляем метрики
            mse = compute_mse(outputs, targets)
            mae = compute_mae(outputs, targets)
            med = compute_med(outputs, targets)
            accuracy = compute_accuracy(outputs, targets, ERROR_THRESHOLD)

            epoch_mse += mse.item()
            epoch_mae += mae.item()
            epoch_med += med.item()
            epoch_accuracy += accuracy

        average_loss = epoch_loss / len(train_dataloader)
        average_mse = epoch_mse / len(train_dataloader)
        average_mae = epoch_mae / len(train_dataloader)
        average_med = epoch_med / len(train_dataloader)
        average_accuracy = epoch_accuracy / len(train_dataloader)

        epoch_losses.append(average_loss)
        train_mse.append(average_mse)
        train_mae.append(average_mae)
        train_med.append(average_med)
        train_accuracy.append(average_accuracy)

        # Валидация
        model.eval()
        val_loss = 0.0
        val_mse_val = 0.0
        val_mae_val = 0.0
        val_med_val = 0.0
        val_accuracy_val = 0.0
        with torch.no_grad():
            for inputs, targets in val_dataloader:
                inputs = inputs.view(-1, NUM_POINTS, INPUT_SIZE)
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item()

                # Вычисляем метрики
                mse = compute_mse(outputs, targets)
                mae = compute_mae(outputs, targets)
                med = compute_med(outputs, targets)
                accuracy = compute_accuracy(outputs, targets, ERROR_THRESHOLD)

                val_mse_val += mse.item()
                val_mae_val += mae.item()
                val_med_val += med.item()
                val_accuracy_val += accuracy

        average_val_loss = val_loss / len(val_dataloader)
        average_val_mse = val_mse_val / len(val_dataloader)
        average_val_mae = val_mae_val / len(val_dataloader)
        average_val_med = val_med_val / len(val_dataloader)
        average_val_accuracy = val_accuracy_val / len(val_dataloader)

        val_losses.append(average_val_loss)
        val_mse.append(average_val_mse)
        val_mae.append(average_val_mae)
        val_med.append(average_val_med)
        val_accuracy.append(average_val_accuracy)

        # Сохраняем модель, если потеря на валидации улучшилась
        if average_val_loss < best_loss:
            best_loss = average_val_loss
            torch.save(model.state_dict(), MODEL_PATH)

            print(f'Модель сохранена на эпохе {epoch + 1} с потерей {average_val_loss:.4f}')
            print(f'Модель сохранена по пути: {os.path.abspath(MODEL_PATH)}')  # Путь сохраненной модели

        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch + 1}/{NUM_EPOCHS}], Train Loss: {average_loss:.4f}, Train MSE: {average_mse:.4f}, Train MAE: {average_mae:.4f}, Train MED: {average_med:.4f}, Train Accuracy: {average_accuracy:.4f}, Val Loss: {average_val_loss:.4f}, Val MSE: {average_val_mse:.4f}, Val MAE: {average_val_mae:.4f}, Val MED: {average_val_med:.4f}, Val Accuracy: {average_val_accuracy:.4f}')

    print('Обучение завершено!')

    # Визуализация
    plt.figure(figsize=(10, 5))
    plt.plot(range(1, NUM_EPOCHS + 1), epoch_losses, label='Train Loss', color='blue')
    plt.plot(range(1, NUM_EPOCHS + 1), val_losses, label='Validation Loss', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss Over Epochs')
    plt.legend()
    plt.grid()
    plt.savefig('loss_plot.png')
    plt.show()

    plt.figure(figsize=(10, 5))
    plt.plot(range(1, NUM_EPOCHS + 1), train_mse, label='Train MSE', color='blue')
    plt.plot(range(1, NUM_EPOCHS + 1), val_mse, label='Validation MSE', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('MSE')
    plt.title('Training and Validation MSE Over Epochs')
    plt.legend()
    plt.grid()
    plt.savefig('mse_plot.png')
    plt.show()

    plt.figure(figsize=(10, 5))
    plt.plot(range(1, NUM_EPOCHS + 1), train_mae, label='Train MAE', color='blue')
    plt.plot(range(1, NUM_EPOCHS + 1), val_mae, label='Validation MAE', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('MAE')
    plt.title('Training and Validation MAE Over Epochs')
    plt.legend()
    plt.grid()
    plt.savefig('mae_plot.png')
    plt.show()

    plt.figure(figsize=(10, 5))
    plt.plot(range(1, NUM_EPOCHS + 1), train_med, label='Train MED', color='blue')
    plt.plot(range(1, NUM_EPOCHS + 1), val_med, label='Validation MED', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('MED')
    plt.title('Training and Validation MED Over Epochs')
    plt.legend()
    plt.grid()
    plt.savefig('med_plot.png')
    plt.show()

    plt.figure(figsize=(10, 5))
    plt.plot(range(1, NUM_EPOCHS + 1), train_accuracy, label='Train Accuracy', color='blue')
    plt.plot(range(1, NUM_EPOCHS + 1), val_accuracy, label='Validation Accuracy', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy Over Epochs')
    plt.legend()
    plt.grid()
    plt.savefig('accuracy_plot.png')
    plt.show()

# Запуск
if __name__ == "__main__":
    data_directory = 'New_traectory_norm'
    main(data_directory)


## Изменен тип сохранений. Теперь сохраняется на основании лучшей Val accuracy

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import matplotlib.pyplot as plt

# Параметры
NUM_POINTS = 5
INPUT_SIZE = 4
OUTPUT_SIZE = 4
HIDDEN_SIZE = 256
NUM_LAYERS = 4
LEARNING_RATE = 0.001
NUM_EPOCHS = 200
BATCH_SIZE = 64
MODEL_PATH = 'model_advanced/model_ver2.pth'
ERROR_THRESHOLD = 0.1  # Порог для определения "правильно предсказанных" точек

# Датасет
class TrajectoryDataset(Dataset):
    def __init__(self, directory):
        self.data = []
        self.load_data(directory)

    def load_data(self, directory):
        for filename in os.listdir(directory):
            if filename.endswith('.txt'):
                file_path = os.path.join(directory, filename)
                trajectory = np.loadtxt(file_path)
                for i in range(NUM_POINTS, len(trajectory)):
                    input_data = trajectory[i - NUM_POINTS:i]
                    output_data = trajectory[i - NUM_POINTS - 1]
                    self.data.append((input_data, output_data))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx][0], dtype=torch.float32), torch.tensor(self.data[idx][1], dtype=torch.float32)

# Улучшенная модель
class AdvancedTrajectoryRNN(nn.Module):
    def __init__(self):
        super(AdvancedTrajectoryRNN, self).__init__()
        # Входная проекция
        self.input_projection = nn.Sequential(
            nn.Linear(INPUT_SIZE, HIDDEN_SIZE // 2),
            nn.ReLU(),
            nn.Linear(HIDDEN_SIZE // 2, HIDDEN_SIZE),
            nn.ReLU()
        )
        
        # Рекуррентный блок
        self.lstm = nn.LSTM(HIDDEN_SIZE, HIDDEN_SIZE, NUM_LAYERS, batch_first=True, dropout=0.4)
        
        # Глобальный механизм внимания
        self.attention = nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE)
        self.attention_context = nn.Parameter(torch.randn(HIDDEN_SIZE))
        
        # Batch Normalization и проекции
        self.batch_norm = nn.BatchNorm1d(HIDDEN_SIZE)
        self.residual_connection = nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE)
        
        # Выходной слой
        self.output_layers = nn.Sequential(
            nn.Linear(HIDDEN_SIZE, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, OUTPUT_SIZE)
        )

    def forward(self, x):
        # Входная проекция
        x_proj = self.input_projection(x)  # (batch_size, NUM_POINTS, HIDDEN_SIZE)
        
        # LSTM
        lstm_out, _ = self.lstm(x_proj)  # (batch_size, NUM_POINTS, HIDDEN_SIZE)
        
        # Механизм внимания
        attention_weights = torch.tanh(self.attention(lstm_out))  # (batch_size, NUM_POINTS, HIDDEN_SIZE)
        attention_weights = torch.matmul(attention_weights, self.attention_context)  # (batch_size, NUM_POINTS)
        attention_weights = torch.softmax(attention_weights, dim=1)  # (batch_size, NUM_POINTS)
        context_vector = torch.sum(lstm_out * attention_weights.unsqueeze(-1), dim=1)  # (batch_size, HIDDEN_SIZE)
        
        # BatchNorm
        normalized_out = self.batch_norm(context_vector)
        
        # Остаточная связь
        residual_out = normalized_out + self.residual_connection(normalized_out)
        
        # Выходной слой
        output = self.output_layers(residual_out)
        return output

# Функции для вычисления ошибок
def compute_mse(outputs, targets):
    return torch.mean((outputs - targets) ** 2)

def compute_mae(outputs, targets):
    return torch.mean(torch.abs(outputs - targets))

def compute_med(outputs, targets):
    # Евклидово расстояние (MED) между предсказанием и истинной точкой
    distance = torch.norm(outputs - targets, dim=1)
    return torch.mean(distance)

def compute_accuracy(outputs, targets, error_threshold=ERROR_THRESHOLD):
    # Евклидово расстояние (MED) между предсказанием и истинной точкой
    distance = torch.norm(outputs - targets, dim=1)
    # Если расстояние меньше порога, считаем точку предсказанную правильно
    correct_predictions = (distance < error_threshold).float()
    accuracy = correct_predictions.mean().item()
    return accuracy

# Основная функция
def main(directory):
    os.makedirs('model_advanced', exist_ok=True)

    dataset = TrajectoryDataset(directory)
    dataset_size = len(dataset)
    val_size = int(0.2 * dataset_size)  # 20% данных на валидацию
    train_size = dataset_size - val_size

    # Разделение на обучающую и валидационную выборки
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    model = AdvancedTrajectoryRNN()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.MSELoss()

    best_val_accuracy = -1  # Изначально устанавливаем отрицательное значение

    epoch_losses = []
    val_losses = []
    train_accuracy = []
    val_accuracy = []

    for epoch in range(NUM_EPOCHS):
        model.train()
        epoch_loss = 0.0
        epoch_accuracy = 0.0
        for inputs, targets in train_dataloader:
            inputs = inputs.view(-1, NUM_POINTS, INPUT_SIZE)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

            # Вычисляем метрики
            accuracy = compute_accuracy(outputs, targets, ERROR_THRESHOLD)
            epoch_accuracy += accuracy

        average_loss = epoch_loss / len(train_dataloader)
        average_accuracy = epoch_accuracy / len(train_dataloader)

        epoch_losses.append(average_loss)
        train_accuracy.append(average_accuracy)

        # Валидация
        model.eval()
        val_loss = 0.0
        val_accuracy_val = 0.0
        with torch.no_grad():
            for inputs, targets in val_dataloader:
                inputs = inputs.view(-1, NUM_POINTS, INPUT_SIZE)
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item()

                # Вычисляем метрики
                accuracy = compute_accuracy(outputs, targets, ERROR_THRESHOLD)
                val_accuracy_val += accuracy

        average_val_loss = val_loss / len(val_dataloader)
        average_val_accuracy = val_accuracy_val / len(val_dataloader)

        val_losses.append(average_val_loss)
        val_accuracy.append(average_val_accuracy)

        # Сохраняем модель, если точность на валидации улучшилась
        if average_val_accuracy > best_val_accuracy:
            best_val_accuracy = average_val_accuracy
            torch.save(model.state_dict(), MODEL_PATH)

            print(f'Модель сохранена на эпохе {epoch + 1} с точностью {average_val_accuracy:.4f}')
            print(f'Модель сохранена по пути: {os.path.abspath(MODEL_PATH)}')  # Путь сохраненной модели

        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch + 1}/{NUM_EPOCHS}], Train Loss: {average_loss:.4f}, Train Accuracy: {average_accuracy:.4f}, Val Loss: {average_val_loss:.4f}, Val Accuracy: {average_val_accuracy:.4f}')

    print('Обучение завершено!')

    # Визуализация
    plt.figure(figsize=(10, 5))
    plt.plot(range(1, NUM_EPOCHS + 1), epoch_losses, label='Train Loss', color='blue')
    plt.plot(range(1, NUM_EPOCHS + 1), val_losses, label='Validation Loss', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss Over Epochs')
    plt.legend()
    plt.grid()
    plt.savefig('loss_plot.png')
    plt.show()

    plt.figure(figsize=(10, 5))
    plt.plot(range(1, NUM_EPOCHS + 1), train_accuracy, label='Train Accuracy', color='blue')
    plt.plot(range(1, NUM_EPOCHS + 1), val_accuracy, label='Validation Accuracy', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy Over Epochs')
    plt.legend()
    plt.grid()
    plt.savefig('accuracy_plot.png')
    plt.show()

if __name__ == "__main__":
    data_directory = 'New_traectory_norm'
    main(data_directory)

Модель сохранена на эпохе 1 с точностью 0.7218
Модель сохранена по пути: c:\Users\PC1\Desktop\Диплом\git_folder\Diplom_VKR\model_advanced\model_ver2.pth
