# Tarea 3. Transformer con llamadas a funciones

Una evolución interesante de los primeros LLMs fue la incorporación de llamadas a funciones para superar las limitaciones en cálculos concretos de los modelos de lenguajes. <br>
Para esta tarea final utilizamos los ficheros: fechas2_train_function.en.csv y fechas2_train_function.es.csv en la que se añade al final del texto a reconocer un separador ‘|’ y un código de Python para llamar al sistema y pedir que ejecute esa función <br>
Prepare un sistema como el de la tarea 1 que entrene combinando las listas de los dos idiomas para que sea bilingüe dentro del contexto simplificado de esta tarea.
En test esta vez en lugar de medir la tasa de error WER lo que haremos es ejecutar el código dentro de la sección separada y comparar la fecha que genera con la de referencia de la tabla: <br>
fechas2_test_function.csv <br>
y evaluaremos la tasa de error. <br>

# Imports

In [1]:
import os
import torch
import torch, torchaudio, glob
import numpy as np
import random
import csv

from Trabajo_Utils import NoiseAug, RIRAug, identity, wer
from Trabajo_Model import *

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
SEQ_LEN = 12
NB_EPOCHS = 5
BATCH_SIZE = 32

def seed_everything(seed):      
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
seed_everything(42)

### Data Downloads

In [3]:
# dataset = load_dataset("FluidInference/musan", split="train")

# if not os.path.exists("musan_small"):
#     os.makedirs("musan_small", exist_ok=True)
#     for i, example in enumerate(dataset):
#         audio = example["audio"]["array"]
#         sr = example["audio"]["sampling_rate"]
#         sf.write(f"musan_small/file_{i}.wav", audio, sr)
        
# url = 'https://openslr.elda.org/resources/28/rirs_noises.zip'

# if not os.path.exists('RIRS_NOISES'):
#     if not os.path.exists('rirs_noises.zip'):
#         os.system('wget ' + url)
#     os.system('unzip -q rirs_noises.zip')
#     os.system('rm rirs_noises.zip')

In [4]:
from datetime import datetime, timedelta

def relative_day(days_to_add, current_date='21/11/2025'):
    date_format = "%d/%m/%Y"
    current = datetime.strptime(current_date, date_format)
    new_date = current + timedelta(days=int(days_to_add))
    return new_date.strftime(date_format)

def next_day(day_name, current_date='21/11/2025'):
    date_format = "%d/%m/%Y"
    current = datetime.strptime(current_date, date_format)
    
    # Map day names → ISO weekday codes
    day_name = day_name.strip().lower()
    day_map = {
        "monday": 1,
        "tuesday": 2,
        "wednesday": 3,
        "thursday": 4,
        "friday": 5,
        "saturday": 6,
        "sunday": 7,
    }
    if day_name not in day_map:
        raise ValueError(f"Invalid day name: {day_name}")
    
    target_code = day_map[day_name]
    days_ahead = (target_code - current.isoweekday()) % 7
    if days_ahead == 0:
        days_ahead = 7 # always next occurrence
        
    new_date = current + timedelta(days=days_ahead)
    return new_date.strftime(date_format)

In [None]:
class WordTokenizer:
    def __init__(self, csv_file):
        self.word2index = {
            '<pad>': 0,
            '<sos>': 1,
            '<eos>': 2,
            '<unk>': 3
        }
        self.index2word = {
            0: '<pad>',
            1: '<sos>',
            2: '<eos>',
            3: '<unk>'
        }

        self.build_vocab(csv_file)

    def build_vocab(self, csv_file):
        with open(csv_file, newline='', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                data = row['txt'].split('|')
                text = data[0].strip().lower()
                # func = data[1].strip().lower()
                for word in text.split():
                    if word not in self.word2index:
                        idx = len(self.word2index)
                        self.word2index[word] = idx
                        self.index2word[idx] = word

    def encode(self, text, seq_len=-1):
        tokens = ['<sos>'] + text.lower().split() + ['<eos>']
        ids = [
            self.word2index.get(w, self.word2index['<unk>'])
            for w in tokens
        ]

        if seq_len > len(ids):
            ids += [self.word2index['<pad>']] * (seq_len - len(ids))
        else:
            ids = ids[:seq_len]

        return torch.tensor(ids)


    def decode(self, ids):
        if isinstance(ids, torch.Tensor):
            ids = ids.tolist()

        words = [self.index2word.get(i, '<unk>') for i in ids]
        text = ' '.join(words)
        text = text.replace('<sos>', '').replace('<eos>', '').replace('<pad>', '')
        return text.strip()


In [None]:
class TrainDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        csv_file,
        tokenizer,
        audio_len=4*16000,
        seq_len=SEQ_LEN,
        transform=[identity]
    ):
        self.audio_len = audio_len
        self.seq_len = seq_len
        self.transform = transform
        self.tokenizer = tokenizer

        with open(csv_file, encoding='utf-8') as f:
            reader = csv.DictReader(f)
            self.data = [(row['wav'], row['txt']) for row in reader]

        print(len(self.data))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        wav_path, text = self.data[idx]
        tmp = text.split('|')
        text = tmp[0].strip().lower()
        func = tmp[1].strip().lower()

        x, fs = torchaudio.load(wav_path)

        if x.shape[1] < self.audio_len:
            x = torch.nn.functional.pad(
                x, (0, self.audio_len - x.shape[1]), value=0
            )
        else:
            x = x[:, :self.audio_len]

        x = x[0].numpy()
        for t in self.transform:
            x = t(x)

        y = self.tokenizer.encode(text, self.seq_len)
        return x, y
    
class TestDataset(torch.utils.data.Dataset):
    def __init__(self, csv_file, tokenizer, audio_len=4*16000, seq_len=SEQ_LEN):
        self.audio_len = audio_len
        self.seq_len = seq_len
        self.tokenizer = tokenizer

        with open(csv_file, encoding='utf-8') as f:
            reader = csv.DictReader(f)
            self.data = [(row['wav'], row['txt']) for row in reader]

        print(len(self.data))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        wav_path, text = self.data[idx]

        x, fs = torchaudio.load(wav_path)
        if x.shape[1] < self.audio_len:
            x = torch.nn.functional.pad(
                x, (0, self.audio_len - x.shape[1]), value=0
            )
        else:
            x = x[:, :self.audio_len]

        x = x[0]
        y = self.tokenizer.encode(text, self.seq_len)
        return x, y

In [6]:
tokenizer = WordTokenizer('fechas2/fechas2_train_function.es.csv')

please next thursday
next friday
friday thank you
tuesday please
tomorrow
this coming friday thanks
the day after tomorrow
please this coming tuesday
tomorrow
next monday
next thursday
this coming friday thanks
next friday please
this tuesday
please tomorrow
in three days
please next tuesday
next monday
please next friday
in three days
tuesday thank you.
in a couple of days
this monday
next friday
next friday
please in a couple of days
tuesday
in three days
please tomorrow
this coming wednesday please
please in a couple of days
please next tuesday
next friday thank you
the following tuesday
tuesday
this monday
this coming wednesday
the day after tomorrow
this coming tuesday
thursday please
wednesday
this coming monday please
in three days thanks
wednesday
wednesday
this friday
next friday thank you
in a couple of days please
this coming tuesday
in a couple of days
next friday
in three days thanks
please this coming tuesday
wednesday please
next friday
please this coming monday
tuesday 