<a href="https://colab.research.google.com/github/EdwinKestler/NecroTalk/blob/main/Diario_de_orantes_para_la_automatizacion_de_formularios.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Diario de oradores y llenado automatico de formularios.**

#ASR (OpenAI Whisper)

En esta sección, utilizaremos Whisper, el nuevo lanzamiento de OpenAI, para convertir un podcast de YouTube de muestra en un mapeo de palabras <> marcas de tiempo.

In [None]:
!apt install ffmpeg

In [None]:
!apt install sox libsndfile1

In [None]:
!pip install --upgrade hydra-core llvmlite omegaconf --ignore-installed

In [None]:
!pip3 install torch torchvision torchaudio yt-dlp

In [None]:
# Installing Whisper and WhisperX

!pip install git+https://github.com/openai/whisper.git
!pip install git+https://github.com/m-bain/whisperX.git

In [None]:
# A 20mins podcast from YC official youtube channel.
!rm ./audio.wav
!yt-dlp -xv --audio-format wav  -o audio.wav -- https://youtu.be/ZdiedkI4uks

In [None]:
!rm ./audio_16k.wav
!ffmpeg -i audio.wav -ac 1 -ar 16000 audio_16k.wav # Converting audio.wav to mono channel & 16K audio_16k.wav

rm: cannot remove './audio_16k.wav': No such file or directory
ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolam

Ahora procesamos el Audio.

In [None]:
from whisper import load_model

# Large models result in considerably better and more aligned (words, timestamps) mapping.
model = load_model("large-v2")

# Beam size if None by default (Greedy Decoding). You can also set the
# beam_size to some number like 5. This will increase in better transcription
# quality but it'll increase runtime considerabley.
results = model.transcribe('./audio_16k.wav', beam_size=None)

In [None]:
# WhisperX results in better word timestamps by using wav2vec based forced alignment.
import whisperx

device = 'cuda'
alignment_model, metadata = whisperx.load_align_model(language_code=results["language"], device=device)
result_aligned = whisperx.align(results["segments"], alignment_model, metadata, './audio_16k.wav', device)

Ahora generamos el diccionario de palabras de conversacion.

In [None]:
# Storing words <> timestamps mapping in a file.
import json

with open('./word_ts.text', 'w+') as f:
    for line in result_aligned['word_segments']:
        line_temp = line.copy()
        # Check if 'text' key is in the dictionary before stripping
        if 'word' in line_temp:
            line_temp['word'] = line_temp['word'].strip()
            f.write(f'{json.dumps(line_temp)}\n')
        else:
            print(f"Key 'word' not found in line: {line}")

In [None]:
!python -m pip install git+https://github.com/NVIDIA/NeMo.git@main

In [None]:
!pip install --upgrade Cython jiwer braceexpand webdataset librosa sentencepiece
!pip install --upgrade youtokentome pyannote-audio transformers pandas inflect editdistance

In [None]:
!pip install -U pytorch-lightning
!pip install nemo-toolkit[nlp,asr,tts]


In [None]:
import os
import json

diarize_manifest = {
  'audio_filepath': f'./audio_16k.wav',
  'offset': 0,
  'duration':  None,
  'label': "infer",
  'text': "-",
  'num_speakers': None,
  'rttm_filepath': f'./diarized/pred_rttms/audio_16k.rttm',
  'uniq_id': ""
}

if not os.path.exists('./manifest.json'):
  with open('./manifest.json', 'w') as f:
    f.write(json.dumps(diarize_manifest))

In [None]:
import os
import wget
from omegaconf import OmegaConf
from nemo.collections.asr.models.msdd_models import ClusteringDiarizer

MODEL_CONFIG = os.path.join('./','diar_infer_meeting.yaml')
if not os.path.exists(MODEL_CONFIG):
    config_url = "https://raw.githubusercontent.com/NVIDIA/NeMo/main/examples/speaker_tasks/diarization/conf/inference/diar_infer_meeting.yaml"
    MODEL_CONFIG = wget.download(config_url, './')

config = OmegaConf.load(MODEL_CONFIG)

In [None]:
config.num_workers = 4
config.batch_size = 32

config.diarizer.manifest_filepath = './manifest.json'
config.diarizer.out_dir = os.path.join('./', 'diarized')
config.diarizer.speaker_embeddings.model_path = 'titanet_large'
config.diarizer.speaker_embeddings.parameters.window_length_in_sec = [1.5, 1.0, 0.5]
config.diarizer.speaker_embeddings.parameters.shift_length_in_sec = [0.75, 0.5, 0.25]
config.diarizer.speaker_embeddings.parameters.multiscale_weights = [0.33, 0.33, 0.33]
config.diarizer.speaker_embeddings.parameters.save_embeddings = False

config.diarizer.ignore_overlap = False
config.diarizer.oracle_vad = False
config.diarizer.collar = 0.25


config.diarizer.vad.model_path = 'vad_multilingual_marblenet'
config.diarizer.oracle_vad = False # ----> Not using oracle VAD

In [None]:

model = ClusteringDiarizer(cfg=config)

In [None]:
model.diarize()

In [None]:
# Reading timestamps <> Speaker Labels mapping

speaker_ts = []
with open('./diarized/pred_rttms/audio_16k.rttm', 'r') as f:
    lines = f.readlines()
    for line in lines:
        line_list = line.split(' ')
        s = int(float(line_list[5]) * 1000)
        e = s + int(float(line_list[8]) * 1000)
        speaker_ts.append([s, e, int(line_list[11].split('_')[-1])])


Procesamos el text  y alieniamos el audio

In [None]:

!pip install protobuf==3.20

In [None]:
import os
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"

In [None]:
!pip install deepmultilingualpunctuation

In [None]:
# Reading words <> timestamps mapping, which we saved earlier
import json

word_ts = []
with open('./word_ts.text', 'r+') as f:
    for line in f:
        line_temp = json.loads(line)
        if 'start' not in line_temp or 'end' not in line_temp:
            print(f"Skipping malformed entry: {line_temp}")
            continue
        word_ts.append(line_temp)

Skipping malformed entry: {'word': '1.'}
Skipping malformed entry: {'word': '8'}
Skipping malformed entry: {'word': '1'}
Skipping malformed entry: {'word': '2'}
Skipping malformed entry: {'word': '5.'}
Skipping malformed entry: {'word': '2'}
Skipping malformed entry: {'word': '2'}
Skipping malformed entry: {'word': '2'}
Skipping malformed entry: {'word': '2'}
Skipping malformed entry: {'word': '90'}
Skipping malformed entry: {'word': '140.'}
Skipping malformed entry: {'word': '15'}


In [None]:
def get_word_ts_anchor(s, e, option='start'):
  if option == 'end':
    return e
  elif option == 'mid':
    return (s + e) / 2
  return s

def get_words_speaker_mapping(wrd_ts, spk_ts, word_anchor_option='start'):
    s, e, sp = spk_ts[0]
    wrd_pos, turn_idx = 0, 0
    wrd_spk_mapping = []
    for wrd_dict in wrd_ts:
        ws, we, wrd = int(wrd_dict['start'] * 1000), int(wrd_dict['end'] * 1000), wrd_dict['word']
        wrd_pos = get_word_ts_anchor(ws, we, word_anchor_option)
        while wrd_pos > float(e):
            turn_idx += 1
            turn_idx = min(turn_idx, len(spk_ts) - 1)
            s, e, sp = spk_ts[turn_idx]
        wrd_spk_mapping.append({'word': wrd, 'start_time': ws, 'end_time': we, 'speaker': sp})
    return wrd_spk_mapping

In [None]:
for i, wrd_dict in enumerate(word_ts):
    if 'start' not in wrd_dict:
        print(f"No 'start' key in dictionary at index {i}: {wrd_dict}")

In [None]:
wsm = get_words_speaker_mapping(word_ts, speaker_ts, 'start')

In [None]:
from deepmultilingualpunctuation import PunctuationModel

punct_model = PunctuationModel()
words_list = list(map(lambda x: x['word'], wsm))

labled_words = punct_model.predict(words_list)

Downloading (…)lve/main/config.json:   0%|          | 0.00/892 [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/2.24G [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/406 [00:00<?, ?B/s]

Downloading (…)tencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

In [None]:
# Whisper already punctuates the text in most of the case, so we'll give priority
# to its puntuation marks over PunctuationModel results.
import re

ending_puncts = '.?!'
model_puncts = '.,;:!?'

# We don't want to punctuate U.S.A. with a period. Right?
is_acronym = lambda x: re.fullmatch(r"\b(?:[a-zA-Z]\.){2,}", x)

for word_dict, labeled_tuple in zip(wsm, labled_words):
    word = word_dict['word']
    if word and labeled_tuple[1] in ending_puncts and (word[-1] not in model_puncts or is_acronym(word)):
        word += labeled_tuple[1]
        if word.endswith('..'): word = word.rstrip('.')
        word_dict['word'] = word

In [None]:
sentence_ending_punctuations = '.?!'

def get_first_word_idx_of_sentence(word_idx, word_list, speaker_list, max_words):
  is_word_sentence_end = lambda x: x >= 0 and word_list[x][-1] in sentence_ending_punctuations
  left_idx = word_idx
  while (left_idx > 0 and word_idx - left_idx < max_words and
          speaker_list[left_idx - 1] == speaker_list[left_idx] and
          not is_word_sentence_end(left_idx - 1)):
      left_idx -= 1

  return left_idx if left_idx == 0 or is_word_sentence_end(left_idx - 1) else -1

def get_last_word_idx_of_sentence(word_idx, word_list, max_words):
  is_word_sentence_end = lambda x: x >= 0 and word_list[x][-1] in sentence_ending_punctuations
  right_idx = word_idx
  while (right_idx < len(word_list) and right_idx - word_idx < max_words and
          not is_word_sentence_end(right_idx)):
      right_idx += 1

  return right_idx if right_idx == len(word_list) - 1 or is_word_sentence_end(right_idx) else -1

def get_realigned_ws_mapping_with_punctuation(word_speaker_mapping, max_words_in_sentence = 50):
  is_word_sentence_end = lambda x: x >= 0 and word_speaker_mapping[x]['word'][-1] in sentence_ending_punctuations
  wsp_len = len(word_speaker_mapping)

  words_list, speaker_list = [], []
  for k, line_dict in enumerate(word_speaker_mapping):
      word, speaker = line_dict['word'], line_dict['speaker']
      words_list.append(word)
      speaker_list.append(speaker)

  k = 0
  while k < len(word_speaker_mapping):
      line_dict = word_speaker_mapping[k]
      if k < wsp_len - 1 and speaker_list[k] != speaker_list[k + 1] and not is_word_sentence_end(k):
          left_idx = get_first_word_idx_of_sentence(k, words_list, speaker_list, max_words_in_sentence)
          right_idx = get_last_word_idx_of_sentence(k, words_list, max_words_in_sentence - k + left_idx - 1) if left_idx > -1 else -1
          if min(left_idx, right_idx) == -1:
              k += 1
              continue

          spk_labels = speaker_list[left_idx: right_idx + 1]
          mod_speaker = max(set(spk_labels), key=spk_labels.count)
          if spk_labels.count(mod_speaker) < len(spk_labels) // 2:
              k += 1
              continue

          speaker_list[left_idx: right_idx + 1] = [mod_speaker] * (right_idx - left_idx + 1)
          k = right_idx

      k += 1

  k, realigned_list = 0, []
  while k < len(word_speaker_mapping):
      line_dict = word_speaker_mapping[k].copy()
      line_dict['speaker'] = speaker_list[k]
      realigned_list.append(line_dict)
      k += 1


  return realigned_list

In [None]:
def get_sentences_speaker_mapping(word_speaker_mapping, spk_ts):
  s, e, spk = spk_ts[0]
  prev_spk = spk

  snts = []
  snt = {'speaker': f'Speaker {spk}', 'start_time': s, 'end_time': e, 'text': ''}

  for wrd_dict in word_speaker_mapping:
      wrd, spk = wrd_dict['word'], wrd_dict['speaker']
      s, e = wrd_dict['start_time'], wrd_dict['end_time']
      if spk != prev_spk:
          snts.append(snt)
          snt = {'speaker': f'Speaker {spk}', 'start_time': s, 'end_time': e, 'text': ''}
      else:
          snt['end_time'] = e
      snt['text'] += wrd + ' '
      prev_spk = spk

  snts.append(snt)
  return snts

def get_speaker_aware_transcript(sentences_speaker_mapping):
  with open('diarization.txt', 'w') as f:
    for sentence_dict in sentences_speaker_mapping:
        sp = sentence_dict['speaker']
        text = sentence_dict['text']
        f.write(f'\n\n{sp}: {text}')

In [None]:
wsm = get_realigned_ws_mapping_with_punctuation(wsm)
ssm = get_sentences_speaker_mapping(wsm, speaker_ts)
get_speaker_aware_transcript(ssm)

Ahora procesamos el formulario con charGPT

In [None]:
!pip install --upgrade openai

In [None]:
# Read the content of the file into the variable
with open('diarization.txt', 'r') as file:
    file_content = file.read()

# Now, `file_content` contains the text from `diarization.txt`

In [None]:

words = file_content.split()
file_content = ' '.join(words[:1000])

In [None]:
import openai

with open('colab_key_diarization.txt', 'r') as file:
    openai.api_key = file.read().strip()

In [None]:
# Your list of questions
questions = [
    "pregunta1: en la conversacion ¿cuantas personas hablan en la conversacion?",
    "pregunta2: en la conversacion ¿de que trata la conversacion?",
    "pregunta4: en la conversacion ¿que datos relevantes hay en la conversacion?",
    "Pregunta5: en la conversacion ¿que noticia revela la conversacion?",
    "pregunta6: en la conversacion ¿quien esta embarazada?",
    "pregunta7: en la conversacion ¿Cual es el diagnostico?",
    "pregunta8: que ceno anoche y cuantas pastillas debe tomar?",
    "pregunta9: que dijo juan literalmente?"
    # Add more questions as necessary
]

# Create a file to write the questions and answers
with open("FormularioDePreguntasSobreConversacion.txt", "w") as f:
    for question in questions:
        # Create a prompt with your file content and question
        prompt = f"La siguiente es la transcripción de una conversación entre dos personas responde las preguntas con exactamente lo que dicen las personas: {file_content}\nQ: {question}\n"

        # Generate a response from OpenAI's GPT model
        response = openai.Completion.create(
            model="text-davinci-003",
            prompt=prompt,
            temperature=0.2,
            max_tokens=150,
        )

        # Write the question and answer to the file
        answer = response.choices[0].text.strip()
        f.write(question + "\n")
        f.write(answer + "\n\n")