In [None]:
# Section 5.3
# Contains the extraction process of 4 features used in Void system
# feature extraction

# !pip install pydub

import numpy as np
import scipy.io.wavfile as wav
import scipy.signal as ssig
import scipy.stats as stats
from scipy.signal import find_peaks
import os
import matplotlib.pyplot as plt
import math
import librosa
import librosa
from sklearn import svm
from sklearn import svm
import pickle
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import NearestCentroid
from pydub import AudioSegment

def _stft(y):
    n_fft, hop_length, _ = _stft_parameters()
    return librosa.stft(y=y, n_fft=n_fft, hop_length=hop_length)


def _stft_parameters():
    # n_fft = (num_freq - 1) * 2
    n_fft = 2048
    hop_length = 128
    # hop_length = int(frame_shift_ms / 1000 * sample_rate)
    # win_length = int(frame_length_ms / 1000 * sample_rate)
    win_length = 512
    return n_fft, hop_length, win_length


def LinearityDegreeFeatures(power_normal):
    # Calculate signal power linearity degree features
    # input: power_normal
    # output: FV_LDF

    # Normalize power_vec as power_normal:
    #power_normal = power_vec / np.sum(power_vec)
    # From power_normal, calculate cumulative distribution of spectral power power_cdf:
    power_cdf = np.cumsum(power_normal)
    # Compute the correlation coefficients of power_cdf and store the results as rho:
    pearson_co = stats.pearsonr(power_cdf, np.arange(power_cdf.size))
    rho = pearson_co[0]
    #print("rho =", rho)
    # Compute the quadratic coefficients of power_cdf and store the results as q:
    #x_values = np.arange(0, 8+8/(power_cdf.size-1), 8/(power_cdf.size-1))
    x_values = np.arange(0, 8+7/(power_cdf.size-1), 8/(power_cdf.size-1))
    #parameter_2 = np.polyfit(x_values, power_cdf, 2)
    parameter_2 = np.polyfit(power_cdf, x_values, 2)
    q = parameter_2[0]
    #print("q =", q)
    # Form rho and q as FV_LDF:
    FV_LDF = np.array([rho, q])
 
    # Plot power_cdf and its estimation if necessary:
    plt.rc('text', usetex=True)
    plt.rc('font', family='serif')
    plt.figure()
    plt.plot(x_values, power_cdf, color='red', marker='d', label=r'$\textbf{pow}_\textbf{cdf}$')
    plt.plot(x_values, np.polyval(parameter_2, x_values), color='blue',
        marker='o', label=r'$\textbf{fitting curve to pow}_\textbf{cdf}$')
    plt.xlabel(r'\textbf{Frequency (kHz)}', fontsize=16)
    plt.ylabel(r'$\textbf{pow}_\textbf{cdf}$', fontsize=16)
    # The coordinate of starting point and end point of the arrow should be chosen manually:
    plt.annotate('rho = %.3f \n q = %.3f' % (rho, q), xy=(0.4, np.polyval(parameter_2, 0.4)-0.1),
        xytext=(1, 0.3), arrowprops=dict(arrowstyle="->",connectionstyle="arc3"), fontsize=12)
    plt.legend(fontsize=14)
    plt.grid()
    plt.show()

    # Return power_cdf for plotting figures:
    return power_cdf, FV_LDF


def HighPowerFrequencyFeatures(FV_LFP, omega):
    # Calculate high power frequency
    # input: FV_LFP, omega
    # output: FV_HPF

    # 1. Find peaks from FV_LFP (returns the indices of found peaks):
    peaks_idx, _ = find_peaks(FV_LFP, height=0)
    # Obtain corresponding values of the peaks:
    peaks_val = FV_LFP[peaks_idx]
    # 2. Compute the threshold of selecting peaks using omega:
    T_peak = omega * max(peaks_val)
    # 3. Remove peaks lower than T_peak (insignificant peaks):
    peaks_idx = peaks_idx[np.where(peaks_val >= T_peak)]
    peaks_val = FV_LFP[peaks_idx]
    # 4. Obtain the number of remaining peaks:
    N_peak = peaks_idx.size
    # 5. Compute the mean of the locations of remaining peaks:
    mu_peak = peaks_idx.mean()
    # 6. Compute the standard deviation of the locations of remaining peaks:
    sigma_peak = np.std(peaks_idx)
    # 7. Use a 6-order polynomial to fit FV_LFP and take first 32 estimatied values as P_est:
    parameter_6 = np.polyfit(np.arange(FV_LFP.size), FV_LFP, 6)
    value_est = np.polyval(parameter_6, np.arange(FV_LFP.size))
    '''
    plt.figure()
    plt.plot(np.arange(FV_LFP.size), FV_LFP, 'r')
    plt.plot(np.arange(FV_LFP.size), value_est, 'b')
    plt.show()
    '''
    P_est = value_est[0:32]
    # Construct FV_HPF (insert N_peak, mu_peak and sigma_peak in fornt of P_est):
    FV_HPF = np.insert(P_est, 0, [N_peak, mu_peak, sigma_peak])
    return FV_HPF


def lpc_to_lpcc(lpc):
    # Based on given LPC, calculate LPCC:
    lpcc = []
    order = lpc.size - 1
    # The 1st element equals ln(order):
    lpcc.append(math.log(order))
    lpcc.append(lpc[1])
    for i in range(2, order+1):
        sum_1 = 0
        for j in range(1, i):
            sum_1 += j / i * lpc[i-j-1] * lpcc[j]
        c = -lpc[i-1] + sum_1
        lpcc.append(c)
    return lpcc[1:13]


def extract_lpcc(wav_path, order):
    y, _ = librosa.load(wav_path, sr=16000)
    lpc = librosa.lpc(y,order=order)
    lpcc = np.array(lpc_to_lpcc(lpc))
    return lpcc


# https://www.cnblogs.com/klchang/p/9280509.html
def calc_stft(signal, sample_rate=16000, frame_size=512, frame_stride=128, winfunc=np.hamming, NFFT=2048):

    # Calculate the number of frames from the signal
    signal_length = len(signal)
    frame_step = frame_stride
    num_frames = 1 + int(np.ceil(float(np.abs(signal_length - frame_size)) / frame_step))
    # zero padding
    pad_signal_length = num_frames * frame_step + frame_size
    z = np.zeros((pad_signal_length - signal_length))
    # Pad signal to make sure that all frames have equal number of samples
    # without truncating any samples from the original signal
    pad_signal = np.append(signal, z)

    # Slice the signal into frames from indices
    indices = np.tile(np.arange(0, frame_size), (num_frames, 1)) + \
            np.tile(np.arange(0, num_frames * frame_step, frame_step), (frame_size, 1)).T
    frames = pad_signal[indices.astype(np.int32, copy=False)]
    # Get windowed frames
    frames *= winfunc(frame_size)
    # Compute the one-dimensional n-point discrete Fourier Transform(DFT) of
    # a real-valued array by means of an efficient algorithm called Fast Fourier Transform (FFT)
    mag_frames = np.absolute(np.fft.rfft(frames, NFFT))
    # Compute power spectrum
    pow_frames = (1.0 / NFFT) * ((mag_frames) ** 2)
    '''
    # Calculate the number of frames from the signal
    frame_length = frame_size * sample_rate
    frame_step = frame_stride * sample_rate
    signal_length = len(signal)
    frame_length = int(round(frame_length))
    frame_step = int(round(frame_step))
    num_frames = 1 + int(np.ceil(float(np.abs(signal_length - frame_length)) / frame_step))
    # zero padding
    pad_signal_length = num_frames * frame_step + frame_length
    z = np.zeros((pad_signal_length - signal_length))
    # Pad signal to make sure that all frames have equal number of samples
    # without truncating any samples from the original signal
    pad_signal = np.append(signal, z)

    # Slice the signal into frames from indices
    indices = np.tile(np.arange(0, frame_length), (num_frames, 1)) +
        np.tile(np.arange(0, num_frames * frame_step, frame_step), (frame_length, 1)).T
    frames = pad_signal[indices.astype(np.int32, copy=False)]
    # Get windowed frames
    frames *= winfunc(frame_length)
    # Compute the one-dimensional n-point discrete Fourier Transform(DFT) of
    # a real-valued array by means of an efficient algorithm called Fast Fourier Transform (FFT)
    mag_frames = np.absolute(np.fft.rfft(frames, NFFT))
    # Compute power spectrum
    pow_frames = (1.0 / NFFT) * ((mag_frames) ** 2)
    '''
    return pow_frames


def load_audio_file(file_path):
    # Check the file extension
    file_ext = os.path.splitext(file_path)[1]
    if file_ext.lower() == '.mp3':
        # Load MP3 file
        audio = AudioSegment.from_mp3(file_path)
        # Construct the output file path by replacing the extension with '.wav'
        output_file_path = os.path.splitext(file_path)[0] + '.wav'
        # Convert MP3 to WAV and save the file
        audio.export(output_file_path, format="wav")
    elif file_ext.lower() == '.wav':
        # Load WAV file
        audio = AudioSegment.from_wav(file_path)
        output_file_path = file_path  # No need to save if it's already in WAV format
    else:
        raise ValueError("Unsupported file format. Only MP3 and WAV are supported.")

    return output_file_path


In [None]:
dataset_labels = ['eval']

for dataset in dataset_labels:
    feature_file = os.path.join(os.getcwd(), 'feature_label_{}.npy'.format(dataset))

    # If all the features and labels are extracted and stored into './feature_label.txt', then directly load this file;
    # Otherwise, generate this file first:
    if os.path.isfile(feature_file):
        print("The features has already been extracted!")
    else:
        print('Feature extraction for ' + dataset + ' has not been done. Extract Void features...')
        # PREPARATION:
        # Path to data:
        data_path = os.path.join(os.getcwd(),'audio',dataset)
        # Protocol of data:
        protocol = os.path.join(os.getcwd(),'ASVspoof2017_V2_eval.trl.txt')
        # Load the filenames and corresponding labels:
        fp = open(protocol)
        lines = fp.readlines()
        name_seq = []
        label_seq = []
        for line in lines:
            str_list = line.split()
            name_seq.append(str_list[0])
            label_seq.append(str_list[1])

        # Initialize parameters:
        W = 14
        # Peak selection threshold:
        omega = 0.3
        # Number of points in each signal segment (window size):
        nperseg = 512
        # Hop length of the window is 25% nperseg (with 75% overlap):
        noverlap = 512-128
        # Number of FFT points:
        nfft = 2048
        # Calculate the number of segments k in S_pow:
        k = int((nfft/2 + 1) / W)

        # Create an empty Numpy array to store extracted features as well as corresponding labels:
        fl = np.zeros((len(name_seq), 98))

        for name_idx in np.arange(len(name_seq)):
            #Obtain the name of current file:
            filename = name_seq[name_idx]
            # Obtain the label of current file:
            label = label_seq[name_idx]

            # ------ Stage 1: Signal transformation ------
            # Read the input signal:
            output_file_path=load_audio_file(os.path.join(data_path, filename))
            signal, _ = librosa.load(output_file_path, sr=16000)

            # Compute STFT for the input signal:
            sig_stft = _stft(signal)

            # Compute S_pow from STFT:
            S_pow = np.sum(np.abs(sig_stft)**2/nfft, axis=1)

            # ------ Stage 2: Feature Extraction ------
            # Calculate the sum of power in each segment (in total k segments):
            power_vec = np.zeros(k)
            for i in np.arange(k):
                power_vec[i] = np.sum(S_pow[i*W:(i+1)*W])
            # Normalize power_vec as power_normal:
            power_normal = power_vec / np.sum(power_vec)

            # Feature 1: FV_LFP - low frequencies power features
            FV_LFP = power_normal[0:48] * 100
            #print(FV_LFP)

            # Feature 2: FV_LDF - signal power linearity degree features
            _, FV_LDF = LinearityDegreeFeatures(power_normal)
            #FV_LDF = np.zeros(2)

            # Feature 3: FV_HPF - high power frequency features
            FV_HPF = HighPowerFrequencyFeatures(FV_LFP, omega)
            #FV_HPF = np.zeros(35)

            # Feature 4: FV_LPC - linear prediction cesptrum coefficients
            FV_LPC = extract_lpcc(os.path.join(data_path, filename), 12)
            #FV_LPC = np.zeros(12)

            # ------ Stage 3: Attack Detection ------
            # Normalize each sub-feature:
            '''
            mean_LFP = np.mean(FV_LFP)
            FV_LFP = (FV_LFP - mean_LFP) / (FV_LFP.max() - FV_LFP.min())
            mean_LDF = np.mean(FV_LDF)
            FV_LDF = (FV_LDF - mean_LDF) / (FV_LDF.max() - FV_LDF.min())
            mean_HPF = np.mean(FV_HPF)
            FV_HPF = (FV_HPF - mean_HPF) / (FV_HPF.max() - FV_HPF.min())
            mean_LPC = np.mean(FV_LPC)
            FV_LPC = (FV_LPC - mean_LPC) / (FV_LPC.max() - FV_LPC.min())
            '''
            # Construct the final feature of length 97 (= 2 + 35 + 12 + 48):
            FV_Void = np.concatenate((FV_LDF, FV_HPF, FV_LPC, FV_LFP))
            #FV_Void = np.concatenate((FV_LDF, FV_LPC))
            '''
            print("Extracted Void feature for {} is:".format(filename))
            print(FV_Void)
            print("--------------------------------------------")
            '''

            if label == 'genuine':
                label = 1
            else:
                label = 0
            fl[name_idx, 0:97] = FV_Void
            fl[name_idx, 97] = label
        np.save(feature_file, fl)

print("Feature extraction FINISHED!")

Feature extraction for eval has not been done. Extract Void features...
Feature extraction FINISHED!


In [None]:
def audio_label(s):
    it = {b'genuine':1, b's/////////////////////poof':0}
    return it[s]

In [None]:
# Import evaluation data for testing:
file_eval = os.path.join(os.getcwd(),'feature_labels','feature_label_eval.npy')
# If all the features and labels are extracted and stored into './feature_label.txt', then directly load this file;
# Otherwise, generate this file first:
if os.path.isfile(file_eval):
    # Load the extracted features and corresponding labels:
    data_eval = np.load(file_eval)
else:
    print('Feature extraction has not been done. Please extract Void features using data_preparation.py!')
# Load the Void features and corresponding labels:
x_eval, y_eval = np.split(data_eval, indices_or_sections=(97,), axis=1)

# Load the the SVM classifier model:
file_model = os.path.join(os.getcwd(),'svm.pkl')
if os.path.isfile(file_model):
    # Load the extracted features and corresponding labels:
    with open(file_model, 'rb') as f:
      classifier=pickle.load(f)
else:
    print('Feature extraction has not been done. Please extract Void features using data_preparation.py!')

# Prediction result of SVM classifier on EVALUATION set of ASVspoof 2017 v2 dataset:
print("Results on EVALUATION SET:")
result_pred = classifier.predict(x_eval)
for i in np.arange(y_eval.size):
        # The i-th predicted label:
        label_pred = result_pred[i]
        # The actual label for i-th sample:
        label_actual = y_eval[i]

        print("predict, actual:", label_pred, label_actual)

Results on EVALUATION SET:
predict, actual: 0.525418295544118 [1.]
predict, actual: 0.7776566384841299 [1.]
predict, actual: 0.41728831386440346 [1.]
predict, actual: 0.45483333005977916 [0.]
predict, actual: 0.5910411211363287 [0.]
predict, actual: 0.7022448416281751 [0.]
predict, actual: 0.32840359087963494 [0.]
