In [None]:
!pip install scikit-image
!pip install lightning
!pip install tensorboard
!pip install mlflow

In [None]:
import time
from tqdm.notebook import tqdm
from collections import defaultdict

import numpy as np
import pandas as pd
import seaborn as sns
import scipy.stats as sps
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler

import lightning.pytorch as pl
from lightning.pytorch import loggers as pl_loggers
from lightning.pytorch.callbacks import ModelCheckpoint

import torch
from torch import nn
from torch.utils.data import Dataset, TensorDataset, \
                             DataLoader, RandomSampler, SequentialSampler

from IPython.display import clear_output
from pylab import rcParams

rcParams['figure.figsize'] = 15, 7
%matplotlib inline

sns.set(font_scale=1.3, palette='Set2')

# Данные

## Биология

#### **Профиль биология**

### Загрузка данных
В этом задании вы будете работать с данными о распространении COVID-19, их можно скачать [здесь](https://www.kaggle.com/datasets/gpreda/coronavirus-2019ncov). Мы будем предказывать показатель смертности, потому что способы измерения выздоровевших и заболевших разнятся между регионами.

 В данных представлены следующие столбцы:

* `Country/Region` &mdash; страна или регион,

* `Province/State` &mdash; город или населенный пункт,
* `Latitude` &mdash; географическая широта,
* `Longitude` &mdash; географическая долгота,
* `Confirmed` &mdash; кол-во подтвержденных случаев заболевания,
* `Recovered` &mdash; кол-во подтвержденных случаев выздоровления,
* `Deaths` &mdash; кол-во смертей,
* `Date` &mdash; дата.

Выгрузим датасет.

In [None]:
!unzip archive

In [None]:
df = pd.read_csv('covid-19-all.csv', dtype={'Country/Region': str, 'Province/State': str}, low_memory=False)
df['Date'] = pd.to_datetime(df['Date'])
df.head()

Установим в качестве индекса дату.

In [None]:
df.set_index('Date', inplace=True)
df.head()

Избавимся от None если они есть.

In [None]:
df.dropna(inplace=True)
df.head()

Выберем страну и регион, где хотим предсказывать.

In [None]:
country = 'Russia'
region = 'Moscow'

df_selected = df[(df['Country/Region'] == country) & (df['Province/State'] == region)]
df_selected.head()

Удалим лишние столбцы в выбранных данных.

In [None]:
df_selected = df_selected[['Confirmed', 'Recovered', 'Deaths']]
df_selected.head()

In [None]:
data = df_selected.loc[:, ['Deaths']].values

Посмотрим на данные. Постройте графики заболевших, выздоровевших и количества смертей.

In [None]:
def plot_ts(y_to_train, y_to_test=None, y_forecast=None, ylim=None, title=""):
    """
    Функция для визуализации временного ряда и предсказания.
    """
    plt.figure(figsize=(15, 5))
    plt.plot(np.arange(len(y_to_train)), y_to_train, label='Train')

    if y_to_test is not None:
        plt.plot(np.arange(len(y_to_train), len(y_to_train) + len(y_to_test)), y_to_test, label='Test')
        if y_forecast is not None:
            plt.plot(np.arange(len(y_to_train), len(y_to_train) + len(y_forecast)), y_forecast, label='Prediction')

    if ylim is not None:
        plt.ylim(ylim)

    plt.title(title)
    plt.legend()
    plt.show()

plot_ts(df_selected['Confirmed'], title="Динамика подтвержденных случаев COVID-19")
plot_ts(df_selected['Deaths'], title="Динамика смертей от COVID-19")
plot_ts(df_selected['Recovered'], title="Динамика выздоровлений от COVID-19")

# Модели и обучение

## Вспомогательные функции

Напишите класс датасета для данных в виде последовательности.

In [None]:
class TSDataset(torch.utils.data.Dataset):
    def __init__(self, data, timesteps):
        self.data = data
        self.timesteps = timesteps

    def __len__(self):
        return len(self.data) - self.timesteps

    def __getitem__(self, index):
        X = self.data[index:index + self.timesteps][['Confirmed']].values
        y = self.data.iloc[index + self.timesteps]['Deaths']
        return torch.FloatTensor(X), torch.FloatTensor([y])

Здесь мы создаём функции для обучения, для вывода кривых обучения и для рекурсивного предсказания.

In [None]:
def evaluate_ts_model(model, start_seq, test_data, scaler, return_all=False, device='cpu'):
    '''
    Функция для проверки качества модели на обучающем отрезке ряда.

    :param model: обучаемая модель,
    :param start_seq: обучающие данные для первого предсказания,
    :param test_data: тестовые данные.
    :param return_all: возвращать все предсказания или только для 1-го магазина

    :return: результаты предсказания.
    '''
    result = []
    model.train(False)
    input_tensor = torch.FloatTensor(start_seq).to(device).unsqueeze(0)

    with torch.no_grad():
        for i in range(len(test_data)):
            # делаем предсказание, а unsqueeze нужны, чтобы сделать размерность (1, 1, 1) вместо (1)

            logits = model(input_tensor[:, i:, :]).unsqueeze(0)#.unsqueeze(2)

            # присоединяем предсказанное значение к последовательности:
            #                        (1, timestep, 1) -> (1, 1, 1)   по оси 1
            input_tensor = torch.cat((input_tensor,        logits),       1    )

            # обратное преобразование к нормальным числам
            logits = scaler.inverse_transform(logits.cpu().numpy().squeeze(0))

            # результат сохраняем
            result.append(logits.squeeze())

    if return_all:
        return np.array(result)

    return np.array(result)

Создадим класс модели.

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, num_lstm_layers, use_pool=False):
        super(LSTM, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers  = num_lstm_layers
        self.input_size  = input_size

        self.lstm = nn.LSTM(input_size, hidden_size, num_lstm_layers, batch_first=True, dropout=0.2) # LSTM-модель с batch_first=True и dropout=0.2

        # Размерность пространства выхода последнего LSTM-слоя равна hidden_size.
        # Линейный слой нужен, чтобы преобразовать выход LSTM к нужному размеру output_size.
        self.fc = nn.Linear(hidden_size, output_size) # добавьте линейный слой

        # Пуллинг
        self.pool = nn.AdaptiveAvgPool1d(1) # добавьте усредняющий все выходы пуллинг-слой
        self.use_pool = use_pool


    def forward(self, input_seq):

        # инициализируем начальные скрытые состояния
        h_0 = torch.zeros(self.num_layers, input_seq.size(0), self.hidden_size).to(device=input_seq.device)
        c_0 = torch.zeros(self.num_layers, input_seq.size(0), self.hidden_size).to(device=input_seq.device)

        out, (_, _) = self.lstm(input_seq, (h_0, c_0))

        if self.use_pool:
            # берем среднее от векторов для всей последовательности
            out_to_fc = self.pool(out.transpose(1, 2)).squeeze(-1)
        else:
            # берем последний выходной вектор
            out_to_fc = out[:, -1, :]

        return self.fc(out_to_fc)

Напишем функцию для визуализации результатов предсказания.

In [None]:
def plot_results(y_to_train, y_to_val=None, y_to_test=None, y_forecast=None):
    """
        Функция для визуализации временного ряда и предсказания.

        Параметры:
            - y_to_train: pd.Series
                Временной ряд, на котором обучалась модель.
            - y_to_test: pd.Series
                Временной ряд, который предсказывает модель.
            - y_forecast: array
                Предсказания модели.
            - plot_conf_int: bool
                Надо ли строить предсказательного интервал.
            - left_bound: array
                Левая граница предсказательного интервала.
            - right_bound: array
                Правая граница предсказательного интервала.
    """
    plt.figure(figsize=(15, 5))
    plt.plot(y_to_train.index, y_to_train, label='Train', color='blue')

    if y_to_val is not None:
        plt.plot(y_to_val.index, y_to_val, label='Validation', color='orange')

    if y_to_test is not None:
        plt.plot(y_to_test.index, y_to_test, label='Test', color='red')
        if y_forecast is not None:
            plt.plot(y_to_test.index, y_forecast, label='Prediction', color='green')

    plt.xlabel('Date')
    plt.ylabel('Cases')
    plt.title("COVID-19 Cases Over Time")
    plt.legend()
    plt.show()

## Биология

Разделим выборку на тест и трейн.

In [None]:
train_time = pd.Timestamp('2020-11-01')  # Дата для разделения обучения и валидации
val_time = pd.Timestamp('2020-12-01')  # Дата для разделения валидации и теста

train_bio = df_selected[(df_selected.index < train_time)]
val_bio = df_selected[(df_selected.index >= train_time) & (df_selected.index < val_time)]
test_bio = df_selected[(df_selected.index >= val_time)]

Посмотрим на то, что получилось.

In [None]:
plot_results(train_bio, val_bio, test_bio)

Преобразуем данные.

In [None]:
scaler = MinMaxScaler(feature_range=(-1, 1))

train_bio_scaled = scaler.fit_transform(train_bio[['Confirmed', 'Deaths']])
val_bio_scaled = scaler.transform(val_bio[['Confirmed', 'Deaths']])
test_bio_scaled = scaler.transform(test_bio[['Confirmed', 'Deaths']])

Зададим длину подпоследовательности, размер батча, девайс.

In [None]:
timesteps = 7  # Длина подпоследовательности
batch_size = 32  # Размер батча

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

Преобразуем к датасету с помощью нашего класса и сделаем генератор батчей.

In [None]:
# Преобразуем данные в датасеты
train_dataset = TSDataset(pd.DataFrame(train_bio_scaled, columns=['Confirmed', 'Deaths']), timesteps)
val_dataset = TSDataset(pd.DataFrame(val_bio_scaled, columns=['Confirmed', 'Deaths']), timesteps)
test_dataset = TSDataset(pd.DataFrame(test_bio_scaled, columns=['Confirmed', 'Deaths']), timesteps)

# Создание DataLoader для батчей
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Инициализируем модель.

In [None]:
class TSModel(pl.LightningModule):
    def __init__(self, model, lr=5e-4):
        super().__init__()
        self.lr = lr
        self.criterion = nn.MSELoss()
        self.save_hyperparameters()
        self.model = model
        self.predictions = []

    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=self.lr)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x_batch, y_batch = batch
        output = self.forward(x_batch)
        loss = self.criterion(output.squeeze(), y_batch.squeeze())
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        x_batch, y_batch = batch
        output = self.forward(x_batch)
        loss = self.criterion(output.squeeze(), y_batch.squeeze())
        self.log('val_loss', loss)
        return loss

    def test_step(self, batch, batch_idx):
        x_batch, y_batch = batch
        output = self.forward(x_batch)
        loss = self.criterion(output.squeeze(), y_batch.squeeze())
        self.log('test_loss', loss)
        self.predictions.append(output.detach().cpu())
        return loss

    def transfer_batch_to_device(self, batch, device, dataloader_idx):
        x_batch, y_batch = batch
        x_batch = x_batch.type(torch.float32).to(device)
        y_batch = y_batch.type(torch.float32).to(device)
        return x_batch, y_batch

In [None]:
lstm_model = LSTM(
    input_size=1,  # Количество входных признаков
    output_size=1,  # Предсказываем одно значение
    hidden_size=50,  # Размер скрытого слоя
    num_lstm_layers=2,  # Количество слоев LSTM
    use_pool=False  # Без пуллинга
)

lstm_model_1 = TSModel(
                    model=lstm_model,
                    lr=5e-4,
                    )

Обучаем её.

In [None]:
checkpoint_callback = pl.callbacks.ModelCheckpoint(monitor='val_loss', mode='min')

tb_logger = pl_loggers.TensorBoardLogger(save_dir="lightning_logs/", name="lstm_model")

trainer = pl.Trainer(logger=tb_logger,
                     accelerator='gpu',
                     max_epochs=10,
                     devices=1,
                     val_check_interval=5,
                     callbacks=[checkpoint_callback])

In [None]:
trainer.fit(lstm_model_1, train_loader, val_loader)

In [None]:
lstm_model_1.cpu()
start_seq = train_scaled[-timesteps:]
test_pred = evaluate_ts_model(lstm_model_1, start_seq, scaler.transform(test), scaler)
plot_ts(train, test, test_pred)

Создадим предсказания на тесте.

In [None]:
trainer.test(lstm_model_1, test_loader)

Сравним предсказание с реальностью.

In [None]:
# Переместите модель на CPU, если это необходимо
ts_model.cpu()

# Получите последние timesteps из обучающей выборки для начала предсказания
start_seq = train_bio_scaled[-timesteps:]

# Преобразуйте данные для предсказания
# Мы должны преобразовать их в форму, подходящую для модели (добавить размерность)
start_seq = torch.FloatTensor(start_seq).unsqueeze(0)  # Добавляем размерность для batch_size

# Получите предсказания на тестовой выборке
test_pred = evaluate_ts_model(ts_model.model, start_seq, scaler.transform(test_bio[['Confirmed', 'Deaths']]), scaler)

# Визуализируем результаты
plt.figure(figsize=(15, 5))
plt.plot(range(len(train_bio)), train_bio['Deaths'], label='Train Deaths', color='blue')
plt.plot(range(len(train_bio), len(train_bio) + len(test_bio)), test_bio['Deaths'], label='True Test Deaths', color='green')
plt.plot(range(len(train_bio), len(train_bio) + len(test_pred)), test_pred, label='Predicted Test Deaths', color='orange')
plt.title('Comparison of Real and Predicted Deaths')
plt.xlabel('Time Steps')
plt.ylabel('Deaths')
plt.legend()
plt.show()


**Вывод**