In [None]:
import zipfile
import os

tar_file = r'D:\PyCharmMiscProject\slo.zip'

extract_dir = 'working'

os.makedirs(extract_dir, exist_ok=True)

with zipfile.ZipFile(tar_file, 'r') as tar:
    tar.extractall(path=extract_dir)

print("Extraction completed.")

In [None]:
!pip install transformers[torch]
!pip install datasets
!pip install soundfile
!pip install hf_xet

In [1]:
from pathlib import Path
import pandas as pd

base_path = Path("working")
metadata_files = list(base_path.rglob("*.tsv"))

train_path = next(p for p in metadata_files if "train" in p.name)
validated_path = next(p for p in metadata_files if "validated" in p.name)

train_df = pd.read_csv(train_path, sep="\t")
validated_df = pd.read_csv(validated_path, sep="\t")

main_df = pd.concat([train_df, validated_df], ignore_index=True)

def ensure_mp3_extension(p):
    return p if p.endswith(".mp3") else p + ".mp3"

main_df["path"] = main_df["path"].apply(ensure_mp3_extension)

clips_dir = base_path / "clips"
main_df["audio_path"] = main_df["path"].apply(lambda p: (clips_dir / p).as_posix())

main_df["exists"] = main_df["audio_path"].apply(lambda p: Path(p).exists())

valid_df = main_df[main_df["exists"]].reset_index(drop=True)

print(f"Valid audio samples: {len(valid_df)}")

Valid audio samples: 1307


In [2]:
from transformers import (
 Wav2Vec2ForCTC,TrainingArguments, Trainer
)
from datasets import Dataset
from jiwer import wer, cer


def prepare_dataset(df, sampling_rate=16000):
    if df.empty:
        raise ValueError("Input DataFrame is empty.")
    df = df.rename(columns={'audio_path': 'audio', 'sentence': 'text'})
    ds = Dataset.from_pandas(df[["audio", "text"]])
    return ds

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
from pydub import AudioSegment
import numpy as np

def load_audio_pydub(path, target_sampling_rate=16000):
    audio = AudioSegment.from_file(path)
    if audio.frame_rate != target_sampling_rate:
        audio = audio.set_frame_rate(target_sampling_rate)
    samples = np.array(audio.get_array_of_samples()).astype(np.float32) / (2**15)
    if audio.channels > 1:
        samples = samples.reshape((-1, audio.channels)).mean(axis=1)
    return samples, target_sampling_rate

In [4]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor.from_pretrained("mrshu/wav2vec2-large-xlsr-slovene")

def preprocess_single_example(example, processor, sampling_rate=16000):
        from pydub import AudioSegment
        import numpy as np

        path = example["audio"]
        text = example["text"]

        audio = AudioSegment.from_file(path)
        if audio.frame_rate != sampling_rate:
            audio = audio.set_frame_rate(sampling_rate)
        samples = np.array(audio.get_array_of_samples()).astype(np.float32) / (2**15)
        if audio.channels > 1:
            samples = samples.reshape((-1, audio.channels)).mean(axis=1)

        inputs = processor(samples, sampling_rate=sampling_rate, return_attention_mask=True, padding=True)

        with processor.as_target_processor():
            labels = processor(text).input_ids

        return {
            "input_values": inputs.input_values[0],
            "attention_mask": inputs.attention_mask[0],
            "labels": labels
        }



In [5]:
from dataclasses import dataclass
from typing import Dict, List, Union
import torch

@dataclass
class DataCollatorCTCWithPadding:
        processor: Wav2Vec2Processor
        padding: Union[bool, str] = True

        def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
            input_features = [{"input_values":f["input_values"]} for f in features]
            label_features = [{"input_ids":f["labels"]} for f in features]

            batch = self.processor.pad(
                input_features,
                padding=self.padding,
                return_tensors="pt",
            )

            with self.processor.as_target_processor():
                labels_batch = self.processor.pad(
                    label_features,
                    padding=self.padding,
                    return_tensors="pt"
                )

            labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)
            batch["labels"] = labels

            return batch

In [None]:
def quick_test_training(dataset, max_samples=100):
    print(f"Running quick test with {max_samples} samples...")

    from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC, Trainer, TrainingArguments


    class Config:
        MODEL_NAME = "mrshu/wav2vec2-large-xlsr-slovene"
        SAMPLING_RATE = 16000
        NUM_EPOCHS = 2
        BATCH_SIZE = 4
        OUTPUT_DIR = "./wav2vec2-test"

    small_dataset = dataset.select(range(min(max_samples, len(dataset))))

    processor = Wav2Vec2Processor.from_pretrained(Config.MODEL_NAME)
    model = Wav2Vec2ForCTC.from_pretrained(
        Config.MODEL_NAME,
        ignore_mismatched_sizes=True,
        vocab_size=len(processor.tokenizer)
    )

    model.freeze_feature_encoder()

    print("Processing dataset...")
    processed_dataset = small_dataset.map(
        lambda x: preprocess_single_example(x, processor),
        remove_columns=small_dataset.column_names,
        desc="Processing audio files"
    ).filter(lambda x: x is not None)

    print(f"Processed {len(processed_dataset)} examples")

    split = processed_dataset.train_test_split(test_size=0.1)
    train_dataset = split["train"]
    eval_dataset = split["test"]

    data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

    training_args = TrainingArguments(
        output_dir=Config.OUTPUT_DIR,
        per_device_train_batch_size=Config.BATCH_SIZE,
        eval_strategy="steps",
        logging_dir=f"{Config.OUTPUT_DIR}/logs",
        logging_strategy="steps",
        num_train_epochs=Config.NUM_EPOCHS,
        save_steps=100,
        eval_steps=100,
        logging_steps=5,
        save_total_limit=2,
        fp16=True,
        report_to=[],
        remove_unused_columns=False,
        dataloader_drop_last=False,
        group_by_length=True,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=processor.feature_extractor,
        data_collator=data_collator,
    )

    print("Starting training...")
    trainer.train()
    return trainer, processor

In [6]:
MODEL_NAME = "mrshu/wav2vec2-large-xlsr-slovene"
processor = Wav2Vec2Processor.from_pretrained(MODEL_NAME)
model = Wav2Vec2ForCTC.from_pretrained(MODEL_NAME, vocab_size=len(processor.tokenizer), ignore_mismatched_sizes=True)
model.freeze_feature_encoder()

Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at mrshu/wav2vec2-large-xlsr-slovene and are newly initialized because the shapes did not match:
- lm_head.weight: found shape torch.Size([31, 1024]) in the checkpoint and torch.Size([33, 1024]) in the model instantiated
- lm_head.bias: found shape torch.Size([31]) in the checkpoint and torch.Size([33]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
dataset = prepare_dataset(valid_df)
ds = dataset.map(lambda x: preprocess_single_example(x, processor), remove_columns=dataset.column_names)

Map:   6%|▌         | 74/1307 [00:11<03:03,  6.71 examples/s]

In [None]:
split_ds = ds.train_test_split(test_size=0.1)
train_ds = split_ds["train"]
eval_ds = split_ds["test"]

In [None]:
training_args = TrainingArguments(
    output_dir="./wav2vec2-slovene",
    eval_strategy="steps",
    eval_steps=100,
    logging_steps=50,
    save_steps=200,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=10,
    fp16=True,
    learning_rate=3e-4,
    save_total_limit=2,
    group_by_length=True,
    remove_unused_columns=False,
    report_to=[],
    logging_dir="./logs"
)

In [None]:
def compute_metrics(pred):
    pred_ids = np.argmax(pred.predictions, axis=-1)
    pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)

    label_ids = pred.label_ids
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id
    label_str = processor.batch_decode(label_ids, skip_special_tokens=True)

    return {
        "wer": wer(label_str, pred_str),
        "cer": cer(label_str, pred_str)
    }

data_collator = DataCollatorCTCWithPadding(processor=processor)


In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=eval_ds,
    tokenizer=processor.feature_extractor,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)

In [None]:
trainer.train()
trainer.save_model("./wav2vec2-slovene")
processor.save_pretrained("./wav2vec2-slovene")

In [None]:
metrics = trainer.evaluate()
print(metrics)

In [None]:
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC, TrainingArguments, Trainer
from datasets import Dataset
import numpy as np
import torch
from dataclasses import dataclass
from typing import List, Dict, Union
from pydub import AudioSegment
from jiwer import wer


In [None]:
MODEL_NAME = "mrshu/wav2vec2-large-xlsr-slovene"
processor = Wav2Vec2Processor.from_pretrained(MODEL_NAME)
model = Wav2Vec2ForCTC.from_pretrained(
    MODEL_NAME,
    vocab_size=len(processor.tokenizer),
    ignore_mismatched_sizes=True
)
model.freeze_feature_encoder()

In [None]:
def load_audio(path):
    audio = AudioSegment.from_file(path).set_frame_rate(16000)
    samples = np.array(audio.get_array_of_samples()).astype(np.float32) / (2**15)
    if audio.channels > 1:
        samples = samples.reshape((-1, audio.channels)).mean(axis=1)
    return samples

In [None]:
def preprocess(batch):
    audio = load_audio(batch["audio_path"])
    inputs = processor(audio, sampling_rate=16000, return_attention_mask=True)
    with processor.as_target_processor():
        labels = processor(batch["sentence"]).input_ids
    return {
        "input_values": inputs["input_values"][0],
        "attention_mask": inputs["attention_mask"][0],
        "labels": labels
    }

In [None]:
ds = Dataset.from_pandas(valid_df[["audio_path", "sentence"]])
ds = ds.map(preprocess)

split = ds.train_test_split(test_size=0.1)
train_ds, eval_ds = split["train"], split["test"]

In [None]:
@dataclass
class DataCollator:
    processor: Wav2Vec2Processor
    def __call__(self, features: List[Dict]) -> Dict[str, torch.Tensor]:
        input = self.processor.pad([{"input_values": f["input_values"]} for f in features], return_tensors="pt")
        with self.processor.as_target_processor():
            labels = self.processor.pad([{"input_ids": f["labels"]} for f in features], return_tensors="pt")
        input["labels"] = labels["input_ids"].masked_fill(labels["attention_mask"].ne(1), -100)
        return input

In [None]:
def compute_metrics(pred):
    pred_ids = np.argmax(pred.predictions, axis=-1)
    pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
    label_ids = pred.label_ids
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id
    label_str = processor.batch_decode(label_ids, skip_special_tokens=True)
    return {"wer": wer(label_str, pred_str)}

In [None]:
trainer = Trainer(
    model=model,
    args=TrainingArguments(
        output_dir="./slovene-model",
        per_device_train_batch_size=4,
        num_train_epochs=3,
        logging_steps=10,
        eval_steps=20,
        eval_strategy="steps",
        save_steps=50,
        save_total_limit=1,
        report_to=[]
    ),
    train_dataset=train_ds,
    eval_dataset=eval_ds,
    tokenizer=processor.feature_extractor,
    data_collator=DataCollator(processor),
    compute_metrics=compute_metrics
)

trainer.train()
print(trainer.evaluate())