In [None]:
import pandas as pd
import numpy as np

import seaborn as sns
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

Считывание выборок

In [None]:
SAMPLE_PATH: str = "../data/raw/spectr/Выборка_Н22_5_1.csv"
SAMPLE_REST: str = "../data/raw/spectr/СКЗУ_ВЫБОРКА_ЯНВАР-МАРТ.csv"

Первая часть

In [None]:
df = pd.read_csv(SAMPLE_PATH, skiprows=16, sep=';')
df.drop(columns=df.columns[[0, 1]], inplace=True)

column_names = pd.to_datetime(df.iloc[0, ::2].tolist(), format='%d.%m.%Y %H:%M:%S') 
df = df.iloc[:, 1::2]
df.columns = column_names
df = df.sort_index(axis=1)

df = df.apply(lambda x:
              pd.to_numeric(
                  x.str.replace(',','.'),
                  errors='coerce')
              )

Сжать выборки (Отцентровать и усреднить)

In [None]:
df = df.apply(lambda x: np.mean(x.to_numpy().reshape(-1, 257), axis=1), axis=0)

Вторая часть

In [None]:
df_rest = pd.read_csv(SAMPLE_REST, skiprows=16, sep=';')
df_rest.drop(columns=df_rest.columns[[0, 1]], inplace=True)

column_names = pd.to_datetime(df_rest.iloc[0, ::2].tolist(), format='%d.%m.%Y %H:%M:%S') 
df_rest = df_rest.iloc[:, 1::2]
df_rest.columns = column_names
df_rest = df_rest.sort_index(axis=1)

df_rest = df_rest.apply(lambda x:
                        pd.to_numeric(
                            x.str.replace(',','.'),
                            errors='coerce')
                        )

In [None]:
df_rest = df_rest.apply(lambda x: np.mean(x.to_numpy().reshape(-1, 257), axis=1), axis=0)

Возьмём только первые 700 часов (т.е. до второй половины декабря 2022 года), потому что дальше что-то странное с данными выборок

In [None]:
df

In [None]:
df_rest

Объединить 

In [None]:
df = pd.concat([df, df_rest], axis=1)

# Использование спектра для построения индикатора здоровья

In [None]:
df = pd.read_csv("../data/processed/spectr/RMSA.csv")

In [None]:
def label_func(max_val: int):
    lin = np.linspace(0, max_val, max_val + 1)
    return -(lin ** 2 / lin.max() ** 2) + 1

def label_func2(max_val: int):
    lin = np.linspace(0, max_val, max_val + 1)
    return -(lin / lin.max()) + 1

Аппроксимируем закон деградации (метки)

In [None]:
labels = label_func(2224)

Построение автоэнкодера

In [None]:
def criterion(encoder_output, encoder_target, decoder_output, decoder_target, w):
    criterion_mse = nn.MSELoss()
    loss_encoder = criterion_mse(encoder_output, encoder_target)
    loss_decoder = criterion_mse(decoder_output, decoder_target)
    return loss_encoder + w * loss_decoder

class CustomAutoencoder(nn.Module):
    def __init__(self):
        super(CustomAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 20, kernel_size=3, stride=2, padding=1), 
            nn.MaxPool1d(2, stride=2),                         
            nn.BatchNorm1d(20),
            nn.ReLU(),
            nn.Conv1d(20, 40, kernel_size=3, stride=2, padding=1),   
            nn.MaxPool1d(2, stride=2),                              
            nn.BatchNorm1d(40),
            nn.ReLU(),
            nn.Conv1d(40, 80, kernel_size=3, stride=2, padding=1),  
            nn.MaxPool1d(2, stride=2),                            
            nn.BatchNorm1d(80),
            nn.ReLU(),
            nn.Conv1d(80, 40, kernel_size=3, stride=2, padding=1),   
            nn.BatchNorm1d(40),
            nn.ReLU(),
            nn.Conv1d(40, 20, kernel_size=3, stride=1, padding=1),  
            nn.BatchNorm1d(20),
            nn.ReLU(),
            nn.Conv1d(20, 10, kernel_size=3, stride=1, padding=1),   
            nn.BatchNorm1d(10),
            nn.ReLU(),
            nn.Conv1d(10, 1, kernel_size=3, stride=1, padding=1),   
            nn.BatchNorm1d(1),
            nn.Sigmoid(),
            nn.MaxPool1d(kernel_size=3, dilation=1, padding=1),
            nn.Sigmoid(),
        )
        self.decoder = nn.Sequential(
            # L_in = 1
            # L_out = (1 - 1) * 2 - 2 * 1 + 1 * (3 - 1) + 1 + 1
            nn.ConvTranspose1d(1, 10, kernel_size=3, stride=2, padding=1, output_padding=1), # (batch_size, 10, 2)
            nn.BatchNorm1d(10),
            nn.ReLU(), 
            # L_in = 2
            # L_out = (2 - 1) * 2 - 2 * 1 + 1 * (3 - 1) + 1 + 1
            nn.ConvTranspose1d(10, 20, kernel_size=3, stride=2, padding=1, output_padding=1), # (batch_size, 20, 4)
            nn.BatchNorm1d(20),
            nn.ReLU(),
            nn.ConvTranspose1d(20, 40, kernel_size=3, stride=2, padding=1, output_padding=1), # (batch_size, 40, 8)
            nn.BatchNorm1d(40),
            nn.ReLU(),
            nn.ConvTranspose1d(40, 80, kernel_size=3, stride=2, padding=1, output_padding=1), # (batch_size, 80, 16)
            nn.BatchNorm1d(80),
            nn.ReLU(),
            
            nn.ConvTranspose1d(80, 40, kernel_size=3, stride=2, padding=1, output_padding=1), # (batch_size, 40, 32)
            nn.BatchNorm1d(40),
            nn.ReLU(),
            nn.ConvTranspose1d(40, 20, kernel_size=3, stride=2, padding=1, output_padding=1), # (batch_size, 20, 64)
            nn.BatchNorm1d(20),
            nn.ReLU(),

            nn.ConvTranspose1d(20, 10, kernel_size=3, stride=2, padding=1, output_padding=1), # (batch_size, 20, 128)
            nn.BatchNorm1d(10),
            nn.ReLU(),
            nn.ConvTranspose1d(10, 1, kernel_size=9, stride=2, padding=1, output_padding=3, dilation=5), # (batch_size, 1, 296)
            nn.ReLU()
        )
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded

def train(param, data, labels, num_epochs=100, w=1.0):
    model = CustomAutoencoder()
    optimizer = torch.optim.Adam(model.parameters(), lr=param['lr'])
    if len(data.shape) == 2:
        data = data.unsqueeze(1)  
    
    dataset = TensorDataset(data, labels)
    train_loader = DataLoader(dataset, batch_size=param['batch_size'], shuffle=False)
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, encoder_targets in train_loader:
            optimizer.zero_grad()
            encoder_outputs, decoder_outputs = model(inputs)
            loss = criterion(encoder_outputs, encoder_targets, decoder_outputs, inputs, w)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        avg_loss = running_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")
    
    return model

# Пример вызова
param = {
    'lr': 0.0005,
    'batch_size': 32
}

data = torch.tensor(df.T.values, dtype=torch.float32)
labels = torch.tensor(labels, dtype=torch.float32)

trained_model = train(param, data, labels, num_epochs=100, w=0.001)



In [None]:
HI = []
trained_model.eval()
with torch.no_grad():    
    for i in range(2225):
        HI.append(trained_model(torch.Tensor(df.T.iloc[i].to_numpy().reshape(1, 1, -1)))[0].item())

In [None]:
# def exponential_moving_average(data, window):
#     weights = np.exp(np.linspace(-1., 0., window))
#     weights /= weights.sum()
#     ema = np.convolve(data, weights, mode='full')[:len(data)]
#     ema[:window] = ema[window]
#     return ema
# sns.lineplot(exponential_moving_average(HI, 24))
sns.lineplot(HI)

получился мусор