# 음성 데이터를 통한 보이스피싱 분류

In [None]:
import json
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import VotingClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from gensim.models import Word2Vec
import nltk
import warnings
import urllib.request
from sklearn.model_selection import cross_val_score
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RandomizedSearchCV
from transformers import BertTokenizer, BertModel
import torch
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, BertConfig
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from keras.utils import pad_sequences
warnings.filterwarnings('ignore')

데이터 전처리

In [None]:
import os
import pandas as pd
import librosa
import numpy as np
from tqdm import tqdm
from scipy.ndimage import interpolation
from pydub import AudioSegment

# wav 파일이 있는 디렉토리 설정
voice_spam1_dir = r"C:\Users\user\Downloads\voice\Fraud_wav"
voice_spam2_dir = r"C:\Users\user\Downloads\voice\Impersonate_wav"
normal_dir = r"C:\Users\user\Downloads\voice\Callcenter_wav"

# wav 파일 불러오기
voice_spam1_files = os.listdir(voice_spam1_dir)
voice_spam2_files = os.listdir(voice_spam2_dir)

# DataFrame 생성
df = pd.DataFrame()

# 크기 변화, 반전, 노이즈 추가, 크로스페이드 함수 정의
def augment_data(y, sr):
    augmented_data = []

    # 크기 변화
    for semitone in [-2, 2]:
        y_shifted = librosa.effects.pitch_shift(y, sr=sr, n_steps=semitone)
        augmented_data.append(y_shifted)

    # 반전
    y_reversed = y[::-1]
    augmented_data.append(y_reversed)

    # 노이즈 추가
    noise = np.random.randn(len(y))
    y_noisy = y + 0.005 * noise  # 노이즈 크기 조절
    augmented_data.append(y_noisy)

    # 크로스페이드
    y_crossfaded = crossfade(y, y_reversed)
    augmented_data.append(y_crossfaded)

    return augmented_data

# 크로스페이드 함수 정의
def crossfade(y1, y2):
    fade_out = np.linspace(1, 0, len(y1))
    fade_in = 1 - fade_out
    crossfaded = (fade_out * y1) + (fade_in * y2)
    return crossfaded

# 보이스피싱 데이터 증강
for file in tqdm(voice_spam1_files):
    label = 1
    filepath = os.path.join(voice_spam1_dir, file)
    y, sr = librosa.load(filepath, sr=22050)
    y = librosa.util.normalize(y)

    # 증강 함수 적용
    augmented_data = augment_data(y, sr)

    for augmented_y in augmented_data:
        mfccs = librosa.feature.mfcc(y=augmented_y, sr=sr, n_mfcc=40)
        mfccs_mean = np.mean(mfccs, axis=1)
        #df = pd.concat([df, pd.DataFrame({'filepath': [filepath], 'voice_feature': [mfccs_mean], 'voice_fishing': [label]})], ignore_index=True)
        df = pd.concat([df, pd.DataFrame({'filename': [file], 'filepath': [filepath], 'voice_feature': [mfccs_mean], 'voice_fishing': [label]})], ignore_index=True)

# 보이스피싱 데이터 증강
num_voice_spam2_augment = int(len(voice_spam1_files) / len(voice_spam2_files))
for file in tqdm(voice_spam2_files):
    label = 1
    filepath = os.path.join(voice_spam2_dir, file)
    y, sr = librosa.load(filepath, sr=22050)
    y = librosa.util.normalize(y)

    # 증강 함수 적용
    augmented_data = augment_data(y, sr)

    for augmented_y in augmented_data:
         mfccs = librosa.feature.mfcc(y=augmented_y, sr=sr, n_mfcc=40)
         mfccs_mean = np.mean(mfccs, axis=1)
         #df = pd.concat([df, pd.DataFrame({'filepath': [filepath], 'voice_feature': [mfccs_mean], 'voice_fishing': [label]})], ignore_index=True)
         df = pd.concat([df, pd.DataFrame({'filename': [file], 'filepath': [filepath], 'voice_feature': [mfccs_mean], 'voice_fishing': [label]})], ignore_index=True)

# 일반 음성 데이터 처리
num_normal_samples = len(df[df['voice_fishing'] == 1])  # 일반 음성 데이터 수를 조정
for root, dirs, files in os.walk(normal_dir):
    for file in files:
        if file.endswith('.wav') and num_normal_samples > 0:
            filepath = os.path.join(root, file)
            label = 0
            y, sr = librosa.load(filepath, sr=22050)
            y = librosa.util.normalize(y)
            mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40)
            mfccs_mean = np.mean(mfccs, axis=1)
            #df = pd.concat([df, pd.DataFrame({'filepath': [filepath], 'voice_feature': [mfccs_mean], 'voice_fishing': [label]})], ignore_index=True)
            df = pd.concat([df, pd.DataFrame({'filename': [file], 'filepath': [filepath], 'voice_feature': [mfccs_mean], 'voice_fishing': [label]})], ignore_index=True)
            num_normal_samples -= 1


print(df)

100%|██████████| 185/185 [05:23<00:00,  1.75s/it]
100%|██████████| 141/141 [11:20<00:00,  4.83s/it]


         filename                                           filepath  \
0     voice_0.wav  C:\Users\user\Downloads\voice\Fraud_wav\voice_...   
1     voice_0.wav  C:\Users\user\Downloads\voice\Fraud_wav\voice_...   
2     voice_0.wav  C:\Users\user\Downloads\voice\Fraud_wav\voice_...   
3     voice_0.wav  C:\Users\user\Downloads\voice\Fraud_wav\voice_...   
4     voice_0.wav  C:\Users\user\Downloads\voice\Fraud_wav\voice_...   
...           ...                                                ...   
3255     0002.wav  C:\Users\user\Downloads\voice\Callcenter_wav\S...   
3256     0003.wav  C:\Users\user\Downloads\voice\Callcenter_wav\S...   
3257     0004.wav  C:\Users\user\Downloads\voice\Callcenter_wav\S...   
3258     0005.wav  C:\Users\user\Downloads\voice\Callcenter_wav\S...   
3259     0006.wav  C:\Users\user\Downloads\voice\Callcenter_wav\S...   

                                          voice_feature  voice_fishing  
0     [-326.95956, 151.90648, -53.65599, -3.662547, ...       

In [None]:
print(df['voice_fishing'].value_counts())

voice_fishing
1    1630
0    1630
Name: count, dtype: int64


음성 데이터 학습

In [None]:
# 음성 데이터 학습

#df_temp=df.drop(columns=[filepath', 'voice_fishing'])
df_temp=df.drop(columns=['filename','filepath', 'voice_fishing'])
X = df_temp
y = df['voice_fishing']

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=46) #8:2
X_train1=list(X_train['voice_feature'])
X_val1=list(X_val['voice_feature'])
y_train1 = list(y_train)
y_val1 = list(y_val)


# randomforest
rf = RandomForestClassifier(n_estimators=100, random_state=42)
parameters1 = {
    "n_estimators" : [10, 50, 1000, 2000],
    "max_features" : ["sqrt", "log2"],
    "max_depth" : [2,30, 50,70,100,150,200,300,400]
}
n_iter_search = 10
rf_rgs1 = RandomizedSearchCV(
    rf,
    param_distributions=parameters1,
    scoring="recall",
    n_jobs=-1,
    random_state=42,
    n_iter = n_iter_search
)
rf_rgs1.fit(X_train1, y_train1)
y_pred_rf1 = rf_rgs1.predict(X_val1)
print("RandomForest:")
print("Accuracy:", accuracy_score(y_val1, y_pred_rf1))
print("Recall:", recall_score(y_val1, y_pred_rf1))
print(' ')



# gradient boosting
gb = GradientBoostingClassifier(n_estimators=100, random_state=42)
parameters={
    'n_estimators': [1300,1500,1700],
    'max_depth': [15,20,22],
    'min_samples_leaf': [33,35,40],
    'min_samples_split': [500,700,900],
    'learning_rate': [0.2,0.1],
}
n_iter_search = 20
gb_kf_rgs = RandomizedSearchCV(
    gb,
    param_distributions=parameters,
    scoring="recall",
    n_jobs=-1,
    random_state=42,
    n_iter = n_iter_search
)
gb_kf_rgs.fit(X_train1, y_train1)
y_pred_gb1 = gb_kf_rgs.predict(X_val1)
print("GradientBoosting:")
print("Accuracy:", accuracy_score(y_val1, y_pred_gb1))
print("Recall:", recall_score(y_val1, y_pred_gb1))
print(' ')


# xgboost
xgb= XGBClassifier( n_estimators=100,n_jobs=-1)
parameters ={
     "n_estimators":[200,300,400],
     "learning_rate":[0.3, 0.5, 1.0],
     "max_depth" : [6,7],
     "gamma" : [0.1, 0.15],
     "subsample":[0.5, 0.6, 0.7],
     "colsample_bytree":[0.3, 0.5, 1],
}

xgb_gs =GridSearchCV(
    xgb,
    param_grid=parameters,
    scoring="recall",
    n_jobs=-1,
)
xgb_gs.fit(X_train1, y_train1)

y_pred_xgb1 = xgb_gs.predict(X_val1)
print("XGBoost:")
print("Accuracy:", accuracy_score(y_val1, y_pred_xgb1))
print("Recall:", recall_score(y_val1, y_pred_xgb1))
print(' ')

# Lightgbm
lgbm = LGBMClassifier( random_state=42, n_jobs=-1)

parameters ={
    "n_estimators":[100, 200, 300, 500, 1000],
    "learning_rate":[0.01, 0.1, 0.2, 0.5, 1],

    "max_depth" : [1,3,5,7],
    "min_split_gain" : [0, 0.1, 0.2, 0.4, 0.5],

    "subsample":[0.3, 0.5, 0.7, 0.9],
    "colsample_bytree":[0.3, 0.5, 0.7, 0.8, 0.9],

    "reg_alpha": [0, 0.01, 0.1, 0.5, 1, 10 ],
    "reg_lambda" : [0.01, 0.1, 0.5, 1, 10 ]
}
n_iter_search = 50
lgb_kf_rgs = RandomizedSearchCV(
    lgbm,
    param_distributions=parameters,
    scoring="recall",
    n_jobs=-1,
    random_state=42,
    n_iter = n_iter_search
)

lgb_kf_rgs.fit(X_train1, y_train1)
y_pred_lgbm = lgb_kf_rgs.predict(X_val1)
print("LightGBM:")
print("Accuracy:", accuracy_score(y_val1, y_pred_lgbm))
print("Recall:", recall_score(y_val1, y_pred_lgbm))

RandomForest:
Accuracy: 0.995398773006135
Recall: 0.9939024390243902
 
GradientBoosting:
Accuracy: 0.995398773006135
Recall: 0.9969512195121951
 
XGBoost:
Accuracy: 0.9938650306748467
Recall: 0.9939024390243902
 
[LightGBM] [Info] Number of positive: 1302, number of negative: 1306
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.001150 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 10200
[LightGBM] [Info] Number of data points in the train set: 2608, number of used features: 40
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.499233 -> initscore=-0.003067
[LightGBM] [Info] Start training from score -0.003067
LightGBM:
Accuracy: 0.9969325153374233
Recall: 0.9969512195121951


In [None]:
# 음성 모델 ensemble

ensemble_wav = VotingClassifier(estimators=[('rf', rf_rgs1), ('gb', gb_kf_rgs), ('xgb', xgb_gs), ('lgbm', lgb_kf_rgs)], voting='soft')
ensemble_wav.fit(X_train1, y_train1)
y_pred1 = ensemble_wav.predict(X_val1)

print("ensemble Recall:", recall_score(y_val1, y_pred1))
print("Accuracy:", accuracy_score(y_val1, y_pred1))

[LightGBM] [Info] Number of positive: 1302, number of negative: 1306
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.001139 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 10200
[LightGBM] [Info] Number of data points in the train set: 2608, number of used features: 40
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.499233 -> initscore=-0.003067
[LightGBM] [Info] Start training from score -0.003067
ensemble Recall: 0.9969512195121951
Accuracy: 0.995398773006135


# 음성 보이스피싱 판단 모델 테스트

In [None]:
import pandas as pd

# 테스트 데이터 폴더 경로
test_data_voicefishing_dir = r"C:\Users\user\Downloads\voice\Voicefishing_test" #보이스피싱 테스트 데이터 경로
test_data_callcenter_dir = r"C:\Users\user\Downloads\voice\Callcenter_test"  #콜센터 일반 대화 테스트 데이터 경로


# 테스트 데이터 파일 목록
test_files_Voicefishing = os.listdir(test_data_voicefishing_dir)
test_files_Callcenter = os.listdir(test_data_callcenter_dir)

# 결과를 저장할 DataFrame 생성
test_df = pd.DataFrame(columns=['filename','filepath', 'voice_feature'])

# 보이스피싱 테스트 데이터 처리
for file in tqdm(test_files_Voicefishing):
    # 파일 경로 설정
    filepath = os.path.join(test_data_voicefishing_dir, file)

    # 음성 파일 불러오기
    y, sr = librosa.load(filepath, sr=22050)
    y = librosa.util.normalize(y)

    # 피처 추출
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40)
    mfccs_mean = np.mean(mfccs, axis=1)

    # 결과를 DataFrame에 추가
    test_df= pd.concat([test_df, pd.DataFrame({'filename': [file], 'filepath': [filepath], 'voice_feature': [mfccs_mean], 'voice_fishing': [label]})], ignore_index=True)

# 콜센터 일반대화 테스트 데이터 처리
for root, dirs, files in os.walk(test_data_callcenter_dir):
    for file in files:
        if file.endswith('.wav'):
            filepath = os.path.join(root, file)

            y, sr = librosa.load(filepath, sr=22050)
            y = librosa.util.normalize(y)  # 음량 조정

            mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40)
            mfccs_mean = np.mean(mfccs, axis=1)

            # 결과를 DataFrame에 추가
            test_df= pd.concat([test_df, pd.DataFrame({'filename': [file], 'filepath': [filepath], 'voice_feature': [mfccs_mean], 'voice_fishing': [label]})], ignore_index=True)


# 앙상블 모델에 적용
test_features = list(test_df['voice_feature'])
test_predictions = ensemble_wav.predict(test_features)

# 결과 출력
for i, file in enumerate(test_files_Voicefishing + test_files_Callcenter):
    print(f"File: {file}, Prediction: {'Voicefishing' if test_predictions[i] == 1 else 'Not Voicefishing'}")

100%|██████████| 81/81 [00:29<00:00,  2.76it/s]


File: voice_144.wav, Prediction: Voicefishing
File: voice_145.wav, Prediction: Voicefishing
File: voice_146.wav, Prediction: Voicefishing
File: voice_147.wav, Prediction: Voicefishing
File: voice_149.wav, Prediction: Voicefishing
File: voice_150.wav, Prediction: Voicefishing
File: voice_152.wav, Prediction: Voicefishing
File: voice_153.wav, Prediction: Voicefishing
File: voice_154.wav, Prediction: Voicefishing
File: voice_155.wav, Prediction: Voicefishing
File: voice_156.wav, Prediction: Voicefishing
File: voice_157.wav, Prediction: Voicefishing
File: voice_158.wav, Prediction: Voicefishing
File: voice_159.wav, Prediction: Voicefishing
File: voice_160.wav, Prediction: Voicefishing
File: voice_161.wav, Prediction: Voicefishing
File: voice_162.wav, Prediction: Voicefishing
File: voice_163.wav, Prediction: Voicefishing
File: voice_164.wav, Prediction: Voicefishing
File: voice_165.wav, Prediction: Voicefishing
File: voice_166.wav, Prediction: Voicefishing
File: voice_167.wav, Prediction: V