# RAG Manual de Tránsito

In [1]:
import os
import re
from dotenv import load_dotenv
from typing_extensions import List, TypedDict

from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import ChatPromptTemplate
from langchain.load import dumps, loads

from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.tools import tool
from langchain_core.messages import SystemMessage

from langchain_openai import OpenAIEmbeddings, ChatOpenAI

from langchain_community.vectorstores import Chroma

from langgraph.graph import START, StateGraph, MessagesState, END
from langgraph.prebuilt import ToolNode, tools_condition

### Load environment variables

In [2]:
# Define the path to the .env file (one directory above)
dotenv_path = '.env'

# Load the .env file
load_dotenv(dotenv_path)

# Access the API key
openai_api_key = os.getenv('OPENAI_API_KEY')
langsmith_api_key = os.getenv('LANGSMITH_API_KEY')

In [3]:
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGCHAIN_API_KEY'] = langsmith_api_key
os.environ['OPENAI_API_KEY'] = openai_api_key

### Index files in vector store

In [4]:
# Load local text file
file_path = "ley-769-de-2002-codigo-nacional-de-transito_3704_0_processed.txt"
loader = TextLoader(file_path)
docs = loader.load()

def clean_section_text(text):
    """
    Remove the article header up to the next uppercase letter.
    """
    # Pattern to match from article header to next uppercase letter
    header_pattern = r'^(?:ARTÍCULO|ARTICULO|Artículo|Articulo)\s+\d+.*?(?=[A-ZÁÉÍÓÚÑ])'
    cleaned_text = re.sub(header_pattern, '', text, flags=re.DOTALL)
    return cleaned_text.strip()

def extract_article_sections(text):
    """
    Extract sections of text between article markers and assign article numbers.
    Only matches when starting with capital A ('ARTÍCULO' or 'Artículo').
    Returns a list of (text, article_number) tuples with cleaned section text.
    """
    # Pattern that only matches when starting with capital A
    article_pattern = r'(?:ARTÍCULO|ARTICULO|Artículo|Articulo)\s+(\d+)'
    
    # Find all article markers with their positions
    article_matches = list(re.finditer(article_pattern, text))
    
    article_sections = []
    
    # Process each article section
    for i in range(len(article_matches)):
        start_pos = article_matches[i].start()
        article_num = article_matches[i].group(1)

        # Get end position (either next article or end of text)
        if i < len(article_matches) - 1:
            end_pos = article_matches[i + 1].start()
        else:
            end_pos = len(text)
        
        section_text = text[start_pos:end_pos]
        # Clean the section text by removing the header
        cleaned_section = clean_section_text(section_text)
        article_sections.append((cleaned_section, article_num))
    
    return article_sections

# Process the original text to get article sections
original_text = docs[0].page_content
article_sections = extract_article_sections(original_text)

# Modified text splitter
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=300,
    chunk_overlap=50
)

# Create splits with article metadata
splits = []
for section_text, article_num in article_sections:
    section_splits = text_splitter.create_documents(
        texts=[section_text],
        metadatas=[{
            "source": file_path,
            "source_article": article_num
        }]
    )
    splits.extend(section_splits)

# Index with the enhanced metadata
vector_store = Chroma.from_documents(
    documents=splits,
    embedding=OpenAIEmbeddings()
)

retriever = vector_store.as_retriever()

### Create chat models

In [5]:
retriever_llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
generator_llm = ChatOpenAI(model_name="gpt-4o", temperature=0)

### Define retriever tool

In [6]:
def get_unique_union(documents: list[list]):
    """ Unique union of retrieved docs with article numbers """
    # Flatten list of lists, and convert each Document to string
    # We'll add the article number to the string representation to make it part of uniqueness check
    flattened_docs = [dumps(doc.page_content + doc.metadata.get('source_article', '')) 
                    for sublist in documents 
                    for doc in sublist]
    # print("Total number of docs retrieved", len(flattened_docs))
    unique_docs = list(set(flattened_docs))
    # print("Total number of unique docs retrieved", len(unique_docs))

    # When loading back, we need to split the content and metadata
    loaded_unique_docs = []
    for doc_str in unique_docs:
        doc = loads(doc_str)
        # Get original document from documents list to preserve metadata
        original_doc = next((d for sublist in documents 
                        for d in sublist 
                        if d.page_content + d.metadata.get('source_article', '') == doc), None)
        if original_doc:
            loaded_unique_docs.append(original_doc)

    # for doc in loaded_unique_docs:
    #     print("=======================")
    #     print(f"Article {doc.metadata.get('source_article', 'N/A')}:")
    #     print(doc.page_content)

    return loaded_unique_docs

In [7]:
# Modified chain to format context with article numbers
def format_context_with_articles(docs):
    formatted_contexts = []
    for doc in docs:
        article_num = doc.metadata.get('source_article', 'N/A')
        formatted_contexts.append(f"[Artículo {article_num}] {doc.page_content}")

    # print("++++++++++++++++++")
    # print("\n\n".join(formatted_contexts))
    return "\n\n".join(formatted_contexts)

In [8]:
@tool(response_format="content_and_artifact")
def extraer(pregunta: str):
    """Extrae información relacionada a la pregunta"""
    # Multi Query: Different Perspectives
    template = (
        "Eres una IA asistente modelo de lenguaje. Tu tarea es generar cinco "
        "diferentes versiones de la pregunta dada por el usuario para extraer los "
        "documentos relevantes de una base de datos de vectores. Al generar múltiples "
        "perspectivas de la pregunta del usuario, tu objetivo es ayudar al usuario "
        "a superar algunas de las limitaciones de la búsqueda de similaridad basada "
        "en distancia. Escribe estas preguntas alternativas separadas por caracteres "
        "de nueva línea, sin enumerar ni listar. Pregunta original: {pregunta}"
    )

    prompt_perspectives = ChatPromptTemplate.from_template(template)

    generate_queries = (
        prompt_perspectives
        | retriever_llm
        | StrOutputParser()
        | (lambda x: x.split("\n"))
    )

    # retriever.map is the one that brings the list of documents most similar
    # to each of the queries generated by generate_queries

    retrieval_chain = generate_queries | retriever.map() | get_unique_union
    docs = retrieval_chain.invoke({"pregunta": pregunta})

    serialized = format_context_with_articles(docs)

    return serialized, docs

# Wrap the retriever in a ToolNode
tools = ToolNode([extraer])
retriever_llm_with_tools = retriever_llm.bind_tools([extraer])

### Define query and generator graph nodes

In [9]:
# Step 1: Generate an AIMessage that may include a tool-call to be sent.
def query_or_respond(state: MessagesState):
    """Generate tool call for retrieval or respond."""
    response = retriever_llm_with_tools.invoke(state["messages"])
    # MessagesState appends messages to state instead of overwriting
    return {"messages": [response]}

# Step 2: Generate a response using the retrieved content.
def generate(state: MessagesState):
    """Generate answer."""
    # Get generated ToolMessages
    recent_tool_messages = []
    for message in reversed(state["messages"]):
        if message.type == "tool":
            recent_tool_messages.append(message)
        else:
            break
    tool_messages = recent_tool_messages[::-1]

    # Format into prompt
    docs_content = "\n\n".join(doc.content for doc in tool_messages)

    system_message_content = (
        "Eres un asistente de leyes de tránsito en Colombia. Usa las siguientes "
        "piezas de contexto extraídas para responder la pregunta. Si la respuesta "
        "no está en el contexto, di que no sabes. Primero menciona la respuesta "
        "rápida (si la hay), y después da un poco más de detalles, mencionando "
        "el artículo en que te basaste en cada parte de la respuesta al principio "
        "de dicha parte, diciendo \"Basado en el artículo X,...\""
        "\n\n"
        "Contexto con artículos:\n\n"
        f"{docs_content}"
    )

    conversation_messages = [
        message
        for message in state["messages"]
        if message.type in ("human", "system")
        or (message.type == "ai" and not message.tool_calls)
    ]
    prompt = [SystemMessage(system_message_content)] + conversation_messages

    # Run
    response = generator_llm.invoke(prompt)
    return {"messages": [response]}

### Build Graph

In [10]:
graph_builder = StateGraph(MessagesState)

In [11]:
# Compile application and test
graph_builder.add_node(query_or_respond)
graph_builder.add_node(tools)
graph_builder.add_node(generate)

graph_builder.set_entry_point("query_or_respond")
graph_builder.add_conditional_edges(
    "query_or_respond",
    tools_condition,
    {END: END, "tools": "tools"},
)
graph_builder.add_edge("tools", "generate")
graph_builder.add_edge("generate", END)

graph = graph_builder.compile()

### Test

In [12]:
input_message = "Hola"

for step in graph.stream(
    {"messages": [{"role": "user", "content": input_message}]},
    stream_mode="values",
):
    step["messages"][-1].pretty_print()


Hola

¡Hola! ¿En qué puedo ayudarte hoy?


In [13]:
input_message = "una moto puede ir por la linea punteada amarilla entre carros"

for step in graph.stream(
    {"messages": [{"role": "user", "content": input_message}]},
    stream_mode="values",
):
    step["messages"][-1].pretty_print()


una moto puede ir por la linea punteada amarilla entre carros
Tool Calls:
  extraer (call_TWULohwqQJcauzee5N28s6wo)
 Call ID: call_TWULohwqQJcauzee5N28s6wo
  Args:
    pregunta: una moto puede ir por la linea punteada amarilla entre carros


  doc = loads(doc_str)


Name: extraer

[Artículo 68] De cuatro (4) carriles: Los carriles exteriores se utilizarán para el tránsito ordinario de vehículos, y los interiores, para maniobras de adelantamiento o para circular a mayores velocidades dentro de los límites establecidos.
PARÁGRAFO 1o. Sin perjuicio de las normas que sobre el particular se establecen en este código, las bicicletas, motocicletas, motociclos, mototriciclos y vehículos de tracción animal e impulsión humana, transitarán de acuerdo con las reglas que en cada caso dicte la autoridad de tránsito competente. En todo caso, estará prohibido transitar por los andenes o aceras, o puentes de uso exclusivo para los peatones.
PARÁGRAFO 2o. Se prohíbe el tránsito de motocicletas y motociclos por las ciclorrutas o ciclovías. En caso de infracción se procederá a la inmovilización.

[Artículo 60] OBLIGATORIEDAD DE TRANSITAR POR LOS CARRILES
DEMARCADOS. Los vehículos deben transitar, obligatoriamente, por sus respectivos carriles, dentro de las líneas de