In [None]:
import librosa

from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import random

from torch import nn
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from sklearn.ensemble import RandomForestClassifier

import torch
import torchmetrics
import os

import pickle

In [None]:
import torch
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
device = device = torch.device("mps")

In [None]:
class Config:
    SR = 32000
    N_MFCC = 128
    # Dataset
    ROOT_FOLDER = './'
    # Training
    N_CLASSES = 2
    BATCH_SIZE = 96
    N_EPOCHS = 5
    LR = 3e-4
    # Others
    SEED = 42

CONFIG = Config()

In [None]:
df = pd.read_csv('./train.csv')
df.head()

In [None]:
df['path'] = df['path'].str.replace('./train/', './bandstop_train/')
df.head()

In [None]:
aug_label = np.load('./mfcc/aug_label_data.npy')
aug_mfcc = np.load('./mfcc/aug_mfcc_data.npy')

aug_label

In [None]:
import ast
# CSV 파일 경로
csv_file_path1 = './train.csv'  # 레이블이 'fake', 'real' 형식인 파일
csv_file_path2 = './unlabeled_data.csv'  # 레이블이 이미 [0,1], [1,0] 형식인 파일

# 첫 번째 CSV 파일 불러오기
df1 = pd.read_csv(csv_file_path1)

# 'fake', 'real' 레이블을 [1,0], [0,1] 형식으로 변환
def convert_label(label):
    return [1, 0] if label == 'fake' else [0, 1]

df1['label'] = df1['label'].apply(convert_label)
df1['path'] = df1['path'].str.replace('./train', './bandstop_train')
# 두 번째 CSV 파일 불러오기
df2 = pd.read_csv(csv_file_path2,nrows=477)
def convert_to_numpy(label):
    return ast.literal_eval(label)

df2['label'] = df2['label'].apply(convert_to_numpy)
#print(df2['label'])
#df2['path'] = df2['path'].str.replace('./unlabeled_data', './bandstop_unlabeled_data')
# 두 데이터프레임 결합
combined_df = pd.concat([df1, df2], ignore_index=True)

# 데이터프레임 섞기 (선택사항)
combined_df = combined_df.sample(frac=1).reset_index(drop=True)

combined_df

In [None]:
import ast
df = pd.read_csv('./unlabeled_data.csv',nrows=600)
def convert_to_numpy(label):
    return ast.literal_eval(label)

df['label'] = df['label'].apply(convert_to_numpy)
#print(df2['label'])
df.head()

In [None]:
train, val, _, _ = train_test_split(combined_df, combined_df['label'], test_size=0.2, random_state=CONFIG.SEED)
train.shape, val.shape


In [None]:
def get_mfcc_feature(df, train_mode=True):
    features = []
    labels = []
    for _, row in tqdm(df.iterrows()):
        # librosa패키지를 사용하여 wav 파일 load
        y, sr = librosa.load(row['path'], sr=CONFIG.SR)
        # librosa패키지를 사용하여 mfcc 추출
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CONFIG.N_MFCC)
        mfcc = np.mean(mfcc.T, axis=0)
        features.append(mfcc)

        if train_mode:
            label = row['label']
            
            label_vector = np.array((label), dtype=float)
            labels.append(label_vector)
            #label_vector = [1,0] if label == 'fake' else [0,1]
            #label_vector2 = np.array((label_vector), dtype=float)



            #label_vector[0 if label == 'fake' else 1] = 1
            #labels.append(label_vector2)

    if train_mode:
        return features, labels
    return features

In [None]:
train_mfcc, train_labels = get_mfcc_feature(train, True)
val_mfcc, val_labels = get_mfcc_feature(val, True)

In [None]:
aug_mfcc = np.load('./mfcc/aug_mfcc_mean_data.npy')
aug_label = np.load('./mfcc/aug_label_data.npy')

In [None]:
aug_X_train, aug_X_val,aug_y_train,aug_y_val = train_test_split(aug_mfcc,aug_label, test_size=0.2, random_state=CONFIG.SEED)
aug_X_train.shape, aug_y_train.shape

In [None]:


train_mfcc = np.load('./mfcc/bandstop_with_unlabel_train_mfcc.npy')
train_labels=np.load('./mfcc/bandstop_with_unlabel_train_labels.npy')
val_mfcc = np.load('./mfcc/bandstop_with_unlabel_val_mfcc.npy')
val_labels = np.load('./mfcc/bandstop_with_unlabel_val_labels.npy')

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold

model = RandomForestClassifier(n_estimators=100, random_state=CONFIG.SEED)
kf = KFold(n_splits=5, random_state=CONFIG.SEED, shuffle=True)

print(model)
print(kf.get_n_splits(aug_X_train))

In [None]:
print(pd.DataFrame(train_mfcc))

## Parameter tuning

In [None]:
from sklearn.model_selection import GridSearchCV

params = { 'n_estimators' : [10, 50,100],
           'max_depth' : [6, 8, 10, 12],
           'min_samples_leaf' : [8, 12, 18],
           'min_samples_split' : [8, 16, 20]
            }

# RandomForestClassifier 객체 생성 후 GridSearchCV 수행
rf_clf = RandomForestClassifier(random_state = CONFIG.SEED)
grid_cv = GridSearchCV(rf_clf, param_grid = params, cv = 5, verbose=2, n_jobs = -1)
grid_cv.fit(aug_X_train, aug_y_train )

print('최적 하이퍼 파라미터: ', grid_cv.best_params_)
print('최고 예측 정확도: {:.4f}'.format(grid_cv.best_score_))

In [None]:
from sklearn.model_selection import GridSearchCV

params = { 'n_estimators' : [100,150,200],
           'max_depth' : [12,15,18,21],
           'min_samples_leaf' : [6,7,8],
           'min_samples_split' : [6,7,8]
            }

# RandomForestClassifier 객체 생성 후 GridSearchCV 수행
rf_clf = RandomForestClassifier(random_state = CONFIG.SEED)
grid_cv = GridSearchCV(rf_clf, param_grid = params, cv = 5, verbose=2, n_jobs = -1)
grid_cv.fit(aug_X_train, aug_y_train )

print('최적 하이퍼 파라미터: ', grid_cv.best_params_)
print('최고 예측 정확도: {:.4f}'.format(grid_cv.best_score_))

In [None]:
from sklearn.model_selection import GridSearchCV

params = { 'n_estimators' : [140,150,160],
           'max_depth' : [21,23,25,27],
           'min_samples_leaf' : [4,5,6],
           'min_samples_split' : [4,5,6]
            }

# RandomForestClassifier 객체 생성 후 GridSearchCV 수행
rf_clf = RandomForestClassifier(random_state = CONFIG.SEED)
grid_cv = GridSearchCV(rf_clf, param_grid = params, cv = 5, verbose=2, n_jobs = -1)
grid_cv.fit(aug_X_train, aug_y_train )

print('최적 하이퍼 파라미터: ', grid_cv.best_params_)
print('최고 예측 정확도: {:.4f}'.format(grid_cv.best_score_))

In [None]:
from sklearn.model_selection import GridSearchCV

params = { 'n_estimators' : [135,140,145],
           'max_depth' : [27,28,29],
           'min_samples_leaf' : [2,3,4],
           'min_samples_split' : [2,3,4]
            }

# RandomForestClassifier 객체 생성 후 GridSearchCV 수행
rf_clf = RandomForestClassifier(random_state = CONFIG.SEED)
grid_cv = GridSearchCV(rf_clf, param_grid = params, cv = 5, verbose=2, n_jobs = -1)
grid_cv.fit(aug_X_train, aug_y_train )

print('최적 하이퍼 파라미터: ', grid_cv.best_params_)
print('최고 예측 정확도: {:.4f}'.format(grid_cv.best_score_))

In [None]:
from sklearn.model_selection import GridSearchCV

params = { 'n_estimators' : [195,200, 205],
           'max_depth' : [26,28,30],
           'min_samples_leaf' : [3,4,5],
           'min_samples_split' : [3,4,5]
            }

# RandomForestClassifier 객체 생성 후 GridSearchCV 수행
rf_clf = RandomForestClassifier(random_state = CONFIG.SEED)
grid_cv = GridSearchCV(rf_clf, param_grid = params, cv = 5, verbose=2, n_jobs = -1)
grid_cv.fit(train_mfcc, train_labels )

print('최적 하이퍼 파라미터: ', grid_cv.best_params_)
print('최고 예측 정확도: {:.4f}'.format(grid_cv.best_score_))

In [None]:
from sklearn.model_selection import GridSearchCV

params = { 'n_estimators' : [200, 205,210],
           'max_depth' : [30,32,34],
           'min_samples_leaf' : [1,2,3],
           'min_samples_split' : [1,2,3]
            }

# RandomForestClassifier 객체 생성 후 GridSearchCV 수행
rf_clf = RandomForestClassifier(random_state = CONFIG.SEED)
grid_cv = GridSearchCV(rf_clf, param_grid = params, cv = 5, verbose=2, n_jobs = -1)
grid_cv.fit(train_mfcc, train_labels )

print('최적 하이퍼 파라미터: ', grid_cv.best_params_)
print('최고 예측 정확도: {:.4f}'.format(grid_cv.best_score_))

In [None]:
#pred = rf_clf1.predict(val_mfcc)
test = pd.read_csv('./test.csv')
test_mfcc = get_mfcc_feature(test, False)
print(test_mfcc)
'''
best_rf_clf = grid_cv.best_estimator_

# 각 클래스의 확률을 예측
probs = best_rf_clf.predict_proba(val_mfcc)

print((probs))
#print('예측 정확도: {:.4f}'.format(accuracy_score(val_labels,probs)))
'''

In [None]:
np.save('./mfcc/test_mfcc_[0,1]_form.npy', test_mfcc)

In [None]:
test_mfcc = np.load('./mfcc/test_mfcc.npy')

In [None]:
best_rf_clf = grid_cv.best_estimator_
print(len(test_mfcc))
probs = best_rf_clf.predict_proba(test_mfcc)

print(len(probs[1]))

In [None]:
submit = pd.read_csv('./sample_submission.csv')
print(submit.info)
submit.iloc[:, 1:] = probs
submit = submit.set_index('id')
submit.head()

In [None]:
print(submit.info)
submit.to_csv('./aug_mfcc_randomforest.csv')