# Codificador y Decodificador en video

In [37]:
import os
import shutil
import subprocess
import numpy as np
import cv2
import imageio.v2 as imageio
from scipy.signal import firwin, lfilter, get_window
from scipy.fft import fft, ifft
from scipy.io import wavfile as wav
from vozyaudio import lee_audio, sonido

## Codificador

In [None]:
class PixelSoundsEncoder:
    """
    Convierte un audio en un “vídeo pixelado” codificando cada bloque de
    muestras en filas de imágenes.

    - Primer frame: metadatos (N, hop, fps, modo color).
    - Siguientes frames: bloques de audio coloreados (gris o RGB).
    - Luego empaqueta PNGs en MP4 con un batch externo.
    """
    def __init__(
        self,
        audio_path,
        frames_dir="fotogramas",
        export_dir="exports",
        fps=60,
        color_mode="color",   # 'gris' o 'color'
        map_mode="ampl",       # 'ampl', 'fft' o 'fir'
        window_type="hann",
        numcoef=101
    ):
        """
        Inicializa el codificador con:

        audio_path   : ruta al WAV de entrada.
        frames_dir   : carpeta donde escribir PNGs.
        export_dir   : carpeta de salida de vídeo.
        fps          : frames por segundo del vídeo.
        color_mode   : 'gris' o 'color'.
        map_mode     : 'ampl', 'fft' o 'fir'.
        window_type  : tipo de ventana (p.ej. 'hann').
        numcoef      : coeficientes FIR (solo para 'fir').
        """
        self.audio_path  = audio_path
        self.frames_dir  = frames_dir
        self.export_dir  = export_dir
        self.fps         = fps
        self.color_mode  = color_mode
        self.map_mode    = map_mode
        self.window_type = window_type
        # Flag para modo color: 0=gris, 1=color
        self.iscolor     = 1 if color_mode == "color" else 0

        # Leer y normalizar audio de 16-bit
        self.fs, audio = lee_audio(audio_path)
        self.audio     = audio.astype(np.float32)
        self.audio    /= np.max(np.abs(self.audio)) + 1e-12

        # Diseñar filtros FIR (baja, banda, alta)
        self.b_low  = firwin(numcoef, cutoff=2000,                   fs=self.fs)
        self.b_band = firwin(numcoef, [2000, 6000], pass_zero=False, fs=self.fs)
        self.b_high = firwin(numcoef, cutoff=6000, pass_zero=False,  fs=self.fs)

        # Preparar directorios de salida
        if os.path.exists(self.frames_dir):
            shutil.rmtree(self.frames_dir)
        os.makedirs(self.frames_dir, exist_ok=True)
        os.makedirs(self.export_dir, exist_ok=True)

    def _write_header(self, N, hop):
        # Creamos un frame cuadrado de tamaño N×N
        header = np.zeros((N, N), dtype=np.uint8)

        # Desglosar N y hop en bytes altos y bajos
        hiN, loN = (N >> 8) & 0xFF, N & 0xFF
        hiH, loH = (hop >> 8) & 0xFF, hop & 0xFF
        b_fps     = self.fps & 0xFF

        # Filas 0–4: hiN, loN, hiH, loH, fps
        header[0, :] = hiN
        header[1, :] = loN
        header[2, :] = hiH
        header[3, :] = loH
        header[4, :] = b_fps

        # Fila 5: modo de imagen (0=gris, 1=color)
        header[5, :] = self.iscolor

        # Guardar encabezado como primer frame PNG
        path = os.path.join(self.frames_dir, "frame_0000.png")
        imageio.imwrite(path, header)

    def _colorear_fila(self, fila, modo):
        """
        Dada una fila (uint8) y el modo, devuelve Nx3 RGB uint8 coloreado.
        """
        # fila puede ser (N,) o (N,3)
        if modo == 'ampl':
            # fila: (N,) uint8
            amp = fila
            r = amp
            g = 255 - amp
            b = 128 * np.ones_like(amp, dtype=np.uint8)
        elif modo == 'fft':
            # fila_fft: (N,2) uint8 con [mag_n, phase_n]
            mag_n, phase_n = fila[:,0], fila[:,1]

            # Propuesta: R=mag_n, G=phase_n, B=255
            r = mag_n
            g = phase_n
            b = 255 * np.ones_like(r, dtype=np.uint8)
        elif modo == "fir":
            # fila ya es Nx3 con low, band, high
            r = fila[:,0]
            g = fila[:,1]
            b = fila[:,2]
        else:
            raise ValueError(f"Modo desconocido para colorear: {modo}")

        # Apilar y asegurar uint8
        return np.stack([r, g, b], axis=1).astype(np.uint8)

    def generate_frames(self):
        # Determinar N según fs/fps y asegurar paridad
        N = self.fs // self.fps
        if N % 2: N += 1
        hop = N // 2

        # Escribir primer frame con metadata
        self._write_header(N, hop)

        # Ventana y número de bloques
        window   = get_window(self.window_type, N)
        n_blocks = (len(self.audio) - N) // hop
        print(f"N={N}, HOP={hop}, FPS={self.fps}, Bloques={n_blocks}")

        # Para cada bloque, crear frame y guardarlo
        for i in range(n_blocks):
            start = i * hop
            block = self.audio[start:start+N] * window

            # 1) Construir fila base según map_mode
            if self.map_mode == "ampl":
                norm = (block - block.min()) / (block.max() - block.min() + 1e-12)

                # FILA
                fila = (norm * 255).astype(np.uint8)              # (N,)

            elif self.map_mode == "fft":
                # FFT + shift (centrar DC)
                X      = fft(block, n=N)
                X      = np.fft.fftshift(X)
                mag    = np.abs(X)
                phase  = np.angle(X)

                # Normalizar magnitud y fase a rango 0–255
                mag_n   = np.round((mag   / (mag.max() + 1e-12)) * 255).astype(np.uint8)
                phase_n = np.round(((phase + np.pi) / (2*np.pi)) * 255).astype(np.uint8)

                # Fila base: [mag_n, phase_n, canal vacío]
                fila    = np.stack([mag_n, phase_n, np.zeros_like(mag_n)], axis=1)  # (N,3)

            elif self.map_mode == "fir":
                # Aplicamos filtros
                y_l = lfilter(self.b_low,  1.0, block)
                y_b = lfilter(self.b_band, 1.0, block)
                y_h = lfilter(self.b_high, 1.0, block)
                
                # Para que se mantenga entre -1 y +1
                y_l, y_b, y_h = map(lambda y: np.clip(y, -1, 1), (y_l,y_b,y_h))

                # Normalizamos a 8 bits
                r8 = np.round(y_l*127).astype(np.int8).view(np.uint8)
                g8 = np.round(y_b*127).astype(np.int8).view(np.uint8)
                b8 = np.round(y_h*127).astype(np.int8).view(np.uint8)

                # FILA
                fila = np.stack([r8, g8, b8], axis=1)         # (N,3)

            else:
                raise ValueError("map_mode debe ser 'ampl', 'fft' o 'fir'")

            # 2) Colorear o replicar según color_mode
            if self.color_mode == "gris":
                # replicar fila en 3 capas idénticas
                gris = fila if fila.ndim==1 else fila[:,0]
                img  = np.tile(gris[np.newaxis,:], (N,1))
            else:
                base_rgb = fila if fila.ndim==2 else np.stack([fila]*3,axis=1)
                colored  = self._colorear_fila(base_rgb, self.map_mode)
                img      = np.tile(colored[np.newaxis,...], (N,1,1))

            # Guardar PNG
            path = os.path.join(self.frames_dir, f"frame_{i+1:04d}.png")
            imageio.imwrite(path, img)

        print(f"[Encoder] Generados {n_blocks} fotogramas en '{self.frames_dir}/'")

    def encode_video(self, output_name=None):
        # Nombre de salida
        base = os.path.splitext(os.path.basename(self.audio_path))[0]
        name = output_name or f"{base}_{self.map_mode}_{self.color_mode}.mp4"
        out  = os.path.join(self.export_dir, name)
        if os.path.exists(out):
            os.remove(out)

        # Llamada externa para empaquetar en MP4
        subprocess.run(
            ["cmd", "/c", "generarVideo2.bat", str(self.fps), out],
            check=True
        )
        return out


## Decodificador

In [None]:
class PixelSoundsDecoder:
    """
    Decodifica un vídeo generado por PixelSoundsEncoder de vuelta a WAV.

    - Primer frame: metadatos (N, hop, fps, color_mode).
    - Siguientes frames: bloques de audio (gris o RGB) codificados.
    - Reconstruye por overlap-add y guarda WAV.
    """
    def __init__(
        self,
        frames_dir,
        output_wav,
        map_mode='ampl',    # 'ampl', 'fft' o 'fir'
        window_type='hann'  # tipo de ventana para overlap-add
    ):
        self.frames_dir   = frames_dir
        self.output_wav   = output_wav
        self.map_mode     = map_mode
        self.window_type  = window_type

        # preparar carpeta limpia para extraer frames
        if os.path.exists(frames_dir):
            shutil.rmtree(frames_dir)
        os.makedirs(self.frames_dir, exist_ok=True)

    def extract_all_frames(self, video_path, prefix="frame_", fmt="png"):
        print(f"[Decoder] Extrayendo frames de '{video_path}'...")
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise IOError(f"No se pudo abrir el vídeo {video_path}")
        idx = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            fname = f"{prefix}{idx:04d}.{fmt}"
            cv2.imwrite(os.path.join(self.frames_dir, fname), frame)
            idx += 1
        cap.release()
        print(f"[Decoder] Total frames extraídos: {idx}")
        return idx

    def extraer_metadatos_cabecera_rows(self, frame):
        print("[Decoder] Leyendo metadatos de header...")
        # forzar uint8 y extraer canal único
        if frame.dtype != np.uint8:
            frame = np.clip(frame * 255, 0, 255).astype(np.uint8)
        if frame.ndim == 3:
            frame = frame[..., 0]

        # filas 0–1: N (hi, lo)
        hiN, loN = int(frame[0,0]), int(frame[1,0])
        # filas 2–3: hop (hi, lo)
        hiH, loH = int(frame[2,0]), int(frame[3,0])
        # fila 4: fps
        fps      = int(frame[4,0])
        # fila 5: modo color (0=gris, 1=color)
        flag     = int(frame[5,0])

        N        = (hiN << 8) | loN
        hop      = (hiH << 8) | loH
        is_color = bool(flag)

        print(f"[Decoder] Header -> N={N}, hop={hop}, fps={fps}, modo={'color' if is_color else 'gris'}")
        return N, hop, fps, is_color

    def decode(self):
        """
        Reconstruye el audio a partir de los PNG ya extraídos en self.frames_dir.
        Asume que frame_0000.png (header) y los frames de datos están presentes.
        """
        # 1) Leer y parsear header (frame_0000.png)
        frame0    = imageio.imread(os.path.join(self.frames_dir, "frame_0000.png"))
        N, hop, fps, self.is_color = self.extraer_metadatos_cabecera_rows(frame0)
        fs_recon  = N * fps   # fs = muestras por frame × frames por segundo

        # 2) Listar solo los archivos de datos
        files     = sorted(f for f in os.listdir(self.frames_dir)
                           if f.startswith("frame_") and f != "frame_0000.png")
        n_blocks  = len(files)
        print(f"[Decoder] Frames de datos: {n_blocks}")

        # 3) Preparar buffers para overlap-add
        length    = N + hop * (n_blocks - 1)
        audio     = np.zeros(length, dtype=np.float32)
        pesos     = np.zeros(length, dtype=np.float32)
        ventana   = get_window(self.window_type, N)
        print(f"[Decoder] Ventana '{self.window_type}' aplicada, primeros valores: {ventana[:5]}")

        # 4) Reconstruir bloque a bloque
        for i, fname in enumerate(files, start=1):
            print(f"[Decoder] Procesando block {i}/{n_blocks}: {fname}")
            img = imageio.imread(os.path.join(self.frames_dir, fname)).astype(np.float32) / 255.0
            row = img[0]    # primera fila de pixeles: shape (N,3)

            if not self.is_color:
                # → modo gris: colapsar los 3 canales (idénticos) a 1D
                gris  = row[:, 0]
                pix   = gris
                print(f"[Decoder]  gris block, primeros pix: {pix[:5]}")

                if self.map_mode != 'ampl':
                    raise ValueError("En modo gris solo map_mode='ampl'")
                
                
                # restaurar amplitud lineal: [0→1] → [-1→+1]
                block = pix * 2 - 1
                print(f"[Decoder]  ampl(gris) block, primeros 5 samples: {block[:5]}")
            else:
                # → modo color: row es Nx3
                pix = row
                print(f"[Decoder]  color block, pix.shape = {pix.shape}, primeros pix: {pix[:3]}")
                if self.map_mode == 'ampl':
                    # reconstruir amplitud 24-bit
                    hi   = pix[:,0].astype(np.uint32)
                    md   = pix[:,1].astype(np.uint32)
                    lo   = pix[:,2].astype(np.uint32)
                    X24  = (hi<<16)|(md<<8)|lo
                    block = (X24/(2**24-1))*2 - 1
                    print(f"[Decoder]  ampl block, primeros 5 samples: {block[:5]}")
                elif self.map_mode == 'fft':
                    # reconstruir FFT con magnitud y fase
                    mag_n   = pix[:,0]
                    phase_n = pix[:,1]
                    phase   = phase_n * 2*np.pi - np.pi
                    X       = mag_n * np.exp(1j * phase)
                    # revertir el fftshift aplicado en el encoder
                    X       = np.fft.ifftshift(X)
                    block   = np.real(ifft(X, n=N))
                    print(f"[Decoder]  fft block, primeros 5 samples: {block[:5]}")
                elif self.map_mode == 'fir':
                    # reinterpretar int8 en float
                    r8    = pix[:,0].view(np.int8).astype(np.float32) / 127.0
                    block = r8
                    print(f"[Decoder]  fir block, primeros 5 samples: {block[:5]}")
                else:
                    raise ValueError("map_mode debe ser 'ampl', 'fft' o 'fir'")

            # 5) Overlap-add con ventana
            start = (i-1) * hop
            audio[start:start+N] += block * ventana
            pesos[start:start+N] += ventana
            if i == 1:
                print(f"[Decoder]  overlap-add i=1, audio[0:5]={audio[0:5]}")

        # 6) Normalizar todo el audio
        print(f"[Decoder] Antes normalización, min/max = {audio.min():.4f}/{audio.max():.4f}")
        audio /= (pesos + 1e-12)
        print(f"[Decoder] Después normalización, min/max = {audio.min():.4f}/{audio.max():.4f}")

        # 7) Reproducir y guardar WAV
        sonido(audio, fs_recon)

        # Asegurar carpeta de salida y renombrar con modo y color
        out_dir = os.path.dirname(self.output_wav) or '.'
        os.makedirs(out_dir, exist_ok=True)
        base       = os.path.splitext(os.path.basename(self.output_wav))[0]
        mode_color = 'color' if self.is_color else 'gris'
        new_name   = f"{base}_{self.map_mode}_{mode_color}_recon.wav"
        out_path   = os.path.join(out_dir, new_name)

        # Escalar a 16-bit y escribir WAV
        scaled = np.int16(np.clip(audio, -1, 1) * 32767)
        wav.write(out_path, fs_recon, scaled)
        print(f"[Decoder] WAV reconstruido en: {out_path} ({fs_recon} Hz)")

        return audio, fs_recon



# Pruebas

In [None]:
def extraer_metadatos_cabecera_rows(frame):
    """
    Extrae N, salto y fps de un array 2D o 3D codificado por filas:
    fila 0→hiN, 1→loN, 2→hiS, 3→loS, 4→fps.
    """
    # asegurar uint8 y escala de grises
    if frame.dtype != np.uint8:
        frame = np.clip(frame * 255, 0, 255).astype(np.uint8)
    if frame.ndim == 3:
        frame = frame[..., 0]

    hiN  = int(frame[0, 0]);     loN  = int(frame[1, 0])
    hiS  = int(frame[2, 0]);     loS  = int(frame[3, 0])
    fpsb = int(frame[4, 0])

    N     = (hiN << 8) + loN
    salto = (hiS << 8) + loS
    fps   = fpsb

    return N, salto, fps


In [None]:
def borrar_carpetas():
    try:
        shutil.rmtree("fotogramas")
        shutil.rmtree("decoded")
        shutil.rmtree("exports")
    finally:
        print("borradas")
borrar_carpetas()

In [40]:
ruta_frame = os.path.join("fotogramas", "frame_0000.png")
frame_orig = imageio.imread(ruta_frame)

# Extrae y muestra los metadatos
N, salto, fps = extraer_metadatos_cabecera_rows(frame_orig)
print(f"Original → N: {N}, salto: {salto}, fps: {fps}")


Original → N: 736, salto: 368, fps: 60


In [41]:
ruta_frame = os.path.join("analisis", "frame_0000.png")
frame_orig = imageio.imread(ruta_frame)

# Extrae y muestra los metadatos
N, salto, fps = extraer_metadatos_cabecera_rows(frame_orig)
print(f"Original → N: {N}, salto: {salto}, fps: {fps}")


Original → N: 736, salto: 368, fps: 61


## Codificador

In [123]:
# Ruta al WAV de entrada
audio_path = "audios/music.wav"
# Carpeta donde se volcarán los PNG
frames_dir = "fotogramas"
# Carpeta donde se guardará el MP4 final
export_dir = "exports"

# Parámetros de codificación
fps         = 60          # frames por segundo
color_mode  = "color"     # "gris" o "color"
map_mode    = "fft"       # "ampl", "fft" o "fir"
window_type = "hann"      # tipo de ventana para framing
numcoef     = 101         # n° de coef. FIR (solo importa para 'fir')


encoder = PixelSoundsEncoder(
    audio_path=audio_path,
    frames_dir=frames_dir,
    export_dir=export_dir,
    fps=fps,
    color_mode=color_mode,
    map_mode=map_mode,
    window_type=window_type,
    numcoef=numcoef
)

In [124]:
# 2) Generar los PNG
encoder.generate_frames()
print(f"[Encoder] Frames generados en '{frames_dir}/'")

# 3) Empaquetar los frames en un MP4
video_file = encoder.encode_video()
print(f"[Encoder] Vídeo exportado en '{video_file}'")


N=736, HOP=368, FPS=60, Bloques=1196
[Encoder] Generados 1196 fotogramas en 'fotogramas/'
[Encoder] Frames generados en 'fotogramas/'
[Encoder] Vídeo exportado en 'exports\music_fft_color.mp4'


## Decodificador

In [163]:
video_path  = os.path.join("exports", "music_fir_color.mp4")   # mp4 generado por el encoder
frames_dir  = "analisis"                                    
output_wav  = os.path.join("exports", "music_recon.wav")      # WAV reconstruido
map_mode    = "fir"                                          # debe coincidir con encoder
window_type = "hann"                                          # idem

# 2) Instanciar el decoder
decoder = PixelSoundsDecoder(
    frames_dir=frames_dir,
    output_wav=output_wav,
    map_mode=map_mode,
    window_type=window_type
)

In [164]:
# 3) Extraer todos los fotogramas del vídeo
total = decoder.extract_all_frames(video_path)
print(f"[Decoder] Extraídos {total} fotogramas en '{frames_dir}/'")

[Decoder] Extrayendo frames de 'exports\music_fir_color.mp4'...
[Decoder] Total frames extraídos: 1197
[Decoder] Extraídos 1197 fotogramas en 'analisis/'


In [165]:
# 4) Reconstruir el audio y guardar el WAV
audio, fs = decoder.decode()
print(f"[Decoder] Audio reconstruido ({fs} Hz) guardado en '{output_wav}'")

[Decoder] Leyendo metadatos de header...
[Decoder] Header -> N=736, hop=368, fps=61, modo=color
[Decoder] Frames de datos: 1196
[Decoder] Ventana 'hann' aplicada, primeros valores: [0.00000000e+00 1.82197108e-05 7.28775154e-05 1.63969430e-04
 2.91488817e-04]
[Decoder] Procesando block 1/1196: frame_0001.png
[Decoder]  color block, pix.shape = (736, 3), primeros pix: [[0. 0. 0.]
 [0. 0. 0.]
 [0. 0. 0.]]


ValueError: To change to a dtype of a different size, the last axis must be contiguous