In [3]:
ls

[0m[01;34msample_data[0m/  test.zip


In [4]:
!unzip /content/test.zip -d /content/test

Archive:  /content/test.zip
   creating: /content/test/404/
   creating: /content/test/545006/
   creating: /content/test/596001/
   creating: /content/test/605000/
   creating: /content/test/606/
  inflating: /content/test/test.lst  
   creating: /content/test/606/manual_transcription/
   creating: /content/test/606/manual_translations/
  inflating: /content/test/606/slides.pptx  
  inflating: /content/test/606/606.m4a  
   creating: /content/test/606/manual_transcription/sentence_segmented/
  inflating: /content/test/606/manual_transcription/ie606.srt  
  inflating: /content/test/606/manual_transcription/ie606.txt  
  inflating: /content/test/606/manual_transcription/sentence_segmented/ie606.srt  
   creating: /content/test/606/manual_translations/de/
   creating: /content/test/606/manual_translations/es/
   creating: /content/test/606/manual_translations/fr/
   creating: /content/test/606/manual_translations/sl/
  inflating: /content/test/606/manual_translations/fr/606.lst  
  infla

In [1]:
! pip install transformers==4.47 backoff srt jiwer



## Load

### Load example

In [None]:

transcription = open('/content/test/404/manual_transcription/ie404.txt', 'r')
transcription = transcription.read()


In [None]:
import srt
srt_file_path = '/content/test/404/manual_transcription/sentence_segmented/ie404.srt'

with open(srt_file_path, 'r', encoding='utf-8') as f:
  content = f.read()

parser = srt.parse(content)
trans = list(parser)
print(f"Se encontraron {len(trans)} transcripciones.")

Se encontraron 182 transcripciones.


In [None]:
for subtitle in trans:
        print("-" * 20)
        print(f"Número de secuencia: {subtitle.index}")
        print(f"Inicio: {subtitle.start}") # Un objeto datetime.timedelta
        print(f"Fin: {subtitle.end}")   # Un objeto datetime.timedelta
        # El texto del subtítulo, con las etiquetas HTML eliminadas por defecto por srt.parse
        print(f"Texto:\n{subtitle.content}")

--------------------
Número de secuencia: 1
Inicio: 0:00:00
Fin: 0:00:02.130000
Texto:
Good morning, afternoon and evening.
--------------------
Número de secuencia: 2
Inicio: 0:00:02.430000
Fin: 0:00:04.160000
Texto:
This is the conference operator.
--------------------
Número de secuencia: 3
Inicio: 0:00:04.310000
Fin: 0:00:08.950000
Texto:
The European School of Oncology welcomes you to their four hundred and forth e-session.
--------------------
Número de secuencia: 4
Inicio: 0:00:09.250000
Fin: 0:00:13.670000
Texto:
Today's e-session is the impact of oncological surgery on the outcomes.
--------------------
Número de secuencia: 5
Inicio: 0:00:13.980000
Fin: 0:00:16.930000
Texto:
Please note that this activity is CME accredited.
--------------------
Número de secuencia: 6
Inicio: 0:00:17.230000
Fin: 0:00:24.870000
Texto:
At the end of the presentation by closing the webcast window you will be directed to the CME evaluation and multiple choice test.
--------------------
Número de se

In [None]:
from pathlib import Path
import subprocess, soundfile as sf, datetime as dt

FFMPEG = "ffmpeg"          # o la ruta absoluta si no está en PATH

def m4a_to_wav(path_m4a, sr_out=16_000):
    """Convierte 1 × .m4a → .wav (mono, 16 kHz) y devuelve la ruta del WAV."""
    path_m4a = Path(path_m4a)
    wav_path = path_m4a.with_suffix(".wav")
    subprocess.run(
        [FFMPEG, "-loglevel", "error", "-y", "-i", str(path_m4a),
         "-ac", "1", "-ar", str(sr_out), str(wav_path)],
        check=True
    )
    return wav_path


def _to_seconds(t):
    """Acepta timedelta, str 'H:M:S.micros' o número ya en segundos."""
    if isinstance(t, (int, float)):
        return float(t)
    if isinstance(t, dt.timedelta):
        return t.total_seconds()
    if isinstance(t, str):
        h, m, s = t.split(":")
        return int(h) * 3600 + int(m) * 60 + float(s)
    raise TypeError(f"Tipo de tiempo no soportado: {type(t)}")


def cut_segments(wav_path, transcripciones, sr=16_000):
    """
    Extrae los fragmentos indicados por .start / .end de cada objeto `seg`
    (timedelta, str o float s) y devuelve una lista de np.ndarray.
    """
    clips = []
    with sf.SoundFile(str(wav_path)) as f:
        if f.samplerate != sr:
            raise ValueError(f"El WAV está a {f.samplerate} Hz, no a {sr}")
        for seg in transcripciones:
            start_s = _to_seconds(seg.start)
            end_s   = _to_seconds(seg.end)
            f.seek(int(start_s * sr))
            frames  = int((end_s - start_s) * sr)
            clips.append(f.read(frames, dtype="float32"))
    return clips, sr                             # sr devuelto solo una vez




In [None]:
wav_file = m4a_to_wav("/content/test/404/404.m4a")
clips, sr = cut_segments(wav_file, trans)

print(f"{len(clips)=}, {sr=}, {clips[0].shape=}")

len(clips)=182, sr=16000, clips[0].shape=(34080,)


In [None]:
clips[0]

array([ 0.        ,  0.        ,  0.        , ..., -0.00665283,
       -0.00775146, -0.0088501 ], dtype=float32)

### Load Test data

In [2]:
from pathlib import Path
import subprocess, soundfile as sf, datetime as dt

FFMPEG = "ffmpeg"          # o la ruta absoluta si no está en PATH

def m4a_to_wav(path_m4a, sr_out=16_000):
    """Convierte 1 × .m4a → .wav (mono, 16 kHz) y devuelve la ruta del WAV."""
    path_m4a = Path(path_m4a)
    wav_path = path_m4a.with_suffix(".wav")
    subprocess.run(
        [FFMPEG, "-loglevel", "error", "-y", "-i", str(path_m4a),
         "-ac", "1", "-ar", str(sr_out), str(wav_path)],
        check=True
    )
    return wav_path


def _to_seconds(t):
    """Acepta timedelta, str 'H:M:S.micros' o número ya en segundos."""
    if isinstance(t, (int, float)):
        return float(t)
    if isinstance(t, dt.timedelta):
        return t.total_seconds()
    if isinstance(t, str):
        h, m, s = t.split(":")
        return int(h) * 3600 + int(m) * 60 + float(s)
    raise TypeError(f"Tipo de tiempo no soportado: {type(t)}")


def cut_segments(wav_path, transcripciones, sr=16_000):
    """
    Extrae los fragmentos indicados por .start / .end de cada objeto `seg`
    (timedelta, str o float s) y devuelve una lista de np.ndarray.
    """
    clips = []
    with sf.SoundFile(str(wav_path)) as f:
        if f.samplerate != sr:
            raise ValueError(f"El WAV está a {f.samplerate} Hz, no a {sr}")
        for seg in transcripciones:
            start_s = _to_seconds(seg.start)
            end_s   = _to_seconds(seg.end)
            f.seek(int(start_s * sr))
            frames  = int((end_s - start_s) * sr)
            clips.append(f.read(frames, dtype="float32"))
    return clips, sr                             # sr devuelto solo una vez


In [3]:
import srt
with open('/content/test/test.lst', 'r', encoding='utf-8') as f:
  folders = f.read()
  folders = folders.splitlines()
  print(folders)

['404', '596001', '605000', '606', '545006']


In [4]:
def get_audio_segments(name, folder='test'):
  srt_file_path = f'/content/{folder}/{name}/manual_transcription/sentence_segmented/ie{name}.srt'

  with open(srt_file_path, 'r', encoding='utf-8') as f:
    content = f.read()

  parser = srt.parse(content)
  trans = list(parser)
  print(f"Se encontraron {len(trans)} transcripciones.")

  wav_file = m4a_to_wav(f"/content/{folder}/{name}/{name}.m4a")
  clips, sr = cut_segments(wav_file, trans)

  print(f"{len(clips)=}, {sr=}, {clips[0].shape=}")
  return trans, clips, sr

### Load Model

In [5]:
from transformers import AutoModelForCausalLM, AutoProcessor, GenerationConfig
# from peft import LoraConfig, get_peft_model
import torch

model_name = 'microsoft/Phi-4-multimodal-instruct'
processor = AutoProcessor.from_pretrained(model_name,trust_remote_code = True)

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="cuda",
    torch_dtype="auto",
    trust_remote_code=True,
    # if you do not use Ampere or later GPUs, change attention to "eager"
    _attn_implementation='eager',
)

generation_config = GenerationConfig.from_pretrained(model_name)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
  lambda i: encoder_checkpoint_wrapper(


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

## Basic Inference

In [None]:
user_prompt = '<|user|>'
assistant_prompt = '<|assistant|>'
prompt_suffix = '<|end|>'
speech_prompt = "Transcribe the audio clip into text"
prompt = f'{user_prompt}<|audio_1|>{speech_prompt}{prompt_suffix}{assistant_prompt}'
print(f'>>> Prompt\n{prompt}')

>>> Prompt
<|user|><|audio_1|>Transcribe the audio clip into text<|end|><|assistant|>


In [None]:
# Downlowd and open audio file
# audio, samplerate = sf.read(io.BytesIO(urlopen(audio_url).read()))

# Process with the model
inputs = processor(text=prompt, audios=[(clips[0], sr)], return_tensors='pt').to('cuda:0')


In [None]:
generate_ids = model.generate(
    **inputs,
    max_new_tokens=1000,
    generation_config=generation_config,
)
generate_ids = generate_ids[:, inputs['input_ids'].shape[1]:]
response = processor.batch_decode(
    generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
print(f'>>> Response\n{response}')



>>> Response
Good morning, afternoon and evening.


In [None]:
trans[0].content

'Good morning, afternoon and evening.'

## Baseline

In [6]:
import torch, gc
from tqdm import tqdm                   # barra de progreso opcional

batch_size=2
device = torch.device("cuda:0")
user_prompt = '<|user|>'
assistant_prompt = '<|assistant|>'
prompt_suffix = '<|end|>'
speech_prompt = "Transcribe the audio clip into text"
prompt = f'{user_prompt}<|audio_1|>{speech_prompt}{prompt_suffix}{assistant_prompt}'
@torch.inference_mode()                      # evita gradientes
def transcribe_folder(folder, prompt=prompt):
    trans, clips, sr = get_audio_segments(folder)

    # ── agrupa los clips en lotes del mismo tamaño aproximado ──
    batch, batch_hyps = [], []
    for c in clips:
        batch.append((c, sr))
        if len(batch) == batch_size or c is clips[-1]:             # ← ajusta batch_size
            inputs = processor(
                text=[prompt] * len(batch),               # uno por clip
                audios=batch, return_tensors="pt", padding=True
            ).to(device, dtype=torch.float16)

            gen_ids = model.generate(
                **inputs,
                max_new_tokens = 1000,
                generation_config = generation_config,
            )[:, inputs["input_ids"].shape[1]:]           # quita prompt

            # mueve las ids a CPU antes de decodificar → libera VRAM
            batch_hyps.extend(
                processor.batch_decode(
                    gen_ids.cpu(), skip_special_tokens=True,
                    clean_up_tokenization_spaces=False
                )
            )

            # ── liberamos objetos grandes ───────────────────────
            del inputs, gen_ids
            torch.cuda.empty_cache()
            gc.collect()

            batch = []        # empezamos nuevo lote

    return batch_hyps, [t.content for t in trans]

# ───────────────────── bucle principal ───────────────────────────

hypotesis, transcriptions = [], []
for f in tqdm(folders):
    h, t = transcribe_folder(f)
    hypotesis.append(h)
    transcriptions.append(t)

  0%|          | 0/5 [00:00<?, ?it/s]

Se encontraron 182 transcripciones.
len(clips)=182, sr=16000, clips[0].shape=(34080,)


 20%|██        | 1/5 [26:42<1:46:51, 1602.92s/it]

Se encontraron 349 transcripciones.
len(clips)=349, sr=16000, clips[0].shape=(22560,)


 40%|████      | 2/5 [59:43<1:31:15, 1825.25s/it]

Se encontraron 486 transcripciones.
len(clips)=486, sr=16000, clips[0].shape=(102240,)


 60%|██████    | 3/5 [1:37:35<1:07:38, 2029.13s/it]

Se encontraron 274 transcripciones.
len(clips)=274, sr=16000, clips[0].shape=(17599,)


 80%|████████  | 4/5 [2:04:24<31:03, 1863.15s/it]  

Se encontraron 146 transcripciones.
len(clips)=146, sr=16000, clips[0].shape=(124320,)


100%|██████████| 5/5 [2:18:00<00:00, 1656.11s/it]


### Evaluation

In [7]:
from jiwer import wer, cer, Compose, ToLowerCase, RemovePunctuation,RemoveMultipleSpaces, Strip

# ── 1. Normalización recomendada ────────────────────────────────────────────
normalize = Compose([
    ToLowerCase(),           # “Hola” → “hola”
    RemovePunctuation(),     # quita . , ! ? …
    RemoveMultipleSpaces(),
    Strip(),
])

In [8]:
from itertools import chain
def wer_cer_by_audio(refs_audio, hyps_audio, transform=normalize):
    """
    refs_audio : list[list[str]]   → [[frase1, frase2, ...] por audio]
    hyps_audio : list[list[str]]   → misma estructura
    return      : (global_metrics, per_audio_metrics)

    global_metrics     = {'wer': float, 'cer': float}
    per_audio_metrics  = [{'audio_id': i, 'wer': float, 'cer': float}, ...]
    """
    assert len(refs_audio) == len(hyps_audio), "Nº de audios no coincide"

    # ── 1. Métricas por audio ────────────────────────────────────────────────
    per_audio = []
    for idx, (ref_seg, hyp_seg) in enumerate(zip(refs_audio, hyps_audio)):
        assert len(ref_seg) == len(hyp_seg), f"Líneas audio {idx} no cuadran"
        ref_flat = " ".join(ref_seg)
        hyp_flat = " ".join(hyp_seg)

        ref_norm = transform(ref_flat)
        hyp_norm = transform(hyp_flat)

        per_audio.append({
            "audio_id": idx,
            "wer": wer(ref_norm, hyp_norm),
            "cer": cer(ref_norm, hyp_norm),
        })

    # ── 2. Métrica global (todas las frases) ────────────────────────────────
    refs_all = list(chain.from_iterable(refs_audio))
    hyps_all = list(chain.from_iterable(hyps_audio))
    assert len(refs_all) == len(hyps_all), "Total de frases no coincide"

    refs_all_norm = [transform(r) for r in refs_all]
    hyps_all_norm = [transform(h) for h in hyps_all]

    global_metrics = {
        "wer": wer(refs_all_norm, hyps_all_norm),
        "cer": cer(refs_all_norm, hyps_all_norm),
    }

    return global_metrics, per_audio

### Results

In [12]:
glob,audio=wer_cer_by_audio(transcriptions,hypotesis)

print("Global Metrics")
print(glob)
print("Audio Metrics")
for audio in audio:
  print(audio)

Global Metrics
{'wer': 0.1386331847900335, 'cer': 0.07190126080101271}
Audio Metrics
{'audio_id': 0, 'wer': 0.0678151918559123, 'cer': 0.03504282431585544}
{'audio_id': 1, 'wer': 0.16718528995756718, 'cer': 0.08687558966774613}
{'audio_id': 2, 'wer': 0.13771210896917926, 'cer': 0.06650491229465813}
{'audio_id': 3, 'wer': 0.22954128440366972, 'cer': 0.127046760913469}
{'audio_id': 4, 'wer': 0.06047244094488189, 'cer': 0.027386839102320273}
