# 1. import

In [None]:
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]="1"

os.environ['CUDA_HOME']='/home/j-j11a210/.conda/envs/voice_strength'
os.environ['LD_LIBRARY_PATH']='/home/j-j11a210/.conda/envs/voice_strength/lib'


# 음성 데이터 처리
import librosa
import numpy as np
import pandas as pd

# 데이터 시각화
from matplotlib import pyplot as plt

# 모델 관련
import sklearn
from sklearn import preprocessing # AttributeError: module 'sklearn' has no attribute 'preprocessing'
import tensorflow as tf
# import keras
import joblib

import os
# from datetime import datetime

# 2. GPU 인식되는지 확인

In [None]:
from tensorflow.python.client import device_lib

print(device_lib.list_local_devices())
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

# 3. 이것저것 필요한 변수 선언

In [None]:
# sample rate at loaded
SAMPLE_RATE = 16000

# mfcc parameter
N_FFT = 400
HOP_LENGTH = 160
N_MELS = 64

# slicing window parameter
# 입력에 대해 일정 단위로 잘라서 모델에 넣는다.
WINDOW_SECOND = 30
HOP_SECOND = 0

# result data
mfcc_list = []
feature_list = []

model = None

# 4. 데이터 가공 관련 메소드


In [None]:
def load_audio(file_path):
    # 오디오 파일 로드
    assert os.path.isfile(file_path), "Wrong path to audio file"
    amplitude, sr = librosa.load(file_path, sr=SAMPLE_RATE)


    # WINDOW_SECOND가 10s이므로 파일이 10초보다 짧거나 HOP_SECOND의 배수가 아닌 경우 패딩을 추가합니다.
    window_frame = WINDOW_SECOND * SAMPLE_RATE

    if len(amplitude) % window_frame != 0:
      added_frame = window_frame - (len(amplitude) % window_frame)
      amplitude = librosa.util.fix_length(data=amplitude, size=len(amplitude) + added_frame)


    # input audio의 길이가 WINDOW_SECOND 보다 짧은 경우 (480000,)의 1차원 배열
    # print("amplitude shape: " + str(amplitude.shape))

    return amplitude

In [None]:
def extract_mfcc(frame):
    print("frame shape: " + str(frame.shape))
    mfcc = librosa.feature.mfcc(y=frame, sr=SAMPLE_RATE, n_fft=N_FFT, hop_length=HOP_LENGTH, n_mels=N_MELS)

    # (n_mfcc, time_steps)를 전치시켜 시간에 따른 특성을 학습하도록 한다.
    mfcc = mfcc.T
    print("mfcc shape: " + str(mfcc.shape)) # (3001, 20)

    # Z-score로 정규화 후 리턴 (keras의 예제에서도 이 방식을 선택)
    return sklearn.preprocessing.minmax_scale(mfcc, axis=1)

In [None]:
# 슬라이딩 윈도우를 적용해 오디오를 일정 크기의 프레임 단위로 나누는 함수
def audio_sliding_window(file_path, window_second=WINDOW_SECOND, hop_second=HOP_SECOND):
    amplitude = load_audio(file_path)

    # window_second(초)와 hop_second(초)를 샘플 단위로 변환
    window_samples = window_second * SAMPLE_RATE
    hop_samples = window_samples # 이전: hop_second * SAMPLE_RATE

    # 프레임 단위로 오디오를 슬라이딩 윈도우 기법으로 분할
    frame_list = librosa.util.frame(amplitude, frame_length=window_samples, hop_length=window_samples)
    # print("sliced frame shape: " + str(frame_list.shape))

    return frame_list.T

In [None]:
def create_train_set(directory, audio_file_list):
    feature_list = []
    
    # 디렉토리 내 모든 오디오 파일에 대해
    audio_file_count = len(audio_file_list)
    for index, audio_file in enumerate(audio_file_list):
      print(str(index) + "/" + str(audio_file_count) + ": " + audio_file + " pre-prossessing...")


      # 오디오를 WINDOW_SECOND초 단위로 자른다.
      audio_full_path = os.path.join(directory, audio_file)
      frame_list = audio_sliding_window(audio_full_path, WINDOW_SECOND, HOP_SECOND)

      for frame in frame_list:
        new_frame = extract_mfcc(frame)
        feature_list.append(new_frame)
        print("new_frame shape: " + str(new_frame.shape))

    return np.array(feature_list)

---

# Main

## 1) 학습셋 만들기

In [None]:
# 디렉토리에서 wav 파일 목록을 불러온다.
directory = r'/home/j-j11a210/EchoNote/AI/train_data'

audio_full_path_list = []

for (path, dir, files) in os.walk(directory):
    print(path)
    for filename in files:
        if filename.endswith('.wav'):
            audio_full_path_list.append(os.path.join(path, filename))

In [None]:
# 학습셋을 생성한다.
train_set = create_train_set(directory, audio_full_path_list)

In [None]:
train_set.shape

In [None]:
plt.figure(figsize=(16, 6))
librosa.display.specshow(train_set[0].T, sr=SAMPLE_RATE, x_axis='time') # 그릴 때는 전치
plt.title("Original Data")
plt.colorbar()
plt.show()

## 2) 모델 구축 및 학습

In [None]:
def create_autoencoder(input_shape):
    input_layer = tf.keras.layers.Input(shape=input_shape)
    encoded = tf.keras.layers.Dense(128, activation='relu')(input_layer)
    encoded = tf.keras.layers.Dense(64, activation='relu')(encoded)

    decoded = tf.keras.layers.Dense(128, activation='relu')(encoded)
    decoded = tf.keras.layers.Dense(input_shape[1], activation='sigmoid')(decoded)

    autoencoder = tf.keras.models.Model(input_layer, decoded)
    autoencoder.compile(optimizer='adam', loss='mae')

    return autoencoder

In [None]:
# 모델 생성 및 학습
model = create_autoencoder(input_shape=(train_set.shape[1], train_set.shape[2]))

In [None]:
model.summary()

In [None]:
train_history = model.fit(
    train_set,
    train_set,
    epochs=50,
    batch_size=32,
    validation_split=0.2,
    shuffle=True
)

## 3) 학습 결과 확인

In [None]:
plt.plot(train_history.history["loss"], label="Training Loss")
plt.plot(train_history.history["val_loss"], label="Validation Loss")
plt.legend()
plt.show()

## 4) 재구성 오류 계산을 통한 이상 현상 감지
오토인코더는 입력 데이터를 압축한 후 다시 복원하는 과정을 거친다. **입력 데이터(원본 데이터)**를 인코더로 압축하여 저차원 잠재 공간(z)으로 변환, 그 후, 디코더를 통해 잠재 공간(z)에서 복원된 데이터를 생성한다.

재구성 오류는 입력 데이터와 복원된 데이터 간의 차이를 나타낸다.
재구성 오류(Reconstruction Error)란, 원본 데이터와 오토인코더가 복원한 데이터 간의 차이를 나타내는 값으로, 오토인코더가 원본 데이터를 얼마나 잘 복원했는지를 나타낸다.

정상적인 데이터는 오토인코더가 잘 복원할 수 있어 재구성 오류가 작다. 오토인코더가 학습에 사용한 정상적인 패턴에 가까운 데이터를 입력하면 복원 과정에서 큰 손실 없이 재구성할 수 있다.

반면, 이상치 데이터는 복원이 잘 되지 않아 재구성 오류가 크다. 이상치는 학습 중에 경험하지 못한 패턴일 가능성이 높으므로, 오토인코더가 이를 잘 복원하지 못해 큰 오류가 발생한다.

---

### 그래프에 대한 설명
각 loss에 대한 데이터 수를 의미하는 것 같다. loss가 0.00에 가까운 sample이 대다수임을 볼 수 있다. 또한 이상 경계값인 0.04를 넘는 샘플이 거의 없다. 이런 이유로 해당 데이터에서 anomaly가 발견되지 않았다고 할 수 있다?

In [None]:
# mae loss를 계산해보자.

# 모델을 이용해 입력 데이터를 복원한다.
train_pred = model.predict(train_set)

In [None]:
train_pred.shape

In [None]:
# 모델이 복원한 결과와 실제 원본값의 차이를 계산한다.
# 차이가 클수록 오토인코더가 복원하는 데 어려움을 겪었다는 의미
# == 해당 데이터가 이상치일 가능성이 있다는 의미
# ↓↓↓↓↓↓ 생각보다 오래걸림
train_mae_loss = np.mean(np.abs(train_pred - train_set), axis=-1)

# ?
plt.hist(train_mae_loss[0], bins=50)
# plt.xlabel("Train mae loss")
# plt.ylabel("mae samples")
# plt.show()

In [None]:
train_mae_loss.shape

## 5) 재구성이 잘 되었는지 시각화하여 보기

In [None]:
'''
mfcc라서 그냥 matplotlib으로 바로 그리면 안 되고
librosa.display.specshow 메서드를 써서 그려야 한다.
keras의 예제는 mfcc를 쓰지 않았기에 그냥 matplotlib으로 그린 것.
'''

plt.figure(figsize=(16, 6))
librosa.display.specshow(train_set[100].T, sr=SAMPLE_RATE, x_axis='time', hop_length=HOP_LENGTH, n_fft=N_FFT)
plt.title('Original Data: {}'.format(audio_full_path_list[0]))
plt.colorbar()
plt.show()


plt.figure(figsize=(16, 6))
librosa.display.specshow(train_pred[100].T, sr=SAMPLE_RATE, x_axis='time', hop_length=HOP_LENGTH, n_fft=N_FFT)
plt.title("Reconstitution Data")
plt.colorbar()
plt.show()

## 6) 이상 탐지 임계값 설정
평균 재구성 오류에 3배의 표준편차를 더한 값을 임계값으로 사용한다?

통계적으로 정규 분포를 가정하면, 평균 ± 3표준편차 안에 약 99.7%의 데이터가 포함된다? 즉, 정상적인 범위는 평균 재구성 오류에서 3표준편차 내의 값이라고 볼 수 있으며, 그보다 큰 값들은 이상치로 간주할 수 있다?

In [None]:
train_mae_loss.shape

In [None]:
plt.plot(train_mae_loss[5])

In [None]:
plt.plot(sklearn.preprocessing.minmax_scale(train_mae_loss[5], axis=0))

In [None]:
train_mae_loss

In [None]:
## 일반적인 방식
# # Get reconstruction loss threshold.
# threshold = np.mean(train_mse_loss) + 3 * np.std(train_mse_loss)
# print("Reconstruction error threshold: ", threshold)

threshold_mean = np.mean(train_mae_loss, axis=-1) + 3 * np.std(train_mae_loss)
threshold_max = np.max(train_mae_loss)
print("Reconstruction error threshold_mean: ", threshold_mean)
print("Reconstruction error threshold_max: ", threshold_max)

In [None]:
## 각 차원에 대한 임계값 설정
# 임계값은 세로 방향으로 계산해야 한다.
thresholds = np.mean(train_mae_loss, axis=0) + 3 * np.std(train_mae_loss, axis=0)
print("Reconstruction error thresholds: ", thresholds)

In [None]:
# 각 time frame 마다 임계값이 생성되었음을 확인
thresholds.shape

## 7) 모델 저장

In [None]:
# @keras.saving.register_keras_serializable()
model.save(r'/home/j-j11a210/EchoNote/AI/audio_anomaly_detection_mae.keras')

---

In [None]:
# 실 데이터 불러오기
test_data = create_train_set(r'/home/j-j11a210/EchoNote/AI', ['hanseokwon44k.wav'])
test_data.shape

In [None]:
test_predict_list = model.predict(test_data, batch_size=32)

In [None]:
test_predict_list.shape

In [None]:
plt.figure(figsize=(16, 6))
librosa.display.specshow(test_data[0].T, sr=SAMPLE_RATE, x_axis='time', hop_length=HOP_LENGTH, n_fft=N_FFT)
plt.title('Original Data')
plt.colorbar()
plt.show()


plt.figure(figsize=(16, 6))
librosa.display.specshow(test_predict_list[0].T, sr=SAMPLE_RATE, x_axis='time', hop_length=HOP_LENGTH, n_fft=N_FFT)
plt.title("Reconstitution Data")
plt.colorbar()
plt.show()

In [None]:
mae_loss_list = np.mean(np.abs(test_data - test_predict_list), axis=-1) # (n, 3001)
print(mae_loss_list.shape)

In [None]:
plt.title("MAE loss")
plt.plot(sklearn.preprocessing.minmax_scale(mae_loss_list[0], axis=0))
# plt.axhline(y=threshold_max, color='r', linewidth=1)
# plt.axhline(y=threshold_mean, color='g', linewidth=1)
plt.show()

In [None]:
test_predict = model.predict(test_data)
test_mae_loss = np.mean(np.abs(test_predict - test_data), axis=-1)
test_threshold = np.mean(test_mae_loss) + 3 * np.std(test_mae_loss)

# librosa.display.specshow(, sr=SAMPLE_RATE, x_axis='time')
for time_frame in test_mae_loss:
    plt.plot(time_frame)
    plt.axhline(y=test_threshold, color='r', linestyle='--')
    plt.title('Reconstruction Error for New Audio')
    plt.show()

In [None]:
test_mae_loss

In [None]:
# 7. 이상치 탐지
print(test_mae_loss.shape)
print("loss: ", test_mae_loss)
anomalies_new = np.where(test_mae_loss > threshold)[0]
print("Detected anomalies at:", anomalies_new)

# 8. 이상치가 발생한 시점 출력
# 예를 들어 이상치가 발생한 프레임이 몇 초인지 계산
frame_duration = len(test_data) / len(test_mae_loss)  # 한 프레임이 차지하는 시간 비율 계산
anomalous_times = anomalies_new * frame_duration
print("Anomalies detected at (seconds):", anomalous_times)

In [None]:
# test_predict = model.predict(test_data)
# test_mae_loss = np.mean(np.power(test_predict - test_data, 2), axis=1)
# threshold = np.mean(test_mae_loss) + 3 * np.std(test_mae_loss)

# # librosa.display.specshow(, sr=SAMPLE_RATE, x_axis='time')
# for time_frame in test_mae_loss:
#     plt.plot(time_frame)
#     plt.axhline(y=threshold, color='r', linestyle='--')
#     plt.title('Reconstruction Error for New Audio')
#     plt.show()

# 타임스탬프 생성 (예: 1초 간격의 타임스탬프)
timestamps = pd.date_range(start='2024-10-01', periods=len(test_mae_loss), freq='S')

# 각 차원에 대한 이상치 탐지
anomalies = np.any(test_mae_loss > thresholds, axis=1)

# 이상치가 발생한 시각 추출
anomaly_timestamps = timestamps[anomalies]

# 결과 출력
print("Detected anomalies in test data:", anomalies)
print("Number of anomalies detected:", np.sum(anomalies))
print("Timestamps of detected anomalies:", anomaly_timestamps)

# 이상치 시각화
plt.figure(figsize=(12, 6))
for i in range(20):  # 13개의 MFCC 계수에 대한 시각화
    plt.subplot(4, 5, i + 1)  # 3x5 서브플롯
    plt.hist(test_mae_loss[:, i], bins=30, alpha=0.7, color='blue', edgecolor='black')
    plt.axvline(x=thresholds[i], color='red', linestyle='--', label='Threshold')
    plt.title(f'MFCC {i + 1}')
    plt.xlabel('Reconstruction Error')
    plt.ylabel('Frequency')
    plt.legend()
plt.tight_layout()
plt.show()

In [None]:
def sec_to_timestamp(sec):
    hour = sec // (60*60)
    sec %= 60*60

    minute = sec // 60
    sec %= 60

    return '%02d:%02d:%02d' % (hour, minute, sec)