In [1]:
# Read protocol text file (ASVspoof2017_V2_train.trn, ASVspoof2017_V2_eval.trl, ASVspoof2017_V2_dev.trl)
# take file name, label, id, speech(?)
# Read individual files

### Import Libraries

In [2]:
import pandas as pd
import scipy.fftpack
from scipy.fftpack import dct
import librosa
import matplotlib.pyplot as plt
import numpy as np
import os
import pickle

### File Paths

In [3]:
protocol_dev_file = '../Dataset/ASVSpoof/actual/protocol_V2/ASVspoof2017_V2_dev.trl.txt'
protocol_train_file = '../Dataset/ASVSpoof/actual/protocol_V2/ASVspoof2017_V2_train.trn.txt'
protocol_eval_file = '../Dataset/ASVSpoof/actual/protocol_V2/ASVspoof2017_V2_eval.trl.txt'

In [4]:
folder_dev_file = '../Dataset/ASVSpoof/actual/ASVspoof2017_V2_dev'
folder_train_file = '../Dataset/ASVSpoof/actual/ASVspoof2017_V2_train'
folder_eval_file = '../Dataset/ASVSpoof/actual/ASVspoof2017_V2_eval'

In [5]:
df_dev = pd.read_csv(protocol_dev_file, sep=' ')
df_train = pd.read_csv(protocol_train_file, sep=' ')
df_eval = pd.read_csv(protocol_eval_file, sep=' ')

### File Check

In [6]:
print(df_dev.head())

         file_id     type speaker_id phrase_id environment_id playback_id  \
0  D_1000001.wav  genuine      M0011       S06              -           -   
1  D_1000002.wav  genuine      M0011       S08              -           -   
2  D_1000003.wav  genuine      M0011       S04              -           -   
3  D_1000004.wav  genuine      M0011       S07              -           -   
4  D_1000005.wav  genuine      M0011       S10              -           -   

  recording_id  
0            -  
1            -  
2            -  
3            -  
4            -  


In [7]:
print(df_train.head())

         file_id     type speaker_id phrase_id environment_id playback_id  \
0  T_1000001.wav  genuine      M0002       S05              -           -   
1  T_1000002.wav  genuine      M0002       S09              -           -   
2  T_1000003.wav  genuine      M0002       S01              -           -   
3  T_1000004.wav  genuine      M0002       S10              -           -   
4  T_1000005.wav  genuine      M0002       S04              -           -   

  recording_id  
0            -  
1            -  
2            -  
3            -  
4            -  


In [8]:
print(df_eval.head())

         file_id   type speaker_id phrase_id environment_id playback_id  \
0  E_1000001.wav  spoof      M0029       S03            E19         P22   
1  E_1000002.wav  spoof      M0027       S10            E19         P22   
2  E_1000003.wav  spoof      M0025       S01            E14         P03   
3  E_1000004.wav  spoof      M0035       S10            E12         P16   
4  E_1000005.wav  spoof      M0025       S10            E18         P05   

  recording_id  
0          R22  
1          R22  
2          R04  
3          R11  
4          R03  


In [9]:
dev_labels = df_dev['type'].map({'genuine' : 1, 'spoof' : 0})
train_labels = df_train['type'].map({'genuine' : 1, 'spoof' : 0})
eval_labels = df_eval['type'].map({'genuine' : 1, 'spoof' : 0})

In [39]:
print(dev_labels)

0       1
1       1
2       1
3       1
4       1
       ..
1705    0
1706    0
1707    0
1708    0
1709    0
Name: type, Length: 1710, dtype: int64


In [10]:
print(dev_labels.shape)
print(train_labels.shape)
print(eval_labels.shape)

(1710,)
(3014,)
(13306,)


### Find the minimum and maximum audio file duration

In [11]:
# def find_min_max_duration(folder_path):
#     min_duration = float('inf')
#     max_duration = 0
#     for file in os.listdir(folder_path):
#         file_path = os.path.join(folder_path, file)
#         y, sr = librosa.load(file_path, sr=None)
#         duration = librosa.get_duration(y=y, sr=sr)
            
#         # Update min and max durations
#         if duration < min_duration:
#             min_duration = duration
#         if duration > max_duration:
#             max_duration = duration
            
#     return min_duration, max_duration

In [12]:
# min_dev_duration, max_dev_duration = find_min_max_duration(folder_dev_file)
# min_train_duration, max_train_duration = find_min_max_duration(folder_train_file)
# min_eval_duration, max_eval_duration = find_min_max_duration(folder_eval_file)

In [13]:
# print(f'DEV     : min({min_dev_duration}), max({max_dev_duration})')
# print(f'TRAIN   : min({min_train_duration}), max({max_train_duration})')
# print(f'TEST    : min({min_eval_duration}), max({max_eval_duration})')

#### Minimum and Maximum audio file duration of ASVSpoof2017
DEV     : min(0.6855), max(10.9095); 
TRAIN   : min(0.7935), max(5.6295); 
TEST    : min(0.9445), max(7.5685); 

In [14]:
min_train_duration = 0.7935
max_train_duration = 5.6295

### Pad & Truncate Audio Files

In [15]:
def pad_and_truncate_audio(audio, target_duration, sample_rate):
    target_length = int(target_duration * sample_rate)
    
    if len(audio) < target_length:
        # Pad the audio with zeros
        pad_width = target_length - len(audio)
        return np.pad(audio, (0, pad_width), mode='constant')
    elif len(audio) > target_length:
        # Truncate the audio
        return audio[:target_length]
    return audio  # If already the correct length, return as is

def pad_and_truncate_files(folder_path):
    new_audio = []
    sample_rate = []
    for file in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file)
        audio, sr = librosa.load(file_path, sr=None)
        processed_audio = pad_and_truncate_audio(audio, max_train_duration, 16000)
        new_audio.append(processed_audio)
        sample_rate.append(sr)

    return new_audio, sample_rate

In [16]:
# train_audio, train_sr = pad_and_truncate_files(folder_train_file)
# dev_audio, dev_sr = pad_and_truncate_files(folder_dev_file)
# eval_audio, eval_sr = pad_and_truncate_files(folder_eval_file)

### Code for LFCC

In [17]:
def pre_emphasis(signal, pre_emphasis_coef=0.97):
    return np.append(signal[0], signal[1:] - pre_emphasis_coef * signal[:-1])

def framing(signal, frame_size, frame_stride, sample_rate):
    frame_length, frame_step = frame_size * sample_rate, frame_stride * sample_rate
    signal_length = len(signal)
    frame_length = int(round(frame_length))
    frame_step = int(round(frame_step))
    
    num_frames = int(np.ceil(float(np.abs(signal_length - frame_length)) / frame_step)) + 1
    
    pad_signal_length = num_frames * frame_step + frame_length
    z = np.zeros((pad_signal_length - signal_length))
    pad_signal = np.append(signal, z)
    
    indices = np.tile(np.arange(0, frame_length), (num_frames, 1)) + np.tile(np.arange(0, num_frames * frame_step, frame_step), (frame_length, 1)).T
    frames = pad_signal[indices.astype(np.int32, copy=False)]
    
    return frames

def apply_window(frames):
    hamming_window = np.hamming(frames.shape[1])
    return frames * hamming_window

def power_spectrum(frames, NFFT):
    mag_frames = np.absolute(np.fft.rfft(frames, NFFT))  # Magnitude of the FFT
    pow_frames = ((1.0 / NFFT) * ((mag_frames) ** 2))  # Power Spectrum
    return pow_frames

def linear_filter_bank(sample_rate, NFFT, nfilt=26):
    low_freq_mel = 0
    high_freq_mel = sample_rate / 2
    linear_points = np.linspace(low_freq_mel, high_freq_mel, nfilt + 2)
    
    bins = np.floor((NFFT + 1) * linear_points / sample_rate)
    
    fbank = np.zeros((nfilt, int(np.floor(NFFT / 2 + 1))))
    for m in range(1, nfilt + 1):
        f_m_minus = int(bins[m - 1])   # left
        f_m = int(bins[m])             # center
        f_m_plus = int(bins[m + 1])    # right
        
        for k in range(f_m_minus, f_m):
            fbank[m - 1, k] = (k - bins[m - 1]) / (bins[m] - bins[m - 1])
        for k in range(f_m, f_m_plus):
            fbank[m - 1, k] = (bins[m + 1] - k) / (bins[m + 1] - bins[m])
    return fbank

def extract_lfcc(signal, sample_rate, frame_size=0.025, frame_stride=0.01, nfilt=26, NFFT=512, num_ceps=13):
    # Step 1: Pre-emphasis
    emphasized_signal = pre_emphasis(signal)

    # Step 2: Framing
    frames = framing(emphasized_signal, frame_size, frame_stride, sample_rate)

    # Step 3: Apply Hamming Window
    windowed_frames = apply_window(frames)

    # Step 4: Power Spectrum
    power_frames = power_spectrum(windowed_frames, NFFT)

    # Step 5: Filter Banks (Linear scale)
    fbank = linear_filter_bank(sample_rate, NFFT, nfilt)
    filter_banks = np.dot(power_frames, fbank.T)
    filter_banks = np.where(filter_banks == 0, np.finfo(float).eps, filter_banks)  # Numerical Stability
    log_filter_banks = np.log(filter_banks)

    # Step 6: Discrete Cosine Transform (DCT)
    lfcc = dct(log_filter_banks, type=2, axis=1, norm='ortho')[:, :num_ceps]

    return lfcc

### Code to extract LFCC and MFCC

In [18]:
def extract_voice_features(audio, sr):
    lfcc = extract_lfcc(audio, sr)
    mfcc = librosa.feature.mfcc(y=audio, sr=sr)

    features = np.concatenate([lfcc.flatten(), mfcc.flatten()])
    return features

def extract_features_from_folder(audios, srs):
    features_list = []

    for audio, sr in zip(audios, srs):
        features = extract_voice_features(audio, sr)
        features_list.append(features)
    return features_list

In [19]:
# dev_features = extract_features_from_folder(dev_audio, dev_sr)
# train_features = extract_features_from_folder(train_audio, train_sr)
# eval_features = extract_features_from_folder(eval_audio, eval_sr)

### Get Names

In [20]:
def get_names(folder_path):
    name_list = []
    for file in os.listdir(folder_path):
        name_list.append(file)
    return name_list

In [21]:
dev_names = get_names(folder_dev_file)
train_names = get_names(folder_train_file)
eval_names = get_names(folder_eval_file)

##### npy save paths

In [22]:
dev_features_path = './dev_features.npy'
train_features_path = './train_features.npy'
eval_features_path = './eval_features.npy'

##### Save data in pkl to save for later

In [23]:
# np.save(dev_features_path, np.array(dev_features))
# np.save(train_features_path, np.array(train_features))
# np.save(eval_features_path, np.array(eval_features))

##### Load npy data

In [24]:
dev_features = np.load(dev_features_path)
train_features = np.load(train_features_path)
eval_features = np.load(eval_features_path)

In [29]:
print(dev_features.shape)
print(len(dev_names))
print(train_features.shape)
print(len(train_names))
print(eval_features.shape)
print(len(eval_names))

(1710, 10826)
1710
(3014, 10826)
3014
(13306, 10826)
13306


In [30]:
from sklearn.mixture import GaussianMixture

In [82]:
genuine_gmm = GaussianMixture(n_components=2, covariance_type='spherical', random_state=42)
genuine_gmm.fit(train_features)

In [83]:
dev_predictions = genuine_gmm.predict(dev_features)
eval_predictions = genuine_gmm.predict(eval_features)

In [84]:
from sklearn.metrics import accuracy_score

true_labels_dev = dev_labels.values  # or however you store your labels

accuracy_dev = accuracy_score(true_labels_dev, dev_predictions)
print(f'Accuracy on the development set: {accuracy_dev:.2f}')

Accuracy on the development set: 0.46


In [85]:
true_labels_eval = eval_labels.values

accuracy_eval = accuracy_score(true_labels_eval, eval_predictions)
print(f'Accuracy on the eval set: {accuracy_eval:.2f}')

Accuracy on the eval set: 0.68
