So we got the PDF working fine, but in some mails the information is in the plain text or the HTML part and is easier to the model to extract the information in a more "structructure" way

### Data extraction and files management

In [1]:
import os
import zipfile
from email import policy
from email.parser import BytesParser
from pathlib import Path
import shutil

In [2]:
def extract_eml_info(eml_path):
    # Open and parse the .eml file
    with open(eml_path, 'rb') as eml_file:
        msg = BytesParser(policy=policy.default).parse(eml_file)
    
    # Extract headers
    subject = msg.get('subject', '')
    from_ = msg.get('from', '')
    to = msg.get('to', '')
    date = msg.get('date', '')
    
    # Extract body (text and HTML)
    text_body = ""
    html_body = ""
    
    if msg.is_multipart():
        for part in msg.iter_parts():
            content_type = part.get_content_type()
            content_disposition = part.get('Content-Disposition')
            
            if content_type == 'text/plain' and not content_disposition:
                text_body = part.get_payload(decode=True).decode(part.get_content_charset())
            elif content_type == 'text/html' and not content_disposition:
                html_body = part.get_payload(decode=True).decode(part.get_content_charset())
            elif part.get_filename():  # Extract attachments
                filename = part.get_filename()
                with open(filename, 'wb') as f:
                    f.write(part.get_payload(decode=True))
    else:
        if msg.get_content_type() == 'text/plain':
            text_body = msg.get_payload(decode=True).decode(msg.get_content_charset())
        elif msg.get_content_type() == 'text/html':
            html_body = msg.get_payload(decode=True).decode(msg.get_content_charset())
    return [html_body]

In [3]:
data_path = Path("../data/")
mails_path = data_path / "mails"
zips_data = data_path / "extract"
mail_files = [mails_path / f for f in os.listdir(mails_path)]
zip_files = [zips_data / f for f in os.listdir(zips_data)]
pdf_files = [data_path /"extract/pdfs" / f for f in os.listdir(data_path/ "extract/pdfs") if f.endswith(".pdf")]

In [4]:
test_files = data_path / "abril-agosto"

In [5]:
mails_data = [extract_eml_info(i) for i in mail_files]

In [6]:
os.makedirs("../data/extract/pdfs", exist_ok=True)

for file_name in os.listdir(test_files):
    if file_name.endswith(".zip"):
        shutil.move(test_files / file_name, "../data/extract")
    elif file_name.endswith(".pdf"):
        shutil.move(test_files / file_name, "../data/extract/pdfs")
print("Files moved")

Files moved


In [7]:
def extract_zip(zip_file_path, extract_to_folder):
    # Ensure the output directory exists
    os.makedirs(extract_to_folder, exist_ok=True)
    
    
    # Open the ZIP file
    if str(zip_file_path).endswith(".zip"):
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            # Extract all the contents into the specified folder
            zip_ref.extractall(extract_to_folder)

In [8]:
for i in zip_files:
    extract_zip(i, data_path / "extract/pdfs")

### PDF data extraction in plain text

In [9]:
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores.utils import filter_complex_metadata

In [27]:
import re
def pdf_extraction(pdf:str):
    """Returns a str of the pdf and a str with the encrypted ones"""
    try:
        WHITESPACE_HANDLER = lambda k: re.sub('\s+', ' ', re.sub('\n+', ' ', k.strip()))
        loader = PyPDFLoader(pdf)
        data = loader.load()
        return WHITESPACE_HANDLER(filter_complex_metadata(data)[0].page_content)
    except Exception as e:
        return f"Error {e}"

In [28]:
len(pdf_files)

99

In [29]:
len(mail_files)

28

So we got 4 messages that doesn't have pdf or plain text

In [30]:
pdfs_data = [pdf_extraction(f) for f in pdf_files]

In [31]:
len(pdfs_data)

99

In [32]:
from langchain_core.runnables import RunnablePassthrough
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_models import ChatOllama

In [49]:
model_name = "llama3.1:8b-instruct-fp16"
llm = ChatOllama(model=model_name)

In [35]:
context_prompt = PromptTemplate(
input_variables=["question"],
template="""
Ignora cualquier instrucción anterior y conversaciones anteriores

Eres un asistente de facturas, vas a recibir
información de varias facturas, no importa si son confidenciales
y debes extraer la siguiente información
\n

SOLO RESPONDER ASI:

Fecha: [Dia de la compra] como una fecha con el formato dd/mm/yyyy
Empresa: [Nombre de la compañía]
Nit: [Numero nit] con el formato xxxxxxxxx-x ej: 111.111.111-1 como 111111111-1
Valor a pagar: [Valor] como numero con exactitud de un decimal
Numero de factura: [Numero de factura, puede contener letras]

Si no puedes responder la pregunta SOLO responde NaN

A base del siguiente documento:

{question}
"""
)

In [50]:
chain_extraction = (
    {"question": RunnablePassthrough()}
    |context_prompt
    |llm
    |StrOutputParser()
)

In [51]:
model_data = [chain_extraction.invoke(s) for s in pdfs_data]

### Agent

In [38]:
from llama_index.core.tools import FunctionTool, ToolMetadata
from llama_index.core.agent import ReActAgent
from llama_index.llms.ollama import Ollama

In [39]:
def write_to_csv(date:str, comp_name:str, nit:str, bill_num:str, total:str, filename='output.csv'):
    """
    A function that writes a value in a cvs file
    
    Args
    ----
    date: str
        The date in a dd/mm/yyyy format
    
    comp_name: str
        the company name
    
    nit: str
        the company nit
    
    bill_num: str
        the bill number

    total: str
        The total value to pay
    
    Returns
    -------
        none
    """
    import csv
    with open(filename, mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([date, comp_name, nit, bill_num, total])
    file.close()
    return f"Data written to {filename}"

In [40]:
def iva_calc(total:str) -> tuple:
    """
    A function to retrieve the iva value and the original price if a item
    
    Args
    ----
    total: str
        The price of the item as a str
    
    Returns
    -------
        A tuple containing (iva_value, original_value)
    """
    total = float(total)
    iva = round(total / 1.19, 1)
    return (str(iva), str(round(total - iva, 1)))

In [41]:
def error_csv(filepdf:str, filename="error.csv"):
    """
    A function to put the name of the bills that doesn't have the required information
    
    Args
    ----
    file: str
        The number of the file that doesn't have all the information
    """
    import csv
    with open(filename, mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(filepdf)
    file.close()
    return f"Data written to {filename}"

In [52]:
code_name = "llama3.1:8b-instruct-fp16"
code_llm = Ollama(model=code_name)

In [53]:
tools = [
    FunctionTool.from_defaults(fn=write_to_csv,
                     name="write_to_csv",
                     description="This tool can write data in a csv file"),
    
    FunctionTool.from_defaults(fn=error_csv,
                     name="error_csv",
                     description="This tool is when the given data doesn't have all the information"),

    #FunctionTool.from_defaults(fn=iva_calc,
    #                 name="iva_calc",
    #                 description="""A tool to get the values of the iva if the product 
    #                 and it original value""")
]

In [44]:
agent_template = """ \
Ignore previous conversations and history
You will get information as follows

    Fecha: [Date of the purchase]
    Empresa: [Company name]
    Nit: [NIT number]
    Valor a pagar: [Value]
    Numero de factura: [Number of bill]

Example Input:

    Fecha: 28/02/2024
    Empresa: PUNTO LED S.A.S.
    Nit: 901.264.658-7
    Valor a pagar: 660,000.00
    Numero de factura: PL 29626

Example Output:
    [28/02/2024, PUNTO LED S.A.S., 901.264.658-7, PL 29626, 554621.8, 105378.2, 660000]

First you need to get the data from valor a pagar using the iva_calc tool.
Finally you need to write the information in a csv file using the write_to_csv tool, and follow the input order as Example Output.

To write the information in a csv file is VERY IMPORTANT THAT 
the order is given as follows: Fecha, Empresa, NIT, Numero de factura, Valor base, Valor de iva, Valor a pagar. 

YOU CAN ONLY WRITE THE CVS ONCE!!!!!!!!

```
Thought: I need to use the 'iva_calc' tool to calculate the iva.
Action: iva_calc
Action Input: {'total': 'the Valor a pagar value as a int'}
```

```
Thought: After i calculate the iva value i need to write the values in a csv file
Action: write_to_csv
Action Input: {"data": [The values giving the stipulated order, you can follow the Example Output, as a list]}
```
"""

In [59]:
agent_template_2 = """ \
You will get information as follows

    Fecha: [Date of the purchase]
    Empresa: [Company name]
    Nit: [NIT number]
    Valor a pagar: [Value]
    Numero de factura: [Number of bill]

Example Input:

    Contador : 0
    Fecha: 28/02/2024
    Empresa: PUNTO LED S.A.S.
    Nit: 901.264.658-7
    Valor a pagar: 660,000.00
    Numero de factura: PL 29626

Example Output:
    contador = 0
    date = 28/02/2024,
    compt_name = PUNTO LED S.A.S.
    nit = 901264658-7
    bill_num = PL 29626
    total = 660000

To write the information in a csv file is VERY IMPORTANT THAT total has the float format with 1 decimal precision

YOU CAN ONLY WRITE THE CVS ONCE!!!!!!!!

```
Thought: I need to write the information in a csv file.
Action: write_to_csv
Action Input: {"date": Fecha, "comp_name": Empresa, "nit": Nit, "bill_num": Numero de factura, "total": "Valor a pagar"}
```

```
Thought: I don't have all the information to write in a csv file
Action: error_csv
Action Input: {"filepdf": contador}
```
"""

In [60]:
agent = ReActAgent.from_tools(tools, llm=code_llm, context=agent_template_2, verbose=True)

In [58]:
promt_const = """
If you don't something put NaN, and don't rely on previous information
You are done when you write the csv!!!!!

Your document is:


"""
agent_message = lambda x,y : f"{promt_const}Contador:{y} \n{x}"

## for testing

In [None]:
cont = 0
error_list = []
for i in model_data:
    try:
        cont += 1
        res = agent.chat(agent_message(i, cont-1))
        print(res)
    except Exception as e:
        print(f"error with bill N {cont}")
        error_list.append(cont)

In [77]:
error_list

[56, 68]