# 1. 라이브러리 설치

In [None]:
!pip install librosa matplotlib scikit-learn



# 2. import

In [None]:
from google.colab import drive
import librosa
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.ensemble import IsolationForest
import joblib
import os
from datetime import datetime

# 3. 구글 드라이브 연결

In [None]:
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


# 4. 전역변수 선언

In [None]:
directory = '/content/gdrive/MyDrive/sample_data'
audio_files = [f for f in os.listdir(directory) if f.endswith('.wav')]

# sample rate at loaded
sample_rate = 16000

target_length = 10

# mfcc parameter
n_fft = 400
hop_length = 160
n_mels = 64

# slicing window parameter
window_second = 10
hop_second = 5

# result data
mfcc_list = []
feature_list = []

model = None

---

# 5. 모델 학습 관련 코드

In [None]:
def load_audio(file_path):
    global sample_rate
    # 오디오 파일 로드
    assert os.path.isfile(file_path), "Wrong path to audio file"
    amplitude, sample_rate=librosa.load(file_path, sr=sample_rate)


    # window_second가 10s이므로 파일이 10초보다 짧거나 hop_second의 배수가 아닌 경우 패딩을 추가합니다.
    if len(amplitude) < window_second * sample_rate:
      amplitude = librosa.util.fix_length(data=amplitude, size=window_second * sample_rate)
    elif len(amplitude) % hop_second != 0:
      added_time = hop_second - (len(amplitude) % hop_second)
      amplitude = librosa.util.fix_length(data=amplitude, size=len(amplitude) + added_time)


    return amplitude

In [None]:
def extract_feature(frame):

    mfcc = librosa.feature.mfcc(y=frame, sr=sample_rate, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
    return np.mean(mfcc.T, axis=0)  # MFCC의 평균값을 특징으로 사용

In [None]:
# 슬라이딩 윈도우를 적용해 오디오를 프레임 단위로 나누는 함수
def audio_sliding_window(file_path, window_second=10, hop_second=5):
    amplitude = load_audio(file_path)

    # window_second(초)와 hop_second(초)를 샘플 단위로 변환
    window_samples = window_second * sample_rate
    hop_samples = hop_second * sample_rate

    # 프레임 단위로 오디오를 슬라이딩 윈도우 기법으로 분할
    frames = librosa.util.frame(amplitude, frame_length=window_samples, hop_length=hop_samples)

    mfcc_features = []
    for frame in frames.T:  # 각 프레임에서 MFCC 추출
        mfcc = extract_feature(frame)
        mfcc_features.append(mfcc)

    # 리스트를 NumPy 배열로 변환 후 리턴
    return np.array(mfcc_features)

In [None]:
def train_model():
    feature_list = []


    # 디렉토리 내 모든 오디오 파일에 대해
    for audio_file in audio_files:
      print(str(audio_file) + "prossessing...")

      full_path = os.path.join(directory, audio_file)
      features = audio_sliding_window(full_path, window_second, hop_second)
      feature_list.append(features)



    # 여러 오디오 파일의 MFCC 리스트를 하나로 합침
    feature_array = np.vstack(feature_list)


    # Isolation Forest 모델 학습
    print("train stated at " + str(datetime.now().timestamp()))

    model = IsolationForest()
    model.fit(feature_array)  # MFCC 데이터로 모델 학습

    print("train ended at " + str(datetime.now().timestamp()))


    # 모델 저장 및 리턴
    joblib.dump(model, 'audio_anomaly_detection.pkl')
    print('train and save ended')
    return model



---



# 6. 모델 예측 관련 코드

In [None]:
def load_model():
    global model
    model = joblib.load('audio_anomaly_detection.pkl')

In [None]:
def predict_anomaly(file_path):
    mfcc_features = audio_sliding_window(file_path)
    predictions = model.predict(mfcc_features)  # 각 프레임에 대해 예측

    # print(predictions)

    return predictions

In [None]:
def main():
    train_model()

    predictions = predict_anomaly()
    print(predictions)

    # # 이상치 탐지
    # features_df['anomaly'] = model.predict(features_df)

    # # 이상치 파일 출력
    # anomaly_files = [audio_files[i] for i in range(len(features_df)) if features_df['anomaly'][i] == -1]
    # print("Detected anomalies:", anomaly_files)


In [None]:
if __name__ == '__main__':
    train_model()

0018_G2A3E4S0C0_JBR_000966.wavprossessing...
0018_G2A3E4S0C0_JBR_000916.wavprossessing...
0018_G2A3E4S0C0_JBR_000978.wavprossessing...
0018_G2A3E4S0C0_JBR_000937.wavprossessing...
0018_G2A3E4S0C0_JBR_000919.wavprossessing...
0018_G2A3E4S0C0_JBR_000994.wavprossessing...
0018_G2A3E4S0C0_JBR_000926.wavprossessing...
0018_G2A3E4S0C0_JBR_000939.wavprossessing...
0018_G2A3E4S0C0_JBR_000938.wavprossessing...
0018_G2A3E4S0C0_JBR_000928.wavprossessing...
0018_G2A3E4S0C0_JBR_000934.wavprossessing...
0018_G2A3E4S0C0_JBR_000949.wavprossessing...
0018_G2A3E4S0C0_JBR_000997.wavprossessing...
0018_G2A3E4S0C0_JBR_001098.wavprossessing...
0018_G2A3E4S0C0_JBR_001080.wavprossessing...
0018_G2A3E4S0C0_JBR_001023.wavprossessing...
0018_G2A3E4S0C0_JBR_001055.wavprossessing...
0018_G2A3E4S0C0_JBR_001071.wavprossessing...
0018_G2A3E4S0C0_JBR_001048.wavprossessing...
0018_G2A3E4S0C0_JBR_001083.wavprossessing...
0018_G2A3E4S0C0_JBR_001018.wavprossessing...
0018_G2A3E4S0C0_JBR_001049.wavprossessing...
0018_G2A3E

In [None]:
load_model()
predictions = predict_anomaly('/content/hanseokwon44k.wav')

result = ''
for index, predict in enumerate(predictions):
    # if predict > 0.5:
      result += sec_to_timestamp(index * hop_second) + " ~ " + sec_to_timestamp(index * hop_second + window_second) + "\n"

print(result)

[-1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1]



In [None]:
def sec_to_timestamp(sec):
    hour = sec // (60*60)
    sec %= 60*60

    minute = sec // 60
    sec %= 60

    return '%02d:%02d:%02d' % (hour, minute, sec)