<a href="https://colab.research.google.com/github/Conite002/ARTICLES-PROCESSING/blob/main/EXAM_Conite_GBODOGBE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# !unzip exam.zip -d .

In [6]:
import os
import pandas as pd
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
import torch
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, recall_score
from transformers import Wav2Vec2FeatureExtractor, Wav2Vec2ForSequenceClassification, Trainer, TrainingArguments


* Load data

In [104]:
# Load train and test CSVs
train_df = pd.read_csv('./FEMH/train.csv')
test_df = pd.read_csv('./FEMH/test.csv', sep=';')


print(train_df.head())
print(test_df.head())

        fname label
0  001_FD.wav    FD
1  001_NP.wav    NP
2  001_PH.wav    PH
3  001_VP.wav    VP
4  002_FD.wav    FD
   fname label
0      1    FD
1      2    NP
2      3    VP
3      4    NP
4      5    VP


In [105]:
test_df

Unnamed: 0,fname,label
0,1,FD
1,2,NP
2,3,VP
3,4,NP
4,5,VP
...,...,...
195,196,NP
196,197,PH
197,198,FD
198,199,FD


In [106]:
test_df['label'].value_counts()

Unnamed: 0_level_0,count
label,Unnamed: 1_level_1
FD,50
NP,50
VP,50
PH,50


In [107]:
label2id = {label: i for i, label in enumerate(train_df['label'].unique())}
id2label = {i: label for label, i in label2id.items()}

print(f'Label to ID: {label2id}')
print(f'ID to Label: {id2label}')

train_df['label'] = train_df['label'].map(label2id)
test_df['label'] = test_df['label'].map(label2id)
print(train_df.head())

Label to ID: {'FD': 0, 'NP': 1, 'PH': 2, 'VP': 3}
ID to Label: {0: 'FD', 1: 'NP', 2: 'PH', 3: 'VP'}
        fname  label
0  001_FD.wav      0
1  001_NP.wav      1
2  001_PH.wav      2
3  001_VP.wav      3
4  002_FD.wav      0


In [108]:
def transform_fname(x):
    x = str(x)
    if not x.endswith('.wav'):
        return x.zfill(3) + '.wav'
    return x

test_df['fname'] = test_df['fname'].apply(transform_fname)
test_df['fname']

Unnamed: 0,fname
0,001.wav
1,002.wav
2,003.wav
3,004.wav
4,005.wav
...,...
195,196.wav
196,197.wav
197,198.wav
198,199.wav


3: Preprocess Audio Files

In [109]:
def extract_features(audio_path, max_duration=3, sampling_rate=44100, dataset_type='test'):
    base_dir = './FEMH'
    if dataset_type == 'train':
        audio_path = os.path.join(base_dir, 'TrainingDataset', 'wav', audio_path)
    elif dataset_type == 'test':
        audio_path = os.path.join(base_dir, 'TestingDataset','wav', audio_path)
    else:
        raise ValueError("Invalid dataset_type. Must be 'train' or 'test'.")

    if os.path.exists(audio_path):
        audio, sr = librosa.load(audio_path, sr=sampling_rate, duration=max_duration)
        return audio
    else:
        raise FileNotFoundError(f"Audio file not found at path: {audio_path}")


train_df['features'] = train_df['fname'].apply(lambda x: extract_features(x, dataset_type='train'))
test_df['features'] = test_df['fname'].apply(lambda x: extract_features(x, dataset_type='test'))


In [110]:
print(train_df['features'][0].shape)
print(train_df['features'].isnull().sum())


(132300,)
0


In [111]:
shapes = [len(f) for f in train_df['features']]
print(set(shapes))

{109093, 119016, 101384, 110250, 132300, 112399, 132240, 130036}


We ensure all audio samples have the same length by padding or truncating.

In [112]:
def standardize_audio_length(audio, target_length=132300):
    if isinstance(audio, list):
        audio = np.array(audio)
    return librosa.util.fix_length(audio, size=target_length)

target_length = 132300
train_df['features'] = train_df['features'].apply(lambda x: standardize_audio_length(x, target_length=target_length))
test_df['features'] = test_df['features'].apply(lambda x: standardize_audio_length(x, target_length=target_length))

In [113]:
shapes = [len(f) for f in train_df['features']]
print(set(shapes))

{132300}


In [114]:
train_df

Unnamed: 0,fname,label,features
0,001_FD.wav,0,"[-0.0038452148, -0.0009765625, -0.0019226074, ..."
1,001_NP.wav,1,"[0.0010509741, 0.0010746565, 0.0012093864, 0.0..."
2,001_PH.wav,2,"[0.0009765625, 0.0009460449, 0.0009460449, 0.0..."
3,001_VP.wav,3,"[-0.0019226074, 0.0019226074, 0.0019226074, -0..."
4,002_FD.wav,0,"[-0.0029296875, -0.0028381348, -0.0028076172, ..."
...,...,...,...
195,049_VP.wav,3,"[0.00289917, -0.0009765625, -3.0517578e-05, 0...."
196,050_FD.wav,0,"[0.0067749023, 0.009643555, 0.013519287, 0.013..."
197,050_NP.wav,1,"[0.0, -0.0009765625, -0.0009765625, -0.0019226..."
198,050_PH.wav,2,"[0.0009765625, 0.0009460449, 0.0009765625, 0.0..."


Reduction de dimension

In [115]:
import numpy as np

def reduce_features_mean_pooling(features, reduction_factor=100):

    features = features[:len(features) - (len(features) % reduction_factor)]
    reduced_features = features.reshape(-1, reduction_factor).mean(axis=1)
    return reduced_features

from sklearn.decomposition import PCA

def reduce_features_pca(features, n_components=50):
    pca = PCA(n_components=n_components)
    reduced_features = pca.fit_transform(features.reshape(1, -1))
    return reduced_features.flatten()


train_df['features'] = train_df['features'].apply(lambda x: reduce_features_mean_pooling(np.array(x), reduction_factor=100))
test_df['features'] = test_df['features'].apply(lambda x: reduce_features_mean_pooling(np.array(x), reduction_factor=100))
print(len(train_df['features'][0]))



1323


In [116]:
print(len(train_df['features'][0]))
print(len(test_df['features'][0]))


1323
1323


4. Prepare Data for Wav2Vec2

In [117]:
class VoiceDataset(Dataset):
    def __init__(self, features, labels=None):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        feature = self.features[idx]
        label = self.labels[idx] if self.labels is not None else None
        return {"input_values": feature, "labels": label}

train_dataset = VoiceDataset(train_df['features'], train_df['label'])
test_dataset = VoiceDataset(test_df['features'])


5. Model Training

In [120]:
model = Wav2Vec2ForSequenceClassification.from_pretrained(
    "facebook/wav2vec2-base", num_labels=len(train_df['label'].unique())
)
model.config.mask_length = 1

training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=10,
    weight_decay=0.01,
    logging_dir="./logs",
    save_total_limit=2
)
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

trainer.train()



Some weights of Wav2Vec2ForSequenceClassification were not initialized from the model checkpoint at facebook/wav2vec2-base and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


ValueError: `mask_length` has to be smaller than `sequence_length`, but got `mask_length`: 10 and `sequence_length`: 3`

6: Model Evaluation

7: Visualization

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

cm = confusion_matrix(y_true, y_pred)
ConfusionMatrixDisplay(cm, display_labels=train_df['label'].unique()).plot()
plt.show()


8: Save Model and Pipeline

In [None]:
model.save_pretrained("./voice_disorder_model")
feature_extractor.save_pretrained("./voice_disorder_model")


Inference Pipeline

In [None]:
def predict(audio_path):
    audio = extract_features(audio_path)
    inputs = feature_extractor(audio, return_tensors="pt", sampling_rate=44100)
    outputs = model(**inputs)
    predicted_class = np.argmax(outputs.logits.detach().numpy(), axis=1)
    return predicted_class
