<a href="https://colab.research.google.com/github/DavidP0011/apps/blob/main/app_transc_01_files_path_collect.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# INICIALIZACIÓN

In [1]:
# @title dpm_ini_utils.py (GitHub)

import os         # Para operaciones del sistema y manejo de rutas.
import sys        # Para interactuar con el sistema, en particular con sys.path.
import importlib  # Para realizar importaciones dinámicas.
import tempfile   # Para crear archivos temporales.
import requests   # Para realizar peticiones HTTP y descargar el módulo.

# URL raw del módulo en GitHub (debe incluir las funciones ini_load_dpm_libs, ini_environment_identification e ini_google_drive_instalation)
github_url = "https://raw.githubusercontent.com/DavidP0011/utils/main/dpm_ini_utils.py"

# Solicitar la descarga sin utilizar caché, para asegurar que se obtiene la versión actualizada.
headers = {"Cache-Control": "no-cache", "Pragma": "no-cache"}
response = requests.get(github_url, headers=headers)
if response.status_code != 200:
    raise Exception(f"Error al descargar el módulo desde GitHub. Código: {response.status_code}")

# Guardar el contenido descargado en un archivo temporal.
temp_dir = tempfile.gettempdir()
module_path = os.path.join(temp_dir, "dpm_ini_utils.py")
with open(module_path, "wb") as f:
    f.write(response.content)

# Agregar el directorio temporal a sys.path si aún no está incluido, para permitir la importación.
if temp_dir not in sys.path:
    sys.path.insert(0, temp_dir)

# Invalidar cachés y eliminar versiones previas del módulo para forzar la recarga.
importlib.invalidate_caches()
module_name = "dpm_ini_utils"
if module_name in sys.modules:
    del sys.modules[module_name]

# Importar y recargar el módulo.
module = importlib.import_module(module_name)
module = importlib.reload(module)

# Intentar importar las funciones deseadas desde el módulo.
ini_install_libraries = getattr(module, "ini_install_libraries", None)
ini_load_dpm_libs = getattr(module, "ini_load_dpm_libs", None)
ini_environment_identification = getattr(module, "ini_environment_identification", None)
ini_google_drive_instalation = getattr(module, "ini_google_drive_instalation", None)

# Verificar que las funciones se hayan importado correctamente.
if ini_install_libraries is None:
    print("La función ini_install_libraries no se encontró en el módulo.")
else:
    print("Función ini_install_libraries cargada correctamente.")

if ini_load_dpm_libs is None:
    print("La función ini_load_dpm_libs no se encontró en el módulo.")
else:
    print("Función ini_load_dpm_libs cargada correctamente.")

if ini_environment_identification is None:
    print("La función ini_environment_identification no se encontró en el módulo.")
else:
    print("Función ini_environment_identification cargada correctamente.")

if ini_google_drive_instalation is None:
    print("La función ini_google_drive_instalation no se encontró en el módulo.")
else:
    print("Función ini_google_drive_instalation cargada correctamente.")


Función ini_install_libraries cargada correctamente.
Función ini_load_dpm_libs cargada correctamente.
Función ini_environment_identification cargada correctamente.
Función ini_google_drive_instalation cargada correctamente.


In [2]:
# @title IDENTIFICACION DE ENTORNO, INSTALACIÓN GOOGLE DRIVE

# Detectar el entorno de ejecución
ini_environment_identificated = ini_environment_identification()
print(f"[INFO ℹ️] Entorno detectado: {ini_environment_identificated}", flush=True)

GCP_json_keyfile_local = r"C:/api_keys/XXX.json"
GCP_json_keyfile_colab = "/content/drive/MyDrive/ANIMUM DIRECCION/DIRECCION BI/NOTEBOOKS/api_keys/animum-dev-apps-google-colab.json"
GCP_json_keyfile_GCP_secret_id = "notebook-vm"

# Montar Google Drive si entorno_identificado_str es Colab
params = {"entorno_identificado_str": ini_environment_identificated}
ini_google_drive_instalation(params)

[INFO ℹ️] Entorno detectado: COLAB
Mounted at /content/drive
[INFO ℹ️] Google Drive montado correctamente.


In [2]:
# @title INSTALACION DE LIBRERIAS
from IPython import get_ipython
import os
packages = [
    # Ejemplos originales:
    {
        "name": "whisper",
        "import_name": "whisper",
        # Instalación desde GitHub:
        "install_cmd": "pip install git+https://github.com/openai/whisper.git"
    },
    {
        "name": "ffmpeg-python",
        "import_name": "ffmpeg",
        "pip_name": "ffmpeg-python"
    },
    {
        "name": "FFmpeg",  # Paquete del sistema
        "is_system": True,
        "check_cmd": "ffmpeg -version",
        "install_cmds": [
            "apt-get update",
            "apt-get install -y ffmpeg",
            "ffmpeg -version"
        ]
    },

    {"name": "rapidfuzz", "import_name": "rapidfuzz", "pip_name": "rapidfuzz"},
    {"name": "pycountry", "import_name": "pycountry", "pip_name": "pycountry"},
    {"name": "phonenumbers", "import_name": "phonenumbers", "pip_name": "phonenumbers"},
    {"name": "deep_translator", "import_name": "deep_translator", "pip_name": "deep_translator"},
    {"name": "requests", "import_name": "requests", "pip_name": "requests"},
    {"name": "beautifulsoup4", "import_name": "bs4", "pip_name": "beautifulsoup4"},
    {"name": "googletrans", "import_name": "googletrans", "pip_name": "googletrans", "version": "4.0.0-rc1"},
]

# Ejecuta la función para cada paquete
for pkg in packages:
    ini_install_libraries(pkg)



[START ▶️] Verificando instalación de whisper...
[INSTALLATION [INFO ℹ️]] whisper no está instalado. Procediendo con la instalación...
[INSTALLATION [COMMAND ▶️]] Ejecutando comando personalizado: pip install git+https://github.com/openai/whisper.git
[END [FINISHED ✅]] Proceso de instalación finalizado para whisper.


[START ▶️] Verificando instalación de ffmpeg-python...
[INSTALLATION [INFO ℹ️]] ffmpeg-python no está instalado. Procediendo con la instalación...
[INSTALLATION [COMMAND ▶️]] Ejecutando: pip install --upgrade ffmpeg-python
[END [FINISHED ✅]] Proceso de instalación finalizado para ffmpeg-python.


[START ▶️] Verificando instalación de FFmpeg...
[INSTALLATION [SUCCESS ✅]] FFmpeg ya está instalado.

[START ▶️] Verificando instalación de rapidfuzz...
[INSTALLATION [INFO ℹ️]] rapidfuzz no está instalado. Procediendo con la instalación...
[INSTALLATION [COMMAND ▶️]] Ejecutando: pip install --upgrade rapidfuzz
[END [FINISHED ✅]] Proceso de instalación finalizado para rapidfuzz.

In [None]:
# @title IMPORTACIÓN DE LIBRERÍAS DPM
import sys
import os
import importlib
import datetime
import inspect
import pandas as pd



# Configuración para importar librerías personalizadas
config = [
    {
        "module_host": "github",
        # Se utiliza la URL raw para obtener el contenido real del archivo
        "module_path": "https://raw.githubusercontent.com/DavidP0011/utils/main/dpm_tables.py",
        "selected_functions_list": []
    },
    {
        "module_host": "github",
        # Se utiliza la URL raw para obtener el contenido real del archivo
        "module_path": "https://raw.githubusercontent.com/DavidP0011/utils/main/dpm_GCP_utils.py",
        "selected_functions_list": []
    },
    {
        "module_host": "github",
        "module_path": "https://raw.githubusercontent.com/DavidP0011/utils/main/dpm_SQL.py",
        "selected_functions_list": []
    }
]

# Cargar las librerías personalizadas
ini_load_dpm_libs(config)



🔹🔹🔹 [START ▶️] Iniciando carga de módulo dpm_tables.py 🔹🔹🔹

[EXTRACTION [START ▶️]] Descargando módulo desde GitHub: https://raw.githubusercontent.com/DavidP0011/utils/main/dpm_tables.py
[EXTRACTION [SUCCESS ✅]] Archivo descargado y guardado en: /tmp/dpm_tables.py
[LOAD [START ▶️]] Importando módulo: dpm_tables
[LOAD [SUCCESS ✅]] Módulo 'dpm_tables' importado correctamente.

[METRICS [INFO 📊]] Informe de carga del módulo:
  - Módulo: dpm_tables
  - Ruta: /tmp/dpm_tables.py
  - Fecha de última modificación (último commit en GitHub o mod. local): 2025-03-11 17:42:43+01:00
  - Objetos importados:
      • fields_name_format (function): Formatea nombres de campos de datos según configuraciones específicas.
      • table_DF_to_various_targets (function): Escribe un DataFrame en distintos destinos (archivo local, Google Sheets, BigQuery o GCS)
      • table_various_sources_to_DF (function): Extrae datos desde distintos orígenes (archivo, Google Sheets, BigQuery o GCS) y los convierte en un D

In [None]:
# @title IMPORTACIÓN DE LIBRERÍAS WHISPER Y FFMPEG

# Verificar e instalar el paquete 'whisper'
try:
    import whisper
except ImportError:
    print("El paquete 'whisper' no está instalado. Procediendo con la instalación...")
    !pip install git+https://github.com/openai/whisper.git
    import whisper

# Verificar e instalar el paquete 'ffmpeg-python'
try:
    import ffmpeg
except ImportError:
    print("El paquete 'ffmpeg-python' no está instalado. Procediendo con la instalación...")
    !pip install ffmpeg-python

# Instalar y verificar 'ffmpeg' a nivel del sistema
import os
if os.system("ffmpeg -version") != 0:
    print("FFmpeg no está instalado. Procediendo con la instalación...")
    !apt-get update
    !apt-get install -y ffmpeg
    !ffmpeg -version
else:
    print("FFmpeg ya está instalado.")



El paquete 'whisper' no está instalado. Procediendo con la instalación...
Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-fmgh_a8f
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-fmgh_a8f
  Resolved https://github.com/openai/whisper.git to commit 517a43ecd132a2089d85f4ebc044728a71d49f6e
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper==20240930)
  Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->openai-whisper==20240930)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->open

## GBQ DATASETS SCHEMA

In [None]:
# @title GBQ INFO GLOBAL


# Configuración
params_dic = {
    "spreadsheet_source_table_id": "1aJCGTJtDu_ODqBc4zUcrpQ-q6PE_HN0rO4mwYMIhCXw",
    "spreadsheet_source_table_worksheet_name": "DATA",

    "ini_environment_identificated": ini_environment_identificated,
    "json_keyfile_GCP_secret_id": GCP_json_keyfile_GCP_secret_id,
    "json_keyfile_colab": GCP_json_keyfile_colab
}



full_info_from_GBQ_df = table_various_sources_to_DF(params_dic)
display(full_info_from_GBQ_df)

[AUTHENTICATION [INFO] 🔐] Entorno local/Colab detectado. Usando json_keyfile_colab.
[EXTRACTION [START ⏳]] Extrayendo datos de Google Sheets...
[EXTRACTION [SUCCESS ✅]] Datos extraídos con éxito de la hoja 'DATA'.


Unnamed: 0,project_id,dataset_id,table_name,field_name,field_type,num_rows,num_columns,size_mb,fecha_actualizacion_GBQ,fecha_actualizacion_df
0,animum-dev-datawarehouse,IMDb_01raw_01,name_basics_raw,nconst,STRING,14235647,6,865.54,10/03/2025 19:43:37,11/03/2025 17:00:05
1,animum-dev-datawarehouse,IMDb_01raw_01,name_basics_raw,primaryName,STRING,14235647,6,865.54,10/03/2025 19:43:37,11/03/2025 17:00:05
2,animum-dev-datawarehouse,IMDb_01raw_01,name_basics_raw,birthYear,INTEGER,14235647,6,865.54,10/03/2025 19:43:37,11/03/2025 17:00:05
3,animum-dev-datawarehouse,IMDb_01raw_01,name_basics_raw,deathYear,STRING,14235647,6,865.54,10/03/2025 19:43:37,11/03/2025 17:00:05
4,animum-dev-datawarehouse,IMDb_01raw_01,name_basics_raw,primaryProfession,STRING,14235647,6,865.54,10/03/2025 19:43:37,11/03/2025 17:00:05
...,...,...,...,...,...,...,...,...,...,...
9688,animum-dev-datawarehouse,vl_01raw_01,MA_PROVINCIAS,Hora_creacion,DATETIME,1033,10,0.06,10/03/2025 5:22:56,11/03/2025 17:00:05
9689,animum-dev-datawarehouse,vl_01raw_01,MA_PROVINCIAS,Usuario_creacion,STRING,1033,10,0.06,10/03/2025 5:22:56,11/03/2025 17:00:05
9690,animum-dev-datawarehouse,vl_01raw_01,MA_PROVINCIAS,Fecha_modificacion,DATETIME,1033,10,0.06,10/03/2025 5:22:56,11/03/2025 17:00:05
9691,animum-dev-datawarehouse,vl_01raw_01,MA_PROVINCIAS,Hora_modificacion,DATETIME,1033,10,0.06,10/03/2025 5:22:56,11/03/2025 17:00:05


In [None]:
# @title GBQ DATASETS
print("Datasets disponibles:\n")
for dataset in full_info_from_GBQ_df['dataset_id'].unique():
    print(f"{dataset}")

Datasets disponibles:

IMDb_01raw_01
IMDb_02st_01
IMDb_raw_01
IMDb_staging_01
cd2_01raw_01
facebook_ads_raw_v01
facebook_ads_raw_v01_facebook_ads
facebook_ads_raw_v01_facebook_ads_source
fivetran_metadata
fivetran_metadata_fivetran_platform
fivetran_metadata_stg_fivetran_platform
google_ads_raw_01
google_ads_raw_01_google_ads
google_ads_raw_01_google_ads_source
hubspot_raw_v01
hubspot_raw_v01_hubspot
hubspot_raw_v01_stg_hubspot
mkt_02st_01
mkt_03BI_01
tablas_mapeo
tp_02st_01
tp_03bi_01
vl_01raw_01


# DECLARACIÓN DE FUNCIONES

In [None]:
# @title get_video_properties_dic()
def get_video_properties_dic(params: dict) -> dict:
    """
    Obtiene las propiedades técnicas de un archivo de video.
    Args:
        params (dict):
            - file_path (str): Ruta completa del archivo de video.
    Returns:
        dict: Diccionario con las siguientes claves:
            - file_name (str): Nombre del archivo de video.
            - file_path (str): Ruta completa del archivo de video.
            - file_size_mb (float): Tamaño del archivo en megabytes.
            - duration_s (float): Duración del video en segundos.
            - video_codec (str): Códec de video utilizado.
            - audio_codec (str): Códec de audio utilizado.
            - resolution (str): Resolución del video (ancho x alto).
            - audio_sample_rate (int): Frecuencia de muestreo del audio en Hz.
            - error (str): Mensaje de error en caso de fallo (opcional).
    """
    file_path = params.get("file_path")
    try:
        probe = ffmpeg.probe(file_path)
        video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
        audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None)

        return {
            "file_name": os.path.basename(file_path),
            "file_path": file_path,
            "file_size_mb": float(probe['format']['size']) / (1024 * 1024),
            "duration_s": float(probe['format']['duration']),
            "video_codec": video_stream['codec_name'] if video_stream else None,
            "audio_codec": audio_stream['codec_name'] if audio_stream else None,
            "resolution": f"{video_stream['width']}x{video_stream['height']}" if video_stream else None,
            "audio_sample_rate": int(audio_stream['sample_rate']) if audio_stream else None,
        }
    except Exception as e:
        return {
            "file_name": os.path.basename(file_path),
            "file_path": file_path,
            "file_size_mb": None,
            "duration_s": None,
            "video_codec": None,
            "audio_codec": None,
            "resolution": None,
            "audio_sample_rate": None,
            "error": str(e)
        }

In [None]:
# @title orchestrate_transcription_to_GSpread()
def orchestrate_transcription_to_GSpread(params: dict):
    """
    Orquesta el proceso de transcripción de videos en una carpeta y actualiza
    una hoja de Google Sheets con columnas fijas, incluyendo hasta 10 partes
    de transcripción (de 50k caracteres cada una).

    Escribe cada transcripción inmediatamente, para no perder avances si
    la sesión se interrumpe.

    Args:
        params (dict):
            - folder_path (str): Ruta de la carpeta con los videos.
            - model_size (str): Tamaño del modelo Whisper ("tiny", "base", "small", "medium", "large").
            - sheet_id (str | None): ID de la hoja de Google Sheets; si no se da, se crea una nueva.
            - verbose (bool): Mostrar mensajes detallados durante el proceso.
    """
    import gspread
    from google.colab import auth
    from google.auth.transport.requests import Request
    from google.auth import default
    from datetime import datetime
    import os
    import time
    import ffmpeg
    import pandas as pd
    from whisper import load_model
    from IPython.display import display

    # ------------------------------------------------------------------------
    # 1) Autenticar en Google y abrir/crear la hoja
    # ------------------------------------------------------------------------
    auth.authenticate_user()
    creds, _ = default()
    creds.refresh(Request())
    gspread_client = gspread.authorize(creds)

    sheet_id = params.get("sheet_id")
    if sheet_id:
        try:
            spreadsheet = gspread_client.open_by_key(sheet_id)
        except gspread.SpreadsheetNotFound:
            raise ValueError("El ID proporcionado no corresponde a una hoja válida.")
    else:
        folder_name = os.path.basename(params["folder_path"])
        sheet_name = "Transcripciones de Videos"
        spreadsheet = gspread_client.create(f"{folder_name} - {sheet_name}")

    sheet = spreadsheet.sheet1

    # ------------------------------------------------------------------------
    # 2) Definir cabecera fija y verificar si la hoja ya la tiene
    # ------------------------------------------------------------------------
    HEADERS = [
        "file_name", "file_path", "file_size_mb", "duration_s",
        "video_codec", "audio_codec", "resolution", "audio_sample_rate",
        "transcription_date",
        "transcription_part_1", "transcription_part_2", "transcription_part_3",
        "transcription_part_4", "transcription_part_5", "transcription_part_6",
        "transcription_part_7", "transcription_part_8", "transcription_part_9",
        "transcription_part_10"
    ]

    existing_values = sheet.get_all_values()
    if existing_values:
        current_headers = existing_values[0]
        # Chequeamos si 'file_name' no existe o si la cantidad de columnas
        # es distinta a la que esperamos (19), limpiamos y reescribimos.
        if ("file_name" not in current_headers) or (len(current_headers) != len(HEADERS)):
            sheet.clear()
            sheet.update("A1", [HEADERS])
    else:
        # Hoja en blanco
        sheet.update("A1", [HEADERS])

    # ------------------------------------------------------------------------
    # 3) Leer registros existentes para saber qué archivos se han transcrito
    # ------------------------------------------------------------------------
    all_rows = sheet.get_all_records()
    processed_files = {
        row.get("file_name"): row.get("transcription_date", "N/A")
        for row in all_rows
        if row.get("file_name") is not None
    }

    # ------------------------------------------------------------------------
    # 4) Listar archivos y mostrar estado
    # ------------------------------------------------------------------------
    folder_path = params["folder_path"]
    video_files = [
        f for f in os.listdir(folder_path)
        if f.lower().endswith((".mp4", ".mkv", ".avi", ".mov"))
    ]
    status_data = []
    for video in video_files:
        status = "Transcrito" if video in processed_files else "No transcrito"
        date = processed_files.get(video, "N/A")
        status_data.append({"file_name": video, "status": status, "transcription_date": date})

    status_df = pd.DataFrame(status_data)
    print("\nEstado de los vídeos:")
    display(status_df)

    # ------------------------------------------------------------------------
    # 5) Cargar modelo Whisper
    # ------------------------------------------------------------------------
    model = load_model(params.get("model_size", "medium"))
    if params.get("verbose", False):
        print(f"\nModelo '{params.get('model_size', 'medium')}' cargado correctamente.")

    # ------------------------------------------------------------------------
    # 6) Transcribir cada video y volcar de inmediato a la hoja
    # ------------------------------------------------------------------------
    max_chars = 50000  # troceamos la transcripción en bloques de 50k para evitar errores en GSheet

    for video in video_files:
        # Saltar si ya está transcrito
        if video in processed_files:
            if params.get("verbose", False):
                print(f"Saltando archivo ya procesado: {video}")
            continue

        video_path = os.path.join(folder_path, video)

        # Obtener metadatos del video (necesita get_video_properties_dic)
        properties = get_video_properties_dic({"file_path": video_path})

        # Transcribir
        transcription = ""
        try:
            if params.get("verbose", False):
                print(f"\nProcesando: {video}")
            result = model.transcribe(video_path, language='es')
            transcription = result["text"]
        except Exception as e:
            print(f"Error transcribiendo {video}: {e}")
            transcription = ""

        # Trocear en bloques de 50k
        transcription_parts = [
            transcription[i : i + max_chars]
            for i in range(0, len(transcription), max_chars)
        ]

        # Asignar un máximo de 10 partes
        if len(transcription_parts) > 10:
            # Concatenar el resto en la última parte
            sobrante = "".join(transcription_parts[10:])
            transcription_parts[9] += sobrante
            transcription_parts = transcription_parts[:10]

        # Preparar lista de 10 partes (rellenando con "")
        transcription_parts += [""] * (10 - len(transcription_parts))

        # Construir la fila en el orden de HEADERS
        row_values = [
            properties.get("file_name"),
            properties.get("file_path"),
            properties.get("file_size_mb"),
            properties.get("duration_s"),
            properties.get("video_codec"),
            properties.get("audio_codec"),
            properties.get("resolution"),
            properties.get("audio_sample_rate"),
            datetime.now().strftime("%Y-%m-%d %H:%M:%S"),   # transcription_date
        ] + transcription_parts  # las 10 columnas

        # Insertar la fila en la hoja
        reintentos = 3
        for intento in range(reintentos):
            try:
                sheet.append_row(row_values, value_input_option="USER_ENTERED")
                if params.get("verbose", False):
                    print(f"Actualizado en Google Sheets: {video}")
                break
            except gspread.exceptions.APIError as e:
                if intento < reintentos - 1:
                    print(f"Error al actualizar Google Sheets. Reintentando ({intento + 1}/{reintentos})...")
                    time.sleep(5)
                else:
                    print(f"Error persistente al actualizar Google Sheets: {e}")

    print(f"\nProceso completado. Hoja de cálculo disponible en: {spreadsheet.url}")


In [None]:
#@title Configuración
import os
import time
import ffmpeg
import pandas as pd
from whisper import load_model

# Diccionario de configuración
folder_path = r"/content/drive/Shareddrives/AREA TIC COMPARTIDO USUARIOS EXTERNOS/ERP v6.0 - PROYECTO G-F (AYESA)/50 - BC/10 - DAF/Videos Sesiones DAF/" #@param {type:"string"}
model_size = "medium" #@param ["tiny", "base", "small", "medium", "large"]
sheet_id = "18esoAyRzemkA3GLVTDV6k45U5KqQzkrULYt3sE99KiQ" # o None
param = {
    "folder_path": folder_path,
    "model_size": model_size,
    "sheet_id": sheet_id,  # ID de la hoja de Google Sheets,
    "verbose": True  # Activar detalles adicionales
}


# Ejecutar el proceso orquestado
orchestrate_transcription_to_GSpread(param)


Estado de los vídeos:


Unnamed: 0,file_name,status,transcription_date
0,03 - Gestión Financiera 2-20240607_100241.mp4,No transcrito,
1,03 - Gestión Financiera 2 20240607.mp4,No transcrito,
2,01 - Ventas y Cobros-20240604_120138 - v1.0 20...,No transcrito,
3,01 - Ventas y Cobros-20240604_100548 - v1.0 20...,No transcrito,
4,07 - Sesión análisis Tesorería 2-20240618_...,No transcrito,
5,06 - Sesión análisis Tesoreria 1-20240617_1...,No transcrito,
6,05 - Compras y pagos 2-20240612_113249.mp4,No transcrito,
7,05 - Compras y pagos 2-20240612_100906.mp4,No transcrito,
8,04 - Compras y pagos-20240611_115156.mp4,No transcrito,
9,04 - Compras y pagos-20240611_105212.mp4,No transcrito,


100%|█████████████████████████████████████| 1.42G/1.42G [00:15<00:00, 95.8MiB/s]
  checkpoint = torch.load(fp, map_location=device)



Modelo 'medium' cargado correctamente.

Procesando: 03 - Gestión Financiera 2-20240607_100241.mp4
Actualizado en Google Sheets: 03 - Gestión Financiera 2-20240607_100241.mp4

Procesando: 03 - Gestión Financiera 2 20240607.mp4
Actualizado en Google Sheets: 03 - Gestión Financiera 2 20240607.mp4

Procesando: 01 - Ventas y Cobros-20240604_120138 - v1.0 20240624.mp4
Actualizado en Google Sheets: 01 - Ventas y Cobros-20240604_120138 - v1.0 20240624.mp4

Procesando: 01 - Ventas y Cobros-20240604_100548 - v1.0 20240604.mp4
Actualizado en Google Sheets: 01 - Ventas y Cobros-20240604_100548 - v1.0 20240604.mp4

Procesando: 07 - Sesión análisis Tesorería  2-20240618_100526.mp4
Actualizado en Google Sheets: 07 - Sesión análisis Tesorería  2-20240618_100526.mp4

Procesando: 06 - Sesión análisis Tesoreria  1-20240617_100524.mp4
Actualizado en Google Sheets: 06 - Sesión análisis Tesoreria  1-20240617_100524.mp4

Procesando: 05 - Compras y pagos 2-20240612_113249.mp4
Actualizado en Goo

In [None]:
# @title video_whisper_transcribe_to()
def video_whisper_transcribe_to(params: dict) -> None:
    """
    Orquesta la transcripción de archivos de video en una carpeta utilizando el modelo Whisper y actualiza
    una hoja de Google Sheets con los metadatos técnicos y la transcripción dividida en hasta 10 partes.

    Args:
        params (dict):
            - folder_path (str): Ruta de la carpeta que contiene los videos. [OBLIGATORIO]
            - model_size (str): Tamaño del modelo Whisper ("tiny", "base", "small", "medium", "large"). Default: "medium".
            - sheet_id (str | None): ID de la hoja de Google Sheets; si no se proporciona, se creará una nueva.
            - verbose (bool): Indica si se muestran mensajes detallados. Default: False.
            - ini_environment_identificated (str): Identificador del entorno. Opciones: "LOCAL", "COLAB", "COLAB_ENTERPRISE" o un project_id.
            - json_keyfile_local (str): (Requerido en entorno LOCAL) Ruta al archivo JSON de credenciales.
            - json_keyfile_colab (str): (Requerido en entornos COLAB) Ruta al archivo JSON de credenciales.
            - json_keyfile_GCP_secret_id (str): (Requerido en entornos GCP) Secret ID del JSON de credenciales en Secret Manager.

    Returns:
        None

    Raises:
        ValueError: Si faltan parámetros obligatorios o se produce un error durante la autenticación.
    """
    import os
    import json
    import time
    import ffmpeg
    from datetime import datetime
    from google.oauth2.service_account import Credentials

    # ────────────────────────────── VALIDACIÓN DE PARÁMETROS ──────────────────────────────
    if not params.get("folder_path"):
        raise ValueError("[VALIDATION [ERROR ❌]] Falta el parámetro 'folder_path' en params.")
    folder_path_str = params.get("folder_path")
    if not os.path.exists(folder_path_str):
        raise ValueError(f"[VALIDATION [ERROR ❌]] La ruta indicada no existe: {folder_path_str}")
    model_size_str = params.get("model_size", "medium")
    verbose_bool = params.get("verbose", False)

    print("[START ▶️] Inicio del proceso de transcripción de videos.", flush=True)

    # ────────────────────────────── AUTENTICACIÓN Y ACCESO A GOOGLE SHEETS ──────────────────────────────
    env_str = params.get("ini_environment_identificated")
    if not env_str:
        raise ValueError("[VALIDATION [ERROR ❌]] Falta el parámetro 'ini_environment_identificated' en params.")

    if env_str == "LOCAL":
        json_keyfile_local_str = params.get("json_keyfile_local")
        if not json_keyfile_local_str:
            raise ValueError("[VALIDATION [ERROR ❌]] En entorno LOCAL se debe proporcionar 'json_keyfile_local' en params.")
        print("[AUTHENTICATION [START ▶️]] Iniciando autenticación en entorno LOCAL mediante JSON de credenciales...", flush=True)
        try:
            creds = Credentials.from_service_account_file(json_keyfile_local_str)
            print("[AUTHENTICATION [SUCCESS ✅]] Autenticación en entorno LOCAL completada.", flush=True)
        except Exception as error_local:
            raise ValueError(f"[AUTHENTICATION [ERROR ❌]] Error durante la autenticación en entorno LOCAL: {error_local}")
    elif env_str == "COLAB":
        json_keyfile_colab_str = params.get("json_keyfile_colab")
        if not json_keyfile_colab_str:
            raise ValueError("[VALIDATION [ERROR ❌]] En entornos COLAB se debe proporcionar 'json_keyfile_colab' en params.")
        print("[AUTHENTICATION [START ▶️]] Iniciando autenticación en entorno COLAB mediante JSON de credenciales...", flush=True)
        try:
            creds = Credentials.from_service_account_file(json_keyfile_colab_str)
            print("[AUTHENTICATION [SUCCESS ✅]] Autenticación en entorno COLAB completada.", flush=True)
        except Exception as error_colab:
            raise ValueError(f"[AUTHENTICATION [ERROR ❌]] Error durante la autenticación en COLAB: {error_colab}")
    elif env_str == "COLAB_ENTERPRISE" or (env_str not in ["LOCAL", "COLAB"]):
        # Para COLAB_ENTERPRISE o cuando se pasa un project_id directamente
        if env_str == "COLAB_ENTERPRISE":
            project_id_env = os.environ.get("GOOGLE_CLOUD_PROJECT")
            if not project_id_env:
                raise ValueError("[VALIDATION [ERROR ❌]] No se encontró la variable de entorno 'GOOGLE_CLOUD_PROJECT'.")
        else:
            project_id_env = env_str
        json_keyfile_GCP_secret_id_str = params.get("json_keyfile_GCP_secret_id")
        if not json_keyfile_GCP_secret_id_str:
            raise ValueError("[VALIDATION [ERROR ❌]] En entornos GCP se debe proporcionar 'json_keyfile_GCP_secret_id' en params.")
        print("[AUTHENTICATION [START ▶️]] Iniciando autenticación en entorno GCP mediante Secret Manager...", flush=True)
        try:
            from google.cloud import secretmanager
            client_sm = secretmanager.SecretManagerServiceClient()
            secret_name = f"projects/{project_id_env}/secrets/{json_keyfile_GCP_secret_id_str}/versions/latest"
            response = client_sm.access_secret_version(name=secret_name)
            secret_string = response.payload.data.decode("UTF-8")
            secret_info = json.loads(secret_string)
            creds = Credentials.from_service_account_info(secret_info)
            print(f"[AUTHENTICATION [SUCCESS ✅]] Autenticación en entorno GCP completada. (Secret Manager: {json_keyfile_GCP_secret_id_str})", flush=True)
        except Exception as error_gcp:
            raise ValueError(f"[AUTHENTICATION [ERROR ❌]] Error durante la autenticación en GCP: {error_gcp}")

    try:
        import gspread
        gspread_client = gspread.authorize(creds)
    except Exception as error_gs:
        raise Exception(f"[AUTHENTICATION [ERROR ❌]] Error al autorizar Google Sheets: {error_gs}")

    # ────────────────────────────── SUBFUNCIÓN: Obtener propiedades del video ──────────────────────────────
    def _get_video_properties_dic(file_path_str: str) -> dict:
        """
        Extrae las propiedades técnicas del video.

        Args:
            file_path_str (str): Ruta completa del archivo de video.

        Returns:
            dict: Diccionario con metadatos técnicos.
        """
        try:
            print(f"[EXTRACTION [START ▶️]] Extrayendo metadatos de: {os.path.basename(file_path_str)}", flush=True)
            probe_dic = ffmpeg.probe(file_path_str)
            video_stream_dic = next((stream for stream in probe_dic['streams'] if stream['codec_type'] == 'video'), None)
            audio_stream_dic = next((stream for stream in probe_dic['streams'] if stream['codec_type'] == 'audio'), None)
            properties_dic = {
                "file_name": os.path.basename(file_path_str),
                "file_path": file_path_str,
                "file_size_mb": float(probe_dic['format']['size']) / (1024 * 1024),
                "duration_s": float(probe_dic['format']['duration']),
                "video_codec": video_stream_dic['codec_name'] if video_stream_dic else None,
                "audio_codec": audio_stream_dic['codec_name'] if audio_stream_dic else None,
                "resolution": f"{video_stream_dic['width']}x{video_stream_dic['height']}" if video_stream_dic else None,
                "audio_sample_rate": int(audio_stream_dic['sample_rate']) if audio_stream_dic else None,
            }
            print("[EXTRACTION [SUCCESS ✅]] Metadatos extraídos correctamente.", flush=True)
            return properties_dic
        except Exception as error_meta:
            print(f"[EXTRACTION [ERROR ❌]] Error al extraer metadatos: {error_meta}", flush=True)
            return {
                "file_name": os.path.basename(file_path_str),
                "file_path": file_path_str,
                "file_size_mb": None,
                "duration_s": None,
                "video_codec": None,
                "audio_codec": None,
                "resolution": None,
                "audio_sample_rate": None,
                "error": str(error_meta)
            }

    # ────────────────────────────── APERTURA O CREACIÓN DE LA HOJA DE CÁLCULO ──────────────────────────────
    sheet_id_str = params.get("sheet_id")
    try:
        if sheet_id_str:
            spreadsheet = gspread_client.open_by_key(sheet_id_str)
            print(f"[LOAD [INFO ℹ️]] Hoja encontrada con ID: {sheet_id_str}", flush=True)
        else:
            folder_name_str = os.path.basename(folder_path_str)
            sheet_name_str = "Transcripciones de Videos"
            spreadsheet = gspread_client.create(f"{folder_name_str} - {sheet_name_str}")
            print(f"[LOAD [INFO ℹ️]] Hoja creada: {spreadsheet.title}", flush=True)
    except Exception as error_sheet:
        raise Exception(f"[LOAD [ERROR ❌]] Error al acceder/crear la hoja de cálculo: {error_sheet}")

    sheet = spreadsheet.sheet1

    # ────────────────────────────── CONFIGURACIÓN DE CABECERAS ──────────────────────────────
    HEADERS_list = [
        "file_name", "file_path", "file_size_mb", "duration_s",
        "video_codec", "audio_codec", "resolution", "audio_sample_rate",
        "transcription_date",
        "transcription_part_1", "transcription_part_2", "transcription_part_3",
        "transcription_part_4", "transcription_part_5", "transcription_part_6",
        "transcription_part_7", "transcription_part_8", "transcription_part_9",
        "transcription_part_10"
    ]
    try:
        existing_values_list = sheet.get_all_values()
        if existing_values_list:
            current_headers_list = existing_values_list[0]
            if ("file_name" not in current_headers_list) or (len(current_headers_list) != len(HEADERS_list)):
                sheet.clear()
                sheet.update("A1", [HEADERS_list])
                print("[LOAD [INFO ℹ️]] Cabecera actualizada en la hoja.", flush=True)
        else:
            sheet.update("A1", [HEADERS_list])
            print("[LOAD [INFO ℹ️]] Cabecera inicializada en la hoja.", flush=True)
    except Exception as error_headers:
        print(f"[LOAD [ERROR ❌]] Error al configurar la cabecera: {error_headers}", flush=True)

    # ────────────────────────────── LECTURA DE REGISTROS EXISTENTES ──────────────────────────────
    all_rows_list = sheet.get_all_records()
    processed_files_dic = {
        row.get("file_name"): row.get("transcription_date", "N/A")
        for row in all_rows_list if row.get("file_name") is not None
    }

    # ────────────────────────────── LISTADO DE ARCHIVOS DE VIDEO ──────────────────────────────
    video_files_list = [
        file_name for file_name in os.listdir(folder_path_str)
        if file_name.lower().endswith((".mp4", ".mkv", ".avi", ".mov"))
    ]
    print(f"[EXTRACTION [INFO ℹ️]] Se encontraron {len(video_files_list)} archivos de video.", flush=True)

    try:
        import pandas as pd
        status_data_list = []
        for video_file_str in video_files_list:
            status_str = "Transcrito" if video_file_str in processed_files_dic else "No transcrito"
            date_str = processed_files_dic.get(video_file_str, "N/A")
            status_data_list.append({"file_name": video_file_str, "status": status_str, "transcription_date": date_str})
        status_df = pd.DataFrame(status_data_list)
        print("\n[METRICS [INFO ℹ️]] Estado de los vídeos:", flush=True)
        display(status_df)
    except Exception as error_df:
        print(f"[METRICS [WARNING ⚠️]] No se pudo mostrar el DataFrame de estado: {error_df}", flush=True)

    # ────────────────────────────── CARGA DEL MODELO WHISPER ──────────────────────────────
    try:
        print(f"[LOAD [START ▶️]] Cargando modelo Whisper '{model_size_str}'...", flush=True)
        from whisper import load_model
        model = load_model(model_size_str)
        print(f"[LOAD [SUCCESS ✅]] Modelo '{model_size_str}' cargado correctamente.", flush=True)
    except Exception as error_model:
        raise Exception(f"[LOAD [ERROR ❌]] Error al cargar el modelo Whisper: {error_model}")

    max_chars_int = 50000  # Máximo de caracteres por bloque

    # ────────────────────────────── PROCESAMIENTO DE CADA VIDEO ──────────────────────────────
    for video_file_str in video_files_list:
        if video_file_str in processed_files_dic:
            if verbose_bool:
                print(f"[TRANSFORMATION [INFO ℹ️]] Saltando archivo ya procesado: {video_file_str}", flush=True)
            continue

        video_path_str = os.path.join(folder_path_str, video_file_str)
        properties_dic = _get_video_properties_dic(video_path_str)

        # ────────────────────────────── TRANSCRIPCIÓN DEL VIDEO ──────────────────────────────
        transcription_str = ""
        try:
            print(f"[TRANSFORMATION [START ▶️]] Transcribiendo: {video_file_str}", flush=True)
            result_dic = model.transcribe(video_path_str, language='es')
            transcription_str = result_dic.get("text", "")
            print(f"[TRANSFORMATION [SUCCESS ✅]] Transcripción completada: {video_file_str}", flush=True)
        except Exception as error_trans:
            print(f"[TRANSFORMATION [ERROR ❌]] Error transcribiendo {video_file_str}: {error_trans}", flush=True)
            transcription_str = ""

        # Troceo de la transcripción en bloques de 50k caracteres, limitado a 10 partes
        transcription_parts_list = [
            transcription_str[i : i + max_chars_int]
            for i in range(0, len(transcription_str), max_chars_int)
        ]
        if len(transcription_parts_list) > 10:
            sobrante_str = "".join(transcription_parts_list[10:])
            transcription_parts_list[9] += sobrante_str
            transcription_parts_list = transcription_parts_list[:10]
        transcription_parts_list += [""] * (10 - len(transcription_parts_list))

        # Construcción de la fila para la hoja de cálculo
        row_values_list = [
            properties_dic.get("file_name"),
            properties_dic.get("file_path"),
            properties_dic.get("file_size_mb"),
            properties_dic.get("duration_s"),
            properties_dic.get("video_codec"),
            properties_dic.get("audio_codec"),
            properties_dic.get("resolution"),
            properties_dic.get("audio_sample_rate"),
            datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        ] + transcription_parts_list

        # ────────────────────────────── ACTUALIZACIÓN EN GOOGLE SHEETS ──────────────────────────────
        reintentos_int = 3
        for intento_int in range(reintentos_int):
            try:
                sheet.append_row(row_values_list, value_input_option="USER_ENTERED")
                if verbose_bool:
                    print(f"[LOAD [SUCCESS ✅]] Actualizado en Google Sheets: {video_file_str}", flush=True)
                break
            except Exception as error_append:
                if intento_int < reintentos_int - 1:
                    print(f"[LOAD [WARNING ⚠️]] Error al actualizar Google Sheets. Reintentando ({intento_int + 1}/{reintentos_int})...", flush=True)
                    time.sleep(5)
                else:
                    print(f"[LOAD [ERROR ❌]] Error persistente al actualizar Google Sheets: {error_append}", flush=True)

    print(f"\n[END [FINISHED ✅]] Proceso completado. Hoja de cálculo disponible en: {spreadsheet.url}", flush=True)


In [None]:
# @title TRANSCRIBE A SPREADSHEET
params_colab = {
    "folder_path": "/content/drive/Shareddrives/AREA ACADEMICO/MOCADI/Vídeo Correcciones  /Animación de Personajes 3D - Aca 1 - L06/NIVEL BAJO",
    "model_size": "small",
    "sheet_id": "1kr0WWfXQmiluErKkA0SUP7e9IYXqhKGTznazvsdfRbA",                         # ID de la hoja de Google
    "verbose": True,

    "ini_environment_identificated": ini_environment_identificated,
    "json_keyfile_local": GCP_json_keyfile_local,
    "json_keyfile_colab": GCP_json_keyfile_colab,
    "json_keyfile_GCP_secret_id": GCP_json_keyfile_GCP_secret_id,
}

video_whisper_transcribe_to(params_colab)

[START ▶️] Inicio del proceso de transcripción de videos.
[AUTHENTICATION [START ▶️]] Iniciando autenticación en entorno COLAB mediante JSON de credenciales...
[AUTHENTICATION [SUCCESS ✅]] Autenticación en entorno COLAB completada.


Exception: [LOAD [ERROR ❌]] Error al acceder/crear la hoja de cálculo: ('invalid_scope: Invalid OAuth scope or ID token audience provided.', {'error': 'invalid_scope', 'error_description': 'Invalid OAuth scope or ID token audience provided.'})

In [None]:
# @title scrap_videofiles_local_dir_to_gspread()
def scrap_videofiles_local_dir_to_gspread(params: dict) -> None:
    """
    Busca archivos de video en una carpeta, ya sea en un sistema de archivos local o en Google Drive,
    extrae sus propiedades técnicas y envía los resultados a una hoja de Google Sheets.

    La estrategia de búsqueda se selecciona según el valor de 'ini_environment_identificated' y la existencia
    de la ruta en el sistema de archivos. Si la ruta existe localmente, se realiza un escaneo con os.walk;
    de lo contrario, se asume que se trata de una ruta de Google Drive y se utiliza la API de Drive para listar archivos.

    Args:
        params (dict):
            - video_files_root_path (str): Ruta del directorio raíz o URL del folder de Google Drive.
            - video_files_target_search_folder (list): Lista de subcarpetas de interés dentro del directorio raíz.
            - video_files_target_search_extension (list): Lista de extensiones de archivo a buscar (ej.: [".mp4"]).
            - ini_environment_identificated (str): Identificador del entorno. Ej.: "LOCAL", "COLAB", "COLAB_ENTERPRISE", etc.
            - json_keyfile_local (str): Ruta al archivo JSON de credenciales para entornos LOCAL.
            - json_keyfile_colab (str): Ruta al archivo JSON de credenciales para entornos COLAB.
            - json_keyfile_GCP_secret_id (str): Secret ID del JSON de credenciales en Secret Manager (para entornos GCP).
            - destination_files_path_table_spreadsheet_url (str): URL de la hoja de Google Sheets destino.
            - destination_files_path_table_spreadsheet_worksheet (str): Nombre de la pestaña en la hoja destino.

    Returns:
        None

    Raises:
        ValueError: Si falta algún parámetro obligatorio o no se encuentran archivos.
        Exception: Si ocurre un error al interactuar con Google Sheets.
    """
    import os
    import re
    import subprocess
    import json
    import pandas as pd
    from datetime import datetime
    from time import time
    from dpm_google import gspread_initialize_client, gspread_df_to_sheet
    from googleapiclient.discovery import build  # Para Drive API

    print("\n[START ▶️] Inicio del proceso de scrap de videos a Google Sheets.\n", flush=True)
    start_time_float = time()

    # ────────────────────────────── VALIDACIÓN DE PARÁMETROS ──────────────────────────────
    video_files_root_path_str = params.get('video_files_root_path')
    target_folders_list = params.get('video_files_target_search_folder', [])
    file_exts_list = params.get('video_files_target_search_extension', [])
    ini_env_str = params.get("ini_environment_identificated")
    json_keyfile_local_str = params.get("json_keyfile_local")
    json_keyfile_colab_str = params.get("json_keyfile_colab")
    json_keyfile_GCP_secret_id_str = params.get("json_keyfile_GCP_secret_id")
    spreadsheet_url_str = params.get('destination_files_path_table_spreadsheet_url')
    worksheet_name_str = params.get('destination_files_path_table_spreadsheet_worksheet')

    if not video_files_root_path_str:
        raise ValueError("[VALIDATION [ERROR ❌]] El parámetro 'video_files_root_path' es obligatorio.")
    if not file_exts_list:
        raise ValueError("[VALIDATION [ERROR ❌]] El parámetro 'video_files_target_search_extension' es obligatorio y debe contener al menos una extensión.")
    if not ini_env_str:
        raise ValueError("[VALIDATION [ERROR ❌]] El parámetro 'ini_environment_identificated' es obligatorio.")
    if not json_keyfile_local_str:
        raise ValueError("[VALIDATION [ERROR ❌]] El parámetro 'json_keyfile_local' es obligatorio.")
    if not json_keyfile_colab_str:
        raise ValueError("[VALIDATION [ERROR ❌]] El parámetro 'json_keyfile_colab' es obligatorio.")
    if not json_keyfile_GCP_secret_id_str:
        raise ValueError("[VALIDATION [ERROR ❌]] El parámetro 'json_keyfile_GCP_secret_id' es obligatorio.")
    if not spreadsheet_url_str:
        raise ValueError("[VALIDATION [ERROR ❌]] El parámetro 'destination_files_path_table_spreadsheet_url' es obligatorio.")
    if not worksheet_name_str:
        raise ValueError("[VALIDATION [ERROR ❌]] El parámetro 'destination_files_path_table_spreadsheet_worksheet' es obligatorio.")

    print("[INFO ℹ️] Parámetros validados correctamente.\n", flush=True)

    # ────────────────────────────── SELECCIÓN DE MÉTODO DE BÚSQUEDA ──────────────────────────────
    # Si la ruta existe localmente, se utiliza os.walk. De lo contrario, se asume que es una ruta de Google Drive.
    def _find_files_in_folders(root_str: str, folders_list: list, exts_list: list) -> pd.DataFrame:
        results_list = []
        for dirpath, dirnames, filenames in os.walk(root_str):
            current_folder_str = os.path.basename(dirpath)
            if folders_list and current_folder_str not in folders_list:
                continue
            for file_str in filenames:
                _, file_ext_str = os.path.splitext(file_str)
                if file_ext_str.lower() in [ext.lower() for ext in exts_list]:
                    file_path_str = os.path.join(dirpath, file_str)
                    results_list.append({
                        "video_file_path": file_path_str,
                        "video_file_name": file_str
                    })
                    print(f"[INFO ℹ️] ➤ Archivo encontrado: {file_str} (Ruta: {file_path_str})", flush=True)
        if not results_list:
            print("[WARNING ⚠️] No se encontraron archivos que coincidan con los criterios especificados.\n", flush=True)
            return pd.DataFrame()
        return pd.DataFrame(results_list)

    def _find_files_in_drive(folder_url: str, exts_list: list, creds) -> pd.DataFrame:
        # Se asume que folder_url es la URL de la carpeta en Drive; se extrae el ID.
        m = re.search(r'/d/([a-zA-Z0-9_-]+)', folder_url)
        folder_id = m.group(1) if m else folder_url
        drive_service = build('drive', 'v3', credentials=creds)
        query = f"'{folder_id}' in parents and trashed = false"
        results = drive_service.files().list(q=query, fields="files(id, name)").execute()
        files = results.get('files', [])
        results_list = []
        for f in files:
            name = f.get('name', '')
            _, file_ext_str = os.path.splitext(name)
            if file_ext_str.lower() in [ext.lower() for ext in exts_list]:
                results_list.append({
                    "video_file_path": f.get('id'),  # Para Drive se usa el ID del archivo
                    "video_file_name": name
                })
                print(f"[INFO ℹ️] ➤ Archivo encontrado: {name} (Drive ID: {f.get('id')})", flush=True)
        if not results_list:
            print("[WARNING ⚠️] No se encontraron archivos en Drive que coincidan con los criterios.", flush=True)
            return pd.DataFrame()
        return pd.DataFrame(results_list)

    # Se requiere autenticar previamente para usar tanto Sheets como Drive.
    # Aquí reutilizamos el cliente de Sheets (gspread) que se inicializará más adelante para obtener las credenciales.
    # Para el listado en Drive, se crearán a partir del mismo método de autenticación en gspread_initialize_client.
    drive_scan_required = not os.path.exists(video_files_root_path_str)
    if drive_scan_required:
        print(f"[INFO ℹ️] La ruta '{video_files_root_path_str}' no se encontró localmente; se asume que es una ruta de Google Drive.\n", flush=True)
    else:
        print(f"[INFO ℹ️] Se encontró la ruta local '{video_files_root_path_str}'. Usando búsqueda local.\n", flush=True)

    # ────────────────────────────── AUTENTICACIÓN PARA GOOGLE SHEETS Y DRIVE ──────────────────────────────
    # Se utiliza la estrategia de autenticación definida por las nuevas keys.
    client = gspread_initialize_client({
        "ini_environment_identificated": ini_env_str,
        "json_keyfile_local": json_keyfile_local_str,
        "json_keyfile_colab": json_keyfile_colab_str,
        "json_keyfile_GCP_secret_id": json_keyfile_GCP_secret_id_str
    })
    if not client:
        raise Exception("[AUTHENTICATION [ERROR ❌]] No se pudo autenticar con Google Sheets.")
    print("[SUCCESS ✅] Cliente autenticado correctamente.\n", flush=True)
    # Extraer las credenciales del cliente para usarlas con la API de Drive.
    creds = client.auth.token  if hasattr(client, "auth") else None

    # ────────────────────────────── OBTENCIÓN DE ARCHIVOS ──────────────────────────────
    if drive_scan_required:
        df_paths = _find_files_in_drive(video_files_root_path_str, file_exts_list, client.auth.credentials)
    else:
        df_paths = _find_files_in_folders(video_files_root_path_str, target_folders_list, file_exts_list)

    if df_paths.empty:
        raise ValueError("[VALIDATION [ERROR ❌]] No se encontraron archivos que coincidan con los criterios especificados.")
    print(f"[INFO ℹ️] Total de archivos encontrados: {len(df_paths)}\n", flush=True)

    # ────────────────────────────── EXTRACCIÓN DE PROPIEDADES DE VIDEO ──────────────────────────────
    def _extract_video_properties(file_path_str: str) -> dict:
        try:
            # En Drive, dado que solo tenemos el ID, se requeriría descargar el archivo; aquí se asume que
            # los archivos locales se procesan con ffprobe. Para Drive, se debería implementar la descarga previa.
            if drive_scan_required:
                raise NotImplementedError("La extracción de propiedades para archivos en Drive requiere descarga previa.")
            file_size_int = os.path.getsize(file_path_str) // (1024 * 1024)
            result = subprocess.run([
                'ffprobe', '-v', 'error', '-print_format', 'json', '-show_streams', '-show_format', file_path_str
            ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
            info_dic = json.loads(result.stdout)
            video_codec_str = audio_codec_str = None
            video_bitrate_int = audio_bitrate_int = 0
            video_width_int = video_height_int = None
            video_fps_float = None
            audio_channels_int = audio_sample_rate_int = None
            duration_float = 0.0

            if 'streams' in info_dic:
                for stream in info_dic['streams']:
                    if stream.get('codec_type') == 'video':
                        video_codec_str = stream.get('codec_name')
                        video_bitrate_int = int(stream.get('bit_rate', 0)) // 1000
                        video_width_int = stream.get('width')
                        video_height_int = stream.get('height')
                        if 'r_frame_rate' in stream:
                            num_int, den_int = map(int, stream['r_frame_rate'].split('/'))
                            video_fps_float = num_int / den_int if den_int != 0 else None
                    elif stream.get('codec_type') == 'audio':
                        audio_codec_str = stream.get('codec_name')
                        audio_bitrate_int = int(stream.get('bit_rate', 0)) // 1000
                        audio_channels_int = stream.get('channels')
                        audio_sample_rate_int = int(stream.get('sample_rate', 0))
            if 'format' in info_dic:
                duration_float = float(info_dic['format'].get('duration', 0))
                duration_ms_int = int(duration_float * 1000)
                duration_hms_str = "{:02d}:{:02d}:{:02d}".format(
                    int(duration_float) // 3600, (int(duration_float) % 3600) // 60, int(duration_float) % 60
                )
            else:
                duration_ms_int = 0
                duration_hms_str = "00:00:00"

            return {
                "file_name": os.path.basename(file_path_str),
                "file_path": file_path_str,
                "file_creation_date": datetime.fromtimestamp(os.path.getctime(file_path_str)).strftime('%Y-%m-%d %H:%M:%S'),
                "file_last_modified_date": datetime.fromtimestamp(os.path.getmtime(file_path_str)).strftime('%Y-%m-%d %H:%M:%S'),
                "file_scrap_date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                "file_size_mb": file_size_int,
                "duration_hms": duration_hms_str,
                "duration_ms": duration_ms_int,
                "video_codec": video_codec_str,
                "video_bitrate_kbps": video_bitrate_int,
                "video_fps": video_fps_float,
                "video_resolution": f"{video_width_int}x{video_height_int}" if video_width_int and video_height_int else None,
                "audio_codec": audio_codec_str,
                "audio_bitrate_kbps": audio_bitrate_int,
                "audio_channels": audio_channels_int,
                "audio_sample_rate_hz": audio_sample_rate_int,
            }
        except Exception as error_prop:
            print(f"[EXTRACTION [ERROR ❌]] Error al obtener propiedades del video: {error_prop}", flush=True)
            return {
                "file_name": os.path.basename(file_path_str),
                "file_path": file_path_str,
                "file_creation_date": "unknown",
                "file_last_modified_date": "unknown",
                "file_scrap_date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                "file_size_mb": 0,
                "duration_hms": "00:00:00",
                "duration_ms": 0,
                "video_codec": "unknown",
                "video_bitrate_kbps": 0,
                "video_fps": 0,
                "video_resolution": "unknown",
                "audio_codec": "unknown",
                "audio_bitrate_kbps": 0,
                "audio_channels": 0,
                "audio_sample_rate_hz": 0,
            }

    print("[START ▶️] Extrayendo propiedades de los videos...\n", flush=True)
    df_video_props = df_paths['video_file_path'].apply(_extract_video_properties).apply(pd.Series)
    print("[SUCCESS ✅] Propiedades extraídas correctamente.\n", flush=True)

    df_paths_properties = df_video_props[[
        "file_name",
        "file_path",
        "file_creation_date",
        "file_last_modified_date",
        "file_scrap_date",
        "file_size_mb",
        "duration_hms",
        "duration_ms",
        "video_codec",
        "video_bitrate_kbps",
        "video_fps",
        "video_resolution",
        "audio_codec",
        "audio_bitrate_kbps",
        "audio_channels",
        "audio_sample_rate_hz",
    ]]

    # ────────────────────────────── RESPALDO LOCAL ──────────────────────────────
    backup_csv_path_str = "video_files_backup.csv"
    df_paths_properties.to_csv(backup_csv_path_str, index=False)
    print(f"[INFO ℹ️] Datos respaldados localmente en: {backup_csv_path_str}\n", flush=True)

    # ────────────────────────────── ESTADÍSTICAS ──────────────────────────────
    total_videos_int = len(df_paths_properties)
    print(f"[METRICS [INFO ℹ️]] Número total de videos procesados: {total_videos_int}", flush=True)
    process_duration_float = time() - start_time_float
    print(f"[METRICS [INFO ℹ️]] Duración total del proceso: {process_duration_float:.2f} segundos\n", flush=True)

    # ────────────────────────────── ENVÍO A GOOGLE SHEETS ──────────────────────────────
    print("[START ▶️] Enviando datos a Google Sheets...\n", flush=True)
    gspread_df_to_sheet({
        'client': client,
        'spreadsheet_url': spreadsheet_url_str,
        'worksheet_name': worksheet_name_str,
        'df': df_paths_properties,
        'row_filter_list': list(range(len(df_paths_properties))),
        'col_filter_list': '',
        'include_header': True
    })
    print("[SUCCESS ✅] Datos enviados exitosamente.\n", flush=True)

    os.remove(backup_csv_path_str)
    print(f"[INFO ℹ️] Respaldo local eliminado: {backup_csv_path_str}\n", flush=True)
    print("[END [FINISHED ✅]] Proceso completado.\n", flush=True)


In [None]:
# @title PATH FILES A SPREADSHEET
# Ejemplo de uso para scrap_videofiles_local_dir_to_gspread()

params_example = {
    "video_files_root_path": "/content/drive/MyDrive/Videos",
       # Puede ser una ruta local (ej.: "C:/videos") o una URL de carpeta de Google Drive (ej.: "https://drive.google.com/drive/folders/ID_FOLDER")
    "video_files_target_search_folder": ["Subcarpeta1", "Subcarpeta2"],  # Subcarpetas de interés (opcional)
    "video_files_target_search_extension": [".mp4", ".mkv"],            # Extensiones de video a buscar

    "ini_environment_identificated": ini_environment_identificated,
       # Ej.: "LOCAL" para escanear localmente o "DRIVE" (u otro valor distinto a "LOCAL") para escanear Google Drive
    "json_keyfile_local": GCP_json_keyfile_local,                        # Ruta al JSON de credenciales para LOCAL
    "json_keyfile_colab": GCP_json_keyfile_colab,                        # Ruta al JSON de credenciales para COLAB
    "json_keyfile_GCP_secret_id": GCP_json_keyfile_GCP_secret_id,          # Secret ID para autenticación en GCP

    "destination_files_path_table_spreadsheet_url": "https://docs.google.com/spreadsheets/d/ID_HOJA_DESTINO",
    "destination_files_path_table_spreadsheet_worksheet": "Videos"       # Nombre de la pestaña destino
}

scrap_videofiles_local_dir_to_gspread(params_example)
