Reciprocation of the TranNET Paper - https://www.mdpi.com/1424-8220/23/19/8033 
- Multi-Class Classification
- Mel-Spectogram as input to model. 
- Followed the exact architectural and data pipeline as specified. Made suitable assumption for unspecified parameters.
- Severely Underperformed than the mentioned final accuracy stats in the paper. 

| Metric        | Precision | Recall | F1-Score | Support |
|---------------|-----------|--------|----------|---------|
| Block         | 0.31      | 0.25   | 0.28     | 826     |
| Prolongation  | 0.33      | 0.14   | 0.20     | 647     |
| SoundRep      | 0.28      | 0.06   | 0.10     | 566     |
| WordRep       | 0.20      | 0.04   | 0.06     | 629     |
| Interjection  | 0.42      | 0.84   | 0.56     | 1438    |
| **Accuracy** |           |        | 0.38     | 4106    |
| **Macro Avg** | 0.31      | 0.27   | 0.24     | 4106    |
| **Weighted Avg**| 0.33      | 0.38   | 0.31     | 4106    |

**Final Accuracy Score:** 0.3830979055041403

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tqdm.notebook import tqdm
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
import librosa
import matplotlib.pyplot as plt
import seaborn as sns
from tabulate import tabulate
from collections import defaultdict
import os

In [None]:
# DATA LOADING & INITIAL FILTERING
print("Loading data and performing initial filtering (3.0s duration & MIN_THRESHOLD=2)...")

clips_dir = Path("/kaggle/input/sep-28k/clips/stuttering-clips/clips")
if clips_dir.exists():
    clips = {c.name: str(c) for c in clips_dir.glob("*.wav")}
    print(f"Total audio files found: {len(clips)}")
else:
    print(f"Directory {clips_dir} does not exist. Please check the path.")
    clips = {}

label_files = [
    "/kaggle/input/sep-28k/SEP-28k_labels.csv",
    "/kaggle/input/sep-28k/fluencybank_labels.csv",
]
existing_label_files = [f for f in label_files if Path(f).exists()]
combined_df = pd.concat([pd.read_csv(f) for f in existing_label_files])

STUTTER_TYPES = ['Block', 'Prolongation', 'SoundRep', 'WordRep', 'Interjection']
MIN_THRESHOLD = 2

def find_single_label_with_hierarchy(row):
    """Assigns a single label based on the highest-rated stutter type."""
    values = np.array([row[stutter] for stutter in STUTTER_TYPES])
    max_val = np.max(values)
    if max_val < MIN_THRESHOLD: return None, None
    chosen_index = np.argmax(values)
    class_name = STUTTER_TYPES[chosen_index]
    label = tf.keras.utils.to_categorical(chosen_index, num_classes=len(STUTTER_TYPES))
    return label, class_name

def get_file_path_and_duration(row):
    f_name0 = f"{row['Show']}_{row['EpId']}_{row['ClipId']}.wav"
    f_name1 = f"{row['Show']}_0{row['EpId']}_{row['ClipId']}.wav"
    f_path = clips.get(f_name0) or clips.get(f_name1)
    
    if f_path:
        try:
            duration = librosa.get_duration(path=f_path)
            if duration == 3.0: # Strict 3.0s check
                return f_path
        except Exception:
            pass # Corrupted or unreadable file
    return None

print("Filtering for existing 3.0s audio files...")
combined_df['filename'] = combined_df.apply(get_file_path_and_duration, axis=1)
combined_df_filtered_duration = combined_df.dropna(subset=['filename']).copy()

print(f"Clips remaining after 3.0s duration filter: {len(combined_df_filtered_duration)}")

# Apply labeling and filter by MIN_THRESHOLD 
print("Applying stutter type labels and filtering")
label_results = combined_df_filtered_duration.apply(lambda row: pd.Series(find_single_label_with_hierarchy(row)), axis=1)
combined_df_filtered_duration[['label', 'type']] = label_results
final_combined_df = combined_df_filtered_duration.dropna(subset=['label']).copy()

print(f"Clips remaining after MIN_THRESHOLD filter: {len(final_combined_df)}")


# %%

In [None]:
# THREE-WAY SPEAKER-INDEPENDENT SPLIT 
print("\nPerforming three-way speaker-independent split on filtered data (60/20/20)...")

# Gets unique EpIds from the already filtered combined_df
unique_ep_ids_final = final_combined_df['EpId'].unique()

train_ids, temp_val_test_ids = train_test_split(unique_ep_ids_final, test_size=0.4, random_state=42)

val_ids, test_ids = train_test_split(temp_val_test_ids, test_size=0.5, random_state=42)
#final dataframes
train_data = final_combined_df[final_combined_df['EpId'].isin(train_ids)].copy()
val_data = final_combined_df[final_combined_df['EpId'].isin(val_ids)].copy()
test_data = final_combined_df[final_combined_df['EpId'].isin(test_ids)].copy()

print(f"Final training data size: {len(train_data)}")
print(f"Final validation data size: {len(val_data)}")
print(f"Final testing data size: {len(test_data)}")

# %%

In [None]:
# Mel-Spec Extraction
SAMPLING_RATE = 16000
HOP_LENGTH = 160
N_MELS = 128
TIME_STEPS = 300

def get_features(file_path):
    try:
        audio, _ = librosa.load(file_path, sr=SAMPLING_RATE)
        mel_spec = librosa.feature.melspectrogram(
            y=audio, sr=SAMPLING_RATE, n_mels=N_MELS, hop_length=HOP_LENGTH
        )
        if mel_spec.shape[1] < TIME_STEPS:
            pad_width = TIME_STEPS - mel_spec.shape[1]
            mel_spec = np.pad(mel_spec, ((0, 0), (0, pad_width)), mode='constant')
        else:
            mel_spec = mel_spec[:, :TIME_STEPS]
        return mel_spec
    except Exception as e:
        print(f"Error extracting features from {file_path}: {e}")
        return np.zeros((N_MELS, TIME_STEPS))

# Extract all features  
X_train_np = np.array([get_features(f) for f in tqdm(train_data['filename'], desc="Creating train features")])
y_train_np = np.array(train_data['label'].tolist())

X_val_np = np.array([get_features(f) for f in tqdm(val_data['filename'], desc="Creating validation features")])
y_val_np = np.array(val_data['label'].tolist())

X_test_np = np.array([get_features(f) for f in tqdm(test_data['filename'], desc="Creating test features")])
y_test_np = np.array(test_data['label'].tolist())

X_train_np = X_train_np[..., np.newaxis]
X_val_np = X_val_np[..., np.newaxis]
X_test_np = X_test_np[..., np.newaxis]

print(f"\nFeature shapes (NumPy arrays):\nX_train: {X_train_np.shape}\ny_train: {y_train_np.shape}")
print(f"X_val: {X_val_np.shape}\ny_val: {y_val_np.shape}")
print(f"X_test: {X_test_np.shape}\ny_test: {y_test_np.shape}")


In [3]:
SAMPLING_RATE = 16000
HOP_LENGTH = 160
N_MELS = 128
TIME_STEPS = 300
STUTTER_TYPES = ['Block', 'Prolongation', 'SoundRep', 'WordRep', 'Interjection']
MIN_THRESHOLD = 2

In [None]:
# TRANSTUTTER MODEL 
class PatchEmbedding(layers.Layer):
    def __init__(self, patch_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.patch_size = patch_size
        self.projection = layers.Conv2D(
            filters=embed_dim, kernel_size=patch_size, strides=patch_size, padding="VALID", name="patch_projection"
        )
        self.reshape = layers.Reshape(target_shape=(-1, embed_dim))
        self.position_embedding = layers.Embedding(
            input_dim=((N_MELS // patch_size[0]) * (TIME_STEPS // patch_size[1])) + 1,
            output_dim=embed_dim, name="patch_position_embedding"
        )
    def call(self, inputs):
        patches = self.projection(inputs)
        patches_reshaped = self.reshape(patches)
        positions = tf.range(start=0, limit=tf.shape(patches_reshaped)[1], delta=1)
        pos_embed = self.position_embedding(positions)
        return patches_reshaped + pos_embed

class TemporalEmbedding(layers.Layer):
    def __init__(self, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.permute = layers.Permute((2, 1, 3))
        self.reshape = layers.Reshape((-1, N_MELS))
        self.projection = layers.Dense(embed_dim, name="temporal_projection")
        self.position_embedding = layers.Embedding(input_dim=TIME_STEPS + 1, output_dim=embed_dim)
    def call(self, inputs):
        x = self.permute(inputs)
        x = self.reshape(x)
        x_projected = self.projection(x)
        positions = tf.range(start=0, limit=tf.shape(x_projected)[1], delta=1)
        pos_embed = self.position_embedding(positions)
        return x_projected + pos_embed

def transformer_encoder(inputs, embed_dim, num_heads, ff_dim, rate=0.1):
    attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim, dropout=rate)
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = attention(x, x)
    res = x + inputs
    ffn = keras.Sequential([layers.Dense(ff_dim, activation="gelu"), layers.Dense(embed_dim)])
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = ffn(x)
    x = layers.Dropout(rate)(x)
    return x + res

def build_transtutter(input_shape, num_classes, num_transformer_blocks=6, num_heads=12, embed_dim=768, ff_dim=1024, patch_size=(16,16)):
    inputs = layers.Input(shape=input_shape)

    # Stream 1 - Temporal Embedding Path 
    temporal_embedder = TemporalEmbedding(embed_dim, name="temporal_embedder")
    temporal_x = temporal_embedder(inputs)
    for i in range(num_transformer_blocks):
        temporal_x = transformer_encoder(temporal_x, embed_dim, num_heads, ff_dim)

    # Stream 2 - Patch Embedding Path 
    patch_embedder = PatchEmbedding(patch_size, embed_dim, name="patch_embedder")
    patch_x = patch_embedder(inputs)
    for i in range(num_transformer_blocks):
        patch_x = transformer_encoder(patch_x, embed_dim, num_heads, ff_dim)

    # Fusion & Classification 
    temporal_rep = layers.GlobalAveragePooling1D(name="temporal_pooling")(temporal_x)
    patch_rep = layers.GlobalAveragePooling1D(name="patch_pooling")(patch_x)
    fused_rep = layers.Concatenate(name="fusion")([temporal_rep, patch_rep])
    
    # MLP Head (using GELU activation as it was most common)
    x = layers.Dense(512, activation="gelu", name="mlp_dense_1")(fused_rep)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(256, activation="gelu", name="mlp_dense_2")(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation="softmax", name="output_layer")(x)

    return keras.Model(inputs, outputs, name="TranStutter")

In [None]:

print("\nBuilding and compiling the TranStutter model...")

INPUT_SHAPE = X_train_np.shape[1:]
NUM_CLASSES = len(STUTTER_TYPES)

model = build_transtutter(INPUT_SHAPE, NUM_CLASSES)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=1e-6)
model.compile(optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"])


Building and compiling the TranStutter model...


I0000 00:00:1752035753.017098      35 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15513 MB memory:  -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0


In [6]:
model.summary()

In [None]:
# Creates tf.data.Dataset for efficient training and validation in memory-contrained environments
BATCH_SIZE = 16              
AUTOTUNE = tf.data.AUTOTUNE

train_dataset = tf.data.Dataset.from_tensor_slices((X_train_np, y_train_np))
train_dataset = train_dataset.shuffle(buffer_size=1024 * BATCH_SIZE).batch(BATCH_SIZE).prefetch(AUTOTUNE)

val_dataset = tf.data.Dataset.from_tensor_slices((X_val_np, y_val_np))
val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE)

test_dataset = tf.data.Dataset.from_tensor_slices((X_test_np, y_test_np))
test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE)


print(f"\nTraining with batch size: {BATCH_SIZE}")
print(f"Number of training steps per epoch: {tf.data.experimental.cardinality(train_dataset).numpy()}")
print(f"Number of validation steps per epoch: {tf.data.experimental.cardinality(val_dataset).numpy()}")
print(f"Number of test steps for final evaluation: {tf.data.experimental.cardinality(test_dataset).numpy()}")


Training with batch size: 16
Number of training steps per epoch: 950
Number of validation steps per epoch: 149
Number of test steps for final evaluation: 257


In [8]:
import gc
del X_train_np, y_train_np, X_val_np, y_val_np, X_test_np, y_test_np
gc.collect()

982

In [None]:
early_stopping = keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=15, verbose=1, restore_best_weights=True, mode='max')
model_checkpoint = keras.callbacks.ModelCheckpoint(filepath="./tranStutter_replica_aug.keras", save_best_only=True, monitor="val_accuracy", mode='max')

In [None]:
print("\nStarting model training...")
history = model.fit(
    train_dataset,
    epochs=150,
    validation_data=val_dataset, 
    callbacks=[early_stopping, model_checkpoint]
)
