In [1]:
%matplotlib inline

import datetime
import math
import warnings
import numpy as np
import pandas as pd
import scipy.linalg as la
import scipy.signal as sg
import matplotlib.pyplot as plt
import seaborn as sns
import slycot
import statsmodels.api as sm

from nfoursid.kalman import Kalman
from nfoursid.nfoursid import NFourSID
from nfoursid.state_space import StateSpace

from tqdm.auto import tqdm

from collections import Counter

from tslearn.clustering import TimeSeriesKMeans
from tslearn.datasets import UCR_UEA_datasets
from tslearn.metrics import cdist_dtw
from sklearn.metrics import silhouette_score, adjusted_rand_score
from sklearn_extra.cluster import KMedoids

from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.stattools import adfuller

sns.set_theme(style="darkgrid")

# Программный код

#### Вспомогательные функции

In [None]:
import scipy.signal as sg
import scipy.integrate as integrate
from scipy.optimize import minimize_scalar
    
def integr_of_transfer_function(alpha, beta):
    # I = integrate.quad(lambda x: abs(np.polyval(beta[::-1], np.exp(x * 1j)) / np.polyval(alpha[::-1], np.exp(x * 1j)) * 
    #                                  np.polyval(beta[::-1], np.exp(x * 1j)) / np.polyval(alpha[::-1], np.exp(x * 1j))), 0, math.pi*2, limit=500)[0]
    I = integrate.quad(lambda x: pow(abs(np.polyval(beta[::-1], np.exp(x * 1j)) / np.polyval(alpha[::-1], np.exp(x * 1j))), 2), 0, math.pi*2, limit=1000)[0]
    if I < 0:
        return pow(10,8)
    return np.sqrt(I / (math.pi*2))


def poly_mult(P, Q):
    m = len(P)
    n = len(Q)
    result = [0]*(m+n-1)
    for i in range(m):
        for j in range(n):
            result[i+j] += P[i]*Q[j]
    return result

def simplify_tf(alpha, beta):
    beta, alpha = sg.normalize(beta[::-1], alpha[::-1])
    zeros, poles, gain = sg.tf2zpk(beta, alpha)
    
    zeros_del = []
    poles_del = []
    for i in range(len(zeros)):
        for j in range(len(poles)):
            if abs(zeros[i] - poles[j]) < pow(10, -8) and abs(poles[j]) > pow(10, -8) and abs(zeros[i]) > pow(10, -8):
                zeros_del.append(i)
                poles_del.append(j)
    for elem in zeros_del:
        np.delete(zeros, elem)
    for elem in poles_del:
        np.delete(poles, elem)
    
    beta, alpha = sg.zpk2tf(zeros, poles, gain)
    
    # beta = beta / alpha[0]
    # alpha = alpha / alpha[0]
    # beta, alpha = sg.normalize(beta, alpha)
    return alpha[::-1], beta[::-1]

def check_for_stability(alpha, beta):
    zeros, poles, gain = sg.tf2zpk(beta[::-1], alpha[::-1])
    # print(poles, '\n', zeros, '\n\n')
    if len(beta) > 1:
        for i in range(len(zeros)):
            if abs(zeros[i]) >= 1 - pow(10, -5):
                # raise AttributeError("System is unstable! It has zero z = " + str(zeros[i]) + ", abs(z) = " + str(abs(zeros[i])))
                # warnings.warn("System is unstable! It has zero z = " + str(zeros[i]) + ", abs(z) = " + str(abs(zeros[i])))
                return 0
    if len(alpha) > 1:
        for j in range(len(poles)):
            if abs(poles[j]) >= 1 - pow(10, -5):
                # raise AttributeError("System is unstable! It has pole p = " + str(poles[j]) + ", abs(p) = " + str(abs(poles[j])))
                # warnings.warn("System is unstable! It has pole p = " + str(poles[j]) + ", abs(p) = " + str(abs(poles[j])))
                return 1
    return 0

def find_q_border(alpha, beta, n = 2):
    arg_max_b = minimize_scalar(lambda x: -abs(np.polyval(beta[::-1], np.exp(x * 1j)))**n, bounds=(0, 2*math.pi), method='bounded').x
    arg_max_a = minimize_scalar(lambda x: -abs(np.polyval(alpha[::-1], np.exp(x * 1j)))**n, bounds=(0, 2*math.pi), method='bounded').x
    arg_min_b = minimize_scalar(lambda x: abs(np.polyval(beta[::-1], np.exp(x * 1j)))**n, bounds=(0, 2*math.pi), method='bounded').x
    arg_min_a = minimize_scalar(lambda x: abs(np.polyval(alpha[::-1], np.exp(x * 1j)))**n, bounds=(0, 2*math.pi), method='bounded').x
    
    if abs(arg_min_a) > pow(10,-5) or abs(arg_min_a - 2*math.pi) > pow(10,-5):
        temp1 = 0
        temp2 = 2*math.pi
        i = 0
        while abs(temp1 - temp2) > pow(10,-5) and i < 20:
            temp1 = minimize_scalar(lambda x: abs(np.polyval(alpha[::-1], np.exp(x * 1j)))**n, bounds=(0, arg_min_a), method='bounded').x
            temp1 = minimize_scalar(lambda x: abs(np.polyval(alpha[::-1], np.exp(x * 1j)))**n, bounds=(arg_min_a, 2*math.pi), method='bounded').x
            arg_min_a = min((temp1,temp2))
            i += 1
            
    if abs(arg_max_a) > pow(10,-5) or abs(arg_max_a - 2*math.pi) > pow(10,-5):
        temp1 = 0
        temp2 = 2*math.pi
        i = 0
        while abs(temp1 - temp2) > pow(10,-5) and i < 20:
            temp1 = minimize_scalar(lambda x: -abs(np.polyval(alpha[::-1], np.exp(x * 1j)))**n, bounds=(0, arg_max_a), method='bounded').x
            temp1 = minimize_scalar(lambda x: -abs(np.polyval(alpha[::-1], np.exp(x * 1j)))**n, bounds=(arg_max_a, 2*math.pi), method='bounded').x
            arg_max_a = min((temp1,temp2))
            i += 1
            
    if abs(arg_min_b) > pow(10,-5) or abs(arg_min_b - 2*math.pi) > pow(10,-5):
        temp1 = 0
        temp2 = 2*math.pi
        i = 0
        while abs(temp1 - temp2) > pow(10,-5) and i < 20:
            temp1 = minimize_scalar(lambda x: abs(np.polyval(beta[::-1], np.exp(x * 1j)))**n, bounds=(0, arg_min_b), method='bounded').x
            temp1 = minimize_scalar(lambda x: abs(np.polyval(beta[::-1], np.exp(x * 1j)))**n, bounds=(arg_min_b, 2*math.pi), method='bounded').x
            arg_min_b = min((temp1,temp2))
            i += 1
            
    if abs(arg_max_b) > pow(10,-5) or abs(arg_max_b - 2*math.pi) > pow(10,-5):
        temp1 = 0
        temp2 = 2*math.pi
        i = 0
        while abs(temp1 - temp2) > pow(10,-5) and i < 20:
            temp1 = minimize_scalar(lambda x: -abs(np.polyval(beta[::-1], np.exp(x * 1j)))**n, bounds=(0, arg_max_b), method='bounded').x
            temp1 = minimize_scalar(lambda x: -abs(np.polyval(beta[::-1], np.exp(x * 1j)))**n, bounds=(arg_max_b, 2*math.pi), method='bounded').x
            arg_max_b = min((temp1,temp2))
            i += 1
            
    min_b = abs(np.polyval(beta[::-1], np.exp(arg_min_b * 1j)))**n
    max_b = abs(np.polyval(beta[::-1], np.exp(arg_max_b * 1j)))**n
    min_a = abs(np.polyval(alpha[::-1], np.exp(arg_min_a * 1j)))**n
    max_a = abs(np.polyval(alpha[::-1], np.exp(arg_max_a * 1j)))**n
    min_q = max_b / min_a
    max_q = min_b / max_a
    return max_q, min_q

def coefset_q_border(coefset, ident_method = 'ARIMA', n = 2):
    temp_min = []
    temp_max = []
    if ident_method in ('ARIMA', 'N4SID'):
        for alpha, beta in coefset:
            q = find_q_border(alpha, beta, n = n)
            temp_max.append(q[0])
            temp_min.append(q[1])
    else:
        for alpha in coefset:
            beta = np.zeros_like(alpha)
            beta[-1] = 1
            # print(alpha, beta)
            q = find_q_border(alpha, beta, n = n)
            temp_max.append(q[0])
            temp_min.append(q[1])
    # print(temp_max, temp_min)
    return min(temp_max), max(temp_min)

#### h_2

In [None]:
def h_2_norm(alpha, beta):
    # check_for_stability(alpha[::-1], beta[::-1])
    # beta, alpha = sg.normalize(beta[::-1], alpha[::-1])
    # alpha = alpha[::-1]
    # beta = beta[::-1]
    # if check_for_stability(alpha, beta):
    #     warnings.warn("h_2_norm calculated via integrating!")
    #     return integr_of_transfer_function(alpha[::-1], beta[::-1])
    A, B, C, D = sg.tf2ss(beta[::-1], alpha[::-1])
    G_c = la.solve_discrete_lyapunov(A, B.dot(B.T))
    res = np.trace(C.dot(G_c).dot(C.T) + D.dot(D.T))
    if res >= 0:
        return np.sqrt(res)
    else:
        return integr_of_transfer_function(alpha, beta)
    # G_o = la.solve_discrete_lyapunov(A.T, C.T.dot(C))
    # print(np.trace(B.T.dot(G_o).dot(B)), '\n', np.trace(D.T.dot(D)), '\n')
    # return np.sqrt(np.trace(B.T.dot(G_o).dot(B) + D.T.dot(D)))

def h_2_dist(alpha1, beta1, alpha2, beta2):
    temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    if len(temp1) != len(temp2):
        if len(temp1) > len(temp2):
            temp2 = np.concatenate((np.zeros(len(temp1) - len(temp2)), temp2))
        elif len(temp1) < len(temp2):
            temp1 = np.concatenate((np.zeros(len(temp2) - len(temp1)), temp1))
    alpha = np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))
    beta = temp1 - temp2
    # beta = beta / alpha[0]
    # alpha = alpha / alpha[0]
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
        
    alpha, beta = simplify_tf(alpha[::-1], beta[::-1])
    
    # if len(alpha) != len(beta):
    #     print(alpha1, beta1, alpha2, beta2)
        
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    # if len(beta) < len(alpha):
    #     beta = np.concatenate(( np.zeros(len(alpha) - len(beta)), beta))
    
    return h_2_norm(alpha, beta)
    # return h_2_norm(alpha[::-1], beta[::-1])

#### h_2_mod

In [None]:
def h_2_mod_norm(alpha, beta):
    # check_for_stability(alpha[::-1], beta[::-1])
    # beta, alpha = sg.normalize(beta[::-1], alpha[::-1])
    # if check_for_stability(alpha, beta):
    #     warnings.warn("h_2_norm_mod calculated via integrating!")
    #     return integr_of_transfer_function(alpha, beta)
    A, B, C, D = sg.tf2ss(beta[::-1], alpha[::-1])
    # G_c = la.solve_discrete_lyapunov(A, B.dot(B.T))
    # res = np.trace(C.dot(G_c).dot(C.T) + D.dot(D.T))
    G_o = la.solve_discrete_lyapunov(A.T, C.T.dot(C))
    res = np.trace(B.T.dot(G_o).dot(B) + D.T.dot(D))
    if res >= 0:
        return np.sqrt(res)
    else:
        if abs(res) < pow(10,-2):
            return 0
        return integr_of_transfer_function(alpha, beta)
    # G_o = la.solve_discrete_lyapunov(A.T, C.T.dot(C))
    # print(np.trace(B.T.dot(G_o).dot(B)), '\n', np.trace(D.T.dot(D)), '\n')
    # return np.sqrt(np.trace(B.T.dot(G_o).dot(B) + D.T.dot(D)))

def h_2_mod_dist(alpha1, beta1, alpha2, beta2, q = 0.2, n = 2):
    # temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    # temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    # temp1 = np.asarray(poly_mult(temp1, temp1))
    # temp2 = np.asarray(poly_mult(temp2, temp2))
    # temp3 = np.asarray(poly_mult(np.asarray(poly_mult(alpha1[::-1], beta2[::-1])), np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))))
    
    temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    temp3 = np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))
    temp4 = np.asarray(poly_mult(beta1[::-1], beta2[::-1]))
    for i in range(n-1):
        temp1 = np.asarray(poly_mult(temp1, np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))))
        temp2 = np.asarray(poly_mult(temp2, np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))))
        temp3 = np.asarray(poly_mult(temp3, np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))))
        temp4 = np.asarray(poly_mult(temp4, np.asarray(poly_mult(beta1[::-1], beta2[::-1]))))
    
    max_len = max((len(temp1), len(temp2), len(temp3), len(temp4)))
    if len(temp1) < max_len:
        temp1 = np.concatenate((np.zeros(max_len - len(temp1)), temp1))
    if len(temp2) < max_len:
        temp2 = np.concatenate((np.zeros(max_len - len(temp2)), temp2))
    if len(temp3) < max_len:
        temp3 = np.concatenate((np.zeros(max_len - len(temp3)), temp3))
    if len(temp4) < max_len:
        temp4 = np.concatenate((np.zeros(max_len - len(temp4)), temp4))
        
    alpha = temp1 + temp2 + q * temp3 + temp4 / q
    beta = temp1 - temp2
    # beta = beta / alpha[0]
    # alpha = alpha / alpha[0]
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    alpha, beta = simplify_tf(alpha[::-1], beta[::-1])
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    return h_2_mod_norm(alpha, beta)

#### sp

In [None]:
def get_psi(alpha, beta, length):
    alpha = np.append(-np.asarray(alpha[:-1]), 1)
    alpha = alpha[::-1]
    beta = beta[::-1]
    # beta = beta / beta[0]
    psi = [beta[0]]
    for i in range(1, length):
        temp = 0
        for j in range(1, len(alpha)):
            if i - j >= 0:
                temp += alpha[j]*psi[i - j]
        if i >= len(beta):
            psi.append(temp)
        else:
            psi.append(beta[i] + temp)
    return np.array(psi)

def autocov_of_arma(alpha, beta, lag, sigma = 1, L = 100):
    psi = get_psi(alpha, beta, L + lag)
    if lag > 0:
        cor = psi[:-lag].dot(np.roll(psi, -lag)[:-lag])
    else:
        cor = psi.dot(psi)
    return cor * sigma

def get_controlabiltiy(k, A, B):
    A = np.array(A)
    B = np.array(B)
    G = B
    temp = B
    if k == 0:
        return np.eye(A.shape[0])
    elif k == 1:
        return np.hstack((A, B))
    else:
        for i in range(k - 1):
            temp = A.dot(temp)
            # print(temp.T)
            G = np.hstack((temp, G))
        A_powered = np.linalg.matrix_power(A, k - 1)
        # print(A_powered, G.T)
        G = np.hstack((A_powered, G))
        return G
    
def get_corr(tau, control, initial_x, sigma2, A, B, C, D):
    control = np.array((control))
    initial_x = np.array((initial_x))
    A = np.array((A))
    B = np.array((B))
    C = np.array((C))
    D = np.array((D))
    G = get_controlabiltiy(len(control), A, B)
    temp = np.hstack((initial_x, control))
    W = temp.T.dot(temp)
    N = len(temp)
    temp = []
    if tau > 0:
        for i in range(len(initial_x), N - tau):
            diag1 = np.eye(G.shape[1], i)
            diag2 = np.eye(G.shape[1], i + tau)
            temp.append(C.dot(G).dot(diag1).dot(diag1.T).dot(W).dot(diag2).dot(diag2.T).dot(G.T).dot(C.T))
    else:
        for i in range(len(initial_x), N):
            diag = np.eye(G.shape[1], i)
            temp.append(C.dot(G).dot(diag).dot(diag.T).dot(W).dot(diag).dot(diag.T).dot(G.T).dot(C.T) + D.dot(D.T) * sigma2)
    N = len(temp)
    corr = 0
    for i in temp:
        corr += i
    return corr #/ N

def sp_rand_norm(alpha, beta, L = 10):
    cor = []
    for i in range(L):
        cor.append(autocov_of_arma(alpha, beta, i))
    temp = cor[:0:-1]
    return la.norm(np.concatenate((temp, cor)))
    # return la.norm((beta[-1]/alpha[-1]) * np.concatenate((temp, cor)))

def sp_rand_dist(alpha1, beta1, alpha2, beta2, L = 10):
    temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    if len(temp1) != len(temp2):
        if len(temp1) > len(temp2):
            temp2 = np.concatenate((np.zeros(len(temp1) - len(temp2)), temp2))
        elif len(temp1) < len(temp2):
            temp1 = np.concatenate((np.zeros(len(temp2) - len(temp1)), temp1))
    alpha = np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))
    beta = temp1 - temp2
    beta = beta / alpha[0]
    alpha = alpha / alpha[0]
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    alpha, beta = simplify_tf(alpha[::-1], beta[::-1])
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    return sp_rand_norm(alpha, beta)

def sp_det_norm(alpha, beta, control, initial_x, sigma2, L = 10):
    beta, alpha = sg.normalize(beta[::-1], alpha[::-1])
    check_for_stability(alpha[::-1], beta[::-1])
    A, B, C, D = sg.tf2ss(beta, alpha)
    initial_x = initial_x[:A.shape[1]]
    cor = []
    for i in range(L):
        cor.append(get_corr(i, control, initial_x, sigma2, A, B, C, D))
    temp = cor[:0:-1]
    return la.norm(np.concatenate((temp, cor)))

def sp_det_dist(alpha1, beta1, alpha2, beta2, control, initial_x, sigma2 = 0.0001, L = 10):
    temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    if len(temp1) != len(temp2):
        if len(temp1) > len(temp2):
            temp2 = np.concatenate((np.zeros(len(temp1) - len(temp2)), temp2))
        elif len(temp1) < len(temp2):
            temp1 = np.concatenate((np.zeros(len(temp2) - len(temp1)), temp1))
    alpha = np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))
    beta = temp1 - temp2
    beta = beta / alpha[0]
    alpha = alpha / alpha[0]
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    alpha, beta = simplify_tf(alpha[::-1], beta[::-1])
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    return sp_det_norm(alpha, beta, control, initial_x, sigma2, L)

#### sp_mod

In [None]:
def sp_rand_mod_norm(alpha, beta, L = 10):
    cor = []
    for i in range(L):
        cor.append(autocov_of_arma(alpha, beta, i))
    temp = cor[:0:-1]
    return la.norm(np.concatenate((temp, cor)))
    # return la.norm((beta[-1]/alpha[-1]) * np.concatenate((temp, cor)))

def sp_rand_mod_dist(alpha1, beta1, alpha2, beta2, L = 10, q = 0.2, n = 2):
    # temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    # temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    # temp1 = np.asarray(poly_mult(temp1, temp1))
    # temp2 = np.asarray(poly_mult(temp2, temp2))
    # temp3 = np.asarray(poly_mult(np.asarray(poly_mult(alpha1[::-1], beta2[::-1])), np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))))
    
    temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    temp3 = np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))
    temp4 = np.asarray(poly_mult(beta1[::-1], beta2[::-1]))
    for i in range(n-1):
        temp1 = np.asarray(poly_mult(temp1, np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))))
        temp2 = np.asarray(poly_mult(temp2, np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))))
        temp3 = np.asarray(poly_mult(temp3, np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))))
        temp4 = np.asarray(poly_mult(temp4, np.asarray(poly_mult(beta1[::-1], beta2[::-1]))))
    
    
    max_len = max((len(temp1), len(temp2), len(temp3), len(temp4)))
    if len(temp1) < max_len:
        temp1 = np.concatenate((np.zeros(max_len - len(temp1)), temp1))
    if len(temp2) < max_len:
        temp2 = np.concatenate((np.zeros(max_len - len(temp2)), temp2))
    if len(temp3) < max_len:
        temp3 = np.concatenate((np.zeros(max_len - len(temp3)), temp3))
    if len(temp4) < max_len:
        temp4 = np.concatenate((np.zeros(max_len - len(temp4)), temp4))
        
    alpha = temp1 + temp2 + q * temp3 + temp4 / q
    beta = temp1 - temp2
    # beta = beta / alpha[0]
    # alpha = alpha / alpha[0]
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    alpha, beta = simplify_tf(alpha[::-1], beta[::-1])
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    return sp_rand_mod_norm(alpha, beta)

#### h_inf

In [None]:
def h_inf_norm(alpha, beta):
    beta, alpha = sg.normalize(beta[::-1], alpha[::-1])
    check_for_stability(alpha[::-1], beta[::-1])
    A, B, C, D = sg.tf2ss(beta, alpha)

    args = {
        "dico": 'D',
        "jobe": 'I',
        "equil": 'N',
        "jobd": 'D',
        "n": len(A),
        "m": np.array(B).shape[1],
        "p": np.array(D).shape[0],
        "A": A,
        "E": np.eye(len(A)),
        "B": B,
        "C": C,
        "D": D
    }

    return slycot.ab13dd(**args)[0]

def h_inf_dist(alpha1, beta1, alpha2, beta2):
    temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    if len(temp1) != len(temp2):
        if len(temp1) > len(temp2):
            temp2 = np.concatenate((np.zeros(len(temp1) - len(temp2)), temp2))
        elif len(temp1) < len(temp2):
            temp1 = np.concatenate((np.zeros(len(temp2) - len(temp1)), temp1))
    alpha = np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))
    beta = temp1 - temp2
    beta = beta / alpha[0]
    alpha = alpha / alpha[0]
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    alpha, beta = simplify_tf(alpha[::-1], beta[::-1])
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    return h_inf_norm(alpha, beta)

#### h_inf_mod

In [None]:
def h_inf_mod_norm(alpha, beta):
    # beta, alpha = sg.normalize(beta[::-1], alpha[::-1])
    # check_for_stability(alpha[::-1], beta[::-1])
    A, B, C, D = sg.tf2ss(beta[::-1], alpha[::-1])

    args = {
        "dico": 'D',
        "jobe": 'I',
        "equil": 'N',
        "jobd": 'D',
        "n": len(A),
        "m": np.array(B).shape[1],
        "p": np.array(D).shape[0],
        "A": A,
        "E": np.eye(len(A)),
        "B": B,
        "C": C,
        "D": D
    }

    return slycot.ab13dd(**args)[0]

def h_inf_mod_dist(alpha1, beta1, alpha2, beta2, q = 0.2, n = 2):
    # temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    # temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    # temp1 = np.asarray(poly_mult(temp1, temp1))
    # temp2 = np.asarray(poly_mult(temp2, temp2))
    # temp3 = np.asarray(poly_mult(np.asarray(poly_mult(alpha1[::-1], beta2[::-1])), np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))))
    
    temp1 = np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))
    temp2 = np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))
    temp3 = np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))
    temp4 = np.asarray(poly_mult(beta1[::-1], beta2[::-1]))
    for i in range(n-1):
        temp1 = np.asarray(poly_mult(temp1, np.asarray(poly_mult(alpha1[::-1], beta2[::-1]))))
        temp2 = np.asarray(poly_mult(temp2, np.asarray(poly_mult(alpha2[::-1], beta1[::-1]))))
        temp3 = np.asarray(poly_mult(temp3, np.asarray(poly_mult(alpha1[::-1], alpha2[::-1]))))
        temp4 = np.asarray(poly_mult(temp4, np.asarray(poly_mult(beta1[::-1], beta2[::-1]))))
    
    
    max_len = max((len(temp1), len(temp2), len(temp3), len(temp4)))
    if len(temp1) < max_len:
        temp1 = np.concatenate((np.zeros(max_len - len(temp1)), temp1))
    if len(temp2) < max_len:
        temp2 = np.concatenate((np.zeros(max_len - len(temp2)), temp2))
    if len(temp3) < max_len:
        temp3 = np.concatenate((np.zeros(max_len - len(temp3)), temp3))
    if len(temp4) < max_len:
        temp4 = np.concatenate((np.zeros(max_len - len(temp4)), temp4))
        
    alpha = temp1 + temp2 + q * temp3 + temp4 / q
    beta = temp1 - temp2
    # beta = beta / alpha[0]
    # alpha = alpha / alpha[0]
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    alpha, beta = simplify_tf(alpha[::-1], beta[::-1])
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    return h_inf_mod_norm(alpha, beta)

#### wcep

In [None]:
def wcep_norm(alpha, beta):
    # beta, alpha = sg.normalize(beta[::-1], alpha[::-1])
    zeros, poles, gain = sg.tf2zpk(beta[::-1], alpha[::-1])
    temp1 = 1
    temp2 = 1
    temp3 = 1
    for i in range(len(poles)):
        for j in range(len(zeros)):
            temp1 *= (np.abs(1 - poles[i] * np.conj(zeros[j])) * np.abs(1 - poles[i] * np.conj(zeros[j])))
    for i in range(len(zeros)):
        for j in range(len(zeros)):
            temp2 *= (1 - zeros[i] * np.conj(zeros[j]))
    for i in range(len(poles)):
        for j in range(len(poles)):
            temp3 *= (1 - poles[i] * np.conj(poles[j]))
    if np.log(np.abs(temp1 / temp2 / temp3)) < 0:
        warnings.warn("wcep_norm error: system probably unstable")
        return 0
    else:
        return np.sqrt(np.log(np.abs(temp1 / temp2 / temp3)))

def wcep_dist(alpha1, beta1, alpha2, beta2):
    alpha1 = alpha1[::-1] # Разворачиваем, чтоб энтый был слева
    beta1 = beta1[::-1]
    alpha2 = alpha2[::-1]
    beta2 = beta2[::-1]
    
    alpha = poly_mult(beta1, alpha2)
    beta = poly_mult(beta2, alpha1)
    
    if np.count_nonzero(beta != 0) == 0:
        return 0
    
    alpha, beta = simplify_tf(alpha[::-1], beta[::-1])
    
    if np.count_nonzero(beta != 0) == 0:
        return 0

    return wcep_norm(alpha, beta)



def wcep_new_norm(poles, zeros):
    temp1 = 1
    temp2 = 1
    temp3 = 1
    for i in range(len(poles)):
        for j in range(len(zeros)):
            temp1 *= (np.abs(1 - poles[i] * np.conj(zeros[j])) * np.abs(1 - poles[i] * np.conj(zeros[j])))
    for i in range(len(zeros)):
        for j in range(i + 1, len(zeros)):
            temp2 *= np.abs(1 - zeros[i] * np.conj(zeros[j])) * np.abs(1 - zeros[i] * np.conj(zeros[j]))
        temp2 *= (1 - np.abs(zeros[i]) ** 2)
    for i in range(len(poles)):
        for j in range(i + 1, len(poles)):
            temp3 *= np.abs(1 - poles[i] * np.conj(poles[j])) * np.abs(1 - poles[i] * np.conj(poles[j]))
        temp3 *= (1 - np.abs(poles[i]) ** 2)
    if np.log(np.abs(temp1 / (temp2 * temp3))) < 0:
        return 0
    else:
        return np.sqrt(np.log(np.abs(temp1 / temp2 / temp3)))

def wcep_new_dist(alpha1, beta1, alpha2, beta2):
    beta1, alpha1 = sg.normalize(beta1[::-1], alpha1[::-1])
    zeros1, poles1, gain1 = sg.tf2zpk(beta1, alpha1)
    beta2, alpha2 = sg.normalize(beta2[::-1], alpha2[::-1])
    zeros2, poles2, gain2 = sg.tf2zpk(beta2, alpha2)
    
    poles = np.concatenate((zeros1, poles2))
    zeros = np.concatenate((zeros2, poles1))

    return wcep_new_norm(poles, zeros)




def cep_coef(alpha, beta, k):
    beta, alpha = sg.normalize(beta[::-1], alpha[::-1])
    zeros, poles, gain = sg.tf2zpk(beta.flatten(), alpha)
    if k == 0:
        return gain
    else:
        temp1 = 0
        temp2 = 0
        for i in range(len(poles)):
            temp1 += pow(poles[i], abs(k)) / abs(k)
        for i in range(len(zeros)):
            temp2 += pow(zeros[i], abs(k)) / abs(k)
    return temp1 - temp2



def wcep_cut_dist(alpha1, beta1, alpha2, beta2, L = 10): 
    cep1 = np.asarray(list(cep_coef(alpha1, beta1, i + 1) for i in range(L)))
    cep2 = np.asarray(list(cep_coef(alpha2, beta2, i + 1) for i in range(L)))
    temp = 0
    for i in range(len(cep1)):
        temp += (i + 1) * pow((np.abs(cep1[i] - cep2[i])), 2)
    return np.sqrt(temp)

#### cep

In [None]:
def cep_dist(alpha1, beta1, alpha2, beta2, L = 10):
    cep1 = np.asarray(list(cep_coef(alpha1, beta1, i + 1) for i in range(L)))
    cep2 = np.asarray(list(cep_coef(alpha2, beta2, i + 1) for i in range(L)))
    return la.norm(cep1 - cep2)

def cep_new_dist(alpha1, beta1, alpha2, beta2, L = 10):
    cep1 = np.asarray(list(cep_coef(alpha1, beta1, i) for i in range(L)))
    cep2 = np.asarray(list(cep_coef(alpha2, beta2, i) for i in range(L)))
    return la.norm(cep1 - cep2)

### Реализация кластеризации

In [None]:
def coef_estimations_of_dataset(dataset, ident_method = 'a', alpha_lenght = None, beta_lenght = None, control = None, D = None, fixed = True):
    coefs = []
    
    if ident_method == 'a':
        if alpha_lenght is None:
            raise AttributeError
    elif ident_method == 'b' or ident_method == 'b_mult':
        if bool(beta_lenght is None) ^ bool(control is None):
            raise AttributeError
        if alpha_lenght != beta_lenght:
            raise AttributeError
    elif ident_method == 'D' or ident_method == 'D_mult':
        if D is None:
            raise AttributeError
            
            
    if ident_method == 'a':
        for i in tqdm(range(len(dataset)), desc = 'coef_estimations_of_dataset: a', leave=False): 
            alpha, _ = E73(dataset[i], method = 'a', initial_alpha = np.ones(alpha_lenght))
            coefs.append(alpha)
    elif ident_method == 'AR':
        for i in tqdm(range(len(dataset)), desc = 'coef_estimations_of_dataset: AR', leave=False):
            mod = sm.tsa.arima.ARIMA(dataset[i], order=(alpha_lenght - 1, 0, 0))
            if fixed:
                res = mod.fit_constrained({'const': 0})
            else:
                res = mod.fit()
            alpha = res.polynomial_ar[::-1]
            coefs.append(alpha)
    elif ident_method == 'b':
        for i in tqdm(range(len(dataset)), desc = 'coef_estimations_of_dataset: b', leave=False):
            alpha, beta, _ = E73(dataset[i], method = 'b', initial_alpha = np.ones(alpha_lenght), initial_beta = np.ones(beta_lenght), control = control)
            coefs.append((alpha, beta))
    elif ident_method == 'D':
        for i in tqdm(range(len(dataset)), desc = 'coef_estimations_of_dataset: D', leave=False):
            alpha, beta, _ = E73(dataset[i], method = 'D', D = D, initial_teta = np.ones(len(D[0])), control = control)
            coefs.append((alpha, beta))
    elif ident_method == 'b_mult':
        for i in tqdm(range(len(dataset)), desc = 'coef_estimations_of_dataset: b_mult', leave=False):
            alpha, beta, _ = E73(dataset[i], method = 'b', initial_alpha = np.ones(alpha_lenght), initial_beta = np.ones(beta_lenght), control = control[i])
            coefs.append((alpha, beta))
    elif ident_method == 'D_mult':
        for i in tqdm(range(len(dataset)), desc = 'coef_estimations_of_dataset: D_mult', leave=False):
            alpha, beta, _ = E73(dataset[i], method = 'D', D = D, initial_teta = np.ones(len(D[0])), control = control[i])
            coefs.append((alpha, beta))
    elif ident_method == 'ARIMA':
        for i in tqdm(range(len(dataset)), desc = 'coef_estimations_of_dataset: ARIMA', leave=False):
            mod = sm.tsa.arima.ARIMA(dataset[i], order=(alpha_lenght - 1, 0, beta_lenght - 1))
            if fixed:
                res = mod.fit_constrained({'const': 0})
            else:
                res = mod.fit()
            alpha = res.polynomial_ar[::-1]
            beta = res.polynomial_ma[::-1]
            if check_for_stability(alpha, beta):
                warnings.warn("System is unstable!")
            # if len(beta) < len(alpha):
            #     beta = np.concatenate((np.zeros(len(alpha) - len(beta)), beta))
            coefs.append((alpha, beta))
    elif ident_method == 'N4SID':
        for i in tqdm(range(len(dataset)), desc = 'coef_estimations_of_dataset: N4SID', leave=False):
            alpha, beta = n4sid_ident(dataset[i], alpha_lenght - 1)
            coefs.append((alpha, beta))
    else:
        raise AttributeError('Invalid ident_method!') 
    
    
    
    return np.array(coefs, dtype=object), ident_method

def dist_matrix(coefset, metric, ident_method = 'a', q = 1, n = 2):      
    
    distances = np.zeros((len(dataset),len(dataset)))
    
    if ident_method in {'a', 'AR'}:
        # for i in range(len(coefset)):
        for i in tqdm(range(len(coefset)), desc = 'dist_matrix: models', leave=False):
            for j in range(i + 1, len(coefset)):
                alpha1 = coefset[i]
                alpha2 = coefset[j]
                beta1 = np.zeros(len(alpha1))
                beta2 = np.zeros(len(alpha2))
                beta1[-1] = 1
                beta2[-1] = 1
                if metric in (h_2_mod_dist, h_inf_mod_dist, sp_rand_mod_dist):
                    distances[i][j] = metric(alpha1, beta1, alpha2, beta2, q = q, n = n)
                else:
                    distances[i][j] = metric(alpha1, beta1, alpha2, beta2)
    elif ident_method in ['b', 'D', 'b_mult', 'D_mult', 'ARIMA', 'N4SID']:
        for i in range(len(coefset)):
            for j in range(i + 1, len(coefset)):
                alpha1 = coefset[i][0]
                beta1 = coefset[i][1]
                alpha2 = coefset[j][0]
                beta2 = coefset[j][1]
                if metric in (h_2_mod_dist, h_inf_mod_dist, sp_rand_mod_dist):
                    distances[i][j] = metric(alpha1, beta1, alpha2, beta2, q = q, n = n)
                else:
                    distances[i][j] = metric(alpha1, beta1, alpha2, beta2)
    else:
        raise AttributeError('Invalid ident_method!')
    
    distances = np.array(distances)
    return distances + distances.T

In [None]:
def get_dataset(N, initial_x, *params, sigma = 0, seed = 12345):
    rng = np.random.default_rng(seed)
    dataset = []
    index = []
    m = 0
    for param in params:
        if len(param) == 3:
            for i in range(param['count']):
                x = get_process(initial_x, param['alpha'], N) + (rng.random(N) * 2 * sigma - sigma)
                # x = sm.tsa.ArmaProcess(param['alpha'][::-1], (1,)).generate_sample(nsample=N, scale=sigma)
                dataset.append(x)
                index.append(param['M'])
            m += 1
        else:
            if len(param) != 5:
                raise AttributeError("You must give control and sigma!")
            for i in range(param['count']):
                x = get_process(initial_x, param['alpha'], N, param['beta'], param['control']) + (rng.random(N) * 2 * sigma - sigma)
                dataset.append(x)
                index.append(param['M'])
            m += 1
    return dataset, index

def get_dataset_new(N, *params, sigma = 1, rng = None):
    dataset = []
    index = []
    m = 0
    for param in params:
        if len(param) == 3:
            for i in range(param['count']):
                # x = get_process(initial_x, param['alpha'], N) + (rng.random(N) * 2 * sigma - sigma)
                # alpha = np.concatenate(((1,), -np.array(param['alpha'][-2::-1])))
                alpha = np.concatenate(((1,), np.array(param['alpha'][-2::-1])))
                if rng is None:
                    x = sm.tsa.ArmaProcess(alpha, (1,)).generate_sample(nsample=N, scale=sigma)
                else:
                    x = sm.tsa.ArmaProcess(alpha, (1,)).generate_sample(nsample=N, scale=sigma, distrvs = rng.standard_normal)
                dataset.append(x)
                index.append(param['M'])
            m += 1
        else:
            if len(param) != 4:
                raise AttributeError("Invalid count of parametrs!")
            for i in range(param['count']):
                # alpha = np.concatenate(((1,), -np.array(param['alpha'][-2::-1])))
                alpha = np.concatenate(((1,), np.array(param['alpha'][-2::-1])))
                if rng is None:
                    x = sm.tsa.ArmaProcess(alpha, param['beta'][::-1]).generate_sample(nsample=N, scale=sigma)
                else:
                    x = sm.tsa.ArmaProcess(alpha, param['beta'][::-1]).generate_sample(nsample=N, scale=sigma, distrvs = rng.standard_normal)
                # x = get_process(initial_x, param['alpha'], N, param['beta'], param['control']) + (rng.random(N) * 2 * sigma - sigma)
                dataset.append(x)
                index.append(param['M'])
            m += 1
    return dataset, index

def clustering_results(n_clusters, coefset, index, dataset = None, ident_method = 'a', alpha_lenght = None, beta_lenght = None, control = None, D = None, q = 1, n = 2):
    result = {}
    pred_labels = {}
    
    metrics = (wcep_dist, wcep_cut_dist, cep_dist, h_2_dist, h_inf_dist, sp_rand_dist, 'coef') 
    
    for metric in tqdm(metrics, desc = 'clustering_results: metrics', leave=False): 
        if metric == 'eu':
            kmedoids = KMedoids(n_clusters, method = 'pam').fit(np.array(dataset))
            result["eu"] = (silhouette_score(dataset, kmedoids.labels_), adjusted_rand_score(index, kmedoids.labels_))
            pred_labels["eu"] = kmedoids.labels_
            del kmedoids
        elif metric in (eu_n,df,df_n):
            modified_dataset = np.array(list(map(metric, dataset)))
            kmedoids = KMedoids(n_clusters, method = 'pam').fit(modified_dataset)
            result[metric.__name__] = (silhouette_score(modified_dataset, kmedoids.labels_), adjusted_rand_score(index, kmedoids.labels_))
            pred_labels[metric.__name__] = kmedoids.labels_
            del kmedoids
        elif metric == 'pc':
            modified_dataset = pc(dataset, L = 10)
            kmedoids = KMedoids(n_clusters, method = 'pam').fit(modified_dataset)
            result["pc"] = (silhouette_score(modified_dataset, kmedoids.labels_), adjusted_rand_score(index, kmedoids.labels_))
            pred_labels["pc"] = kmedoids.labels_
            del kmedoids
        elif metric == 'pc_n':
            modified_dataset = pc_n(dataset, L = 10)
            kmedoids = KMedoids(n_clusters, method = 'pam').fit(modified_dataset)
            result["pc_n"] = (silhouette_score(modified_dataset, kmedoids.labels_), adjusted_rand_score(index, kmedoids.labels_))
            pred_labels["pc_n"] = kmedoids.labels_
            del kmedoids
        elif metric in (wcep_dist,cep_dist,wcep_cut_dist,h_2_dist,h_inf_dist,sp_rand_dist,sp_rand_mod_dist, h_2_mod_dist, h_inf_mod_dist):
            dm = dist_matrix(coefset, metric, ident_method = ident_method, q = q, n = n)
            try:
                kmedoids = KMedoids(n_clusters, method = 'pam', metric = 'precomputed').fit(dm)
                result[metric.__name__[:-5]] = (silhouette_score(dm, kmedoids.labels_, metric="precomputed"), adjusted_rand_score(index, kmedoids.labels_))
                pred_labels[metric.__name__[:-5]] = kmedoids.labels_
                del kmedoids
            except ValueError:
                result[metric.__name__[:-5]] = (-2, -2)
                pred_labels[metric.__name__[:-5]] = -1 * np.ones(dataset.shape[0])
            del dm
        elif metric == 'coef':
            temp = []
            if ident_method not in ('a', 'AR'):
                for row in coefset:
                    temp.append(np.concatenate(row))
                coefset_concat = np.array(temp)
                kmedoids = KMedoids(n_clusters, method = 'pam').fit(coefset_concat)
                result["coef"] = (silhouette_score(coefset_concat, kmedoids.labels_), adjusted_rand_score(index, kmedoids.labels_))
            else:
                kmedoids = KMedoids(n_clusters, method = 'pam').fit(coefset)
                result["coef"] = (silhouette_score(coefset, kmedoids.labels_), adjusted_rand_score(index, kmedoids.labels_))
            pred_labels["coef"] = kmedoids.labels_
            del kmedoids
        elif metric == 'dtw':
            dm = cdist_dtw(dataset)
            kmedoids = KMedoids(n_clusters, method = 'pam', metric = 'precomputed').fit(dm)
            result['dtw'] = (silhouette_score(dm, kmedoids.labels_, metric="precomputed"), adjusted_rand_score(index, kmedoids.labels_))
            pred_labels['dtw'] = kmedoids.labels_
            del dm
            del kmedoids
        else:
            raise AttributeError('Invalid metric!')
    
    return result, pred_labels, coefset        


def clustering_results_for_mod(n_clusters, coefset, index, dataset = None, ident_method = 'a', alpha_lenght = None, beta_lenght = None, control = None, D = None, q = 1, n = 2):
    result = {}
    pred_labels = {}
    
    metrics = (h_2_mod_dist, h_inf_mod_dist, sp_rand_mod_dist) 
    
    for metric in tqdm(metrics, desc = 'clustering_results: metrics', leave=False):
        if metric in (wcep_dist,cep_dist,wcep_cut_dist,h_2_dist,h_inf_dist,sp_rand_dist,sp_rand_mod_dist, h_2_mod_dist, h_inf_mod_dist):
            dm = dist_matrix(coefset, metric, ident_method = ident_method, q = q, n = n)
            try:
                kmedoids = KMedoids(n_clusters, method = 'pam', metric = 'precomputed').fit(dm)
                result[metric.__name__[:-5]] = (silhouette_score(dm, kmedoids.labels_, metric="precomputed"), adjusted_rand_score(index, kmedoids.labels_))
                pred_labels[metric.__name__[:-5]] = kmedoids.labels_
                del kmedoids
            except ValueError:
                result[metric.__name__[:-5]] = (-2, -2)
                pred_labels[metric.__name__[:-5]] = -1 * np.ones(dataset.shape[0])
            del dm
        else:
            raise AttributeError('Invalid metric!')
    
    return result, pred_labels, coefset        

### Вспомогательные функции для графиков

In [None]:
def plot_dataset(dataset, index):
    fig, ax = plt.subplots(figsize = (10,7))
    colors = ['b','g','r','c','m','y','k']
    unique_index = set(index)
    for i in range(len(index)):
        if index[i] in unique_index:
            ax.plot(dataset[i], color=colors[index[i]], label = 'M' + str(index[i] + 1))
            unique_index.remove(index[i])
        else:
            ax.plot(dataset[i], color=colors[index[i]])
    ax.grid(True)
    ax.legend()
    plt.show()
    # ax2.plot(range(len(cor)), cor, 'rx', markersize = 15, label = 'given_cor')                
        
def pairplot_of_coefset(coefset, index, ident_method = 'AR', alpha_lenght = None, beta_lenght = None, control = None, D = None):
    temp = []
    # if len(coefset[0].shape) >= 2:
    # if coefset[0].shape[0] >= 2:
    if ident_method not in ('AR',):
        for row in coefset:
            temp.append(np.concatenate(row))
        coefset = np.array(temp)
    
    columns = []
    if D is not None:
        alpha_lenght = beta_lenght = len(D) // 2
    if alpha_lenght is not None:
        for i in range(alpha_lenght):
            columns.append('a' + str(i))
    if beta_lenght is not None:
        for i in range(beta_lenght):
            columns.append('b' + str(i))
    
    coefset = pd.DataFrame(coefset, columns = columns)
    coefset['true_cluster'] = index
    if D is None:
        sns.pairplot(coefset.drop(columns = ['a' + str(alpha_lenght - 1)]), hue = 'true_cluster')
        # sns.pairplot(coefset, hue = 'true_cluster')
    else:
        to_drop = []
        for i in range(len(D)):
            temp = np.count_nonzero(D[i] != 0)
            if (((D[i][-1] != 0) and (temp == 1)) or (temp == 0)):
                if i % 2 == 0:
                    to_drop.append('a' + str(i // 2))
                else:
                    to_drop.append('b' + str(i // 2))
        sns.pairplot(coefset.drop(columns = to_drop), hue = 'true_cluster')           

In [None]:
def off_warns(t = 0):
    action = ["ignore", "always"]
    # action = ["once", "always"]
    warnings.filterwarnings(action[t], category=sg.BadCoefficients) # LinAlgWarning
    warnings.filterwarnings(action[t], category=sm.tools.sm_exceptions.ConvergenceWarning)
    warnings.filterwarnings(action[t], category=la.LinAlgWarning)

In [None]:
def plot_poles(poles, name = None):
    theta = np.linspace(0, 2*np.pi, 100)

    x1 = np.cos(theta)
    x2 = np.sin(theta)

    fig, ax = plt.subplots(figsize = (5,5))
    plt.xlabel('Re' + r'$\;z$', fontsize=12)
    plt.ylabel('Im' + r'$\;z$', fontsize=12)

    ax.plot(x1, x2)
    
    labeled = []
    for pole in poles:
        if pole[0] == 0:
            if pole[0] in labeled:
                plt.plot(pole[1][0], pole[1][1], 'rx')
            else:
                plt.plot(pole[1][0], pole[1][1], 'rx', label = 'Нули ' + r'$M^{(main)}$')
                labeled.append(pole[0])
        else:
            if pole[0] in labeled:
                plt.plot(pole[1][0], pole[1][1], 'gx')
            else:
                plt.plot(pole[1][0], pole[1][1], 'gx', label = 'Нули ' + r'$M^{(1)}$')
                labeled.append(pole[0])
            
    plt.plot(-0.7, 0, 'bx', label = 'Полюса ' + r'$M^{(main)}$' + ' и ' + r'$M^{(1)}$')
    plt.plot(-0.7, 0, 'bx')
    plt.plot(-0.7, 0, 'bx')
    plt.plot(-0.7, 0, 'bx')
    plt.plot(-0.7, 0, 'bx')
    
    ax.set_aspect(1)

    plt.xlim(-1.25,1.25)
    plt.ylim(-1.25,1.25)

    plt.grid(linestyle='--')

    # plt.title('Poles', fontsize=16)
    ax.legend(loc = 'upper right')

    if name is not None:
        fig.savefig('pictures/' + name + '.pdf')
    plt.show()
    
def plot_heatmaps(coefset, index = None, ident_method = 'a', name = None):    
    metrics = (cep_dist, wcep_dist, h_2_dist, h_2_mod_dist, h_inf_dist, h_inf_mod_dist, sp_rand_dist, sp_rand_mod_dist)
    metrics_name = {'wcep':r'$\mathbf{wcep}$',
                    'cep':r'$\mathbf{cep}$',
                    'h_2':r'$\mathbf{h}_2$',
                    'h_2_mod':r'$\mathbf{h}^*_2$',
                    'h_inf':r'$\mathbf{h}_{\infty}$',
                    'h_inf_mod':r'$\mathbf{h}^*_{\infty}$',
                    'sp_rand':r'$\mathbf{sp}$',
                    'sp_rand_mod':r'$\mathbf{sp}^*$'}
    if index is not None:
        temp = []
        for i in np.argsort(index):
            temp.append(coefset[0][i])
        coefset[0] = temp
    # fig, axes = plt.subplots(2, 3, figsize = (21,10))
    fig, axes = plt.subplots(4, 2, figsize = (10,15))
    # fig, axes = plt.subplots(4, 2)
    for metric in tqdm(metrics, desc = 'plot_mean_heatmaps: metrics', leave=False):
        if metric is h_2_mod_dist:
            dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[1])
        elif metric is h_inf_mod_dist:
            dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[2])
        elif metric is sp_rand_mod_dist:
            dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[3])
        else:
            dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method)
        ax = axes[metrics.index(metric) // 2][metrics.index(metric) % 2]
        ax.set_aspect('equal', 'box')
        ax.set_title(metrics_name[metric.__name__[:-5]], fontsize=15, loc='center')
        gx = sns.heatmap(dm, ax = ax)
    plt.subplots_adjust(wspace=0.05, hspace=0.4)
    if name is not None:
        fig.savefig('pictures/' + name + '.pdf', bbox_inches = 'tight') 
    plt.show()
        
def plot_mean_heatmaps(coefsets, indexes = None, ident_method = 'a', name = None):
    # metrics = (wcep_dist, wcep_cut_dist, cep_dist, h_2_dist, h_inf_dist, sp_rand_dist)
    metrics = (cep_dist, wcep_dist, h_2_dist, h_2_mod_dist, h_inf_dist, h_inf_mod_dist, sp_rand_dist, sp_rand_mod_dist)
    metrics_name = {'wcep':r'$\mathbf{wcep}$',
                    'cep':r'$\mathbf{cep}$',
                    'h_2':r'$\mathbf{h}_2$',
                    'h_2_mod':r'$\mathbf{h}^*_2$',
                    'h_inf':r'$\mathbf{h}_{\infty}$',
                    'h_inf_mod':r'$\mathbf{h}^*_{\infty}$',
                    'sp_rand':r'$\mathbf{sp}$',
                    'sp_rand_mod':r'$\mathbf{sp}^*$'}
    if indexes is not None:
        j = 0
        for coefset in coefsets:
            temp = []
            for i in np.argsort(indexes[j]):
                temp.append(coefset[0][i])
            coefset[0] = temp
            coefsets[j] = coefset
            j += 1
    # fig, axes = plt.subplots(2, 3, figsize = (21,10))
    fig, axes = plt.subplots(4, 2, figsize = (10,15))
    # fig, axes = plt.subplots(4, 2)
    for metric in tqdm(metrics, desc = 'plot_mean_heatmaps: metrics', leave=False):
        mean_dm = np.zeros((len(coefsets[0][0]),len(coefsets[0][0])))
        for coefset in tqdm(coefsets, desc = 'plot_mean_heatmaps: ' + str(metric.__name__[:-5]) + ' iterations', leave=False):
            if metric is h_2_mod_dist:
                dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[1])
            elif metric is h_inf_mod_dist:
                dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[2])
            elif metric is sp_rand_mod_dist:
                dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[3])
            else:
                dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method)
            mean_dm += dm
        mean_dm = mean_dm / len(coefsets)
        ax = axes[metrics.index(metric) // 2][metrics.index(metric) % 2]
        # ax.axis('equal')
        ax.set_aspect('equal', 'box')
        ax.set_title(metrics_name[metric.__name__[:-5]], fontsize=15, loc='center')
        gx = sns.heatmap(mean_dm, ax = ax)
    plt.subplots_adjust(wspace=0.05, hspace=0.4)
    if name is not None:
        fig.savefig('pictures/' + name + '.pdf', bbox_inches = 'tight') 
    plt.show()
    
    # metrics = (wcep_dist, wcep_cut_dist, cep_dist, h_2_dist, h_inf_dist, sp_rand_dist)
    # metrics = (h_2_mod_dist, h_inf_mod_dist, sp_rand_mod_dist)
    # metrics_name = {'wcep':r'$\mathbf{wcep}$',
    #                 'cep':r'$\mathbf{cep}$',
    #                 'h_2':r'$\mathbf{h}_2$',
    #                 'h_2_mod':r'$\mathbf{h}^*_2$',
    #                 'h_inf':r'$\mathbf{h}_{\infty}$',
    #                 'h_inf_mod':r'$\mathbf{h}^*_{\infty}$',
    #                 'sp_rand':r'$\mathbf{sp}$',
    #                 'sp_rand_mod':r'$\mathbf{sp}^*$'}
    # if indexes is not None:
    #     j = 0
    #     for coefset in coefsets:
    #         temp = []
    #         for i in np.argsort(indexes[j]):
    #             temp.append(coefset[0][i])
    #         coefset[0] = temp
    #         coefsets[j] = coefset
    #         j += 1
    # fig, axes = plt.subplots(1, 3, figsize = (15,3))
    # # fig, axes = plt.subplots(4, 2)
    # for metric in tqdm(metrics, desc = 'plot_mean_heatmaps: metrics', leave=False):
    #     mean_dm = np.zeros((len(coefsets[0][0]),len(coefsets[0][0])))
    #     for coefset in tqdm(coefsets, desc = 'plot_mean_heatmaps: ' + str(metric.__name__[:-5]) + ' iterations', leave=False):
    #         if metric is h_2_mod_dist:
    #             dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[1])
    #         elif metric is h_inf_mod_dist:
    #             dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[2])
    #         elif metric is sp_rand_mod_dist:
    #             dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method, q = coefset[3])
    #         else:
    #             dm = dist_matrix(coefset[0], metric = metric, ident_method = ident_method)
    #         mean_dm += dm
    #     mean_dm = mean_dm / len(coefsets)
    #     # ax = axes[metrics.index(metric) // 2][metrics.index(metric) % 2]
    #     ax = axes[metrics.index(metric)]
    #     # ax.axis('equal')
    #     ax.set_aspect('equal', 'box')
    #     ax.set_title(metrics_name[metric.__name__[:-5]], fontsize=15, loc='center')
    #     gx = sns.heatmap(mean_dm, ax = ax)
    # plt.subplots_adjust(wspace=0.1, hspace=0.4)
    # if name is not None:
    #     fig.savefig('pictures/' + name + '.pdf', bbox_inches = 'tight') 
    # plt.show()
    
def plot_heatmaps_old(dataset, index, ident_method = 'a', alpha_lenght = None, beta_lenght = None, control = None, D = None):
    metrics = (wcep_dist, wcep_cut_dist, cep_dist, h_2_dist, h_inf_dist, sp_rand_dist)
    coefset, ident_method = coef_estimations_of_dataset(dataset, ident_method = ident_method, alpha_lenght = alpha_lenght, beta_lenght = beta_lenght, control = control, D = D)
    temp = []
    for i in np.argsort(index):
        temp.append(coefset[i])
    coefset = temp
    fig, axes = plt.subplots(2, 3, figsize = (21,10))
    for metric in metrics:
        dm = dist_matrix(coefset, metric = metric, ident_method = ident_method)
        ax = axes[metrics.index(metric) // 3][metrics.index(metric) % 3]
        ax.set_title(metric.__name__[:-5], fontsize=16, loc='center')
        gx = sns.heatmap(dm, ax = ax)
    plt.show()

# Численные эксперименты

#### Эксперимент 1

In [None]:
# Входные данные для создания выборки
rng = np.random.default_rng(12345)
N = 250
sigma = 0.01

# Количество повторений для усредненого результата
rep = 10
grid = 20

# Массив полюсов второй модели
poles = [0.4, 0.45, 0.5, 0.55]

# Вспомогалеьные вещи для работоспособности кода
final_result = {'wcep':[],'wcep_cut':[],'cep':[],'h_2':[],'h_2_mod':[],'q_h_2_mod':[],'h_inf':[],'h_inf_mod':[],'q_h_inf_mod':[],'sp_rand':[],'sp_rand_mod':[],'q_sp_rand_mod':[],'coef':[]}
off_warns()

# Параметры для алгоритма идентификации
ident_method = 'AR'
args = {"ident_method" :  ident_method, "alpha_lenght" : 2}

# Цикл, в котором создется набор данных и находятся результаты их кластеризации
for pole in tqdm(poles, desc = 'poles', leave=False):
    coefsets = []
    indexes = []
    # Цикл для усреднения
    for j in tqdm(range(rep), desc = 'iterations', leave=False):
        params = []
        poles_for_plot = []
        for i in range(15):
            # Параметры первой модели
            c = 0.3
            c = rng.random() * 0.02 + c - 0.01
            poles_for_plot.append((0, (c, 0)))
            M1 = {
                'M' : 0,
                'count': 1,
                'alpha': (c, 1)
            }
            params.append(M1)

            # Параметры второй модели
            c = pole
            c = rng.random() * 0.02 + c - 0.01
            poles_for_plot.append((1, (c, 0)))
            M2 = {
                'M' : 1,
                'count': 1,
                'alpha': (c, 1)
            }
            params.append(M2)

        dataset, index = get_dataset_new(N, *params, sigma = sigma, rng = rng)

        coefset, _  = coef_estimations_of_dataset(dataset, **args)
        result, pred_labels, coefset  = clustering_results(2, coefset, index, **args, dataset = dataset)
        for key in result:
            final_result[key].append(result[key][1])
        
        # выбор q1 с помощью максимизации индекса Силуэт
        temp = {'h_2_mod':[-1,-1,-1],'h_inf_mod':[-1,-1,-1],'sp_rand_mod':[-1,-1,-1]}
        q1, q2 = coefset_q_border(coefset, n = 2, ident_method = ident_method)
        for i in tqdm(range(grid), desc = 'q_coef', leave=False):
            q_coef = i / grid + 0.05
            result, pred_labels, coefset  = clustering_results_for_mod(2, coefset, index, **args, q = q1 * q_coef, n = 2)
            # print(result)
            for key in temp:
                if result[key][0] > temp[key][0]:
                    temp[key][0] = result[key][0]
                    temp[key][1] = result[key][1]
                    temp[key][2] = q1 * q_coef
        for key in temp:
            final_result[key].append(temp[key][1])
            final_result['q_'+key].append(temp[key][2])
        
        coefsets.append([coefset, temp['h_2_mod'][2], temp['h_inf_mod'][2], temp['sp_rand_mod'][2]])
        indexes.append(index)
            
    # Визуализация набора данных
    plot_dataset(dataset, index)
    
    # Визуализация результатов кластеризации для первого из полюсов второй модели
    pairplot_of_coefset(coefset, index, **args)
    plot_mean_heatmaps(coefsets, indexes = indexes, ident_method = ident_method)

# Вывод таблицы с результатами кластеризации
temp1 = []
temp2 = []
for pole in poles:
    temp1.append(pole * np.ones(rep))
    temp2.append(np.arange(rep))
arrays = [np.concatenate(temp1), np.concatenate(temp2)]

res = pd.DataFrame(final_result, index = arrays)
res.index.names = ["poles", "iterations"]

for pole in poles:
    res.loc[(pole, 'min'), :] = res.loc[pole].min()
    res.loc[(pole, 'mean'), :] = res.loc[pole].mean()
    res.loc[(pole, 'max'), :] = res.loc[pole].max()

display(res.loc(axis=0)[:,['max','mean','min']].sort_index())

#### Эксперимент 2

In [None]:
# Входные данные для создания выборки
rng = np.random.default_rng(12345)
N = 250
sigma = 0.01

# Количество повторений для усредненого результата
rep = 10
grid = 20

# Массив полюсов второй модели
zeros = [0.4, 0.45, 0.5, 0.55]

# Вспомогалеьные вещи для работоспособности кода
final_result = {'wcep':[],'wcep_cut':[],'cep':[],'h_2':[],'h_2_mod':[],'q_h_2_mod':[],'h_inf':[],'h_inf_mod':[],'q_h_inf_mod':[],'sp_rand':[],'sp_rand_mod':[],'q_sp_rand_mod':[],'coef':[]}
off_warns()

# Параметры для алгоритма идентификации
ident_method = 'ARIMA'
args = {"ident_method" :  ident_method, "alpha_lenght" : 2, "beta_lenght" : 2}

# Цикл, в котором создется набор данных и находятся результаты их кластеризации
for zero in tqdm(zeros, desc = 'zeros', leave=False):
    coefsets = []
    indexes = []
    # Цикл для усреднения
    for j in tqdm(range(rep), desc = 'iterations', leave=False):
        params = []
        poles_for_plot = []
        # controls = []
        for i in range(15):
            # Параметры первой модели
            c = 0.3
            c = rng.random() * 0.02 + c - 0.01
            poles_for_plot.append((0, (c, 0)))
            M1 = {
                'M' : 0,
                'count': 1,
                'alpha': (0.7, 1),
                'beta': (-c, 1)
            }
            params.append(M1)

            # Параметры второй модели
            c = zero
            c = rng.random() * 0.02 + c - 0.01
            poles_for_plot.append((1, (c, 0)))
            M2 = {
                'M' : 1,
                'count': 1,
                'alpha': (0.7, 1),
                'beta': (-c, 1)
            }
            params.append(M2)

        dataset, index = get_dataset_new(N, *params, sigma = sigma, rng = rng)

        coefset, _  = coef_estimations_of_dataset(dataset, **args)
        result, pred_labels, coefset  = clustering_results(2, coefset, index, **args, dataset = dataset)
        for key in result:
            final_result[key].append(result[key][1])
        
        # выбор q1 с помощью максимизации индекса Силуэт
        temp = {'h_2_mod':[-1,-1,-1],'h_inf_mod':[-1,-1,-1],'sp_rand_mod':[-1,-1,-1]}
        temp_q = 0
        q1, q2 = coefset_q_border(coefset, n = 2, ident_method = ident_method)
        for i in tqdm(range(grid), desc = 'q_coef', leave=False):
            q_coef = i / grid + 0.05
            result, pred_labels, coefset  = clustering_results_for_mod(2, coefset, index, **args, q = q1 * q_coef, n = 2)
            for key in temp:
                if result[key][0] > temp[key][0]:
                    temp[key][0] = result[key][0]
                    temp[key][1] = result[key][1]
                    temp[key][2] = q1 * q_coef
        for key in temp:
            final_result[key].append(temp[key][1])
            final_result['q_'+key].append(temp[key][2])
        
        coefsets.append([coefset, temp['h_2_mod'][2], temp['h_inf_mod'][2], temp['sp_rand_mod'][2]])
        indexes.append(index)
    
            
    # Визуализация набора данных
    plot_dataset(dataset, index)
    
    # Визуализация результатов кластеризации для первого из полюсов второй модели
    pairplot_of_coefset(coefset, index, **args)
    plot_mean_heatmaps(coefsets, indexes = indexes, ident_method = ident_method)

# Вывод таблицы с результатами кластеризации
temp1 = []
temp2 = []
for zero in zeros:
    temp1.append(zero * np.ones(rep))
    temp2.append(np.arange(rep))
arrays = [np.concatenate(temp1), np.concatenate(temp2)]

res = pd.DataFrame(final_result, index = arrays)
res.index.names = ["zeros", "iterations"]

for zero in zeros:
    res.loc[(zero, 'min'), :] = res.loc[zero].min()
    res.loc[(zero, 'mean'), :] = res.loc[zero].mean()
    res.loc[(zero, 'max'), :] = res.loc[zero].max()

# display(res.sort_index())
display(res.loc(axis=0)[:,['max','mean','min']].sort_index())

#### Эксперимент 3

In [None]:
# Входные данные для создания выборки
rng = np.random.default_rng(12345)
N = 250
sigma = 0.01

# Количество повторений для усредненого результата
rep = 10
grid = 20

# Массив полюсов второй модели
zeros = [0.4, 0.45, 0.5, 0.55]
poles = [0.4, 0.45, 0.5, 0.55]

# Вспомогалеьные вещи для работоспособности кода
final_result_q1 = {'h_2_mod':[],'q_h_2_mod':[],'h_inf_mod':[],'q_h_inf_mod':[],'sp_rand_mod':[],'q_sp_rand_mod':[]}
final_result_q2 = {'h_2_mod':[],'q_h_2_mod':[],'h_inf_mod':[],'q_h_inf_mod':[],'sp_rand_mod':[],'q_sp_rand_mod':[]}
off_warns()

# Параметры для алгоритма идентификации
ident_method = 'ARIMA'
args = {"ident_method" :  ident_method, "alpha_lenght" : 2, "beta_lenght" : 2}

# Цикл, в котором создется набор данных и находятся результаты их кластеризации
for zero in tqdm(zeros, desc = 'zeros', leave=False):
    # Цикл для усреднения
    coefsets_q1 = []
    coefsets_q2 = []
    indexes = []
    for j in tqdm(range(rep), desc = 'iterations', leave=False):
        params = []
        for i in range(15):
            # Параметры первой модели
            c = 0.3
            c = rng.random() * 0.02 + c - 0.01
            M1 = {
                'M' : 0,
                'count': 1,
                'alpha': (0.7, 1),
                'beta': (-c, 1)
            }
            params.append(M1)

            # Параметры второй модели
            c = zero
            c = rng.random() * 0.02 + c - 0.01
            M2 = {
                'M' : 1,
                'count': 1,
                'alpha': (0.7, 1),
                'beta': (-c, 1)
            }
            params.append(M2)

        dataset, index = get_dataset_new(N, *params, sigma = sigma, rng = rng)

        coefset, _  = coef_estimations_of_dataset(dataset, **args)
        
        q1, q2 = coefset_q_border(coefset, n = 2, ident_method = ident_method)
        
        temp = {'h_2_mod':[-1,-1,-1],'h_inf_mod':[-1,-1,-1],'sp_rand_mod':[-1,-1,-1]}
        for i in tqdm(range(grid), desc = 'q_coef', leave=False):
            q_coef = i / grid + 0.01
            result, pred_labels, coefset  = clustering_results_for_mod(2, coefset, index, **args, q = q1 * q_coef, n = 2)
            for key in temp:
                if result[key][0] > temp[key][0]:
                    temp[key][0] = result[key][0]
                    temp[key][1] = result[key][1]
                    temp[key][2] = q1 * q_coef
        for key in temp:
            final_result_q1[key].append(temp[key][1])
            final_result_q1['q_'+key].append(temp[key][2])
        coefsets_q1.append([coefset, temp['h_2_mod'][2], temp['h_inf_mod'][2], temp['sp_rand_mod'][2]])
            
        temp = {'h_2_mod':[-1,-1,-1],'h_inf_mod':[-1,-1,-1],'sp_rand_mod':[-1,-1,-1]}
        for i in tqdm(range(grid), desc = 'q_coef', leave=False):
            q_coef = i / grid + 1.05
            result, pred_labels, coefset  = clustering_results_for_mod(2, coefset, index, **args, q = q2 * q_coef, n = 2)
            for key in temp:
                if result[key][0] > temp[key][0]:
                    temp[key][0] = result[key][0]
                    temp[key][1] = result[key][1]
                    temp[key][2] = q2 * q_coef
        for key in temp:
            final_result_q2[key].append(temp[key][1])
            final_result_q2['q_'+key].append(temp[key][2])
            
        coefsets_q2.append([coefset, temp['h_2_mod'][2], temp['h_inf_mod'][2], temp['sp_rand_mod'][2]])
        indexes.append(index)
        
    plot_mean_heatmaps(coefsets_q1, indexes = indexes, ident_method = ident_method)
    plot_mean_heatmaps(coefsets_q2, indexes = indexes, ident_method = ident_method)
            
for pole in tqdm(poles, desc = 'poles', leave=False):
    # Цикл для усреднения
    coefsets_q1 = []
    coefsets_q2 = []
    indexes = []
    for j in tqdm(range(rep), desc = 'iterations', leave=False):
        params = []
        # controls = []
        for i in range(15):
            # Параметры первой модели
            c = 0.3
            c = rng.random() * 0.02 + c - 0.01
            M1 = {
                'M' : 0,
                'count': 1,
                'alpha': (-c, 1),
                'beta': (0.7, 1)
            }
            params.append(M1)

            # Параметры второй модели
            c = pole
            c = rng.random() * 0.02 + c - 0.01
            M2 = {
                'M' : 1,
                'count': 1,
                'alpha': (-c, 1),
                'beta': (0.7, 1)
            }
            params.append(M2)

        dataset, index = get_dataset_new(N, *params, sigma = sigma, rng = rng)

        coefset, _  = coef_estimations_of_dataset(dataset, **args)
        
        q1, q2 = coefset_q_border(coefset, n = 2, ident_method = ident_method)
        
        temp = {'h_2_mod':[-1,-1,-1],'h_inf_mod':[-1,-1,-1],'sp_rand_mod':[-1,-1,-1]}
        for i in tqdm(range(grid), desc = 'q_coef', leave=False):
            q_coef = i / grid + 0.01
            result, pred_labels, coefset  = clustering_results_for_mod(2, coefset, index, **args, q = q1 * q_coef, n = 2)
            for key in temp:
                if result[key][0] > temp[key][0]:
                    temp[key][0] = result[key][0]
                    temp[key][1] = result[key][1]
                    temp[key][2] = q1 * q_coef
        for key in temp:
            final_result_q1[key].append(temp[key][1])
            final_result_q1['q_'+key].append(temp[key][2])
            
        coefsets_q1.append([coefset, temp['h_2_mod'][2], temp['h_inf_mod'][2], temp['sp_rand_mod'][2]])
            
        temp = {'h_2_mod':[-1,-1,-1],'h_inf_mod':[-1,-1,-1],'sp_rand_mod':[-1,-1,-1]}
        temp_q = 0
        for i in tqdm(range(grid), desc = 'q_coef', leave=False):
            q_coef = i / grid + 1.05
            result, pred_labels, coefset  = clustering_results_for_mod(2, coefset, index, **args, q = q2 * q_coef, n = 2)
            for key in temp:
                if result[key][0] > temp[key][0]:
                    temp[key][0] = result[key][0]
                    temp[key][1] = result[key][1]
                    temp[key][2] = q2 * q_coef
        for key in temp:
            final_result_q2[key].append(temp[key][1])
            final_result_q2['q_'+key].append(temp[key][2])
        
        coefsets_q2.append([coefset, temp['h_2_mod'][2], temp['h_inf_mod'][2], temp['sp_rand_mod'][2]])
        indexes.append(index)
        
    plot_mean_heatmaps(coefsets_q1, indexes = indexes, ident_method = ident_method)
    plot_mean_heatmaps(coefsets_q2, indexes = indexes, ident_method = ident_method)
        
# Вывод таблицы с результатами кластеризации
temp1 = []
temp2 = []
for zero in zeros:
    for i in range(rep):
        temp1.append((zero, -0.7))
    temp2.append(np.arange(rep))
for pole in poles:
    for i in range(rep):
        temp1.append((-0.7, pole))
    temp2.append(np.arange(rep))
arrays = [temp1, np.concatenate(temp2)]

res_q1 = pd.DataFrame(final_result_q1, index = arrays)
res_q1.index.names = ["zero_pole", "iterations"]

for zero in zeros:
    res_q1.loc[((zero, -0.7), 'min'), :] = res_q1.loc[(zero, -0.7)].min()
    res_q1.loc[((zero, -0.7), 'mean'), :] = res_q1.loc[(zero, -0.7)].mean()
    res_q1.loc[((zero, -0.7), 'max'), :] = res_q1.loc[(zero, -0.7)].max()
for pole in poles:
    res_q1.loc[((-0.7, pole), 'min'), :] = res_q1.loc[(-0.7, pole)].min()
    res_q1.loc[((-0.7, pole), 'mean'), :] = res_q1.loc[(-0.7, pole)].mean()
    res_q1.loc[((-0.7, pole), 'max'), :] = res_q1.loc[(-0.7, pole)].max()

res_q2 = pd.DataFrame(final_result_q2, index = arrays)
res_q2.index.names = ["zero_pole", "iterations"]

for zero in zeros:
    res_q2.loc[((zero, -0.7), 'min'), :] = res_q2.loc[(zero, -0.7)].min()
    res_q2.loc[((zero, -0.7), 'mean'), :] = res_q2.loc[(zero, -0.7)].mean()
    res_q2.loc[((zero, -0.7), 'max'), :] = res_q2.loc[(zero, -0.7)].max()
for pole in poles:
    res_q2.loc[((-0.7, pole), 'min'), :] = res_q2.loc[(-0.7, pole)].min()
    res_q2.loc[((-0.7, pole), 'mean'), :] = res_q2.loc[(-0.7, pole)].mean()
    res_q2.loc[((-0.7, pole), 'max'), :] = res_q2.loc[(-0.7, pole)].max()


display(res_q1.loc(axis=0)[:,['max','mean','min']].sort_index())
display(res_q2.loc(axis=0)[:,['max','mean','min']].sort_index())

#### Эксперимент 4

$\alpha^{(1)}(z) = z^2 + \alpha_1^{(1)} z + \alpha_0^{(1)}$ имеет два сопряженных корня $z_1^{(1)}$ и $z_2^{(1)}$, таких что $$|z_1^{(1)}| = |z_2^{(1)}| = r \in U[0.96 \pm 0.01]$$ и $$arg(z_1^{(1)}) = -arg(z_2^{(1)}) = \varphi \in U[150 \pm 2],$$ 

$\alpha^{(2)}(z) = z^4 + \alpha_3^{(2)} z^3 + \alpha_2^{(2)} z^2 + \alpha_1^{(2)} z + \alpha_0^{(2)}$ имеет четыре корня $z_1^{(2)}$, $z_2^{(2)}$, $z_3^{(2)}$ и $z_4^{(2)}$, таких что $$|z_1^{(2)}| = |z_2^{(2)}| = r \in U[0.96 \pm 0.01], \quad arg(z_1^{(2)}) = -arg(z_2^{(2)}) = \varphi \in U[150 \pm 2]$$
и
$$|z_3^{(2)}| = |z_4^{(2)}| = r \in U[0.8 \pm 0.01], \quad arg(z_3^{(2)}) = -arg(z_4^{(2)}) = \varphi \in U[109 \pm 2]$$

In [None]:
# Входные данные для создания выборки
rng = np.random.default_rng(12345)
N = 250
sigma = 0.01

# Количество повторений для усредненого результата
rep = 10
grid = 20

# Вспомогалеьные вещи для работоспособности кода
final_result = {'wcep':[],'wcep_cut':[],'cep':[],'h_2':[],'h_2_mod':[],'q_h_2_mod':[],'h_inf':[],'h_inf_mod':[],'q_h_inf_mod':[],'sp_rand':[],'sp_rand_mod':[],'q_sp_rand_mod':[],'coef':[]}

off_warns(0)

# Параметры для алгоритма идентификации
ident_method = 'AR'
args = {"ident_method" :  ident_method, "alpha_lenght" : 5}

coefsets = []
indexes = []
# Цикл, в котором создется набор данных и находятся результаты их кластеризации
for j in tqdm(range(rep), desc = 'iterations', leave=False):
    poles_for_plot = []
    params = []
    controls = []
    for i in range(15):
        # Параметры первой модели
        r = 0.96
        r = rng.random() * 0.02 + r - 0.01
        phi = 150
        phi = rng.random() * 4 + phi - 2
        poles_for_plot.append((0, (r * math.cos(math.radians(phi)), r * math.sin(math.radians(phi)))))
        poles_for_plot.append((0, (r * math.cos(math.radians(-phi)), r * math.sin(math.radians(-phi)))))
        M1 = {
            'M' : 0,
            'count': 1,
            'alpha': (r*r, -2 * r * math.cos(math.radians(phi)), 1)
            # 'beta': (0, 0, 1)
        }
        params.append(M1)
        # controls.append(control)
        
        # Параметры второй модели
        r1 = 0.96
        r1 = rng.random() * 0.02 + r1 - 0.01
        phi1 = 150
        phi1 = rng.random() * 4 + phi1 - 2
        root11 = r1 * (math.cos(math.radians(phi1)) + math.sin(math.radians(phi1))*1j)
        root12 = root11.conjugate()
        r2 = 0.8
        r2 = rng.random() * 0.02 + r2 - 0.01
        phi2 = 109
        phi2 = rng.random() * 4 + phi2 - 2
        root21 = r2 * (math.cos(math.radians(phi2)) + math.sin(math.radians(phi2))*1j)
        root22 = root21.conjugate()
        # print(np.polynomial.polynomial.polyfromroots((root, root.conjugate())).real)

        poles_for_plot.append((1, (root11.real, root11.imag)))
        poles_for_plot.append((1, (root12.real, root12.imag)))
        poles_for_plot.append((1, (root21.real, root21.imag)))
        poles_for_plot.append((1, (root22.real, root22.imag)))
        M2 = {
            'M' : 1,
            'count': 1,
            'alpha': np.polynomial.polynomial.polyfromroots((root11, root12, root21, root22)).real
            # 'beta': (0, 0, 1)
        }
        params.append(M2)

    dataset, index = get_dataset_new(N, *params, sigma = sigma, rng = rng)

    coefset, _  = coef_estimations_of_dataset(dataset, **args)
    result, pred_labels, coefset  = clustering_results(2, coefset, index, **args, dataset = dataset)
    for key in result:
        final_result[key].append(result[key][1])

    # выбор q1 с помощью максимизации индекса Силуэт
    temp = {'h_2_mod':[-1,-1,-1],'h_inf_mod':[-1,-1,-1],'sp_rand_mod':[-1,-1,-1]}
    q1, q2 = coefset_q_border(coefset, n = 2, ident_method = ident_method)
    for i in tqdm(range(grid), desc = 'q_coef', leave=False):
        q_coef = i / grid + 0.05
        result, pred_labels, coefset  = clustering_results_for_mod(2, coefset, index, **args, q = q1 * q_coef, n = 2)
        # print(result)
        for key in temp:
            if result[key][0] > temp[key][0]:
                temp[key][0] = result[key][0]
                temp[key][1] = result[key][1]
                temp[key][2] = q1 * q_coef
    for key in temp:
        final_result[key].append(temp[key][1])
        final_result['q_'+key].append(temp[key][2])
    
    coefsets.append([coefset, temp['h_2_mod'][2], temp['h_inf_mod'][2], temp['sp_rand_mod'][2]])
    indexes.append(index)

# Визуализация набора данных
plot_dataset(dataset, index)

# Визуализация результатов кластеризации
pairplot_of_coefset(coefset, index, **args)
plot_mean_heatmaps(coefsets, indexes = indexes, ident_method = ident_method)

# Вывод таблицы с результатами кластеризации
res = pd.DataFrame(final_result)
for j in range(rep):
    res.loc['min', :] = res.min()
    res.loc['mean', :] = res.mean()
    res.loc['max', :] = res.max()

# display(res)
display(res.loc[['max','mean','min']])

# Работа с реальными данными

##### Выполнение кластеризации

In [None]:
time = datetime.datetime.now()

grid = 10

off_warns()

dataset_names = ['ECG200', 'Trace', 'Coffee', 'Car','Lightning7','Lightning2', 'Meat']
best_orders = {'ECG200':(5,0), 'Trace':(5,0), 'Plane':(5,0), 'Car':(5,0), 'Coffee':(5,0), 'Lightning7':(5,0), 'Lightning2':(5,0),'Meat':(5,0)}

for dataset_name in tqdm(dataset_names, desc = 'datasets'):
    final_result = {'wcep':[],'wcep_cut':[],'cep':[],'h_2':[],'h_2_mod':[],'h_inf':[],'h_inf_mod':[],'sp_rand':[],'sp_rand_mod':[],'coef':[]}
    ds_loader = UCR_UEA_datasets()
    list_of_ds = ds_loader.list_univariate_datasets()
    if dataset_name in list_of_ds:
        data_train, index_train, data_test, index_test = ds_loader.load_dataset(dataset_name)
        dataset = np.concatenate((data_train, data_test), axis = 0)
        dataset = dataset.reshape((dataset.shape[0], dataset.shape[1]))
        index = np.concatenate((index_train, index_test), axis = 0)
    else:
        continue
    
    index_for_df = []
    best_order = best_orders[dataset_name]
    
    if best_order[1] != 0:
        ident_method = 'ARIMA'
        args = {"ident_method" :  ident_method, "alpha_lenght" : best_order[0] + 1, "beta_lenght" : best_order[1] + 1}
    else:
        ident_method = 'AR'
        args = {"ident_method" :  ident_method, "alpha_lenght" : best_order[0] + 1, "beta_lenght" : 0}
    n_clust = len(set(index))

    coefset, _  = coef_estimations_of_dataset(dataset, **args)

    result, pred_labels, coefset  = clustering_results(n_clust, coefset, index, **args, dataset = dataset)
    for key in result:
        final_result[key].append(result[key][0])
        final_result[key].append(result[key][1])

    # выбор q1 с помощью максимизации индекса Силуэт
    q1, q2 = coefset_q_border(coefset, n = 2, ident_method = ident_method)
    
    grid = 20
    temp = {'h_2_mod':[-1,-1,-1],'h_inf_mod':[-1,-1,-1],'sp_rand_mod':[-1,-1,-1]}
    for i in tqdm(range(grid), desc = 'q_coef', leave=False):
        if abs(q1) > pow(10,-6):
            q_coef = i / grid + 0.01
            result, pred_labels, coefset  = clustering_results_for_mod(n_clust, coefset, index, **args, q = q1 * q_coef, n = 2)
            for key in temp:
                if result[key][0] > temp[key][0]:
                    temp[key][0] = result[key][0]
                    temp[key][1] = result[key][1]
                    temp[key][2] = q1 * q_coef
    for key in temp:
        final_result[key].append(temp[key][0])
        final_result[key].append(temp[key][1])
    index_for_df.append(('silhouette_score', 'b_o: ('+str(best_order[0])+', '+str(best_order[1])+')')),
    index_for_df.append(("adjusted_rand_score", 'b_o: ('+str(best_order[0])+', '+str(best_order[1])+')')),

    # Визуализация результатов кластеризации
    # print(dataset_name)
    # pairplot_of_coefset(coefset, index, **args)
    # plot_heatmaps([coefset, temp['h_2_mod'][2], temp['h_inf_mod'][2], temp['sp_rand_mod'][2]], index = index, ident_method = ident_method)

    # Вывод таблицы с результатами кластеризации
    index_for_df = np.array(index_for_df)
    ind = pd.MultiIndex.from_arrays(index_for_df.T)
    res = pd.DataFrame(final_result, index = ind)
    res.index.names = ["clust_index", "order"]
    display(res.sort_index())