# Librerias y carga de datos

In [None]:
#!pip install --upgrade pip
#!pip install --upgrade git+https://github.com/huggingface/transformers
#!pip install --no-deps sentence-transformers
#!pip install ffmpeg-python pandas --quiet

In [None]:
from transformers import AudioFlamingo3ForConditionalGeneration, AutoProcessor
import os
import glob
import ffmpeg
import torch
import pandas as pd
import subprocess
from IPython.display import HTML
from base64 import b64encode
from google.colab import files
import kagglehub

In [None]:
path_1 = kagglehub.dataset_download("erikvdven/tiktok-trending-december-2020")

print("Path to dataset files:", path_1)



for root, dirs, files in os.walk(path_1):
    print("Carpeta:", root)
    for d in dirs:
        print("Subcarpeta:", d)
    for f in files:
        print("Archivo:", f)


os.listdir(path_1)

# Modelo nvidia/audio-flamingo-3-hf

In [None]:
video_dir = os.path.join(path_1, "videos")

audio_dir = "/kaggle/working/audio_temp" if os.path.exists("/kaggle/working") else "/content/audio_temp"
os.makedirs(audio_dir, exist_ok=True)

model_id = "nvidia/audio-flamingo-3-hf"
processor = AutoProcessor.from_pretrained(model_id)
model = AudioFlamingo3ForConditionalGeneration.from_pretrained(model_id, device_map="auto")


video_files = glob.glob(os.path.join(video_dir, "*.mp4"))[:1000]

def extract_audio_from_video(video_path, output_dir):
    filename = os.path.splitext(os.path.basename(video_path))[0]
    audio_path = os.path.join(output_dir, f"{filename}.wav")

    cmd = [
        "ffmpeg", "-y", "-i", video_path,
        "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
        audio_path
    ]
    subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    return audio_path

def transcribe_audio(audio_path):
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "Transcribe the input speech."},
                {"type": "audio", "path": audio_path},
            ],
        }
    ]

    inputs = processor.apply_chat_template(
        conversation,
        tokenize=True,
        add_generation_prompt=True,
        return_dict=True,
    ).to(model.device)

    outputs = model.generate(**inputs, max_new_tokens=400)
    decoded = processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:], skip_special_tokens=True)
    return decoded[0].strip()


def detect_music(audio_path):
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "Does this audio contain a song, singing, or background music? Answer 'yes' or 'no'."},
                {"type": "audio", "path": audio_path},
            ],
        }
    ]

    inputs = processor.apply_chat_template(
        conversation,
        tokenize=True,
        add_generation_prompt=True,
        return_dict=True,
    ).to(model.device)

    outputs = model.generate(**inputs, max_new_tokens=50)
    decoded = processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:], skip_special_tokens=True)
    return decoded[0].strip()

results = []

for video_path in video_files:
    try:
        print(f" Procesando: {os.path.basename(video_path)}")

        audio_path = extract_audio_from_video(video_path, audio_dir)

        text = transcribe_audio(audio_path)

        is_song = detect_music(audio_path)

        results.append({
            "video_name": os.path.basename(video_path),
            "transcription": text,
            "is_song": is_song
        })

        print(f" {os.path.basename(video_path)} | Música: {is_song}")

    except Exception as e:
        print(f" Error en {os.path.basename(video_path)}: {e}")


df = pd.DataFrame(results)
csv_path = os.path.join(audio_dir, "audio_analysis_results.csv")
df.to_csv(csv_path, index=False)

print(f"\n Resultados guardados en: {csv_path}")
display(df.head())


# Validación del video con su transcipción

In [None]:
def show_video(video_path, width=480):
    """Muestra un video MP4 dentro del notebook."""
    mp4 = open(video_path, 'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML(f"""
    <video width="{width}" controls>
        <source src="{data_url}" type="video/mp4">
    </video>
    """)

video_test = video_files[3]
show_video(video_test)

In [None]:
video_name = os.path.basename(video_test)

row = df[df["video_name"] == video_name]

if not row.empty:
    print(f" Video: {video_name}")
    print(" Transcripción:\n", row.iloc[0]["transcription"])
    print(" ¿Es canción?:", row.iloc[0]["is_song"])
else:
    print(" No se encontró información para este video.")


# Descarga de archivo CSV

In [None]:
csv_path = "/content/audio_temp/audio_analysis_results.csv"
files.download(csv_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>