# Transcription embedders

In [None]:
-- Habilitar la extensión de vectores
CREATE EXTENSION IF NOT EXISTS vector;

-- Crear tabla para almacenar embeddings
CREATE TABLE podcast_embeddings (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    content TEXT NOT NULL,
    metadata JSONB,
    embedding vector(1536), -- text-embedding-3-small usa 1536 dimensiones
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Función corregida que usa la tabla podcast_embeddings
create or replace function match_documents (
  query_embedding vector (1536),
  filter jsonb default '{}',
  match_count int default 5
) returns table (
  id uuid,
  content text,
  metadata jsonb,
  similarity float
) language plpgsql as $$
#variable_conflict use_column
begin
  return query
  select
    podcast_embeddings.id,
    podcast_embeddings.content,
    podcast_embeddings.metadata,
    1 - (podcast_embeddings.embedding <=> query_embedding) as similarity
  from podcast_embeddings
  where podcast_embeddings.metadata @> filter
  order by podcast_embeddings.embedding <=> query_embedding
  limit match_count;
end;
$$;

In [1]:
import json
import logging
import random
import re
import os

from pathlib import Path
from typing import List, Dict, Any
from dotenv import load_dotenv

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import SupabaseVectorStore
from supabase import create_client, Client

In [2]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

load_dotenv()


True

In [3]:
class SemanticChunker:
    """Chunker semántico especializado para transcripciones de podcast"""
    
    def __init__(self, chunk_size: int = 1500, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ". ", "? ", "! ", " ", ""],
            keep_separator=True
        )
    
    def detect_topic_boundaries(self, text: str) -> List[int]:
        """Detecta límites de temas basado en patrones comunes en podcasts"""
        boundaries = [0]
        
        # Patrones que indican cambio de tema
        patterns = [
            r'\b(?:ahora|bueno|entonces|por otro lado|cambiando de tema|hablando de)\b',
            r'\b(?:siguiente pregunta|otra cosa|pasemos a|vamos a hablar)\b',
            r'\n\n',  # Párrafos nuevos
        ]
        
        for pattern in patterns:
            matches = list(re.finditer(pattern, text, re.IGNORECASE))
            for match in matches:
                boundaries.append(match.start())
        
        # Ordenar y eliminar duplicados
        boundaries = sorted(list(set(boundaries)))
        boundaries.append(len(text))
        
        return boundaries
    
    def chunk_transcript(self, text: str, episode_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Divide la transcripción en chunks semánticamente coherentes"""
        boundaries = self.detect_topic_boundaries(text)
        chunks = []
        
        # Estimar duración total (asumiendo ~150 palabras por minuto)
        total_words = len(text.split())
        estimated_duration_minutes = total_words / 150
        
        for i in range(len(boundaries) - 1):
            start_pos = boundaries[i]
            end_pos = boundaries[i + 1]
            
            segment = text[start_pos:end_pos].strip()
            
            if len(segment) < 100:  # Skip chunks muy pequeños
                continue
            
            # Si el segmento es muy largo, usar text_splitter tradicional
            if len(segment) > self.chunk_size:
                sub_chunks = self.text_splitter.split_text(segment)
                
                for j, sub_chunk in enumerate(sub_chunks):
                    # Calcular timestamp estimado
                    progress = (start_pos + j * len(sub_chunk)) / len(text)
                    estimated_timestamp = progress * estimated_duration_minutes
                    
                    chunk_metadata = {
                        **episode_metadata,
                        "chunk_index": len(chunks),
                        "estimated_timestamp_minutes": round(estimated_timestamp, 2),
                        "chunk_type": "semantic_sub",
                        "word_count": len(sub_chunk.split())
                    }
                    
                    chunks.append({
                        "content": sub_chunk,
                        "metadata": chunk_metadata
                    })
            else:
                # Calcular timestamp estimado
                progress = start_pos / len(text)
                estimated_timestamp = progress * estimated_duration_minutes
                
                chunk_metadata = {
                    **episode_metadata,
                    "chunk_index": len(chunks),
                    "estimated_timestamp_minutes": round(estimated_timestamp, 2),
                    "chunk_type": "semantic",
                    "word_count": len(segment.split())
                }
                
                chunks.append({
                    "content": segment,
                    "metadata": chunk_metadata
                })
        
        return chunks

In [20]:
class PodcastIndexer:
    """Sistema principal de indexación para transcripciones de podcast"""
    
    def __init__(self):
        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-small",
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )
        
        # Configurar Supabase
        supabase_url = os.getenv("SUPABASE_URL")
        supabase_key = os.getenv("SUPABASE_KEY")
        
        if not supabase_url or not supabase_key:
            raise ValueError("SUPABASE_URL y SUPABASE_KEY deben estar configurados")
        
        self.supabase_client: Client = create_client(supabase_url, supabase_key)
        self.vector_store = SupabaseVectorStore(
            client=self.supabase_client,
            embedding=self.embeddings,
            table_name="podcast_embeddings",
            query_name="match_documents"
        )
        self.chunker = SemanticChunker()
    
    def extract_episode_metadata(self, file_path: Path) -> Dict[str, Any]:
        
        filename = file_path.stem
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
        except Exception as e:
            logger.error(f"Error leyendo JSON {file_path}: {str(e)}")
            data = {}
        
        episode_id = data.get("episode_id", filename)
        title = data.get("title", episode_id)
        duration = data.get("duration", 0)
        language = data.get("language", "unknown")
        audio_file_path = data.get("file_path", "")
        
        return {
            "episode_id": episode_id,
            "title": title,
            "filename": filename,
            "source_file": str(file_path),
            "duration": duration,
            "language": language,
            "file_path": audio_file_path
        }

    
    def process_transcript_file(self, file_path: Path) -> List[Dict[str, Any]]:
        logger.info(f"Procesando: {file_path}")
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            episode_metadata = self.extract_episode_metadata(file_path)
            chunks = self.chunker.chunk_transcript(content, episode_metadata)
            
            logger.info(f"Generados {len(chunks)} chunks para {file_path.name}")
            return chunks
            
        except Exception as e:
            logger.error(f"Error procesando {file_path}: {str(e)}")
            return []
    
    def index_transcripts(self, transcripts_dir: str = "transcripciones"):
        transcripts_path = Path(transcripts_dir)
        
        if not transcripts_path.exists():
            raise FileNotFoundError(f"Directorio {transcripts_dir} no encontrado")
        
        transcript_files = list(transcripts_path.glob("*.json"))
 
        
        # Seleccionar solo 2 archivos aleatorios
        # transcript_files = random.sample(transcript_files, min(2, len(transcript_files)))
        
        if not transcript_files:
            logger.warning(f"No se encontraron archivos .txt en {transcripts_dir}")
            return
        
        logger.info(f"Encontrados {len(transcript_files)} archivos de transcripción")
        
        all_chunks = []
        
        # Procesar cada archivo
        for file_path in transcript_files:
            chunks = self.process_transcript_file(file_path)
            all_chunks.extend(chunks)
        
        if not all_chunks:
            logger.warning("No se generaron chunks para indexar")
            return
        
        logger.info(f"Total de chunks a indexar: {len(all_chunks)}")
        
        texts = [chunk["content"] for chunk in all_chunks]
        metadatas = [chunk["metadata"] for chunk in all_chunks]
        
        batch_size = 50
        total_batches = (len(texts) + batch_size - 1) // batch_size
        
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i + batch_size]
            batch_metadatas = metadatas[i:i + batch_size]
            current_batch = (i // batch_size) + 1
            
            try:
                self.vector_store.add_texts(
                    texts=batch_texts,
                    metadatas=batch_metadatas
                )
                
                logger.info(f"Batch {current_batch}/{total_batches} indexado exitosamente")
                
            except Exception as e:
                logger.error(f"Error indexando batch {current_batch}: {str(e)}")
                continue
        
        logger.info("Indexación completada!")
    
    def search_episodes(self, query: str, k: int = 5) -> List[Dict[str, Any]]:
        try:
            logger.info(f"Buscando: '{query}' con k={k}")
            
            # Verificar conexión a Supabase
            logger.info("Verificando conexión a Supabase...")
            
            # Intentar búsqueda
            docs = self.vector_store.similarity_search(query=query, k=k)
            logger.info(f"Búsqueda exitosa, encontrados {len(docs)} documentos")
            
            results = []
            for doc in docs:
                result = {
                    "content": doc.page_content,
                    "metadata": doc.metadata,
                }
                results.append(result)
            
            return results
            
        except Exception as e:
            import traceback
            logger.error(f"Error en búsqueda: {str(e)}")
            logger.error(f"Tipo de error: {type(e).__name__}")
            logger.error(f"Traceback completo:\n{traceback.format_exc()}")
            
            return []

In [None]:
# ten cuidad con ejecutar de nuevo. 🚨
indexer = PodcastIndexer()
# indexer.index_transcripts("../data/transcriptions")


In [22]:
indexer = PodcastIndexer()
indexer.search_episodes("Iglesia catolica")

2025-07-06 19:41:31,684 - INFO - Buscando: 'Iglesia catolica' con k=5
2025-07-06 19:41:31,685 - INFO - Verificando conexión a Supabase...
2025-07-06 19:41:32,292 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-07-06 19:41:32,663 - INFO - HTTP Request: POST https://zdanxslxyfhuvwqczyig.supabase.co/rest/v1/rpc/match_documents?limit=5 "HTTP/2 200 OK"
2025-07-06 19:41:32,674 - INFO - Búsqueda exitosa, encontrados 5 documentos


[{'content': 'ahora viene la extravagancia Ya sabemos todos que la secta católica tiene por norma trapichear con huesos de muertos para ponerlos debajo de sus altares Durante la inauguración de esta sede del Opus en el CSIC buscaron los huesos más oportunos, dado que habían declarado patrón espiritual al tal Isidoro de Sevilla Oye,',
  'metadata': {'title': '20240520_190000',
   'duration': 925,
   'filename': '20240520_190000',
   'language': 'es',
   'file_path': '/Users/seedtag/projects/personal/la-historia-por-concostrina/podcast_crawler/app/../../audios/2024_05_20_19.mp3',
   'chunk_type': 'semantic',
   'episode_id': '20240520_190000',
   'word_count': 53,
   'chunk_index': 12,
   'source_file': '../data/transcriptions/20240520_190000.json',
   'estimated_timestamp_minutes': 10.06}},
 {'content': 'bueno, hay que recordar que la república no tuvo oportunidad de desarrollar plenamente esa constitución aprobada en el 31, ni de secularizar la enseñanza, porque lo impidió un golpe de 