In [3]:
import os
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

# Оценка качества модели
from sklearn.metrics import mean_squared_error

In [11]:
try:
    try:
        df = pd.read_csv('Amp_C_train.txt')
    except:
        df = pd.read_csv('/home/redalexdad/GitHub/HwTasksBmstu/Amp_C_train.txt')

except:
    for dirname, _, filenames in os.walk('/kaggle/input'):
        for filename in filenames:
            print(os.path.join(dirname, filename))
            
    df = pd.read_csv(os.path.join(dirname, filename))

In [12]:
class MemoryPolynomialModels:
    def __init__(self, df, M, K):
        self.df = self.prepare_data(df)
        self.M = M  # Глубина памяти
        self.K = K  # Степень полинома
    
    def prepare_data(self, df):
        """
        Предобработка данных: разделение на реальные и мнимые части
        """
        df.columns = df.columns.str.lower()
        df['input'] = df['input'].apply(lambda x: complex(x))
        df['output'] = df['output'].apply(lambda x: complex(x))
        df['input_real'] = df['input'].apply(lambda x: x.real)
        df['input_imag'] = df['input'].apply(lambda x: x.imag)
        df['output_real'] = df['output'].apply(lambda x: x.real)
        df['output_imag'] = df['output'].apply(lambda x: x.imag)
        df = df.drop(['input', 'output'], axis=1)
        return df
    
    def memory_polynomial(self, x_real, x_imag):
        """Memory Polynomial (MP) модель"""
        N = len(x_real)
        X = np.zeros((N, (self.M + 1) * self.K * 2), dtype=np.float64)
        for n in range(self.M, N):
            index = 0
            for m in range(self.M + 1):
                for k in range(1, self.K + 1):
                    X[n, index] = np.abs(x_real[n - m])**(k-1) * x_real[n - m]
                    X[n, index + 1] = np.abs(x_imag[n - m])**(k-1) * x_imag[n - m]
                    index += 2
        return X[self.M:]
    
    def sparse_delay_memory_polynomial(self, x_real, x_imag, delays):
        """Sparse-Delay Memory Polynomial (SDMP) модель"""
        N = len(x_real)
        X = np.zeros((N, len(delays) * self.K * 2), dtype=np.float64)
        for n in range(self.M, N):
            index = 0
            for m in delays:
                for k in range(1, self.K + 1):
                    X[n, index] = np.abs(x_real[n - m])**(k-1) * x_real[n - m]
                    X[n, index + 1] = np.abs(x_imag[n - m])**(k-1) * x_imag[n - m]
                    index += 2
        return X[self.M:]
    
    def non_uniform_memory_polynomial(self, x_real, x_imag, K_list):
        """Non-Uniform Memory Polynomial (NUMP) модель"""
        N = len(x_real)
        X = np.zeros((N, sum(K_list) * 2), dtype=np.float64)
        index = 0
        for m in range(self.M + 1):
            for k in range(1, K_list[m] + 1):
                for n in range(self.M, N):
                    X[n, index] = np.abs(x_real[n - m])**(k-1) * x_real[n - m]
                    X[n, index + 1] = np.abs(x_imag[n - m])**(k-1) * x_imag[n - m]
                index += 2
        return X[self.M:]
    
    def envelope_memory_polynomial(self, x_real, x_imag):
        """Envelope Memory Polynomial (EMP) модель"""
        N = len(x_real)
        X = np.zeros((N, (self.M + 1) * self.K), dtype=np.float64)
        amplitude = np.sqrt(x_real**2 + x_imag**2)  # Амплитуда сигнала
        for n in range(self.M, N):
            index = 0
            for m in range(self.M + 1):
                for k in range(1, self.K + 1):
                    X[n, index] = amplitude[n] * amplitude[n - m]**(k-1)
                    index += 1
        return X[self.M:]
    
    def train_model(self, X, y_real, y_imag):
        """
        Обучение модели на реальной и мнимой части
        """
        X_train, X_test, y_train_real, y_test_real = train_test_split(X, y_real, test_size=0.2, random_state=42)
        _, _, y_train_imag, y_test_imag = train_test_split(X, y_imag, test_size=0.2, random_state=42)
        
        model_real = LinearRegression()
        model_imag = LinearRegression()
        
        model_real.fit(X_train, y_train_real)
        model_imag.fit(X_train, y_train_imag)
        
        y_pred_real = model_real.predict(X_test)
        y_pred_imag = model_imag.predict(X_test)
        
        rmse_real = np.sqrt(mean_squared_error(y_test_real, y_pred_real))
        rmse_imag = np.sqrt(mean_squared_error(y_test_imag, y_pred_imag))
        
        print(f"RMSE для реальной части: {rmse_real}")
        print(f"RMSE для мнимой части: {rmse_imag}")
    
    def run(self, model_type='MP', **kwargs):
        """
        Запуск модели:
        model_type: 'MP', 'SDMP', 'NUMP', 'EMP'
        """
        x_real = self.df['input_real'].values
        x_imag = self.df['input_imag'].values
        y_real = self.df['output_real'].values
        y_imag = self.df['output_imag'].values
        
        if model_type == 'MP':
            X = self.memory_polynomial(x_real, x_imag)
        elif model_type == 'SDMP':
            delays = kwargs.get('delays', range(self.M + 1))
            X = self.sparse_delay_memory_polynomial(x_real, x_imag, delays)
        elif model_type == 'NUMP':
            K_list = kwargs.get('K_list', [self.K] * (self.M + 1))
            X = self.non_uniform_memory_polynomial(x_real, x_imag, K_list)
        elif model_type == 'EMP':
            X = self.envelope_memory_polynomial(x_real, x_imag)
        else:
            raise ValueError(f"Неизвестная модель: {model_type}")
        
        # Обучаем модель
        self.train_model(X, y_real[self.M:], y_imag[self.M:])


In [15]:
model = MemoryPolynomialModels(df, M=10, K=10)

In [16]:
%%time
model_types = ['MP', 'SDMP', 'NUMP', 'EMP']

print('='*30)
    
for name in model_types:
    print(f'Model type: {name}')
    model.run(model_type=name)
    print('-'*30)
print('='*30)

Model type: MP
RMSE для реальной части: 0.4355283025890596
RMSE для мнимой части: 0.43894716108490933
------------------------------
Model type: SDMP
RMSE для реальной части: 0.4355283025890596
RMSE для мнимой части: 0.43894716108490933
------------------------------
Model type: NUMP
RMSE для реальной части: 0.4355283025890596
RMSE для мнимой части: 0.43894716108490933
------------------------------
Model type: EMP
RMSE для реальной части: 2.085356990253707
RMSE для мнимой части: 2.0909697305838266
------------------------------
CPU times: user 2min 56s, sys: 4.55 s, total: 3min
Wall time: 2min 25s
