In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import librosa
import os
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, StratifiedKFold, cross_validate
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from xgboost import XGBClassifier
import time
import tpot
from tqdm.auto import tqdm
from google.cloud import storage
import glob

from sklearn.decomposition import FastICA
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import make_pipeline, make_union, Pipeline
from sklearn.preprocessing import RobustScaler
from tpot.builtins import StackingEstimator
import joblib
import pickle



In [4]:
def initialize_empty_df():
    df = pd.DataFrame({
        "chroma_stft":[],
        "rms":[],
        "spectral_centroid":[],
        "spectral_bandwidth":[],
        "rolloff":[],
        "zero_crossing_rate":[]
    })
    for mfcc in [f"mfcc{i+1}" for i in range(20)]:
        df[mfcc] = ""
    df["LABEL"] = ""
    return df

In [5]:
df_columns = initialize_empty_df().columns

In [6]:
def preprocess_data(y, sr, label):
    iter_ = int(y.shape[0]/sr)
    features = []
    for i in range(iter_):
        y_seg = y[i*sr:(sr*i+sr)]
        chroma_stft = np.mean(librosa.feature.chroma_stft(y=y_seg, sr=sr))
        rms = np.mean(librosa.feature.rms(y=y_seg))
        spec_cent = np.mean(librosa.feature.spectral_centroid(y=y_seg, sr=sr))
        spec_bw = np.mean(librosa.feature.spectral_bandwidth(y=y_seg, sr=sr))
        rolloff = np.mean(librosa.feature.spectral_rolloff(y=y_seg, sr=sr))
        zcr = np.mean(librosa.feature.zero_crossing_rate(y_seg))
        mfcc = np.mean(librosa.feature.mfcc(y=y_seg, sr=sr, n_mfcc=20),axis=1)
        features.append([chroma_stft,rms,spec_cent,spec_bw,rolloff,zcr,*mfcc, label])

    df_indiv = pd.DataFrame(features, columns = df_columns)
    return df_indiv

In [7]:
def split_dataset(df):
    df_train, df_test = train_test_split(df, test_size=0.3, random_state=42, stratify=df["LABEL"])
    
    X_test = df_test.drop(columns=["LABEL"])
    y_test = pd.to_numeric(df_test["LABEL"])

    X_train = df_train.drop(columns=["LABEL"])
    y_train = pd.to_numeric(df_train["LABEL"])

    return df_train, df_test, X_train, X_test, y_train, y_test

In [8]:
def balance_train_dataset(df_train):
    count_fake, count_real = df_train["LABEL"].value_counts()
    df_train_fake = df_train[df_train["LABEL"] == 1]
    df_train_real = df_train[df_train["LABEL"] == 0]
    df_train_fake_under = df_train_fake.sample(count_real)
    df_train_under = pd.concat([df_train_fake_under, df_train_real], axis=0)
    
    X_train = df_train_under.drop(columns=["LABEL"])
    y_train = pd.to_numeric(df_train_under["LABEL"])
    
    return X_train, y_train

# Load, split, balance the last dataset

In [9]:
full_df = pd.read_csv(os.path.join(os.pardir, "full_df_2023-12-08.csv"))
df_train, df_test, X_train, X_test, y_train, y_test = split_dataset(full_df)
X_train, y_train = balance_train_dataset(df_train)

# Basic model

In [11]:
model = XGBClassifier(learning_rate=0.15, max_depth=10, n_estimators=2500)
stratifed_cv = StratifiedKFold(n_splits=5)
model.fit(X_train, y_train)

### Save the model

In [23]:
timestamp = time.strftime("%Y%m%d-%H%M%S")
model_path = os.path.join(os.pardir, "models", f"{timestamp}.h5")
model.save_model(model_path)
model_filename = model_path.split("/")[-1] # e.g. "20230208-161047.h5" for instance
client = storage.Client()
bucket = client.bucket("deep_fake_voice_recognition_elise")
blob = bucket.blob(f"models/{model_filename}")
blob.upload_from_filename(model_path)

### Load the most recent model

In [42]:
client = storage.Client()
blobs = list(client.get_bucket("deep_fake_voice_recognition_elise").list_blobs(prefix="model"))
latest_blob = max(blobs, key=lambda x: x.updated)
latest_model_path_to_save = os.path.join(os.pardir, latest_blob.name)
latest_blob.download_to_filename(latest_model_path_to_save)
latest_model = XGBClassifier()
latest_model.load_model(fname=latest_model_path_to_save)

### Make a prediction

In [43]:
path = os.path.join(os.pardir, "raw_data", "DEMONSTRATION", "DEMONSTRATION", "linus-original-DEMO.mp3")
y, sr = librosa.load(path)
df_demo = preprocess_data(y,sr,0)
X_demo = df_demo.drop(columns="LABEL")
y_pred = pd.DataFrame(model.predict(X=X_demo)).value_counts(normalize=True)
y_pred

0    0.966667
1    0.033333
Name: proportion, dtype: float64

In [48]:
y_pred.index[0][0]

0

In [50]:
y_pred[0]

0.9666666666666667

In [13]:
dict(prediction="REAL", probability=0.96)

{'prediction': 'REAL', 'probability': 0.96}

# Test a TPOT model (Youssef)

In [16]:
pipeline = make_pipeline(
    FastICA(tol=0.45),
    RobustScaler(),
    StackingEstimator(estimator=MLPClassifier(alpha=0.001, learning_rate_init=0.01)),
    KNeighborsClassifier(n_neighbors=6, p=2, weights="distance")
)
pipeline.fit(X_train, y_train)

In [12]:
y_pred = pipeline.predict(X_test)
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Precision:", precision_score(y_test, y_pred))
print("Recal:", recall_score(y_test, y_pred))
print("F1 score:", f1_score(y_test, y_pred))

Accuracy: 0.9152872639614303
Precision: 0.9794090863098249
Recal: 0.9192089456565609
F1 score: 0.9483546226103143


### Save the model

In [18]:
timestamp = time.strftime("%Y%m%d-%H%M%S")
model_path = os.path.join(os.pardir, "models", f"{timestamp}.pkl")
joblib.dump(pipeline, model_path)
model_filename = model_path.split("/")[-1] # e.g. "20230208-161047.h5" for instance
client = storage.Client()
bucket = client.bucket("deep_fake_voice_recognition_elise")
blob = bucket.blob(f"models/{model_filename}")
blob.upload_from_filename(model_path)

### Reload the model from the cloud

In [33]:
client = storage.Client()
blobs = list(client.get_bucket("deep_fake_voice_recognition_elise").list_blobs(prefix="model"))
latest_blob = max(blobs, key=lambda x: x.updated)
latest_model_path_to_save = os.path.join(os.pardir, latest_blob.name)
latest_blob.download_to_filename(latest_model_path_to_save)
latest_model = joblib.load(latest_model_path_to_save)
latest_model

### Use the model to predict

In [34]:
path = os.path.join(os.pardir, "raw_data", "DEMONSTRATION", "DEMONSTRATION", "linus-original-DEMO.mp3")
y, sr = librosa.load(path)
df_demo = preprocess_data(y,sr,0)
X_demo = df_demo.drop(columns="LABEL")
y_pred = pd.DataFrame(latest_model.predict(X=X_demo)).value_counts(normalize=True)
y_pred

0    0.966667
1    0.033333
Name: proportion, dtype: float64

In [35]:
path = os.path.join(os.pardir, "raw_data", "Tests", "Morgan Freeman_fake.mp3")
y, sr = librosa.load(path)
df_demo = preprocess_data(y,sr,0)
X_demo = df_demo.drop(columns="LABEL")
y_pred = pd.DataFrame(latest_model.predict(X=X_demo)).value_counts(normalize=True)
y_pred

0    0.587302
1    0.412698
Name: proportion, dtype: float64

In [36]:
path = os.path.join(os.pardir, "raw_data", "Tests", "Morgan Freeman_real.mp3")
y, sr = librosa.load(path)
df_demo = preprocess_data(y,sr,0)
X_demo = df_demo.drop(columns="LABEL")
y_pred = pd.DataFrame(latest_model.predict(X=X_demo)).value_counts(normalize=True)
y_pred

  return pitch_tuning(


0    0.885057
1    0.114943
Name: proportion, dtype: float64

In [37]:
path = os.path.join(os.pardir, "raw_data", "Tests", "Test_Elise.m4a")
y, sr = librosa.load(path)
df_demo = preprocess_data(y,sr,0)
X_demo = df_demo.drop(columns="LABEL")
y_pred = pd.DataFrame(latest_model.predict(X=X_demo)).value_counts(normalize=True)
y_pred

  y, sr = librosa.load(path)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


0    0.826087
1    0.173913
Name: proportion, dtype: float64

In [41]:
path = os.path.join(os.pardir, "raw_data", "Tests", "Youssef_2_Female.mp3")
y, sr = librosa.load(path)
df_demo = preprocess_data(y,sr,0)
X_demo = df_demo.drop(columns="LABEL")
y_pred = pd.DataFrame(latest_model.predict(X=X_demo)).value_counts(normalize=True)
y_pred

0    1.0
Name: proportion, dtype: float64

# New TPOT model (Elise)

In [10]:
pipeline_2 = XGBClassifier(learning_rate=0.5, max_depth=7, min_child_weight=19, n_estimators=100, n_jobs=1, subsample=0.9500000000000001, verbosity=0)
pipeline_2.fit(X_train, y_train)

In [12]:
y_pred = pipeline_2.predict(X_test)
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Precision:", precision_score(y_test, y_pred))
print("Recal:", recall_score(y_test, y_pred))
print("F1 score:", f1_score(y_test, y_pred))

Accuracy: 0.873222177581358
Precision: 0.9697502361213138
Recal: 0.8775432682035089
F1 score: 0.9213455139150267


### New model Youssef

In [3]:
latest_model = joblib.load("models/last_XGB")

In [9]:
path = os.path.join("Nicole_to_Youssef.wav")
y, sr = librosa.load(path)
df_demo = preprocess_data(y,sr,0)
X_demo = df_demo.drop(columns="LABEL")
y_pred = pd.DataFrame(latest_model.predict(X=X_demo)).value_counts(normalize=True)
y_pred

0    0.9
1    0.1
Name: proportion, dtype: float64

In [10]:
latest_model = joblib.load("models/20231212-123732.pkl")

In [11]:
path = os.path.join("Nicole_to_Youssef.wav")
y, sr = librosa.load(path)
df_demo = preprocess_data(y,sr,0)
X_demo = df_demo.drop(columns="LABEL")
y_pred = pd.DataFrame(latest_model.predict(X=X_demo)).value_counts(normalize=True)
y_pred

1    0.766667
0    0.233333
Name: proportion, dtype: float64

In [23]:
y_pred.index[0][0]

1

In [24]:
y_pred[y_pred.index[0]]

0.7666666666666667