# Dependencies

In [None]:
! pip install git+https://github.com/openai/whisper.git
! pip install jiwer
! pip install pytorch-lightning
! pip install evaluate
! pip install mutagen
! pip install transformers

Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-o6hpcfl2
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-o6hpcfl2
  Resolved https://github.com/openai/whisper.git to commit c0d2f624c09dc18e709e37c2ad90c039a4eb72a2
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone


In [None]:
import os, numpy as np, pathlib
import torch, torch.nn
import pandas as pd
import whisper
import torchaudio, torchaudio.transforms
from pytorch_lightning import LightningModule, Trainer, seed_everything
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt, seaborn as sns
import evaluate
from transformers.optimization import get_linear_schedule_with_warmup
import jiwer
from whisper.normalizers import EnglishTextNormalizer
import warnings
warnings.filterwarnings('ignore')
import mutagen
from transformers import get_linear_schedule_with_warmup, AutoProcessor, AutoModelForSpeechSeq2Seq, pipeline
from torch.optim import AdamW
from pathlib import Path
import openpyxl

#### Configuration

In [None]:
DATASET_DIR = "dataset"
SAMPLE_RATE = 16000
AUDIO_MAX_LENGTH = 160000  # Maksimal panjang audio (10 detik * 16000)
TEXT_MAX_LENGTH = 200  # Maksimal panjang text transcript

TRAIN_RATE = 0.8  # 80% untuk training
VAL_RATE = 0.1    # 10% untuk validation
TEST_RATE = 0.1   # 10% untuk testing

TRAIN_BATCH_SIZE = 64  # Batch size untuk training
EVAL_BATCH_SIZE = 16    # Batch size untuk evaluation
MAX_TRAIN_STEPS = 150

SEED = 3407
seed_everything(SEED, workers=True)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

class Config:

    sample_rate = 16000

    learning_rate = 1e-5
    weight_decay = 0.01
    adam_epsilon = 1e-8

    warmup_steps = 10  # 10 steps warmup dari 150 total steps
    max_steps = 150           # Minimum 150 Steps Training
    eval_steps = 25           # Evaluate setiap 25 steps
    save_steps = 50           # Save checkpoint setiap 50 steps
    train_batch_size = 8      # Batch size training
    eval_batch_size = 4       # Batch size evaluation
    num_worker = 2            # Number of workers untuk data loading
    gradient_accumulation_steps = 2  # Accumulate gradients untuk simulate larger batch

INFO:lightning_fabric.utilities.seed:Seed set to 3407


#### Utility Functions

In [None]:
def load_wave(wave_path, sample_rate: int = 16000):
    waveform = whisper.load_audio(wave_path, sr=sample_rate)
    return torch.from_numpy(waveform)


def calculate_err(data):
    normalizer = EnglishTextNormalizer()

    data['text_clean'] = data['text'].apply(normalizer)
    data['predict_clean'] = data['predict'].apply(normalizer)

    wer_original = jiwer.wer(list(data['text']), list(data['predict']))
    cer_original = jiwer.cer(list(data['text']), list(data['predict']))

    wer_normalized = jiwer.wer(list(data['text_clean']), list(data['predict_clean']))
    cer_normalized = jiwer.cer(list(data['text_clean']), list(data['predict_clean']))

    return (data, wer_original, wer_normalized, cer_original, cer_normalized)


def extract_aud(audio_path, mymodel):
    result = mymodel.transcribe(audio_path, language="id", without_timestamps=True)
    return result["text"]

#### Dataset Loading

In [None]:
TRANSCRIPTION_DIR = "korpus_tvri.csv"

def load_custom_dataset():
    audio_transcript_pair_list = []

    df = pd.read_csv(TRANSCRIPTION_DIR)

    for i in range(len(df)):
        audio_path = os.path.join(DATASET_DIR, f"{df.loc[i, 'Nama Data']}.wav")
        text = df.loc[i, "Transkrip Suara (Bahasa Indonesia)"]

        print(f"{audio_path}")
        audio = mutagen.File(audio_path)
        audio_length = audio.info.length * 16000
        if len(text) <= TEXT_MAX_LENGTH and audio_length <= AUDIO_MAX_LENGTH:
            audio_transcript_pair_list.append((audio_path, text))

    return audio_transcript_pair_list


def split_dataset(audio_transcript_pair_list, train_rate=0.8, val_rate=0.1, test_rate=0.1):
    np.random.seed(SEED)
    np.random.shuffle(audio_transcript_pair_list)

    total_rate = train_rate + val_rate + test_rate
    if abs(total_rate - 1.0) > 1e-9:
        print(f"Warning: Dataset split rates ({total_rate}) do not sum to 1.0. Adjusting.")
        train_rate /= total_rate
        val_rate /= total_rate
        test_rate /= total_rate

    dataset_size = len(audio_transcript_pair_list)

    train_size = int(train_rate * dataset_size)
    val_size = int(val_rate * dataset_size)
    test_size = dataset_size - train_size - val_size

    print(f"Train size: {train_size}, Val size: {val_size}, Test size: {test_size}")

    train_list = audio_transcript_pair_list[:train_size]
    val_list = audio_transcript_pair_list[train_size:train_size + val_size]
    test_list = audio_transcript_pair_list[train_size + val_size:]

    return train_list, val_list, test_list

#### Dataset Class

In [None]:
class CustomSpeechDataset(torch.utils.data.Dataset):

    def __init__(self, audio_info_list, tokenizer, sample_rate):
        super().__init__()

        self.audio_info_list = audio_info_list
        self.tokenizer = tokenizer
        self.sample_rate = sample_rate

    def __len__(self):
        return len(self.audio_info_list)

    def __getitem__(self, id):
        audio_path, text = self.audio_info_list[id]

        waveform = load_wave(audio_path, sample_rate=self.sample_rate)
        waveform = whisper.pad_or_trim(waveform.flatten())
        mel = whisper.log_mel_spectrogram(waveform)

        text_tokens = self.tokenizer.encode(text)
        text = [*self.tokenizer.sot_sequence_including_notimestamps] + self.tokenizer.encode(text)
        labels = text[1:] + [self.tokenizer.eot]

        return {
            "input_ids": mel,
            "labels": labels,
            "dec_input_ids": text
        }

class WhisperDataCollatorWithPadding:

    def __call__(self, features):

        input_ids, labels, dec_input_ids = [], [], []
        for f in features:
            input_ids.append(f["input_ids"])
            labels.append(f["labels"])
            dec_input_ids.append(f["dec_input_ids"])

        input_ids = torch.concat([input_id[None, :] for input_id in input_ids])

        label_lengths = [len(lab) for lab in labels]
        dec_input_ids_length = [len(e) for e in dec_input_ids]
        max_label_len = max(label_lengths+dec_input_ids_length)

        labels = [np.pad(lab, (0, max_label_len - lab_len), 'constant', constant_values=-100) for lab, lab_len in zip(labels, label_lengths)]
        dec_input_ids = [np.pad(e, (0, max_label_len - e_len), 'constant', constant_values=50257) for e, e_len in zip(dec_input_ids, dec_input_ids_length)] # 50257 is eot token id

        batch = {
            "labels": labels,
            "dec_input_ids": dec_input_ids
        }

        batch = {k: torch.tensor(np.array(v), requires_grad=False) for k, v in batch.items()}
        batch["input_ids"] = input_ids

        return batch

#### Whisper Finetuning Class

In [None]:
# class WhisperModelModule(LightningModule):

#     def __init__(self, cfg, model_name="tiny", lang="id", train_dataset=[], eval_dataset=[]):
#         super().__init__()
#         self.options = whisper.DecodingOptions(language="id", without_timestamps=True, task="transcribe")
#         self.tokenizer = whisper.tokenizer.get_tokenizer(True, language="id", task=self.options.task)

#         self.model = whisper.load_model(model_name)

#         self.loss_fn = torch.nn.CrossEntropyLoss(ignore_index=-100)
#         self.metrics_wer = evaluate.load("wer")
#         self.metrics_cer = evaluate.load("cer")

#         self.cfg = cfg
#         self.train_dataset = train_dataset
#         self.eval_dataset = eval_dataset

#     def forward(self, x):
#         return self.model(x)

#     def training_step(self, batch, batch_id):
#         input_ids = batch["input_ids"]
#         labels = batch["labels"].long()
#         dec_input_ids = batch["dec_input_ids"].long()

#         with torch.no_grad():
#             audio_features = self.model.encoder(input_ids)

#         out = self.model.decoder(dec_input_ids, audio_features)

#         loss = self.loss_fn(out.view(-1, out.size(-1)), labels.view(-1))

#         self.log("train/loss", loss, on_step=True, prog_bar=True, logger=True)

#         return loss

#     def validation_step(self, batch, batch_id):
#         input_ids = batch["input_ids"]
#         labels = batch["labels"].long()
#         dec_input_ids = batch["dec_input_ids"].long()
#         audio_features = self.model.encoder(input_ids)
#         out = self.model.decoder(dec_input_ids, audio_features)

#         loss = self.loss_fn(out.view(-1, out.size(-1)), labels.view(-1))

#         predicted_ids = torch.argmax(out, dim=2)

#         o_list, l_list = [], []
#         for pred, ref in zip(predicted_ids, labels):
#             o_list.append(self.tokenizer.decode([token.item() for token in pred if token.item() != -100 and token.item() != self.tokenizer.eot]))
#             l_list.append(self.tokenizer.decode([token.item() for token in ref if token.item() != -100]))

#         cer = self.metrics_cer.compute(references=l_list, predictions=o_list)
#         wer = self.metrics_wer.compute(references=l_list, predictions=o_list)


#         self.log("val/loss", loss, on_step=True, prog_bar=True, logger=True)
#         self.log("val/cer", cer, on_step=True, prog_bar=True, logger=True)
#         self.log("val/wer", wer, on_step=True, prog_bar=True, logger=True)

#         return {
#             "cer": cer,
#             "wer": wer,
#             "loss": loss
#         }


#     def configure_optimizers(self):

#         model = self.model
#         no_decay = ["bias", "LayerNorm.weight"]
#         optimizer_grouped_parameters = [
#             {
#                 "params": [p for n, p in model.named_parameters()
#                             if not any(nd in n for nd in no_decay)],
#                 "weight_decay": self.cfg.weight_decay,
#             },
#             {
#                 "params": [p for n, p in model.named_parameters()
#                             if any(nd in n for nd in no_decay)],
#                 "weight_decay": 0.0,
#             },
#         ]
#         optimizer = AdamW(optimizer_grouped_parameters,
#                           lr=self.cfg.learning_rate,
#                           eps=self.cfg.adam_epsilon)

#         scheduler = get_linear_schedule_with_warmup(
#             optimizer, num_warmup_steps=self.cfg.warmup_steps,
#             num_training_steps=self.cfg.max_steps
#         )

#         return [optimizer], [{"scheduler": scheduler, "interval": "step", "frequency": 1}]

#     def train_dataloader(self):
#         dataset = CustomSpeechDataset(self.train_dataset, self.tokenizer, self.cfg.sample_rate)
#         return torch.utils.data.DataLoader(dataset,
#                           batch_size=self.cfg.train_batch_size,
#                           drop_last=True, shuffle=True, num_workers=self.cfg.num_worker,
#                           collate_fn=WhisperDataCollatorWithPadding()
#                           )

#     def val_dataloader(self):
#         dataset = CustomSpeechDataset(self.eval_dataset, self.tokenizer, self.cfg.sample_rate)
#         return torch.utils.data.DataLoader(dataset,
#                           batch_size=self.cfg.eval_batch_size,
#                           num_workers=self.cfg.num_worker,
#                           collate_fn=WhisperDataCollatorWithPadding()
#                           )

#### Whisper From Scratch

In [None]:
# class WhisperFromScratch:

#     def __init__(self, model_name='tiny'):
#         self.model = whisper.load_model(model_name)
#         self._reset_parameters()
#         self.options = whisper.DecodingOptions(language="id", without_timestamps=True, task="transcribe")
#         self.tokenizer = whisper.tokenizer.get_tokenizer(True, language="id", task=self.options.task)

#     def _reset_parameters(self):
#         for m in self.model.modules():
#             if isinstance(m, torch.nn.Linear):
#                 torch.nn.init.xavier_uniform_(m.weight)
#                 if m.bias is not None:
#                     torch.nn.init.constant_(m.bias, 0)
#             elif isinstance(m, torch.nn.Conv1d):
#                  torch.nn.init.xavier_uniform_(m.weight)
#                  if m.bias is not None:
#                     torch.nn.init.constant_(m.bias, 0)
#             elif isinstance(m, torch.nn.ConvTranspose1d):
#                  torch.nn.init.xavier_uniform_(m.weight)
#                  if m.bias is not None:
#                     torch.nn.init.constant_(m.bias, 0)
#             elif isinstance(m, torch.nn.LayerNorm):
#                 torch.nn.init.constant_(m.weight, 1)
#                 torch.nn.init.constant_(m.bias, 0)


#     def forward(self, mel, tokens):
#         audio_features = self.model.encoder(mel)
#         logits = self.model.decoder(tokens, audio_features)
#         return logits

#     def compute_loss(self, mel, labels):
#         labels = labels.to(self.model.device)

#         dec_input_ids = labels.clone()
#         dec_input_ids[dec_input_ids == -100] = self.tokenizer.sot_sequence_including_notimestamps[0]

#         max_token_id = self.model.decoder.token_embedding.num_embeddings - 1
#         dec_input_ids = torch.clamp(dec_input_ids, max=max_token_id)

#         with torch.no_grad():
#           audio_features = self.model.encoder(mel.to(self.model.device))

#         logits = self.model.decoder(dec_input_ids, audio_features)

#         loss = torch.nn.functional.cross_entropy(logits.view(-1, logits.size(-1)), labels.view(-1), ignore_index=-100) # Use -100 for padding

#         return loss

#     def train_from_scratch(self, train_loader, val_loader, max_steps=150, learning_rate=1e-4, device='cuda'):
#         optimizer = AdamW(self.model.parameters(), lr=learning_rate)
#         scheduler = get_linear_schedule_with_warmup(
#             optimizer, num_warmup_steps=1, num_training_steps=max_steps
#         )

#         self.model.to(device)
#         self.model.train()

#         train_losses = []
#         val_losses = []
#         best_val_loss = float('inf')

#         train_iter = iter(train_loader)

#         for step in tqdm(range(max_steps)):
#             try:
#                 batch = next(train_iter)
#             except StopIteration:
#                 train_iter = iter(train_loader)
#                 batch = next(train_iter)

#             mel = batch["input_ids"].to(device)
#             labels = batch["labels"].to(device) # Labels are the targets

#             loss = self.compute_loss(mel, labels)

#             loss.backward()
#             torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) # Gradient clipping
#             optimizer.step()
#             scheduler.step()
#             optimizer.zero_grad()

#             train_losses.append(loss.item())

#             if (step + 1) % Config().eval_steps == 0: # Evaluate every 25 steps
#                 self.model.eval()
#                 val_loss = 0
#                 with torch.no_grad():
#                     for val_batch in val_loader:
#                         val_mel = val_batch["input_ids"].to(device)
#                         val_labels = val_batch["labels"].to(device) # Labels are the targets
#                         val_loss += self.compute_loss(val_mel, val_labels).item()
#                 val_loss /= len(val_loader)
#                 val_losses.append(val_loss)
#                 print(f"Step {step+1}: Train Loss = {train_losses[-1]:.4f}, Val Loss = {val_loss:.4f}")

#                 if val_loss < best_val_loss:
#                     best_val_loss = val_loss
#                     torch.save(self.model.state_dict(), "best_scratch_model_indonesia.pth")

#                 self.model.train()

#         return train_losses, val_losses


#     def transcribe(self, audio_path):
#         self.model.eval()
#         with torch.no_grad():
#             result = self.model.transcribe(audio_path, language="id", without_timestamps=True)
#         return result["text"]

#### Split Dataset

In [None]:
audio_transcript_pair_list = load_custom_dataset()
train_list, val_list, test_list = split_dataset(audio_transcript_pair_list, 0, 0, 1)
total_samples = len(train_list) + len(val_list) + len(test_list)
print(f"Train samples: {len(train_list)}")
print(f"Val samples: {len(val_list)}")
print(f"Test samples: {len(test_list)}")
print(f"Is total samples match dataset: {total_samples == len(audio_transcript_pair_list)}")

dataset/TVRI_BS_071119_0001.wav
dataset/TVRI_BS_071119_0002.wav
dataset/TVRI_BS_071119_0003.wav
dataset/TVRI_BS_071119_0004.wav
dataset/TVRI_BS_071119_0005.wav
dataset/TVRI_BS_071119_0006.wav
dataset/TVRI_BS_071119_0007.wav
dataset/TVRI_BS_071119_0010.wav
dataset/TVRI_BS_071119_0013.wav
dataset/TVRI_BS_071119_0019.wav
dataset/TVRI_BS_071119_0020.wav
dataset/TVRI_BS_071119_0022.wav
Train size: 0, Val size: 0, Test size: 9
Train samples: 0
Val samples: 0
Test samples: 9
Is total samples match dataset: True


#### Training

In [None]:
# log_output_dir = "logs"
# check_output_dir = "artifacts"
# train_name = "whisper"
# model_name = "medium"
# lang = "id"

In [None]:
# cfg = Config()

# Path(log_output_dir).mkdir(exist_ok=True)
# Path(check_output_dir).mkdir(exist_ok=True)

# tflogger = TensorBoardLogger(
#     save_dir=log_output_dir,
#     name=train_name,
# )

# checkpoint_callback = ModelCheckpoint(
#     dirpath=f"{check_output_dir}/checkpoint",
#     filename="checkpoint-{epoch:04d}",
#     save_top_k=-1 # all model save
# )

In [None]:
# options = whisper.DecodingOptions(language="id", without_timestamps=True, task="transcribe")
# tokenizer = whisper.tokenizer.get_tokenizer(True, language="id", task=options.task)

# train_dataset_scratch = CustomSpeechDataset(train_list, tokenizer, SAMPLE_RATE)
# val_dataset_scratch = CustomSpeechDataset(val_list, tokenizer, SAMPLE_RATE)

# data_collator_scratch = WhisperDataCollatorWithPadding()

# train_loader_scratch = torch.utils.data.DataLoader(
#     train_dataset_scratch,
#     batch_size=Config().train_batch_size,
#     shuffle=True,
#     collate_fn=data_collator_scratch
# )

# val_loader_scratch = torch.utils.data.DataLoader(
#     val_dataset_scratch,
#     batch_size=Config().eval_batch_size,
#     shuffle=False,
#     collate_fn=data_collator_scratch
# )

# print("STARTING FROM-SCRATCH TRAINING...")

# model_scratch = WhisperFromScratch('tiny')
# train_losses_scratch, val_losses_scratch = model_scratch.train_from_scratch(
#     train_loader_scratch,
#     val_loader_scratch,
#     max_steps=Config().max_steps,
#     learning_rate=Config().learning_rate,
#     device=DEVICE
# )

# print("FROM-SCRATCH TRAINING FINISHED.")

In [None]:
# checkpoint_path = "artifacts/checkpoint/checkpoint-epoch=0005.ckpt"
# state_dict = torch.load(checkpoint_path)
# state_dict = state_dict['state_dict']

# whisper_fine_tuned_model = WhisperModelModule(cfg)
# whisper_fine_tuned_model.load_state_dict(state_dict)

In [None]:
# options = whisper.DecodingOptions(language="id", without_timestamps=True, task="transcribe")
# tokenizer = whisper.tokenizer.get_tokenizer(True, language="id", task=options.task)
# dataset = CustomSpeechDataset(val_list, tokenizer, SAMPLE_RATE)
# loader = torch.utils.data.DataLoader(dataset, batch_size=2, collate_fn=WhisperDataCollatorWithPadding())

# refs = []
# res = []
# for b in tqdm(loader):
#     input_ids = b["input_ids"].half()
#     labels = b["labels"].long()
#     with torch.no_grad():
#         results = whisper_fine_tuned_model.model.decode(input_ids, options)
#         for r in results:
#             res.append(r.text)

#         for l in labels:
#             filtered_l = [token.item() for token in l if token.item() != -100 and token.item() != tokenizer.eot]
#             ref = tokenizer.decode(filtered_l)
#             refs.append(ref)

In [None]:
# cer_metrics = evaluate.load("cer")
# cer_metrics.compute(references=refs, predictions=res)

In [None]:
# model_path = 'best_scratch_model_indonesia.pth'
# state_dict = torch.load(model_path)
# whisper_from_scratch_model = WhisperFromScratch('tiny')
# whisper_from_scratch_model.model.load_state_dict(state_dict)

In [None]:
# options = whisper.DecodingOptions(language="id", without_timestamps=True, task="transcribe")
# tokenizer = whisper.tokenizer.get_tokenizer(True, language="id", task=options.task)
# dataset = CustomSpeechDataset(val_list, tokenizer, SAMPLE_RATE)
# loader = torch.utils.data.DataLoader(dataset, batch_size=2, collate_fn=WhisperDataCollatorWithPadding())

# refs = []
# res = []
# for b in tqdm(loader):
#     input_ids = b["input_ids"].half()
#     labels = b["labels"].long()
#     with torch.no_grad():
#         results = whisper_from_scratch_model.model.decode(input_ids, options)
#         for r in results:
#             res.append(r.text)

#         for l in labels:
#             filtered_l = [token.item() for token in l if token.item() != -100 and token.item() != tokenizer.eot]
#             ref = tokenizer.decode(filtered_l)
#             refs.append(ref)

In [None]:
# cer_metrics = evaluate.load("cer")
# cer_metrics.compute(references=refs, predictions=res)

#### Evaluation

In [None]:
test_df = pd.DataFrame(test_list, columns=['audio_path', 'text'])

# EVALUATION 1: ZERO-SHOT (BASELINE)

print("EVALUATING ZERO-SHOT MODEL...")

zero_shot_predictions = []
for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Transcribing Zero-Shot"):
    print(row['audio_path'])

    transcriber = pipeline(
      "automatic-speech-recognition",
      model="cahya/whisper-medium-id"
    )
    transcriber.model.config.forced_decoder_ids = (
      transcriber.tokenizer.get_decoder_prompt_ids(
        language="id",
        task="transcribe"
      )
    )
    prediction = transcriber(row['audio_path'])["text"]
    zero_shot_predictions.append(prediction)

test_df['predict_zero_shot'] = zero_shot_predictions
_, wer_zero_shot_orig, wer_zero_shot_norm, cer_zero_shot_orig, cer_zero_shot_norm = calculate_err(test_df.rename(columns={'predict_zero_shot': 'predict'}))

print(f"Zero-Shot WER (Original): {wer_zero_shot_orig:.4f}")
print(f"Zero-Shot WER (Normalized): {wer_zero_shot_norm:.4f}")
print(f"Zero-Shot CER (Original): {cer_zero_shot_orig:.4f}")
print(f"Zero-Shot CER (Normalized): {cer_zero_shot_norm:.4f}")

In [None]:
prediction_df = test_df.copy()
normalizer = EnglishTextNormalizer()

prediction_df['text_clean'] = prediction_df['text'].apply(normalizer)
prediction_df['predict_clean'] = prediction_df['predict_zero_shot'].apply(normalizer)
prediction_df[["text_clean", "predict_clean"]]

Unnamed: 0,text_clean,predict_clean
0,dari hasil investigasi tersebut penyidik menet...,dari hasil investigasi tersebut penyidik menet...
1,dan saudara berikut indonesia hari ini selengk...,dan saudara berikut indonesia hari ini selengk...
2,yakni brigadir am dalam kasus kematian mahasis...,yakni bergadir am dalam kasus kematian mahasis...
3,menteri pendidikan dan kebudayaan nadiem makar...,menteri pendidikan dan kemudayaan nadima karim...
4,hasil investigasi penyidik bareskrim polri men...,hasil investigasi penyidik baris krim polri me...
5,segera akan dilakukan penahanan dan berkas per...,segera akan dilakukan penahanan dan berkas per...
6,dengan cara ketik pga spasi 7 c dan juga dukun...,dengan cara ketik pga spasi tujuh c dan juga d...
7,selamat malam anda menyaksikan indonesia hari ...,selamat malam anda menyaksikan indonesia hari ...
8,dan saya ardianto wijaya selain dua topik utam...,dan saya adyanto vijaya selain dua topik utama...


#### Display Results

In [None]:
print("\n--- Evaluation Results ---")
# results_data = {
#     'Model': ['Zero-Shot', 'From-Scratch', 'Fine-Tuned'],
#     'WER (Original)': [wer_zero_shot_orig, wer_scratch_orig if 'wer_scratch_orig' in locals() else None, wer_fine_tuned_orig],
#     'WER (Normalized)': [wer_zero_shot_norm, wer_scratch_norm if 'wer_scratch_norm' in locals() else None, wer_fine_tuned_norm],
#     'CER (Original)': [cer_zero_shot_orig, cer_scratch_orig if 'cer_scratch_orig' in locals() else None, cer_fine_tuned_orig],
#     'CER (Normalized)': [cer_zero_shot_norm, cer_scratch_norm if 'cer_scratch_norm' in locals() else None, cer_fine_tuned_norm]
# }

results_data = {
    'Model': ['Zero-Shot'],
    'WER (Original)': [wer_zero_shot_orig],
    'WER (Normalized)': [wer_zero_shot_norm],
    'CER (Original)': [cer_zero_shot_orig],
    'CER (Normalized)': [cer_zero_shot_norm]
}

results_df = pd.DataFrame(results_data)
print(results_df.to_string(index=False))

# # Compare improvement relative to zero-shot baseline
# print("\n--- Improvement Relative to Zero-Shot ---")
# if 'wer_scratch_norm' in locals():
#     wer_scratch_improvement = ((wer_zero_shot_norm - wer_scratch_norm) / wer_zero_shot_norm) * 100 if wer_zero_shot_norm != 0 else 0
#     cer_scratch_improvement = ((cer_zero_shot_norm - cer_scratch_norm) / cer_zero_shot_norm) * 100 if cer_zero_shot_norm != 0 else 0
# else:
#     wer_scratch_improvement = None
#     cer_scratch_improvement = None


# wer_fine_tuned_improvement = ((wer_zero_shot_norm - wer_fine_tuned_norm) / wer_zero_shot_norm) * 100 if wer_zero_shot_norm != 0 else 0
# cer_fine_tuned_improvement = ((cer_zero_shot_norm - cer_fine_tuned_norm) / cer_zero_shot_norm) * 100 if cer_zero_shot_norm != 0 else 0

# improvement_data = {
#     'Model': ['From-Scratch', 'Fine-Tuned'],
#     'WER Improvement (%)': [wer_scratch_improvement, wer_fine_tuned_improvement],
#     'CER Improvement (%)': [cer_scratch_improvement, cer_fine_tuned_improvement]
# }
# improvement_df = pd.DataFrame(improvement_data)
# print(improvement_df.to_string(index=False))

# fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# fig.suptitle('ASR Model Performance Comparison', fontsize=16)

# # WER Comparison
# sns.barplot(x='Model', y='WER (Normalized)', data=results_df, ax=axes[0, 0])
# axes[0, 0].set_title('Normalized WER Comparison')
# axes[0, 0].set_ylabel('WER')

# # CER Comparison
# sns.barplot(x='Model', y='CER (Normalized)', data=results_df, ax=axes[0, 1])
# axes[0, 1].set_title('Normalized CER Comparison')
# axes[0, 1].set_ylabel('CER')

# # Combined WER vs CER
# results_melted = results_df.melt(id_vars='Model', value_vars=['WER (Normalized)', 'CER (Normalized)'], var_name='Metric', value_name='Rate')
# sns.barplot(x='Model', y='Rate', hue='Metric', data=results_melted, ax=axes[1, 0])
# axes[1, 0].set_title('Normalized WER vs CER Comparison')
# axes[1, 0].set_ylabel('Rate')

# # Step-based training curves (Loss vs Steps)
# # Assuming train_losses_scratch and val_losses_scratch are available from previous execution
# if 'train_losses_scratch' in locals() and 'val_losses_scratch' in locals():
#     steps_scratch = range(1, len(train_losses_scratch) + 1)
#     eval_steps_indices = [(i + 1) for i in range(len(train_losses_scratch)) if (i + 1) % 25 == 0]
#     val_steps = [step for step in steps_scratch if step in eval_steps_indices]


#     axes[1, 1].plot(steps_scratch, train_losses_scratch, label='Train Loss (Scratch)')
#     if val_steps:
#         axes[1, 1].plot(val_steps, val_losses_scratch, label='Validation Loss (Scratch)', marker='o')
#     axes[1, 1].set_title('From-Scratch Training Loss vs Steps')
#     axes[1, 1].set_xlabel('Steps')
#     axes[1, 1].set_ylabel('Loss')
#     axes[1, 1].legend()
# else:
#     axes[1, 1].set_title('From-Scratch Training Loss vs Steps (Data Not Available)')


# plt.tight_layout(rect=[0, 0.03, 1, 0.95])
# plt.show()

# # Save results to CSV
# results_df.to_csv('asr_evaluation_results.csv', index=False)
# if 'train_losses_scratch' in locals() and 'val_losses_scratch' in locals():
#     training_progress_df = pd.DataFrame({
#         'Step': steps_scratch,
#         'Train Loss (Scratch)': train_losses_scratch,
#     })
#     if val_steps:
#          training_progress_df['Validation Loss (Scratch)'] = pd.Series(val_losses_scratch, index=[i-1 for i in val_steps]) # Align val loss with steps
#     training_progress_df.to_csv('scratch_training_progress.csv', index=False)


# print("\nEvaluation results saved to 'asr_evaluation_results.csv'")
# if 'train_losses_scratch' in locals() and 'val_losses_scratch' in locals():
#     print("From-scratch training progress saved to 'scratch_training_progress.csv'")

# print("\n--- Experiment Summary ---")
# print(f"Dataset size: {len(audio_transcript_pair_list)} samples")
# print(f"Train samples: {len(train_list)}")
# print(f"Val samples: {len(val_list)}")
# print(f"Test samples: {len(test_list)}")
# print(f"Max training steps: {cfg.max_steps}")
# print(f"Eval every: {cfg.eval_steps} steps")
# print(f"Fine-tuned model checkpoint saved to: artifacts/checkpoint")
# print(f"From-scratch model checkpoint saved to: {'best_scratch_model_indonesia.pth' if os.path.exists('best_scratch_model_indonesia.pth') else 'N/A'}")


--- Evaluation Results ---
    Model  WER (Original)  WER (Normalized)  CER (Original)  CER (Normalized)
Zero-Shot        0.382114          0.169355        0.092233          0.046061
