In [None]:
import imaplib
import email
from email.header import decode_header
from bs4 import BeautifulSoup
import polars as pl
import unicodedata
import ollama
import gspread
import pandas as pd
import numpy as np
import re


class EmailProcessor:
    def __init__(self, imap_server: str, email_address: str, app_password: str,
                 gsheets_key: str, credenciales_path: str = "credenciales.json", folder: str = "Facturas"):
        """
        Inicializa la clase para manejar la lectura, procesamiento y carga de correos a Google Sheets.

        Par√°metros:
        -----------
        imap_server : str
            Servidor IMAP (por ejemplo, 'imap.gmail.com')
        email_address : str
            Direcci√≥n de correo completa
        app_password : str
            Contrase√±a de aplicaci√≥n (no la del correo normal)
        gsheets_key : str
            ID del archivo de Google Sheets
        credenciales_path : str
            Ruta al archivo JSON de credenciales de servicio de Google
        folder : str
            Carpeta de correos a leer (por defecto: "Facturas")
        """
        self.imap_server = imap_server
        self.email_address = email_address
        self.app_password = app_password
        self.folder = folder
        self.gsheets_key = gsheets_key
        self.credenciales_path = credenciales_path
        self.ollama_client = ollama.Client()

    # ----------------------------------------------------------
    # 1Ô∏è‚É£ LECTURA DE CORREOS
    # ----------------------------------------------------------
    def get_unread_emails(self) -> pl.DataFrame:
        """Lee correos no le√≠dos y los devuelve como DataFrame Polars."""

        imap = imaplib.IMAP4_SSL(self.imap_server)
        imap.login(self.email_address, self.app_password)
        imap.select(f'"{self.folder}"')
        print(f"üì• Conectado a la carpeta {self.folder}")

        # def decode_subject(subject_header):
        #     decoded_subject = decode_header(subject_header)
        #     subject = decoded_subject[0][0]
        #     if isinstance(subject, bytes):
        #         subject = subject.decode(decoded_subject[0][1] or "utf-8", errors="ignore")
        #     return subject
        def decode_subject(subject_header):
            decoded_subject = decode_header(subject_header)

            part, encoding = decoded_subject[0]

            if isinstance(part, bytes):
                try:
                    # Si la codificaci√≥n es inv√°lida ‚Üí usa utf-8
                    subject = part.decode(encoding or "utf-8", errors="ignore")
                except LookupError:
                    subject = part.decode("utf-8", errors="ignore")
            else:
                subject = part or ""
            return subject

        def extract_body(message):
            body = None
            if message.is_multipart():
                for part in message.walk():
                    content_type = part.get_content_type()
                    content_disposition = str(part.get("Content-Disposition") or "")
                    if "attachment" in content_disposition:
                        continue
                    if content_type == "text/plain":
                        body = part.get_payload(decode=True).decode("utf-8", errors="ignore")
                    elif content_type == "text/html":
                        html_content = part.get_payload(decode=True).decode("utf-8", errors="ignore")
                        soup = BeautifulSoup(html_content, "html.parser")
                        body = " ".join(soup.get_text(separator=" ", strip=True).split())
            else:
                content_type = message.get_content_type()
                content = message.get_payload(decode=True).decode("utf-8", errors="ignore")
                if content_type == "text/html":
                    soup = BeautifulSoup(content, "html.parser")
                    body = " ".join(soup.get_text(separator=" ", strip=True).split())
                else:
                    body = content
            return body or ""

        status, messages = imap.search(None, "UNSEEN")
        email_data = []

        for num in messages[0].split()[::-1]:
            _, msg = imap.fetch(num, "(RFC822)")
            message = email.message_from_bytes(msg[0][1])

            subject = decode_subject(message["Subject"])
            body = extract_body(message)

            email_data.append({
                "Subject": subject,
                "From": message["From"],
                "Date": message["Date"],
                "Body": body
            })

        imap.close()
        imap.logout()
        print(f"‚úÖ {len(email_data)} correos no le√≠dos cargados desde {self.folder}")

        return pl.DataFrame(email_data)

    # ----------------------------------------------------------
    # 2Ô∏è‚É£ PROCESAMIENTO DE CORREOS
    # ----------------------------------------------------------
    def procesar_correos(self, correos_df: pl.DataFrame) -> pl.DataFrame:
        """Procesa los correos para extraer comercio, fecha, monto, etc."""
        Modelo = "gemma3:4b"
        prompts = {
            "Comercio_raw": "En qu√© comercio se realiz√≥ la compra de la siguiente factura, responda solamente con el nombre del comercio o tienda: ",
            "Fecha_raw": "En qu√© fecha se realiz√≥ la compra de la siguiente factura, responda solamente con la fecha en formato dd/mm/yyyy: ",
            "Hora_raw": "En qu√© hora se realiz√≥ la compra de la siguiente factura, responda solamente con la hora en formato de 24 horas: ",
            "Moneda_raw": "En qu√© moneda se realiz√≥ la compra de la siguiente factura, responda solamente con el nombre de la moneda: ",
            "Monto_raw": "Cu√°nto es el monto de la compra en la siguiente factura, responda solamente con la cantidad: ",
            "Anulacion_raw": "El texto de la siguiente factura contiene la palabra anulaci√≥n, responda solamente con s√≠ o no: "
        }

        # Generar columnas con modelo Ollama
        for col, prompt in prompts.items():
            correos_df = correos_df.with_columns(
                pl.col("Body").map_elements(
                    lambda t, p=prompt: self.ollama_client.generate(model=Modelo, prompt=p + t).response,
                    return_dtype=pl.String
                ).alias(col)
            )

        # Funci√≥n para remover tildes
        def quitar_tildes(texto: str) -> str:
            if texto is None:
                return None
            return "".join(c for c in unicodedata.normalize("NFD", texto) if unicodedata.category(c) != "Mn")

        # Limpieza y refinamiento
        correos_df = correos_df.with_columns(
            (pl.col("Comercio_raw").str.strip_chars().str.to_uppercase()).alias("Comercio"),
            (pl.col("Fecha_raw").str.strip_chars().str.to_uppercase()
             .str.strptime(pl.Date, "%d/%m/%Y", strict=False)).alias("Fecha_refinado"),
            (pl.col("Hora_raw").str.strip_chars().str.to_uppercase()).alias("Hora"),
            (pl.col("Monto_raw").str.strip_chars().str.replace_all(",", "").cast(pl.Float32)).alias("Monto_refinado"),
            (pl.col("Anulacion_raw").str.strip_chars().str.to_uppercase().map_elements(quitar_tildes, return_dtype=pl.String)
             .str.extract(r"(?i)\b(SI|NO)\b")).alias("Anulacion"),
            (pl.col("Moneda_raw").str.strip_chars().str.to_uppercase().map_elements(quitar_tildes, return_dtype=pl.String)).alias("Moneda_refinado")
        )

        # Transformaciones finales
        correos_df = correos_df.with_columns(
            (pl.when(pl.col("Anulacion") == "SI")
             .then(-1 * pl.col("Monto_refinado"))
             .otherwise(pl.col("Monto_refinado"))).round(2).alias("Monto"),
            (pl.when(pl.col("Moneda_refinado").str.contains_any(["CRC", "COLON", "COSTA"]))
             .then(pl.lit("CRC"))
             .when(pl.col("Moneda_refinado").str.contains_any(["USD", "DOLAR", "US"]))
             .then(pl.lit("USD"))
             .otherwise(pl.col("Moneda_refinado"))).alias("Moneda"),
            (pl.col("Fecha_refinado").dt.strftime("%d/%m/%Y")).alias("Fecha")
        ).with_columns(
            (pl.when(pl.col("Moneda") == "USD").then(pl.col("Monto")).otherwise(0)).alias("Monto_USD"),
            (pl.when(pl.col("Moneda") == "CRC").then(pl.col("Monto")).otherwise(0)).alias("Monto_CRC")
        ).sort("Fecha_refinado", descending = False )

        print( '‚úÖ Datos procesados' )

        return correos_df.select(["Comercio", "Monto_CRC", "Monto_USD", "Fecha"])

    # ----------------------------------------------------------
    # 3Ô∏è‚É£ SUBIR A GOOGLE SHEETS
    # ----------------------------------------------------------
    def subir_datos_a_gsheets(self, df_datos_agregar: pl.DataFrame) -> str:
        """Agrega datos a la hoja de c√°lculo de Google Sheets."""
        gc = gspread.service_account(filename=self.credenciales_path)
        sh = gc.open_by_key(self.gsheets_key)
        worksheet = sh.worksheet("Base")

        df_existente = (
            pd.DataFrame(worksheet.get_all_records())
            .replace("", np.nan)
            .dropna(subset=["Gasto", "Fecha"], inplace=False)
        )

        if df_existente.empty:
            raise ValueError("‚ùå La hoja est√° vac√≠a o no contiene columnas 'Gasto' y 'Fecha'.")

        ultimo_gasto = df_existente["Gasto"].iloc[-1]
        ultima_celda = worksheet.findall(str(ultimo_gasto))[-1].address

        letras = "".join(re.findall(r"[A-Za-z]+", ultima_celda))
        numeros = "".join(re.findall(r"\d+", ultima_celda))
        siguiente_posicion = letras + str(int(numeros) + 1)

        filas_para_agregar = df_datos_agregar.to_pandas().to_numpy().tolist()
        worksheet.append_rows(filas_para_agregar, table_range=siguiente_posicion, value_input_option="USER_ENTERED")

        print(f"‚úÖ Datos agregados exitosamente en: !{siguiente_posicion}")
        return siguiente_posicion

    # ----------------------------------------------------------
    # 4Ô∏è‚É£ FLUJO COMPLETO
    # ----------------------------------------------------------
    def ejecutar(self):
        """Ejecuta todo el flujo: leer correos ‚Üí procesar ‚Üí subir a Sheets."""
        correos_df = self.get_unread_emails()
        datos = self.procesar_correos(correos_df)
        self.subir_datos_a_gsheets(datos)

# =======================
# üöÄ USO DEL SCRIPT
# =======================
if __name__ == "__main__":
    processor = EmailProcessor(
        imap_server="imap.gmail.com",
        email_address="<CORREO>@gmail.com",
        app_password="< APP PASSWORD DEL CORREO >",
        gsheets_key="<Llave de hoja de calculos google sheet>",
        credenciales_path="<ARCHIVO CREDENCIALES>.json",
        folder="< NOMBRE DE FOLDER A UTILIZAR >"
    )

    processor.ejecutar()