In [None]:
import psycopg2
from elasticsearch import Elasticsearch
import json
from datetime import time
import logging

# Configuración de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Configuración de PostgreSQL
DB_CONFIG = {
    'host': 'postgres',
    'database': 'bbdd_api_youtube',
    'user': 'postgres',
    'password': 'postgres',
    'port': '5432'
}

# Configuración de Elasticsearch
ES_CONFIG = {
    'hosts': ['http://elasticsearch:9200'],
    'timeout': 30
}

# Índice de Elasticsearch
ES_INDEX = 'videos'

def convert_time_to_seconds(time_obj):
    """Convierte objeto time a segundos totales"""
    if isinstance(time_obj, time):
        return time_obj.hour * 3600 + time_obj.minute * 60 + time_obj.second
    return 0

def create_elasticsearch_index(es_client):
    """Crea el índice en Elasticsearch con el mapeo adecuado"""
    mapping = {
        "mappings": {
            "properties": {
                "id": {"type": "keyword"},
                "title_raw": {
                    "type": "text",
                    "analyzer": "standard",
                    "fields": {
                        "keyword": {"type": "keyword"}
                    }
                },
                "duration_seconds": {"type": "integer"},
                "topic": {
                    "type": "text",
                    "analyzer": "standard"
                },
                "published_at": {"type": "date"},
                "view_count": {"type": "integer"},
                "like_count": {"type": "integer"},
                "language": {"type": "keyword"},
                "id_channel": {"type": "keyword"}
            }
        },
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        }
    }
    
    try:
        if not es_client.indices.exists(index=ES_INDEX):
            es_client.indices.create(index=ES_INDEX, body=mapping)
            logger.info(f"Índice {ES_INDEX} creado exitosamente")
        else:
            logger.info(f"Índice {ES_INDEX} ya existe")
    except Exception as e:
        logger.error(f"Error creando índice: {e}")
        raise

def fetch_videos_from_postgres():
    """Obtiene videos de PostgreSQL"""
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cursor = conn.cursor()
        
        query = """
        SELECT 
            id,
            title_raw,
            duration,
            topic,
            published_at,
            view_count,
            like_count,
            language,
            id_channel
        FROM public.video
        """
        
        cursor.execute(query)
        columns = [desc[0] for desc in cursor.description]
        videos = []
        
        for row in cursor.fetchall():
            video_dict = dict(zip(columns, row))
            # Convertir duration a segundos
            video_dict['duration_seconds'] = convert_time_to_seconds(video_dict['duration'])
            # Eliminar el campo duration original
            del video_dict['duration']
            videos.append(video_dict)
        
        cursor.close()
        conn.close()
        
        logger.info(f"Obtenidos {len(videos)} videos de PostgreSQL")
        return videos
        
    except Exception as e:
        logger.error(f"Error obteniendo datos de PostgreSQL: {e}")
        raise

def index_videos_to_elasticsearch(es_client, videos):
    """Indexa los videos en Elasticsearch"""
    success_count = 0
    error_count = 0
    
    for video in videos:
        try:
            # Usar el ID del video como ID del documento
            doc_id = video['id']
            
            # Preparar documento para Elasticsearch
            doc = {
                'id': video['id'],
                'title_raw': video['title_raw'],
                'duration_seconds': video['duration_seconds'],
                'topic': video['topic'],
                'published_at': video['published_at'].isoformat() if video['published_at'] else None,
                'view_count': video['view_count'],
                'like_count': video['like_count'],
                'language': video['language'],
                'id_channel': video['id_channel']
            }
            
            # Indexar documento
            es_client.index(
                index=ES_INDEX,
                id=doc_id,
                body=doc
            )
            
            success_count += 1
            
            if success_count % 100 == 0:
                logger.info(f"Indexados {success_count} videos...")
                
        except Exception as e:
            error_count += 1
            logger.error(f"Error indexando video {video.get('id', 'unknown')}: {e}")
    
    return success_count, error_count

def sync_postgres_to_elasticsearch():
    """Función principal para sincronizar datos"""
    logger.info("Iniciando sincronización de PostgreSQL a Elasticsearch")
    
    try:
        # Conectar a Elasticsearch
        es_client = Elasticsearch(**ES_CONFIG)
        
        # Verificar conexión a Elasticsearch
        if not es_client.ping():
            raise Exception("No se pudo conectar a Elasticsearch")
        
        logger.info("Conexión a Elasticsearch establecida")
        
        # Crear índice
        create_elasticsearch_index(es_client)
        
        # Obtener videos de PostgreSQL
        videos = fetch_videos_from_postgres()
        
        if not videos:
            logger.warning("No se encontraron videos para indexar")
            return
        
        # Indexar videos en Elasticsearch
        success_count, error_count = index_videos_to_elasticsearch(es_client, videos)
        
        # Forzar refresh para que los documentos estén disponibles inmediatamente
        es_client.indices.refresh(index=ES_INDEX)
        
        # Obtener conteo final
        final_count = es_client.count(index=ES_INDEX)['count']
        
        logger.info("Sincronización completada:")
        logger.info(f"- Documentos indexados exitosamente: {success_count}")
        logger.info(f"- Errores durante indexación: {error_count}")
        logger.info(f"- Total de documentos en Elasticsearch: {final_count}")
        
    except Exception as e:
        logger.error(f"Error en la sincronización: {e}")
        raise

if __name__ == "__main__":
    sync_postgres_to_elasticsearch()