<a href="https://colab.research.google.com/github/fathursidiq/CNNtampalibrary/blob/main/Unsupervised_BiLSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras import layers, models, optimizers, losses
import tensorflow as tf
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import hilbert
from scipy.signal import welch
import os
import glob

In [None]:
def load_data_and_target_from_folder(data_folder_path, target_folder_path, delimiter=';'):
    data_files = glob.glob(os.path.join(data_folder_path, '*.CSV'))
    data_files.sort()
    data_list = []
    target_list = []
    target2_list = []
    for data_file in data_files:
        file_name = os.path.splitext(os.path.basename(data_file))[0]
        target_file = os.path.join(target_folder_path, f'{file_name}.CSV')
        if os.path.exists(target_file):
            df_data = pd.read_csv(data_file, delimiter=delimiter)[1000:9000]
            df_target = pd.read_csv(target_file, delimiter=delimiter)[1000:9000]
            data_list.append(df_data.filter(like='data').values)
            target_list.append(df_target.filter(like='pwave').values)
            target2_list.append(df_target.filter(like='swave').values)
    data_array = np.concatenate(data_list)
    target_array = np.concatenate(target_list)
    target2_array = np.concatenate(target2_list)
    return data_array, target_array, target2_array
data_folder_path = '/content/drive/MyDrive/thesis/filtered'
target_folder_path = '/content/drive/MyDrive/thesis/label'
data_array, target_array, target2_array = load_data_and_target_from_folder(data_folder_path, target_folder_path)

In [None]:
class GaussLegendreQuadrature:
    def __init__(self, num_points=3):
        self.num_points = num_points
        self.points, self.weights = np.polynomial.legendre.leggauss(num_points)

    def integrate(self, function):
        result = np.sum(self.weights * function(self.points))
        return result

if __name__ == "__main__":
    def function_to_integrate(x):
        return x
    num_points = 4
    gauss_legendre = GaussLegendreQuadrature(num_points)
    x_vals = np.array(data_array)[100000:400000]
    cumulative_results = []
    cumulative_sum = 0
    for x in x_vals:
        cumulative_sum += gauss_legendre.integrate(lambda x_val: function_to_integrate(x_val + x))
        cumulative_results.append(cumulative_sum)

    analytic_signal = hilbert(cumulative_results)
    hilbert_gauss_legendre = np.abs(analytic_signal)
    analytic_s = hilbert(x_vals)
    hilbert_envelope = np.abs(analytic_s)
    plt.figure(figsize=(12, 8))
    plt.subplot(4, 1, 1)
    plt.plot(x_vals, label='Original Sine Wave')
    plt.title('Original Sine Wave Function')
    plt.xlabel('x')
    plt.ylabel('Amplitude')
    plt.legend()
    plt.subplot(4, 1, 2)
    plt.plot( np.array(cumulative_results), label='Cumulative Integration ', color='orange')
    plt.title('Cumulative Integration of Sine Wave using Gaussian-Legendre Quadrature')
    plt.xlabel('x')
    plt.ylabel('Cumulative Amplitude')
    plt.legend()
    plt.subplot(4, 1, 3)
    plt.plot( hilbert_gauss_legendre, label='Hilbert_Gauss Transform Analysis', color='green')
    plt.title('Hilbert Transform Analysis')
    plt.xlabel('Time')
    plt.ylabel('Amplitude')
    plt.legend()
    plt.subplot(4, 1, 4)
    plt.plot( hilbert_envelope, label='Hilbert Transform Analysis', color='green')
    plt.title('Hilbert Transform Analysis')
    plt.xlabel('Time')
    plt.ylabel('Amplitude')
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
data = np.array(hilbert_gauss_legendre)
scaler = MinMaxScaler(feature_range=(-1, 1))
data = scaler.fit_transform(data.reshape(-1, 1)).flatten()
input_signal = data.reshape(1, -1, 1)  # Shape: (1, len(data), 1)

def dice_loss(y_true, y_pred, smooth=1e-6):
    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    return 1 - (2. * intersection + smooth) / (tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth)


def create_model(input_shape, num_heads=4):
    inputs = layers.Input(shape=input_shape)
    # Encoder
    x = layers.Conv1D(32, 3, padding='same', activation='relu')(inputs)
    x = layers.MaxPooling1D(2)(x)
    x = layers.Conv1D(8, 3, padding='same', activation='relu')(x)
    x = layers.MaxPooling1D(2)(x)
    x = layers.Bidirectional(layers.LSTM(64, return_sequences=True))(x)
    # Multi-Head Attention
    attn_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=128)(x, x)
    # Decoder
    x = layers.Bidirectional(layers.LSTM(8, return_sequences=True))(attn_output)
    x = layers.Conv1DTranspose(8, 2, strides=2, padding='same', activation='relu')(x)
    x = layers.Conv1DTranspose(1, 2, strides=2, padding='same', activation='sigmoid')(x)
    # Output layer for binary classification
    x = layers.Dense(1, activation='sigmoid')(x)
    model = models.Model(inputs, x)
    model.compile(optimizer=optimizers.Adam(learning_rate=0.001), loss=dice_loss)
    return model


input_shape = (input_signal.shape[1], input_signal.shape[2])
model = create_model(input_shape)
model.summary()
target_signal = np.random.randint(0, 2, size=(1, input_signal.shape[1], 1)).astype(float)
history = model.fit(input_signal, target_signal, epochs=50, batch_size=1, verbose=1)
model.save('/content/drive/MyDrive/thesis/model_unsupervised/conv_bilstm_autoencoder_with_attention.h5')


In [None]:
model = models.load_model('conv_bilstm_autoencoder_with_attention.h5', custom_objects={'dice_loss': dice_loss})
predictions = model.predict(input_signal).squeeze()
binary_predictions = (predictions > 0.5).astype(int).flatten()
y_true = target_signal.flatten()
y_pred = binary_predictions

def plot_confusion_matrix(y_true, y_pred, classes):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

print("Classification Report:")
print(classification_report(y_true, y_pred, target_names=['Class 0', 'Class 1']))

plot_confusion_matrix(y_true, y_pred, classes=['Class 0', 'Class 1'])
