In [None]:
import gc
import torch
import torchaudio
import librosa
import numpy as np
import torch.nn as nn
import pandas as pd
import matplotlib.pyplot as plt
from torchsummary import summary
import keras
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler
from collections import Counter
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
import warnings

tqdm.pandas()

In [None]:
# suppress all warnings
warnings.filterwarnings("ignore")

In [None]:
RATE_HZ = 16000 # resampling rate in Hz
MAX_LENGTH = 128000 # maximum audio interval length to consider (= RATE_HZ * SECONDS)
CSV_FILE_PATH: str = "bio_metadata.csv"
NATIVE_FILE_PATH: str = "native_bio_metadata.csv"
ALL_SPEAKERS_PATH: str = "speakers_all.csv"
NON_NATIVE_FILE_PATH: str = "non_native_bio_metadata.csv"
COL_SIZE: int = 30
SILENCE_THRESHOLD: float = .01
RATE: int = 2400
N_MFCC: int = 13

In [None]:
# extract acoustic features from audio files function
def extract_mfcc_features(file_name):
    try:
        audio, sample_rate = librosa.load(file_name)
        mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=N_MFCC)
        mfccs_processed = np.mean(mfccs.T, axis=0)
    except Exception as e:
        print("Error encountered while parsing file: ", file_name)
        return None
    return mfccs_processed

# extract, chroma_stft, spectral_centroid, 
# spectral_bandwidth, spectral_rolloff, zero_crossing_rate function
def extract_accoustic_features(file_name):
    """
        Extracts accoustic features from audio file
        Takes in the file name and returns the following features:
        Args:
            :param file_name: str: name of the file to extract features from
        Returns:
            :return mfcc: np.array: Mel-frequency cepstral coefficients
            :return chroma_stft: np.array: Chroma short-time Fourier transform
            :return spectral_centroid: np.array: Spectral centroid
            :return spectral_bandwidth: np.array: Spectral bandwidth
            :return spectral_rolloff: np.array: Spectral rolloff
            :return zero_crossing_rate: np.array: Zero crossing rate
    """
    try:
        audio, sample_rate = librosa.load(f'./data/audio/{file_name}')
        mfcc = np.mean(librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=COL_SIZE).T, axis=0)
        chroma_stft = np.mean(librosa.feature.chroma_stft(y=audio, sr=sample_rate).T, axis=0)
        spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=sample_rate).T, axis=0)
        spectral_bandwidth = np.mean(librosa.feature.spectral_bandwidth(y=audio, sr=sample_rate).T, axis=0)
        spectral_rolloff = np.mean(librosa.feature.spectral_rolloff(y=audio, sr=sample_rate).T, axis=0)
        zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(audio).T, axis=0)
        return mfcc, chroma_stft, spectral_centroid, spectral_bandwidth, spectral_rolloff, zero_crossing_rate
    except Exception as e:
        print("Error encountered while parsing file: ", file_name)
        return None
    

# extract pitch intensity, duration, loudness, jitter, shimmer, hnr function (prosodic features)
def extract_prosodic_features(file_name):
    try:
        audio, sample_rate = librosa.load(file_name, res_type='kaiser_fast')
        pitch_intensity = np.mean(librosa.pyin.piptrack(y=audio, sr=sample_rate).T, axis=0)
        duration = np.mean(librosa.effects.time_stretch(audio, 1.0).T, axis=0)
        loudness = np.mean(librosa.feature.rms(y=audio).T, axis=0)
        jitter = np.mean(librosa.effects.jitter(y=audio).T, axis=0)
        shimmer = np.mean(librosa.effects.shimmer(y=audio).T, axis=0)
        hnr = np.mean(librosa.effects.harmonic(y=audio).T, axis=0)
        return pitch_intensity, duration, loudness, jitter, shimmer, hnr
    except Exception as e:
        print("Error encountered while parsing file: ", file_name)
        return None
    

# extract plp features function
def extract_plp_features(file_name):
    try:
        audio, sample_rate = librosa.load(file_name, res_type='kaiser_fast')
        plp = np.mean(librosa.beat.plp(y=audio, sr=sample_rate, n_mfcc=COL_SIZE).T, axis=0)
    except Exception as e:
        print("Error encountered while parsing file: ", file_name)
        return None
    return plp

In [None]:
# get wav from file function
def get_wav(file_name):
    try:
        audio, sample_rate = librosa.load(f'./data/audio/{file_name}.wav')
        return librosa.core.resample(y=audio, orig_sr=sample_rate, target_sr=RATE, scale=True)
    except Exception as e:
        print("Error encountered while parsing file: ", file_name)
        return None

# convert wave to mfcc function
def wave_to_mfcc(audio, sample_rate):
    try:
        mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=N_MFCC)
        mfccs_processed = np.mean(mfccs.T, axis=0)
    except Exception as e:
        print("Error encountered while parsing audio")
        return None
    return mfccs_processed

# normalize mfcc function
def normalize_mfcc(mfcc):
    mms = MinMaxScaler()
    return mms.fit_transform(np.abs(mfcc))

# to categorical function
def to_categorical(y):
    lang_dict = {}
    for index, language in enumerate(set(y)):
        lang_dict[language] = index
    y = list(map(lambda x: lang_dict[x],y))
    return keras.utils.to_categorical(y, len(lang_dict)), lang_dict


In [None]:
# load the native_bio_metadata.csv
native_bio_metadata = pd.read_csv(NATIVE_FILE_PATH)
native_bio_metadata.head()

In [None]:
# drop href, age_sex, age_of_english_onset, other_languages, birthplace
native_bio_metadata.drop(columns=['href', 'age', 'age_of_english_onset', 'other_languages', 'birth_place', 'age_sex', 'length_of_english_residence', 'english_learning_method'], inplace=True)
native_bio_metadata.head()

In [None]:
# describe the native_bio_metadata
native_bio_metadata.describe()

In [None]:
# remove \n and select first one in native_bio_metadata['english_residence']
native_bio_metadata['english_residence'] = native_bio_metadata['english_residence'].apply(lambda x: x.split('\n')[0])
native_bio_metadata.sample(10)

In [None]:
# value counts
native_bio_metadata['english_residence'].value_counts()

In [None]:
native_bio_metadata.shape

In [None]:
# to categorical function
def to_categorical(y):
    lang_dict = {}
    for index, language in enumerate(set(y)):
        lang_dict[language] = index
    y = list(map(lambda x: lang_dict[x],y))
    return keras.utils.to_categorical(y, len(lang_dict)), lang_dict

In [None]:
# add a new column 'file' with the full path of the audio file, the audio files are location in './data/native_combined/'
native_bio_metadata.loc[:, 'file'] = native_bio_metadata['language_num'].apply(lambda x: f"data/audio/{x}.wav")
native_bio_metadata.head()

In [None]:
native_bio_metadata['english_residence'].value_counts()

In [None]:
# create an upsampler
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from collections import Counter

ros = RandomOverSampler(random_state=42)
X = native_bio_metadata.drop(columns=['native_language', 'sex'])
y = native_bio_metadata['english_residence']
X_resampled, y_resampled = ros.fit_resample(X, y)


rus = RandomUnderSampler(random_state=42)
X_resampled, y_resampled = rus.fit_resample(X, y)

In [None]:
print(sorted(Counter(y_resampled).items()))
print(sorted(Counter(y_resampled).items()))

In [None]:
X_resampled['english_residence'].value_counts()

In [None]:
X_resampled.head()

In [None]:
def get_transform_audio(file):
    audio,rate = torchaudio.load(str(file))
    transform = torchaudio.transforms.Resample(rate,RATE_HZ)
    audio = transform(audio).squeeze(0).numpy()
    audio = audio[:MAX_LENGTH]
    return audio

In [None]:
X_resampled.columns

In [None]:
X = X_resampled['language_num'].values

In [None]:
y = X_resampled['english_residence'].values

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

X_train.shape, X_test.shape, y_train.shape, y_test.shape

In [None]:
# create a counter for train and test labels
train_counter = Counter(y_train)
test_counter = Counter(y_test)

print(f"Train Count: {train_counter}")
print(f"Test Count: {test_counter}")

In [None]:
y_train_cat, _ = to_categorical(y_train)
y_test_cat, lang_dict = to_categorical(y_test)

In [None]:
# extract acoustic features
X_train_acoustic = []

for audio in X_train:
    # audio is a waveform, load it and extract the features
    # mfcc, chroma_stft, spectral_centroid, spectral_bandwidth, spectral_rolloff, zero_crossing_rate
    mfcc, chroma_stft, _, _, _, _ = extract_accoustic_features(audio + '.wav')
    
    X_train_acoustic.append([
        mfcc,
        chroma_stft,
    ])
    

In [None]:
X_test_acoustic = []

for file in X_test:
    audio = file
    mfcc, chroma_stft, _, _, _, _ = extract_accoustic_features(audio + '.wav')
    X_test_acoustic.append([
        mfcc,
        chroma_stft,
    ])

In [None]:
# create pytorch model to train the data on
class AcousticDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # Concatenate the features within each sample
        features = torch.tensor(np.concatenate(self.X[idx], axis=0), dtype=torch.float32)
        label = torch.tensor(self.y[idx], dtype=torch.long)  # self.y[idx] is the label index
        return features, label

In [None]:
train_dataset = AcousticDataset(X_train_acoustic, y_train_cat)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# create the data loaders
val_dataset = AcousticDataset(X_test_acoustic, y_test_cat)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)

In [None]:
class AudioClassifier(nn.Module):
    def __init__(self, input_size=42, num_classes=len(lang_dict)):
        super(AudioClassifier, self).__init__()
        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=3, stride=2),
            nn.ReLU(),
            # nn.BatchNorm1d(32),
            nn.Conv1d(32, 64, kernel_size=3, stride=2),
            nn.ReLU(),
            # nn.BatchNorm1d(64),
            nn.Conv1d(64, 128, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Conv1d(128, 256, kernel_size=3, stride=2),
            nn.ReLU(),
            # nn.BatchNorm1d(256),
        )
        # Wrap the Linear Blocks
        self.linear = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes),
        )
        
    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.conv(x)
        x = x.mean(-1)
        x = self.linear(x)
        return x
    
model = AudioClassifier()
summary(model, (42,))

In [None]:
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, )

# adam optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# train the model
n_epochs = 500
train_losses = []
val_losses = []

for epoch in range(n_epochs):
    model.train()
    train_loss = 0.0
    for data, target in train_loader:
        optimizer.zero_grad()
        target_indices = torch.argmax(target, dim=1)
        output = model(data)
        loss = criterion(output, target_indices)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * data.size(0)
    train_loss = train_loss / len(train_loader.dataset)
    train_losses.append(train_loss)

    model.eval()
    val_loss = 0.0
    for data, target in val_loader:
        target_indices = torch.argmax(target, dim=1)
        output = model(data)
        loss = criterion(output, target_indices)
        val_loss += loss.item() * data.size(0)
    val_loss = val_loss / len(val_loader.dataset)
    val_losses.append(val_loss)

    print(f"Epoch {epoch+1}/{n_epochs} Train Loss: {train_loss:.4f} Val Loss: {val_loss:.4f}")


In [None]:
# plot the training and validation loss
plt.plot(train_losses, label='train')
plt.plot(val_losses, label='val')
plt.legend()
plt.show()


In [None]:
# get the accuracy of the model
correct = 0
total = 0
with torch.no_grad():
    for data, target in val_loader:
        target_indices = torch.argmax(target, dim=1)
        output = model(data)
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted == target_indices).sum().item()
        
print(f"Accuracy: {correct / total:.2f}")