In [2]:
from tqdm import tqdm
import joblib
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import LinearSVC
from scipy.stats import skew, kurtosis
from dotenv import load_dotenv
import pyloudnorm as pyln
from scipy.io.wavfile import write
from pathlib import Path
import os
import re
import polars as pl
import numpy as np
from pprint import pprint
import altair as alt
import streamlit as st
import soundfile as sf
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torchaudio.transforms import MFCC
from datasets import load_dataset, get_dataset_split_names, get_dataset_config_names, Audio, concatenate_datasets
from transformers import pipeline
from kokoro import KPipeline
from google import genai
from google.genai import types
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

def extract_path(sample_audio):
    return {"filename": sample_audio["audio"]["path"][:-4]}

def prepare_dataset(split):
    data = load_dataset("mispeech/speechocean762", split=split)
    selected_columns = ["speaker", "audio", "accuracy"]
    data_subset = data.select_columns(selected_columns)
    data_subset = data_subset.cast_column("audio", Audio(sampling_rate=16000))    
    return data_subset

def map_accuracy_to_category(sample):
    if sample["accuracy"] >= 9:
        return {"label": 4}  # Excellent
    elif sample["accuracy"] >= 7:
        return {"label": 3}  # Good
    elif sample["accuracy"] >= 5:
        return {"label": 2}  # Understandable
    elif sample["accuracy"] >= 3:
        return {"label": 1}  # Poor
    else:
        return {"label": 0}  # Extremely poor
    
def extract_mfcc_torchaudio(audio_array, n_mfcc=13, sample_rate=16000):
    mfcc_transform = MFCC(
        sample_rate=sample_rate,
        n_mfcc=n_mfcc,
        melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 23, "center": False}
    )
    mfcc = mfcc_transform(audio_array)
    return mfcc

def pad_vector_torch(sequence: torch.Tensor, max_len, padding_value = 0.0)r:
    time_steps = sequence.size(1)
    if time_steps < max_len:
        pad_width = max_len - time_steps
        return torch.nn.functional.pad(sequence, (0, pad_width), value=padding_value)
    return sequence

    
train_set = prepare_dataset("train")
test_set = prepare_dataset("test")
dataset = concatenate_datasets([train_set, test_set])
dataset = dataset.map(extract_path)
dataset = dataset.map(map_accuracy_to_category)

# Prepare output directories
raw_dir = Path("raw_wavs")
norm_dir = Path("normalized_wavs")
raw_dir.mkdir(exist_ok=True)
norm_dir.mkdir(exist_ok=True)

# Create WAV audio from audio array
for i, sample in enumerate(dataset.select(range(len(dataset)))):
    raw_audio_name = sample["filename"]
    sample_rate = sample["audio"]["sampling_rate"]
    waveform = sample["audio"]["array"]
    raw_audio_file = raw_dir / f"{raw_audio_name}.wav"
    
    # Write the audio to the .wav file
    write(raw_audio_file, sample_rate, waveform)
    
audio_array = []
normalized_audio_array = []
filenames = []

# Normalized WAV audio to a target loudness of -23 db LUFS using ITU-R BS.1770-4 loudness algorithm
for raw_audio_file in list(raw_dir.glob("*.wav")):
    filename = Path(raw_audio_file.name).stem
    norm_audio_filename = filename + "_norm.wav"

    raw_audio, rate = sf.read(raw_audio_file) # load audio

    # measure the loudness first 
    meter = pyln.Meter(rate) # create BS.1770 meter
    loudness = meter.integrated_loudness(raw_audio)

    # loudness normalize audio to -23 dB LUFS
    norm_audio  = pyln.normalize.loudness(raw_audio, loudness, -23.0)
    
    # Save normalized audio
    norm_audio_file = norm_dir / norm_audio_filename
    sf.write(norm_audio_file, norm_audio, rate)
    
    filenames.append(filename)
    audio_array.append(raw_audio)
    normalized_audio_array.append(norm_audio)
    
audio_df = pl.DataFrame({
    "filename": filenames,
    "audio_array": audio_array,
    "normalized_audio_array": normalized_audio_array
})
dataset_df = dataset.to_polars()
dataset_df = dataset_df.join(audio_df, on="filename", how="inner")
selected_col = ["speaker","filename", "normalized_audio_array", "accuracy", "label"]
dataset_df = dataset_df.select(pl.col(selected_col))

dataset_df = dataset_df.with_columns(
    pl.col("normalized_audio_array").map_elements(
        lambda x: extract_mfcc_torchaudio(torch.tensor(x, dtype=torch.float32)), 
        return_dtype=pl.Object
    ).alias("mfcc")
)

dataset_df = dataset_df.with_columns(
    pl.col("mfcc").map_elements(lambda x: x.numpy().shape[1], return_dtype=pl.Int64).alias("mfcc_time")
)

max_len_pad = dataset_df["mfcc_time"].max()

dataset_df = dataset_df.with_columns(
    pl.col("mfcc").map_elements(lambda x: pad_vector_torch(x, max_len_pad), return_dtype=pl.Object).alias("padded_audio")
)

dataset_df = dataset_df.with_columns(
    pl.col("padded_audio").map_elements(lambda x: x.shape[1], return_dtype=pl.Int64).alias("padded_audio_time")
)

train_df, test_df = train_test_split(dataset_df, test_size=0.2, random_state=99)

In [127]:
def plot_speaker_distribution(df, title_chart, color=None):
    speaker_counts = df.group_by(
        pl.col("speaker")
    ).agg(
        pl.len().alias("count")
    )
    
    chart = alt.Chart(speaker_counts).mark_bar().encode(
        alt.X("speaker", type="nominal").title("Speaker"),
        alt.Y("count", type="quantitative").title("Count"),
        tooltip=["speaker", "count"],
        color=alt.ColorValue(color)
    ).properties(
        title=title_chart)
    return chart

def plot_accuracy_distribution(df, title_chart, color=None, opacity=1.0):
    speaker_counts = df.group_by(
        pl.col("accuracy")
    ).agg(
        pl.len().alias("count")
    )
    
    chart = alt.Chart(speaker_counts).mark_bar().encode(
        alt.X("accuracy", type="ordinal").title("Accuracy"),
        alt.Y("count", type="quantitative").title("Count"),
        color=alt.ColorValue(color),
        tooltip=["accuracy", "count"]
    ).properties(
        title=title_chart
    ).mark_bar(opacity=opacity)
    return chart

def plot_label_distribution(df, title_chart, color=None, opacity=1.0):
    speaker_counts = df.group_by(
        pl.col("label")
    ).agg(
        pl.len().alias("count")
    )
    
    chart = alt.Chart(speaker_counts).mark_bar().encode(
        alt.X("label", type="ordinal").title("Label"),
        alt.Y("count", type="quantitative").title("Count"),
        color=alt.ColorValue(color),
        tooltip=["label", "count"]
    ).properties(
        title=title_chart
    ).mark_bar(opacity=opacity)
    return chart
    

speaker_count_train_chart = plot_speaker_distribution(train_df, title_chart="Speaker Distribution (Training)",color="#F75A5A")
speaker_count_test_chart = plot_speaker_distribution(test_df, title_chart="Accuracy Distribution (Testing)", color="#AFDDFF")

accuracy_count_train_chart = plot_accuracy_distribution(df=train_df, color="#F75A5A", title_chart="Accuracy Distribution (Training)")
accuracy_count_test_chart = plot_accuracy_distribution(df=test_df, color="#AFDDFF", title_chart="Accuracy Distribution (Testing)")

label_count_train_chart = plot_label_distribution(train_df, title_chart="Speaker Distribution (Training)",color="#F75A5A")
label_count_test_chart = plot_label_distribution(test_df, title_chart="Accuracy Distribution (Testing)", color="#AFDDFF")

In [4]:
combined_chart = alt.vconcat(
    speaker_count_train_chart,
    speaker_count_test_chart
)
combined_chart

In [134]:
accuracy_count_train_chart | accuracy_count_test_chart

In [135]:
label_count_train_chart | label_count_test_chart

In [124]:
test_mfccs = test_df["padded_audio"]
test_labels = test_df["label"]


class MFCCDataset(Dataset):
    def __init__(self, mfcc_list, label_list):
        self.mfccs = mfcc_list  # List of np.array or torch.Tensor with shape (n_mfcc, time)
        self.labels = label_list

    def __len__(self):
        return len(self.mfccs)

    def __getitem__(self, idx):
        x = torch.tensor(self.mfccs[idx], dtype=torch.float32).unsqueeze(0)  # shape: (1, n_mfcc, time)
        y = torch.tensor(self.labels[idx], dtype=torch.long)
        return x, y
    

class CNNMFCCModel(torch.nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = torch.nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.pool1 = torch.nn.MaxPool2d(2)
        self.conv2 = torch.nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.pool2 = torch.nn.MaxPool2d(2)
        self.dropout = torch.nn.Dropout(0.5)
        self.bn1 = torch.nn.BatchNorm2d(16)
        
        # Perlu tahu ukuran output setelah conv/pooling untuk Linear input
        self.flatten_dim = 32 * (n_mfcc // 4) * (time_steps // 4)
        
        self.fc1 = torch.nn.Linear(self.flatten_dim, 128)
        self.fc2 = torch.nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x)

def evaluate(model, dataloader):
    model.eval()
    correct = 0
    total = 0
    all_preds = []  # List to store all predictions
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)  # Get the predicted class
            all_preds.extend(preds.cpu().numpy())  # Store the predictions
            
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    
    accuracy = correct / total
    return accuracy, np.array(all_preds)  # Return both accuracy and all predictions

n_mfcc = 13
time_steps = 2039
train_mfccs = train_df["padded_audio"]
train_labels = train_df["label"]
num_classes = len(set(train_labels))
train_dataset = MFCCDataset(train_mfccs, train_labels)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)

test_mfccs = test_df["padded_audio"]
test_labels = test_df["label"]
test_dataset = MFCCDataset(test_mfccs, test_labels)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

device = torch.device("cpu")
model = CNNMFCCModel(num_classes=num_classes).to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2)

train_losses = []
train_accuracies = []
test_accuracies = []

for epoch in range(5):
    model.train()
    running_loss = 0.0
    with tqdm(train_loader, desc=f"Epoch {epoch+1}", unit="batch") as pbar:
        for inputs, labels in pbar:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            pbar.set_postfix(loss=running_loss / (pbar.n + 1))

    avg_loss = running_loss / len(train_loader)
    train_accuracy, _ = evaluate(model, train_loader)
    test_accuracy, _ = evaluate(model, test_loader)

    train_losses.append(avg_loss)
    train_accuracies.append(train_accuracy)
    test_accuracies.append(test_accuracy)

    print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}, "
          f"Train Acc: {train_accuracy:.2%}, Test Acc: {test_accuracy:.2%}")
    
    
_, y_pred_CNNMFCC = evaluate(model, test_loader)
torch.save(model.state_dict(), r"models\self-trained\cnn_mfcc_model.pth")

Epoch 1: 100%|█████████████████████████████████████████████████████████| 125/125 [00:30<00:00,  4.04batch/s, loss=1.11]


Epoch 1, Loss: 1.1134, Train Acc: 56.70%, Test Acc: 51.90%


Epoch 2: 100%|████████████████████████████████████████████████████████| 125/125 [00:32<00:00,  3.88batch/s, loss=0.947]


Epoch 2, Loss: 0.9468, Train Acc: 60.92%, Test Acc: 54.50%


Epoch 3: 100%|████████████████████████████████████████████████████████| 125/125 [00:34<00:00,  3.59batch/s, loss=0.882]


Epoch 3, Loss: 0.8820, Train Acc: 63.75%, Test Acc: 55.60%


Epoch 4: 100%|████████████████████████████████████████████████████████| 125/125 [00:41<00:00,  3.00batch/s, loss=0.815]


Epoch 4, Loss: 0.8152, Train Acc: 65.88%, Test Acc: 55.10%


Epoch 5: 100%|████████████████████████████████████████████████████████| 125/125 [00:41<00:00,  3.00batch/s, loss=0.738]


Epoch 5, Loss: 0.7376, Train Acc: 71.65%, Test Acc: 56.00%


In [125]:
y_pred = y_pred_CNNMFCC
y_test = test_labels

acc = accuracy_score(y_test, y_pred)
precision_val = precision_score(y_test, y_pred, average='weighted')  # Ganti 'weighted' dengan 'macro' atau 'micro' jika perlu
recall_val = recall_score(y_test, y_pred, average='weighted')
f1_val = f1_score(y_test, y_pred, average='weighted')

cnn_evaluation = pl.DataFrame({
    "Model": "CNN",
    "Accuracy": acc,
    "Precision": precision_val,
    "Recall": recall_val,
    "F1 Score": f1_val,
})


In [89]:
def extract_statistical_features_to_polars(mfcc: np.ndarray, prefix: str = "mfcc"):
    mfcc = np.array(mfcc)  # shape: (n_mfcc, time_steps)
    n_mfcc = mfcc.shape[0]

    stats = {
        "mean": np.mean(mfcc, axis=1),
        "std": np.std(mfcc, axis=1),
        "min": np.min(mfcc, axis=1),
        "max": np.max(mfcc, axis=1),
        "median": np.median(mfcc, axis=1),
        "skew": skew(mfcc, axis=1),
        "kurt": kurtosis(mfcc, axis=1),
    }

    delta = np.diff(mfcc, axis=1)
    stats["delta_mean"] = np.mean(delta, axis=1)
    stats["delta_std"] = np.std(delta, axis=1)

    columns = {}
    for stat_name, values in stats.items():
        for i, val in enumerate(values):
            col_name = f"{prefix}_{stat_name}_{i+1:02}"
            columns[col_name] = [val]  

    return pl.DataFrame(columns)

mfcc_sample = dataset_df["mfcc"].to_numpy()
all_features = [
    extract_statistical_features_to_polars(sample)
    for sample in mfcc_sample
]

final_df = pl.concat(all_features, how="vertical")
final_df = pl.concat([final_df, dataset_df.select("label")], how="horizontal")

In [128]:
X = final_df.drop("label")
y = final_df["label"]

le = LabelEncoder()
y = le.fit_transform(y)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=99)

models = {
    "GradientBoostingClassifier": GradientBoostingClassifier(),
    "KNeighborsClassifier": KNeighborsClassifier(),
    "LinearSVC": LinearSVC(max_iter=100),
    "RandomForestClassifier": RandomForestClassifier()
}


accuracy = []
precision = []
recall = []
f1_scores = []

for name, model in models.items():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    
    acc = accuracy_score(y_test, y_pred)
    precision_val = precision_score(y_test, y_pred, average='weighted')  # Ganti 'weighted' dengan 'macro' atau 'micro' jika perlu
    recall_val = recall_score(y_test, y_pred, average='weighted')
    f1_val = f1_score(y_test, y_pred, average='weighted')
    
    accuracy.append((name, acc))
    precision.append((name, precision_val))
    recall.append((name, recall_val))
    f1_scores.append((name, f1_val))

results_df = pl.DataFrame({
    "Model": [name for name, _ in models.items()],
    "Accuracy": [acc for name, acc in accuracy],
    "Precision": [prec for name, prec in precision],
    "Recall": [rec for name, rec in recall],
    "F1 Score": [f1 for name, f1 in f1_scores],
})

joblib.dump(model, f"models/self_trained/models/{name}.pkl")
results_df = pl.concat([results_df, cnn_evaluation], how="vertical")

results_df.sort(by="Accuracy", descending=True).write_csv("files/model_evaluation.csv")

shape: (5, 5)
┌────────────────────────────┬──────────┬───────────┬────────┬──────────┐
│ Model                      ┆ Accuracy ┆ Precision ┆ Recall ┆ F1 Score │
│ ---                        ┆ ---      ┆ ---       ┆ ---    ┆ ---      │
│ str                        ┆ f64      ┆ f64       ┆ f64    ┆ f64      │
╞════════════════════════════╪══════════╪═══════════╪════════╪══════════╡
│ GradientBoostingClassifier ┆ 0.6      ┆ 0.593953  ┆ 0.6    ┆ 0.577982 │
│ KNeighborsClassifier       ┆ 0.597    ┆ 0.593356  ┆ 0.597  ┆ 0.588536 │
│ LinearSVC                  ┆ 0.595    ┆ 0.586527  ┆ 0.595  ┆ 0.570806 │
│ RandomForestClassifier     ┆ 0.593    ┆ 0.604433  ┆ 0.593  ┆ 0.548148 │
│ CNN                        ┆ 0.56     ┆ 0.551202  ┆ 0.56   ┆ 0.518905 │
└────────────────────────────┴──────────┴───────────┴────────┴──────────┘
