# **Trabajo Práctico Final NLP**



## **Ejercicio 1 - RAG**
**Crear un chatbot experto en un tema a elección, usando la técnica RAG. Como fuentes de conocimiento se utilizarán al menos las siguientes fuentes:**

* **Documentos de texto**
*  **Datos numéricos en formato tabular (por ej., Dataframes, CSV, sqlite, etc.)**
*  **Base de datos de grafos (Online o local)**


`Elijo que el chatbot se especialice en historia de la Segunda Guerra Mundial`

### **Set de datos**

**`Documentos de textos`**

[**Segunda Guerra Mundial - Wikipedia**](https://es.wikipedia.org/wiki/Segunda_Guerra_Mundial)

[**Breve Historia de la Segunda Guerra Mundial - Jesús Hernández**](https://sgfm.elcorteingles.es/SGFM/dctm/MEDIA02/CONTENIDOS/201503/17/00106518013690____4_.pdf)

[**LA SEGUNDA GUERRA MUNDIAL (1939-1945) - profesoremilio**](https://profesoremilio.files.wordpress.com/2018/09/tema-8-la-segunda-guerra-mundial.pdf)

[**LA SEGUNDA GUERRA MUNDIAL (1939-1945) - IES Fray Pedro de Urbina – Departamento de Geografía e Historia**](https://www.iesfraypedro.com/files/sociales/segunda-guerra-mundial-1b.pdf)

[**La II Guerra Mundial - Sabuco**](https://www.sabuco.com/historia/IIGMb.pdf)

**`Datos tabulares`**

[**WW2 - Kaggle**](https://www.kaggle.com/datasets/ramjasmaurya/world-war-2-archive?select=events.csv)


---

Nota: de los archivos tabulares de Kaggle solo utilizo el que corresponde a los eventos. El mismo fue traducido al español para mejorar la compresión de las solicitudes.

### **Carga de datos**

In [None]:
import pandas as pd
import re

!pip install wikipedia-api
import wikipediaapi
!pip install pdfminer.six
from pdfminer.high_level import extract_text

!pip install es_core_news_sm
import spacy
from spacy.tokens import Doc

!pip install gdown
import gdown
import os
import shutil

In [None]:
# FUNCIÓN PARA EXTRAER TEXTO

def extraer_texto_pdf (pdf, pagina_inicio=None, pagina_final=None):
  """
   Función para extraer texto de un documento PDF.

  Parámetros:
  - pdf: Ruta del archivo PDF.
  - pagina_inicio: Página de inicio para la extracción (opcional).
  - pagina_final: Página final para la extracción (opcional).

  Retorna el texto extraído.
  """
  # Verificar si se especifican páginas de inicio y fin
  if pagina_inicio is not None and pagina_final is not None:
    # Extraer desde la página de inicio hasta la página de fin
    texto = extract_text(pdf, page_numbers=range(pagina_inicio, pagina_final))
  else:
    # Extraer todo el texto del PDF
    texto = extract_text(pdf)
  return texto

In [None]:
#TEXTO WIKIPEDIA

# Configuración de idioma de Wikipedia ('es')
wiki_wiki = wikipediaapi.Wikipedia(language='es', user_agent="MiAplicacion/1.0")

# Obtener la página
archivo_wiki = wiki_wiki.page("Segunda_Guerra_Mundial")

archivo = archivo_wiki.text[:-4464]

# Eliminar corchetes (referencias)
archivo = re.sub(r'\[[^\]]*\]', '', archivo)

In [None]:
# Link con archivos sobre historia de la Segunda Guera Mundial
url = 'https://drive.google.com/drive/folders/1E98wUJ9iKygWe5jLiZIHtPJ_PPaFoaKH?usp=sharing'

# Descarga carpeta 'Data'
gdown.download_folder(url, quiet=True, output='Historia_SGM')

# Crear la carpeta 'llamaindex_data' si no existe
carpeta_destino = 'llamaindex_data'
if not os.path.exists(carpeta_destino):
  os.makedirs(carpeta_destino)

# Mover todos los archivos de 'Historia_SGM' a 'llamaindex_data'
carpeta_origen = 'Historia_SGM'
for filename in os.listdir(carpeta_origen):
  ruta_origen = os.path.join(carpeta_origen, filename)
  ruta_destino = os.path.join(carpeta_destino, filename)
  shutil.move(ruta_origen, ruta_destino)

# Eliminar la carpeta 'Historia_SGM'
shutil.rmtree(carpeta_origen)

In [None]:
# Llamar a la función para extraer texto
texto_1 = extraer_texto_pdf('/content/llamaindex_data/segunda_guerra_mundial_pdf.pdf', 16,50) # TEXTO - Jesús Hernandz

texto_2 = extraer_texto_pdf('/content/llamaindex_data/tema-8-la-segunda-guerra-mundial.pdf') # TEXTO - profesoremilio

texto_3 = extraer_texto_pdf('/content/llamaindex_data/segunda-guerra-mundial-1b.pdf') # TEXTO - IES Fray Pedro de Urbina

texto_4 = extraer_texto_pdf('/content/llamaindex_data/IIGMb.pdf') # TEXTO - Sabuco

texto_5 = archivo # TEXTO - Wikipedia

# Limpieza de texto
texto_1 = re.sub(r'.*'+re.escape('BH SEGUNDA GUERRA REV')+r'.*\n', '', texto_1)
texto_1 = re.sub(r'\n\d+\n', '', texto_1)
texto_1 = re.sub(r'\bREVE HISTORIA DE LA SEGUNDA GUERRA MUNDIAL\b', '', texto_1)
texto_1 = re.sub(r'\bB\b', '', texto_1)
texto_1 = re.sub(r'\bESÚS HERNÁNDEZ\b', '', texto_1)
texto_1 = re.sub(r'\n+\s*\n+', '\n', texto_1)
texto_1 = re.sub(r'(\w+)-\s*\n\s*(\w+)', r'\1\2', texto_1)
texto_1 = re.sub(r'\bJ\b', '', texto_1.strip())

texto_2 = re.sub(r'Tema 8. LA SEGUNDA GUERRA MUNDIAL', '', texto_2)
texto_2 = texto_2.replace(' (1939-1945)','')
texto_2 = re.sub(r'\n\d+\n?', '', texto_2.strip())

texto_3 = re.sub(r'IES Fray Pedro de Urbina – Departamento de Geografía e Historia', '', texto_3)
texto_3 = texto_3.replace('LA SEGUNDA GUERRA MUNDIAL (1939-1945)', '')
texto_3 = re.sub(r'\n+\s*\n+', '\n', texto_3.strip())

texto_4 = re.sub(r'\n+\s*\n+', '\n', texto_4)
texto_4 = re.sub(r'Página \| \d+', '', texto_4.strip())

# Concateno todos los documentos
documentos = [texto_1,texto_2,texto_3,texto_4,texto_5]
doc_final = ''
for documento in documentos:
  doc_final += documento

# Crar carpeta
nombre_carpeta = "SGM"
ruta_carpeta = os.path.join("/content", nombre_carpeta)
os.makedirs(ruta_carpeta, exist_ok=True)

# Guradar archivo final en la carpeta creada
ruta_archivo = os.path.join(ruta_carpeta, 'segunda_guerra_mundial.txt')
with open(ruta_archivo, "wb") as archivo:
  archivo.write(doc_final.encode("utf-8"))

### **Datos tabulares**

In [None]:
#CARGO EL ARCHIVO EN FORMATO CSV
archivo_csv = pd.read_csv('/content/llamaindex_data/eventos.txt')
archivo_csv = archivo_csv.drop('Id', axis=1)
archvio_csv = pd.DataFrame(archivo_csv)

In [None]:
archivo_csv.head(3)

Unnamed: 0,Evento,Fecha
0,Tratado de Rapallo,16 Abr 1922
1,Tratado de Berlín,24 Abr 1926
2,Incidente de Jinan,3 May 1928


In [None]:
archivo_csv.shape

(336, 2)

### **Base de datos de grafo**

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

# Crear un grafo dirigido para representar la base de datos de grafos
grafo = nx.DiGraph()

# Agregar nodos (entidades)
entidades = ["Alemania","Reino Unido","Estados Unidos","Unión Soviética","Japón","Adolf Hitler","Winston Churchill","Franklin D. Roosevelt",
    "Joseph Stalin","Ataque a Pearl Harbor","Operación Barbarroja","Batalla de Stalingrado","Día D","Conferencia de Yalta","Conferencia de Potsdam",
    "Holocausto","Campaña del Pacífico","Tratado de Versalles","Liga de Naciones","Tratado de no agresión germano-soviético",
    "Bomba atómica","Nuremberg Trials","Código Enigma","Operación Overlord"]

grafo.add_nodes_from(entidades)

# Agregar relaciones
relaciones = [
    {"source": "Alemania", "target": "Adolf Hitler", "label": "liderado_por"},
    {"source": "Reino Unido", "target": "Winston Churchill", "label": "liderado_por"},
    {"source": "Estados Unidos", "target": "Franklin D. Roosevelt", "label": "liderado_por"},
    {"source": "Unión Soviética", "target": "Joseph Stalin", "label": "liderado_por"},
    {"source": "Alemania", "target": "Unión Soviética", "label": "inicia"},
    {"source": "Japón", "target": "Ataque a Pearl Harbor", "label": "inicia"},
    {"source": "Alemania", "target": "Operación Barbarroja", "label": "inicia"},
    {"source": "Unión Soviética", "target": "Batalla de Stalingrado", "label": "participa_en"},
    {"source": "Estados Unidos", "target": "Día D", "label": "participa_en"},
    {"source": "Reino Unido", "target": "Día D", "label": "participa_en"},
    {"source": "Unión Soviética", "target": "Conferencia de Yalta", "label": "participa_en"},
    {"source": "Estados Unidos", "target": "Conferencia de Potsdam", "label": "participa_en"},
    {"source": "Alemania", "target": "Holocausto", "label": "involucrado_en"},
    {"source": "Japón", "target": "Campaña del Pacífico", "label": "participa_en"},
    {"source": "Alemania", "target": "Tratado de Versalles", "label": "firmado_con"},
    {"source": "Alemania", "target": "Liga de Naciones", "label": "miembro_de"},
    {"source": "Alemania", "target": "Tratado de no agresión germano-soviético", "label": "firmado_con"},
    {"source": "Estados Unidos", "target": "Bomba atómica", "label": "desarrolla"},
    {"source": "Alemania", "target": "Nuremberg Trials", "label": "juzgado_en"},
    {"source": "Alemania", "target": "Código Enigma", "label": "desarrolla"},
    {"source": "Estados Unidos", "target": "Operación Overlord", "label": "inicia"},
    {"source": "Reino Unido", "target": "Operación Overlord", "label": "inicia"},
]

# Agregar nodos y relaciones al grafo
for rel in relaciones:
    grafo.add_edge(rel["source"], rel["target"], label=rel["label"])

In [None]:
# Dibujar el grafo
plt.figure(figsize=(20,10))
pos = nx.spring_layout(grafo)
edge_labels = nx.get_edge_attributes(grafo, 'label')
nx.draw(grafo, pos, with_labels=True, font_weight='bold', node_size=700, node_color='skyblue', edge_color='gray', arrowsize=15)
nx.draw_networkx_edge_labels(grafo, pos, edge_labels=edge_labels)

# Mostrar el grafo
plt.show()

# **Modelo**

In [None]:
#!pip install llama_index sentence-transformers pypdf langchain python-decouple
#!pip install langchain_community
#!pip install pydantic==1.10.8
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from llama_index.embeddings import LangchainEmbedding
from llama_index.vector_stores import ChromaVectorStore
from llama_index import ServiceContext
from llama_index.storage.storage_context import StorageContext
from llama_index import VectorStoreIndex, SimpleDirectoryReader
from llama_index.text_splitter import SentenceSplitter

from jinja2 import Template
import requests
from decouple import config

#!pip install typing_extensions==4.4.0
#!pip install chromadb
import chromadb

In [None]:
def zephyr_instruct_template(messages, add_generation_prompt=True):
  # Definir la plantilla Jinja
  template_str = "{% for message in messages %}"
  template_str += "{% if message['role'] == 'user' %}"
  template_str += "<|user|>{{ message['content'] }}</s>\n"
  template_str += "{% elif message['role'] == 'assistant' %}"
  template_str += "<|assistant|>{{ message['content'] }}</s>\n"
  template_str += "{% elif message['role'] == 'system' %}"
  template_str += "<|system|>{{ message['content'] }}</s>\n"
  template_str += "{% else %}"
  template_str += "<|unknown|>{{ message['content'] }}</s>\n"
  template_str += "{% endif %}"
  template_str += "{% endfor %}"
  template_str += "{% if add_generation_prompt %}"
  template_str += "<|assistant|>\n"
  template_str += "{% endif %}"
  # Crear un objeto de plantilla con la cadena de plantilla
  template = Template(template_str)
  # Renderizar la plantilla con los mensajes proporcionados
  return template.render(messages=messages, add_generation_prompt=add_generation_prompt)

In [None]:
# Aquí hacemos la llamada el modelo
def generate_answer(prompt: str, max_new_tokens: int = 768) -> None:
  try:
    # Tu clave API de Hugging Face
    api_key = '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'
    # URL de la API de Hugging Face para la generación de texto
    api_url = "https://api-inference.huggingface.co/models/HuggingFaceH4/zephyr-7b-beta"
    # Cabeceras para la solicitud
    headers = {"Authorization": f"Bearer {api_key}"}
    # Datos para enviar en la solicitud POST
    data = {
      "inputs": prompt,
      "parameters": {
        "max_new_tokens": max_new_tokens,
        "temperature": 0.7,
        "top_k": 50,
        "top_p": 0.95
        }
      }
    # Realizamos la solicitud POST
    response = requests.post(api_url, headers=headers, json=data)
    # Extraer respuesta
    respuesta = response.json()[0]["generated_text"][len(prompt):]
    return respuesta
  except Exception as e:
    print(f"An error occurred: {e}")

In [None]:
# Esta función prepara el prompt en estilo QA
def prepare_prompt(query_str: str, nodes: list):
  TEXT_QA_PROMPT_TMPL = (
    "La información de contexto es la siguiente:\n"
    "---------------------\n"
    "{context_str}\n"
    "---------------------\n"
    "Dada la información de contexto anterior, y sin utilizar conocimiento previo, responde la siguiente pregunta.\n"
    "Pregunta: {query_str}\n"
    "Respuesta: "
  )
  # Construimos el contexto de la pregunta
  context_str = ''
  for node in nodes:
    context_str += f"{node.text}\n"
    messages = [
      {
      "role": "system",
      "content": "Eres un asistente experto en la segunda guerra mundial que siempre responde con respuestas veraces, útiles y basadas en hechos. Si la pregunta incluye otra temática, respondes cual es tu función.",
      },
      {"role": "user", "content": TEXT_QA_PROMPT_TMPL.format(context_str=context_str, query_str=query_str)},
      ]
    final_prompt = zephyr_instruct_template(messages)
    return final_prompt

In [None]:
# Configuración inicial de ChromaDB
client = chromadb.Client()
collection = client.create_collection("SegundaGuerraMundial")

# assign chroma as the vector_store to the context
vector_store = ChromaVectorStore(chroma_collection=collection)

In [None]:
# Datos tabulares (archvio_csv)

# Cargamos nuestro modelo de embeddings
print('Cargando modelo de embeddings...')
embed_model = LangchainEmbedding(HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-mpnet-base-v2'))

# Modeo de segmentación de texto
text_splitter = SentenceSplitter(chunk_size=1024, chunk_overlap=200)

### Seteando el service context y el storage context cambiando los valores default
svc = ServiceContext.from_defaults(embed_model=embed_model, llm=None, text_splitter=text_splitter)
stc = StorageContext.from_defaults(vector_store=vector_store)

# Construimos un índice de documentos a partir de los datos de la carpeta llamaindex_data
print('Indexando documentos...')

documents = SimpleDirectoryReader("SGM").load_data()

## Creando un nuevo index
index_documents = VectorStoreIndex.from_documents(documents, storage_context=stc, service_context=svc)
#index_tabular = VectorStoreIndex.from_documents(archvio_csv, storage_context=stc, service_context=svc)

In [None]:
# Construimos un retriever a partir del índice, para realizar la búsqueda vectorial de documentos
retriever = index.as_retriever(similarity_top_k=1)

print('Realizando llamada a HuggingFace para generar respuestas...\n')

queries = ['¿Que fue la segunda guerra mundial?',
           '¿En que año fue la segunda guerra mundial?',
           '¿Se usaron bombas nucleares durante la segunda guerra mundial',
           '¿Quien ganó el mundial de futbol 2022?']

for query_str in queries:
  # Traemos los documentos más relevantes para la consulta
  nodes = retriever.retrieve(query_str)
  final_prompt = prepare_prompt(query_str, nodes)
  print('Pregunta:', query_str)
  print('Respuesta:')
  print(generate_answer(final_prompt))
  print('-------------------------------------------------------')