In [1]:
import random
import random
import math
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import scipy
from scipy.io.wavfile import write
from scipy.fft import fft, fftfreq
import itertools
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
# Матрица ошибок классификатора
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
sns.set_style('darkgrid')

In [2]:
from scipy.fft import fft, fftfreq
from scipy.fft import rfft, rfftfreq

In [3]:
import warnings
from tensorflow.keras.models import Sequential, Model,  Sequential, load_model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import concatenate, Input, multiply, Dense, Dropout, BatchNormalization, Flatten, Conv1D, Conv2D, LSTM, GlobalMaxPooling1D, MaxPooling1D, MaxPooling2D, RepeatVector, SpatialDropout1D, SimpleRNN, GRU, Bidirectional, Reshape
from tensorflow.keras.optimizers import Adam, RMSprop, SGD, Adadelta, Adagrad, Adamax, Nadam, Ftrl
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator, pad_sequences
from tensorflow.keras.utils import plot_model
warnings.filterwarnings('ignore')

In [4]:
import warnings
warnings.filterwarnings('ignore')

# Functions

In [16]:
# Random generators

def make_window_ma(lower = 10, upper = 100):
    """
    Генерация размера скользящего окна, диапазоны указываем в аргументах
    :param lower: int
    :param upper: int
    :return: int
    """
    return random.randint(lower, upper)

def make_observing_window(lower = 10, upper = 300):
    """
    Генерация окна наблюдения. То окно, которое будем использовать для обнаружения нестационарности.
    :param lower: int
    :param upper: int
    :return: int
    """
    return random.randint(lower, upper)

def make_new_window_part(vars = [0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5]):
    """
    Генерация размера скользящего окна, диапазоны указываем в аргументах
    :param lower: int
    :param upper: int
    :return: int
    """
    return random.choice(vars)

In [17]:
def pad_zeros(x, maxlen, how = 'pre'):
    x = np.array(x)
    lenx = x.shape[0]
    len0 = maxlen - lenx
    x0 = np.zeros(len0)
    if how == 'pre':
        x = np.concatenate([x0,x])
    elif how == 'after':
        x = np.concatenate([x,x0])
    return x

In [18]:
def get_corr_coef(data, back_steps): #  шаг смещения >= 0
    """
    Calculate correlation coefficient
    :param data: numpy 1D array
    :param back_steps:  autocorrelation shift
    :return: correlation coefficient, float
    """

    return np.corrcoef(data[back_steps:], data[:len(data) - back_steps])[0, 1]

def make_autocorrelation_dependence(x, steps_max_part_window = 0.5):
    """
    It takes a numpy array, and the maximum number of steps to calculate the autocorrelation function. The number of steps is expressed in terms of a fraction of the window (the length of the received signal).
    :param x: numpy 1D array
    :param steps_max_part_window: fraction of the array window, default 0.5
    :return: numpy array
    """
    N = int(x.shape[0] * steps_max_part_window)
    steps = range(0, N + 1)
    coefs = [get_corr_coef(x, i) for i in steps]

    return np.array(coefs)

In [19]:
def make_random_signal(length, ma):
    """
    The function creates a smooth signal obtained from noise.
    :param length: size of creating signal
    :param ma: value of moving average window
    :return: nunpy array 1D
    """

    x = np.random.normal(0,1,max(length*2, ma * 2))
    x = pd.Series(x).rolling(window = ma).mean().dropna().values
    abs_x = np.abs(x)
    x = x[:length]
    max_value =abs_x.max()
    x = x/max_value

    return x

In [20]:
def make_abs_spectr(signal, normalise = True):
    """
    Function receive numpy array signal, calculate abs FFT.
    If you want you may normalise it, as default
    :param signal: numpy array
    :param normalize: normalise [0,1]
    :return: numpy array FFT
    """
    signal_fft = np.abs(rfft(signal))
    if normalise:
        signal_fft = signal_fft / signal_fft.max()
    signal_fft = np.array(signal_fft)
    return signal_fft

In [14]:
def make_data_single_signal(N, lower_ma = 10, upper_ma = 100, lower_window = 10, upper_window = 300):
    """
    Создание обучающего датасета. Часть датасета является одним сигналом, часть - сшивка двух сигналов.
    :param N: Количество примеров сигналов
    :param maxlen:
    :return:
    """
    lower_mas = []
    upper_mas = []
    lower_windows = []
    upper_windows = []
    new_window_parts = []

    # First signal
    one_signal = []
    for i in range(int(N/2)):
        ma = make_window_ma(lower = lower_ma, upper = upper_ma)
        window = make_observing_window(lower = lower_window, upper = upper_window)
        signal = make_random_signal(window, ma)
        one_signal.append(signal.tolist())
    maxlen = max([len(i) for i in one_signal])
    one_signal = pad_sequences(one_signal, dtype='float', maxlen=maxlen)
    one_signal = np.array(one_signal)
    labels_0 = np.zeros(one_signal.shape[0])

    #Second signal
    two_signals = []
    for i in range(int(N/2)):
        ma1 = make_window_ma(lower = lower_ma, upper = upper_ma)
        window = make_observing_window(lower = lower_window, upper = upper_window)
        signal1 = make_random_signal(window, ma1)

        ma2 = make_window_ma(lower = lower_ma, upper = upper_ma)
        signal2 = make_random_signal(window, ma2)

        new_window_part = make_new_window_part()
        t2 = int(new_window_part * window)
        t1 = window - t2

        signal = np.concatenate([signal1[-t1:], signal2[-t2:]])
        two_signals.append(signal.tolist())

    maxlen = max([len(i) for i in one_signal])
    two_signals = pad_sequences(two_signals, dtype='float', maxlen=maxlen)
    y_two_signals = np.ones(two_signals.shape[0])
    y = np.concatenate([labels_0, y_two_signals])
    y = to_categorical(y)
    X = np.concatenate([one_signal, two_signals])
    print(X.shape)
    print(y.shape)
    return X,y, lower_mas, upper_mas, lower_windows, upper_windows

In [15]:
def compile_and_learn2(model, X_train,  y_train, X_test, y_test,  epochs, optimizer, loss, metrics, batch_size=16):

    # Компиляция модели
    model.compile(optimizer=optimizer,
                loss=loss,
                metrics=metrics)
    #model.summary()

    # Обучение модели
    history = model.fit(X_train,
                        y_train,
                        epochs=epochs,
                        batch_size=batch_size,
                        validation_data=(X_test, y_test),
                        verbose = 1)

    f, axes = plt.subplots(1, 2, sharex=False, sharey=False, figsize=(15,5))
    axes[0].plot(history.history['loss'], label='Ошибка на обучающем наборе')
    axes[0].plot(history.history['val_loss'], label='Ошибка на проверочном наборе')
    axes[0].set_xlabel('Эпоха')
    axes[0].set_ylabel('Ошибка')
    axes[0].legend()

    axes[1].plot(history.history['accuracy'], label='Точность на обучающем наборе')
    axes[1].plot(history.history['val_accuracy'], label='Точность на проверочном наборе')
    axes[1].set_xlabel('Эпоха')
    axes[1].set_ylabel('Точность')
    axes[1].legend()

    print('Тестовые данные')
    y_pred = np.argmax(model.predict(X_test), axis=1)
    print(classification_report(np.argmax(y_test, axis=1), y_pred))
    print('Тренировочные данные')
    y_pred = np.argmax(model.predict(X_train), axis=1)
    print(classification_report(np.argmax(y_train, axis=1), y_pred))
    return model

# Make data

In [73]:
X,y = make_data_single_signal(10000 )
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(20000, 300)
(20000, 2)
(14000, 300)
(14000, 2)
(6000, 300)
(6000, 2)


# LSTM 1

In [76]:
input = Input(X_train.shape[1])

x = Reshape((X_train.shape[1],1))(input)

x = LSTM(64, return_sequences=True)(x)
x = LSTM(64)(x)

x = Dense(64, activation='relu')(x)
x = Dropout(0.25)(x)
x = Dense(2, activation='softmax')(x)
model = Model(input,x)

model.summary()

Model: "model_10"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_18 (InputLayer)       [(None, 300)]             0         
                                                                 
 reshape_10 (Reshape)        (None, 300, 1)            0         
                                                                 
 lstm_27 (LSTM)              (None, 300, 64)           16896     
                                                                 
 lstm_28 (LSTM)              (None, 64)                33024     
                                                                 
 dense_20 (Dense)            (None, 64)                4160      
                                                                 
 dropout_10 (Dropout)        (None, 64)                0         
                                                                 
 dense_21 (Dense)            (None, 2)                 130

In [77]:
compile_and_learn2(model, X_train, y_train, X_test, y_test, epochs = 30, optimizer = Adam(lr=1e-3), loss = 'categorical_crossentropy', metrics = ['accuracy'], batch_size = 32)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30

KeyboardInterrupt: 