# Decibel data prediction with real data

In [1]:
import os
import numpy as np
import sounddevice as sd
import scipy.io.wavfile as wav
import librosa
from pydub import AudioSegment
from tensorflow.keras.models import load_model
import tensorflow as tf



In [None]:
os.getcwd()

In [2]:
def record_audio(duration=20, fs=16000, filename='record_demo.wav'):
    """녹음 후 파일 저장"""
    print("Recording audio for", duration, "seconds...")
    myrecording = sd.rec(int(duration * fs), samplerate=fs, channels=1, dtype='int16')
    sd.wait()  # Wait until recording is finished
    wav.write(filename, fs, myrecording)  # Save as WAV file
    print("recording is finished")
    return filename

In [3]:
def audio_chunking(filename):
    """오디오 파일을 5초 단위로 분할"""
    output_dir = "./5sec_cut_result_demo/"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    audio = AudioSegment.from_wav(filename)
    length_audio = len(audio)
    start = 0 ; end = 0 ; counter = 1

    while start < length_audio:
        end += 5000  
        if end > length_audio:  
            end = length_audio  
        chunk = audio[start:end] 
        if 0000 <= len(chunk) <= 5000:  
            chunk.export(output_dir + f"{os.path.splitext('record.wav')[0]}_part_{counter:04d}.wav", format="wav")
            counter += 1
        start += 5000  
    
    return output_dir

In [4]:
def handle_dB_NaN(dB, threshold=-80):
    """
    Handle NaN (or specific threshold values) in the given dB array.
    This function detects regions where the dB value is equal to the threshold,
    and replaces these values with the average of the surrounding values.
    
    Parameters:
    - dB: np.array
        The array of decibel values.
    - threshold: float
        The value in the dB array to be considered as 'NaN' or to be replaced.
    
    Returns:
    - np.array
        The dB array after handling the 'NaN' or threshold values.
    """
    NAN = False
    B_num = 0
    B_nan = 0
    
    for i in range(len(dB)):
        if NAN == False and (dB[i] != threshold):
            B_num = dB[i]
        elif NAN == True and (dB[i] != threshold):
            avg = (B_num + dB[i]) / 2
            for j in range(B_nan, i):
                dB[j] = avg
            NAN = False
            B_num = dB[i]
        elif (dB[i] == threshold) and NAN == False:
            if i == len(dB) - 1:
                dB[i] = B_num / 2
            NAN = True
            B_nan = i
        elif (dB[i] == threshold) and i == len(dB) - 1:
            avg = B_num / 2
            for j in range(B_nan, i + 1):
                dB[j] = avg
                
    return dB

In [5]:
def dB_npy(output_dir):
    """파일을 읽어서 dB로 변환 후 npy 파일로 저장"""
    errorLogPath = "dB_error_log.txt"
    output_folder = './dB_result_demo/'

    file_list = os.listdir(output_dir)
    file_list_wav = [file for file in file_list if file.endswith('.wav')] 

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    i = 1
    folder_num = 0
    error_log = []
    
    for filename in file_list_wav:
        audio_data, sr = librosa.load(os.path.join(output_dir, filename), sr = 16000)
        rms_values = librosa.feature.rms(y=audio_data)[0]
        dB = librosa.power_to_db(rms_values, ref=np.max)
        
        processed_dB = handle_dB_NaN(dB) # [ dB에 대한 NaN 처리 코드]
        
        processed_dB = processed_dB.astype(np.float32)
        file_path = os.path.join(output_folder, 'dB_demo_' + str(folder_num))
        folder_num += 1
        np.save(file_path, processed_dB)
    
        if -80 in processed_dB:
            error_log.append(filename)

    with open(errorLogPath, "w") as f:
        for filename in error_log:
            f.write(f"{filename}\n")

In [6]:
def load_saved_model(model_path):
    """저장된 모델 로드"""
    model_best = load_model(model_path)
    model_best.summary()
    return model_best

In [22]:
def preprocess_files(file_list_pitch, path):
    """파일 전처리"""
    reshaped_files = []
    
    # 1. 입력 파일 리스트가 비어 있는 경우를 확인
    if not file_list_pitch:
        print("No files to process.")
        return np.array([])
    
    for file in file_list_pitch:
        file_path = os.path.join(path, file)
        
        # 2. 파일 경로를 생성할 때 os.path.join을 사용하여 OS에 독립적인 경로 생성
        # 3. 데이터 파일이 실제로 존재하는지 확인
        if os.path.exists(file_path):
            data = np.load(file_path)
            reshaped_data = data.reshape(1, 157, 1)
            reshaped_files.append(reshaped_data)
        else:
            print(f"File {file_path} not found.")
    
    # 4. reshaped_files 리스트가 비어 있는 경우에 대한 처리를 추가
    if not reshaped_files:
        print("No data to stack.")
        return np.array([])
    
    return np.vstack(reshaped_files).astype('float32')


In [23]:
def predict_with_model(model_best, file_list_pitch, path):
    """모델로 예측 수행"""
    data = preprocess_files(file_list_pitch, path)
    
    # 데이터가 비어 있지 않은 경우에만 예측을 수행
    if data.size > 0:
        probabilities = model_best.predict(data)
        labels = [1 if prob[0] >= 0.5 else 0 for prob in probabilities]
        return labels, probabilities
    else:
        print("No valid data to predict.")
        return None, None

In [24]:
def evaluate_voice_phishing_risk(probabilities, labels):
    """보이스피싱 리스크 평가"""
    num_voice_phishing_files = 0
    sum_class_0_prob_voice_phishing = 0.0

    for idx, label in enumerate(labels):
        if label == 0 and 1 - probabilities[idx][0] >= 0.6:
            num_voice_phishing_files += 1
            sum_class_0_prob_voice_phishing += 1 - probabilities[idx][0]

    total_num_files = len(labels)
    if num_voice_phishing_files > 0:
        probability_P = (num_voice_phishing_files / total_num_files) * (sum_class_0_prob_voice_phishing / num_voice_phishing_files)
    else:
        probability_P = 0.0

    if 0.2 <= probability_P < 0.4:
        risk_level = "보이스피싱 경고"
    elif 0.4 <= probability_P < 0.6:
        risk_level = "보이스피싱 위험"
    elif 0.6 <= probability_P <= 1.0:
        risk_level = "보이스피싱 매우 위험"
    else:
        risk_level = "정상"

    print("Number of voice phishing files:", num_voice_phishing_files)
    print("Total number of files:", total_num_files)
    print("Probability P:", probability_P)
    print("Risk Level:", risk_level)

In [25]:
def main():
    """메인 함수: 다른 함수들을 순서대로 호출"""
    # 오디오 녹음
    filename = record_audio()
    # 오디오 파일 분할
    output_dir = audio_chunking(filename)
    # dB로 변환 후 npy 파일로 저장
    dB_npy(output_dir)
    
    # 저장된 모델 로드
    model_best = load_saved_model('C:\\Users\\user\\Jinwoo\\FINAL_230809_decibel_model.h5')

    # 테스트 데이터 경로 설정
    path = 'C:\\Users\\user\\Jinwoo\\5sec_cut_result_demo\\'
    file_list = os.listdir(path)
    file_list_pitch = [file for file in file_list if file.endswith('.npy')]
    
    print("\n=== Predictions ===\n")
    predictions, labels = predict_with_model(model_best, file_list_pitch, path)
    
    if predictions is not None and labels is not None:
        print("\n=== Risk Evaluation ===\n")
        evaluate_voice_phishing_risk(predictions, labels)
    else:
        print("Prediction was not performed due to lack of valid data.")

if __name__ == '__main__':
    main()

Recording audio for 20 seconds...
recording is finished
Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 157, 1)]          0         
                                                                 
 normalization (Normalizatio  (None, 157, 1)           315       
 n)                                                              
                                                                 
 Conv1D_1 (Conv1D)           (None, 157, 64)           256       
                                                                 
 Conv1D_2 (Conv1D)           (None, 157, 64)           12352     
                                                                 
 Conv1D_3 (Conv1D)           (None, 157, 64)           12352     
                                                                 
 Conv1D_4 (Conv1D)           (None, 157, 64)           12352     
     