# RodAI por nós, GPU

## Aqui
1. Visita a playlist do Roda Viva com os episódios na íntegra
2. Transcreve os novos episódios utilizando Whisper (necessário rodar em GPU)
3. Organiza a transcrição e retorna uma lista de strings em um banco de dados
4. Envia mensagem no Telegram confirmando a operação

## No git
5. Recupera as informações do vídeo no banco de dados
6. Processa as transcrições com LLM em busca de leads

## Bibliotecas

In [None]:
# Whisper (OpenAI)
!pip install git+https://github.com/openai/whisper.git
!pip install --upgrade git+https://github.com/huggingface/transformers.git accelerate
    
# Youtube Downloader
!pip install pytube

# pyMongo
!pip install pymongo

## Funções

In [None]:
# Inicializa uma lista de logs
logs = []

# Função para adicionar mensagens ao log
def loga_isso(mensagem, logs):
    print(mensagem)
    logs.append(mensagem)
    

# Retorna a URL do vídeo mais recente da playlist
def video_mais_recente(url_lista):
    from pytube import Playlist
    p = Playlist(url_lista)

    # Verificar se a lista de URLs de vídeos não está vazia antes de acessar
    if p.video_urls:
        ultima_url = p.video_urls[0]
        loga_isso(f'A URL mais recente da playlist {p.title} é {ultima_url}', logs)        
        return ultima_url
    else:
        loga_isso('A lista de vídeos está vazia', logs)

        return None


# Verifica se o vídeo já foi transcrito
def ja_foi_transcrito(url):
    pass


# Transforma o arquivo em áudio e retorna informações essenciais em um dicionário
def baixa_video(url):
    from pytube import YouTube
    import re # Utilizada para extrair o ID do vídeo

    match = re.search(r"(?<=v=)[^&#]+", url)
    id_video = match.group() if match else None

    yt = YouTube(url)
    titulo = yt.title
    duracao = yt.length
    data_publicacao = yt.publish_date
    stream = yt.streams.filter(only_audio=True)[0]
    caminho = stream.download(filename=f'{id_video}.mp3')
    infos = {'id' : id_video, 'url': url, 'titulo' : titulo, 'duracao_ss' : duracao, 'data_publicacao' : data_publicacao, 'caminho' : caminho}
    loga_isso(f'O áudio de \'{titulo}\' foi baixado com sucesso e pode ser encontrado em {caminho}.', logs)    
    
    return infos


# Converte timestamps para formato amigável
def segundos_para_hhmmss(inicio, fim):
    from datetime import timedelta
    # Verifica se 'fim' é None e, em caso afirmativo, usa o valor de 'inicio'
    fim = inicio if fim is None else fim

    # Assegura que tanto 'inicio' quanto 'fim' são numéricos
    inicio = inicio if isinstance(inicio, (int, float)) else 0
    fim = fim if isinstance(fim, (int, float)) else 0

    inicio_hhmmss = str(timedelta(seconds=inicio)).split(".")[0]
    fim_hhmmss = str(timedelta(seconds=fim)).split(".")[0]
    return (inicio_hhmmss, fim_hhmmss)


# Concatena os 'chunks' para reduzir a quantidade
def concatena_chunks(chunks):
    novos_chunks = []
    n = 8 # Número arbitrário calculado para resultar em chunks de aproximadamente 1 minuto
    for i in range(0, len(chunks), n):
        subset = chunks[i:i+n]
        texto_concatenado = ' '.join(chunk['text'] for chunk in subset)
        timestamp_inicial = subset[0]['timestamp'][0]
        timestamp_final = subset[-1]['timestamp'][1]
        novos_chunks.append({
            'timestamp': (timestamp_inicial, timestamp_final),
            'text': texto_concatenado
        })
    loga_isso(f'{len(chunks)} chunks concatenados em {len(novos_chunks)} novos chunks.', logs)
    return novos_chunks

# Converte os 'chunks' para timestamps amigáveis
def formata_chunks(chunks):
    chunks_convertidos = []
    for chunk in chunks:
        inicio, fim = chunk['timestamp']
        novo_timestamp = segundos_para_hhmmss(inicio, fim)
        chunks_convertidos.append({'timestamp': novo_timestamp, 'text': chunk['text']})
    loga_isso(f'{len(chunks_convertidos)} chunks convertidos para timestamps amigáveis.', logs)
    return chunks_convertidos

# Converte os chunks para uma lista de strings incluindo os timestamps amigáveis
def chunks_to_string(chunks):
    lista_strings = []    
    for chunk in chunks:
        inicio, fim = chunk['timestamp']
        chunk_string = f"Timestamp: {inicio} - {fim}, Texto: {chunk['text']}"        
        lista_strings.append(chunk_string)
    loga_isso(f'{len(lista_strings)} chunks convertidos para strings.', logs)
    return lista_strings


# Adiciona ao banco de dados o dicionário com as informações 
def insere_mongo(infos_video):
    from kaggle_secrets import UserSecretsClient
    user_secrets = UserSecretsClient()
    uri = user_secrets.get_secret("MONGODB_URI")

    from pymongo import MongoClient
    db = MongoClient(uri, ssl=True, tlsAllowInvalidCertificates=True)['mjd_2024']
    db_rodai = db.tri3_grupo2
        
    try:
        # Tenta inserir o documento no banco de dados
        db_rodai.insert_one(infos_video)
        
        # Se a inserção for bem-sucedida, retorna True
        loga_isso('Inserção no banco de dados bem sucedida.', logs)
        return True
    except Exception as e:
        # Se ocorrer uma exceção durante a inserção, você pode optar por imprimir a exceção
        # e retornar False para indicar que a operação falhou
        loga_isso(f"Erro ao inserir o documento: {e}", logs)
        return False

# Envia log para o Telegram
def avisa_telegram(logs):
    import requests
    from kaggle_secrets import UserSecretsClient
    user_secrets = UserSecretsClient()
    token = user_secrets.get_secret("TELEGRAM_BOT_TOKEN")    
    chat_id = user_secrets.get_secret("TELEGRAM_CHAT_ID")
    text = "\n".join(logs)
    
    url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={text}"
    response = requests.get(url)
    if response.status_code == 200:
        print("Log enviado via Telegram com sucesso.")
    else:
        print(f"Erro ao enviar log via Telegram: {response.text}")

## Carrega Whisper

In [None]:
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-large-v3"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    max_new_tokens=128,
    chunk_length_s=30,
    batch_size=16,
    return_timestamps=True,
    torch_dtype=torch_dtype,
    device=device,
)

## Executa o script

In [None]:
# URL da playlist desejada
url_lista = 'https://www.youtube.com/playlist?list=PL6zZkMrBoEjyfLf7ezka6L0BRvvzfk8Rg' # Roda Viva Íntegra

# Recupera o vídeo mais recente da lista
# Comentar caso queira transcrever um vídeo específico
ultimo_video = video_mais_recente(url_lista) 

# Descomentar caso queira transcrever um vídeo específico
# ultimo_video = 'https://www.youtube.com/watch?v=vZPfXig4KLg'

# Converte o vídeo mais recente em mp3 e retorna infos 
infos_video = baixa_video(ultimo_video)

# Define o caminho do mp3 para transcrição
caminho_arquivo = infos_video['caminho']

# Executa o pipeline do Whisper que retorna a transcrição
result = pipe(caminho_arquivo, return_timestamps=True, generate_kwargs={"language": "portuguese"})

# Processa os blocos de transcrição (chunks) para processamento
blocos = result['chunks']
blocos_concatenados = concatena_chunks(blocos)
blocos_formatados = formata_chunks(blocos_concatenados)
blocos_string = chunks_to_string(blocos_formatados)

# Adiciona a transcrição completa ao dicionário das infos
infos_video['transcricao'] = blocos_string

# Insere o dicionário completo no banco de dados
insere_mongo(infos_video)

# Envia o log pelo Telegram
avisa_telegram(logs)