In [None]:
%%capture
!pip install accelerate
!pip install bitsandbytes
!pip install transformers
!pip install git+https://github.com/openai/whisper.git
!pip install librosa==0.9.0 pydub noisereduce scikit-learn tensorflow torchaudio seaborn matplotlib sqlalchemy smtplib
!pip install -U funasr modelscope
!pip install addict
!pip install datasets==2.18.0
!pip install simplejson
!sudo apt update && sudo apt install ffmpeg


In [None]:
import os
import zipfile
import torch
import librosa
import torchaudio
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from transformers import pipeline
import logging
from modelscope.pipelines import pipeline as modelscope_pipeline
from modelscope.utils.constant import Tasks
from sqlalchemy import create_engine, Column, String, Integer, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.exc import IntegrityError
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import json
from transformers import (AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig)

In [None]:
config_data = json.load(open("/content/config.json"))
HF_TOKEN = config_data["HF_TOKEN"]

model_name = "meta-llama/Meta-Llama-3-8B-Instruct"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=HF_TOKEN)
tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    quantization_config=bnb_config,
    use_auth_token=HF_TOKEN
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [None]:
# Função de geração de respostas usando o template de chat
def get_response(prompt):
    input_data = tokenizer(
        prompt,
        padding=True,
        truncation=True,
        max_length=512,
        return_tensors="pt"
    ).to(model.device)

    input_ids = input_data['input_ids']
    attention_mask = input_data['attention_mask']

    eos_token_id = tokenizer.eos_token_id if tokenizer.eos_token_id is not None else 2

    outputs = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_new_tokens=100,
        eos_token_id=eos_token_id,
        do_sample=True,
        temperature=0.6,
        top_p=0.9,
    )

    response = outputs[:, input_ids.shape[-1]:]
    return tokenizer.decode(response[0], skip_special_tokens=True)

In [None]:
Base = declarative_base()

class Psicologo(Base):
    __tablename__ = 'psicologos'
    id = Column(Integer, primary_key=True)
    nome = Column(String, unique=True)
    email = Column(String, unique=True)

class Paciente(Base):
    __tablename__ = 'pacientes'
    id = Column(Integer, primary_key=True)
    nome = Column(String)
    psicologo_id = Column(Integer, ForeignKey('psicologos.id'))
    psicologo = relationship("Psicologo", back_populates="pacientes")

Psicologo.pacientes = relationship("Paciente", order_by=Paciente.id, back_populates="psicologo")

class Conversa(Base):
    __tablename__ = 'conversas'
    id = Column(Integer, primary_key=True)
    paciente_id = Column(Integer, ForeignKey('pacientes.id'))
    texto = Column(Text)
    paciente = relationship("Paciente", back_populates="conversas")

Paciente.conversas = relationship("Conversa", order_by=Conversa.id, back_populates="paciente")

engine = create_engine('sqlite:///database.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

def adicionar_psicologo(nome, email):
    psicologo = Psicologo(nome=nome, email=email)
    session.add(psicologo)
    try:
        session.commit()
        print(f"Psicólogo {nome} adicionado com sucesso!")
    except IntegrityError:
        session.rollback()
        psicologo = session.query(Psicologo).filter_by(nome=nome).first()
        print(f"Psicólogo {nome} já existe.")
    return psicologo

def adicionar_paciente(nome, psicologo_nome, psicologo_email):
    psicologo = session.query(Psicologo).filter_by(nome=psicologo_nome).first()
    if not psicologo:
        psicologo = adicionar_psicologo(psicologo_nome, psicologo_email)

    paciente = Paciente(nome=nome, psicologo=psicologo)
    session.add(paciente)
    try:
        session.commit()
        print(f"Paciente {nome} adicionado com sucesso!")
    except IntegrityError:
        session.rollback()
        print(f"Erro ao adicionar o paciente {nome}.")

def listar_pacientes(): # Function para listar todos os pacientes
    pacientes = session.query(Paciente).all()
    for paciente in pacientes:
        print(f"ID: {paciente.id}, Nome: {paciente.nome}, Psicólogo: {paciente.psicologo.nome}")

def buscar_paciente_por_nome(nome): # Function para buscar paciente por nome
    paciente = session.query(Paciente).filter_by(nome=nome).first()
    if paciente:
        return paciente.id
    else:
        print(f"Paciente {nome} não encontrado.")
        return None


  Base = declarative_base()


In [None]:
inference_pipeline = modelscope_pipeline(
    task=Tasks.emotion_recognition,
    model="iic/emotion2vec_plus_large"
)

2024-07-11 02:35:22,284 - modelscope - INFO - initiate model from /root/.cache/modelscope/hub/iic/emotion2vec_plus_large
2024-07-11 02:35:22,286 - modelscope - INFO - initiate model from location /root/.cache/modelscope/hub/iic/emotion2vec_plus_large.
2024-07-11 02:35:22,288 - modelscope - INFO - initialize model from /root/.cache/modelscope/hub/iic/emotion2vec_plus_large


You are using the latest version of funasr-1.1.0
Detect model requirements, begin to install it: /root/.cache/modelscope/hub/iic/emotion2vec_plus_large/requirements.txt
install model requirements successfully




In [None]:
def enviar_email(psicologo_email, assunto, conteudo):
    from_email = "seu_email@gmail.com"  #verificar no google senhas api
    from_password = "sua_senha"

    msg = MIMEMultipart()
    msg['From'] = from_email
    msg['To'] = psicologo_email
    msg['Subject'] = assunto

    msg.attach(MIMEText(conteudo, 'plain'))

    try:
        server = smtplib.SMTP('smtp.gmail.com:587')
        server.starttls()
        server.login(from_email, from_password)
        server.send_message(msg)
        server.quit()
        print("Email enviado com sucesso!")
    except smtplib.SMTPAuthenticationError:
        print("Falha na autenticação SMTP. O email não foi enviado, mas o processo continuará.")
    except Exception as e:
        print(f"Ocorreu um erro ao enviar o email: {e}. O processo continuará.")


In [None]:
class ProcessadorAudio:
    def __init__(self, taxa_amostragem_alvo=16000, n_fft=1024, hop_length=512, n_mels=64, max_pad_len=400):
        self.taxa_amostragem_alvo = taxa_amostragem_alvo
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels
        self.max_pad_len = max_pad_len

    def carregar_audio(self, caminho):
        waveform, sample_rate = torchaudio.load(caminho)
        if sample_rate != self.taxa_amostragem_alvo:
            resample_transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=self.taxa_amostragem_alvo)
            waveform = resample_transform(waveform)
        return waveform

    def extrair_caracteristicas(self, waveform):
        mel_spectrogram_transform = torchaudio.transforms.MelSpectrogram(
            sample_rate=self.taxa_amostragem_alvo,
            n_fft=self.n_fft,
            hop_length=self.hop_length,
            n_mels=self.n_mels
        )
        mel_spectrogram = mel_spectrogram_transform(waveform)
        mel_spectrogram = mel_spectrogram.mean(dim=0)
        pad = self.max_pad_len - mel_spectrogram.shape[-1]
        if pad > 0:
            mel_spectrogram = torch.nn.functional.pad(mel_spectrogram, (0, pad))
        else:
            mel_spectrogram = mel_spectrogram[:, :self.max_pad_len]
        return mel_spectrogram.numpy().flatten()

In [None]:
class DetectorEmocao:
    def __init__(self, modelo):
        self.modelo = modelo
        self.labels_mapping = {
            '生气/angry': 'angry',
            '开心/happy': 'happy',
            '中立/neutral': 'neutral',
            '难过/sad': 'sad',
            '<unk>': 'unknown',
        }

    def prever(self, caminho_audio):
        rec_result = self.modelo(caminho_audio, granularity="utterance", extract_embedding=False)
        for result in rec_result:
            predicted_emotion = self.labels_mapping[result['labels'][np.argmax(result['scores'])]]
            return predicted_emotion

In [None]:
class ProcessadorNLP:
    def __init__(self):
        self.analisador_sentimento = pipeline("sentiment-analysis", model="nlptown/bert-base-multilingual-uncased-sentiment")

    def analisar_sentimento(self, texto):
        resultado = self.analisador_sentimento(texto)
        sentimento = resultado[0]['label']
        mapeamento_sentimentos = {
            "1 star": "sad",
            "2 stars": "neutral",
            "3 stars": "neutral",
            "4 stars": "happy",
            "5 stars": "happy"
        }
        return mapeamento_sentimentos.get(sentimento, "neutral")

In [None]:
class GeradorResposta:
    def __init__(self):
        pass

    def gerar_resposta(self, prompt):
        return get_response(prompt)


In [None]:
class AssistenteAudioEmocional:
    def __init__(self, processador_audio, detector_emocao, processador_nlp, gerador_resposta, session):
        self.processador_audio = processador_audio
        self.detector_emocao = detector_emocao
        self.processador_nlp = processador_nlp
        self.gerador_resposta = gerador_resposta
        self.session = session

    def processar_audio(self, caminho_audio):
        logging.info("Processando áudio...")
        return self.detector_emocao.prever(caminho_audio)

    def transcrever_audio(self, caminho_audio):
        logging.info("Transcrevendo áudio...")
        os.system(f'whisper {caminho_audio} --model medium --task transcribe --language pt --output_format txt')
        with open(caminho_audio.replace('.m4a', '.txt'), 'r') as file:
            transcricao = file.read()
        return transcricao

    def processar_texto(self, texto):
        logging.info("Processando texto...")
        return self.processador_nlp.analisar_sentimento(texto)

    def gerar_resposta(self, emocao_audio, emocao_texto, transcricao):
        logging.info("Gerando resposta...")
        prompt = (f"Você é um assistente psicólogico virtual especializado em emergências psicológicas. Abaixo o conteudo da mensagem do paciente e os sentimentos detectados. "
                  f"Transcrição do áudio: {transcricao} \n"
                  f"Emoção detectada no áudio: {emocao_audio} \n"
                  f"Emoção detectada no texto transcrito: {emocao_texto} \n"
                  f"Baseado na ciencia psicologica, gere uma resposta para este paciente!Seja empatico e tome cuidado com as palavras. Forneça respostas curtas e objetivas.")
        #print(f"Prompt enviado ao modelo: {prompt}")
        return self.gerador_resposta.gerar_resposta(prompt)

    def salvar_conversa(self, paciente_id, texto):
        conversa = Conversa(paciente_id=paciente_id, texto=texto)
        self.session.add(conversa)
        self.session.commit()

    def enviar_relatorio(self, paciente_id):
        paciente = self.session.query(Paciente).filter_by(id=paciente_id).first()
        psicologo = paciente.psicologo
        conversas = self.session.query(Conversa).filter_by(paciente_id=paciente_id).all()
        historico = "\n".join([c.texto for c in conversas])

        emocao_audio = self.processar_audio(caminho_audio)
        transcricao = self.transcrever_audio(caminho_audio)
        emocao_texto = self.processar_texto(transcricao)

        assunto = f"Relatório da Análise de Sentimentos do Paciente {paciente.nome}"
        conteudo = f"Histórico da conversa:\n{historico}\n\nEmoção detectada no áudio: {emocao_audio}\nEmoção detectada no texto: {emocao_texto}"
        enviar_email(psicologo.email, assunto, conteudo)

    def lidar_interacao(self, caminho_audio, paciente_id):
        emocao_audio = self.processar_audio(caminho_audio)
        transcricao = self.transcrever_audio(caminho_audio)
        emocao_texto = self.processar_texto(transcricao)
        resposta = self.gerar_resposta(emocao_audio, emocao_texto, transcricao)
        historico = "\n".join([c.texto for c in self.session.query(Conversa).filter_by(paciente_id=paciente_id).all()])
        self.salvar_conversa(paciente_id, f"Paciente: {transcricao}\nAssistente: {resposta}")
        self.enviar_relatorio(paciente_id)
        return resposta


In [None]:
# inicialização dos componentes
processador_audio = ProcessadorAudio()
detector_emocao = DetectorEmocao(inference_pipeline)
processador_nlp = ProcessadorNLP()
gerador_resposta = GeradorResposta()

In [None]:
# criação do Assistente
assistente = AssistenteAudioEmocional(processador_audio, detector_emocao, processador_nlp, gerador_resposta, session)

In [None]:
# adicionar psicólogos e pacientes para o exemplo
adicionar_psicologo("Dr. Maria", "dr.maria@example.com")
adicionar_paciente("João da Silva", "Dr. Maria", "dr.maria@example.com")

Psicólogo Dr. Maria já existe.
Paciente João da Silva adicionado com sucesso!


In [None]:
# listar pacientes
listar_pacientes()

ID: 1, Nome: João da Silva, Psicólogo: Dr. Maria
ID: 2, Nome: João da Silva, Psicólogo: Dr. Maria


In [None]:
paciente_id = buscar_paciente_por_nome("João da Silva")

In [None]:
if paciente_id:
    caminho_audio = "/content/audio_triste.m4a"
    resposta = assistente.lidar_interacao(caminho_audio, paciente_id)
    print("Resposta do Assistente:", resposta)

rtf_avg: 0.017: 100%|[34m██████████[0m| 1/1 [00:00<00:00,  7.69it/s]
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
rtf_avg: 0.009: 100%|[34m██████████[0m| 1/1 [00:00<00:00, 11.63it/s]


Falha na autenticação SMTP. O email não foi enviado, mas o processo continuará.
Resposta do Assistente:  

Resposta:
Olá! Eu estou aqui para escutar e ajudar. Pude perceber que você está sentindo um vazio no coração e no estúdio. Isso pode ser um sinal de que você está experimentando uma perda ou um sentimento de desamparo. É normal sentir isso, especialmente em momentos difíceis. Quer falar um pouco mais sobre o que está acontecendo e como você se sente? Eu est
