In [None]:
#Imports & Configuration
import os
import glob
import re
from typing import Dict, List, Optional

import pandas as pd
from pypdf import PdfReader  # make sure pypdf is installed in your venv

#-----------------------------------------------------------------------------------------------------------------------------#

#PDF Ingestion & Parsing
#This part scans all your invoice PDFs, extracts text, and parses key fields.

def extract_text_from_pdf(pdf_path: str) -> str:
    """
    Extracts full text from a single PDF using pypdf.
    Returns a single string with page texts joined by newlines.
    """
    text = ""
    with open(pdf_path, "rb") as f:
        reader = PdfReader(f)
        for page in reader.pages:
            page_text = page.extract_text() or ""
            text += page_text + "\n"
    return text


def _grab(pattern: str, text: str, default: str = "") -> str:
    """
    Helper function: apply regex and return first group or default if not found.
    """
    m = re.search(pattern, text, re.IGNORECASE)
    return m.group(1).strip() if m else default


def parse_invoice_from_text(text: str) -> Dict:
    """
    Parses key invoice fields from raw text.
    Adjust regex patterns if your invoice format changes.
    """
    invoice_no = _grab(r"Invoice Number:\s*(.+)", text)
    customer_name = _grab(r"Customer Name:\s*(.+)", text)
    date_str = _grab(r"Date:\s*(.+)", text)
    total_str = _grab(r"Total Amount:\s*([\d\.]+)", text)
    payment_status = _grab(r"Payment Status:\s*(.+)", text)
    product = _grab(r"Products?:\s*(.+)", text)

    # Convert numeric fields safely
    try:
        total_val = float(total_str)
    except (TypeError, ValueError):
        total_val = 0.0

    return {
        "Invoice No": invoice_no,
        "Customer Name": customer_name,
        "Date": date_str,
        "Total_From_PDF": total_val,
        "Payment Status": payment_status,
        "Product_From_PDF": product,
        "Raw_Text": text,
    }


def build_pdf_index(pdf_dir: str) -> pd.DataFrame:
    """
    Scans all invoice PDFs in the given directory and returns a DataFrame
    with one row per invoice containing parsed fields and metadata.

    Expected file naming in pdf_dir: Invoice_1.pdf, Invoice_2.pdf, ...
    """
    rows: List[Dict] = []

    # Adjust the glob pattern if your naming differs
    pattern = os.path.join(pdf_dir, "Invoice_*.pdf")
    pdf_paths = glob.glob(pattern)

    if not pdf_paths:
        print(f"[WARN] No PDFs found in {pdf_dir} matching pattern 'Invoice_*.pdf'")

    for pdf_path in pdf_paths:
        try:
            raw_text = extract_text_from_pdf(pdf_path)
            meta = parse_invoice_from_text(raw_text)
            meta["Source_File"] = os.path.basename(pdf_path)
            rows.append(meta)
        except Exception as e:
            print(f"[ERROR] Failed to process {pdf_path}: {e}")

    pdf_df = pd.DataFrame(rows)
    return pdf_df

PDF_DIR = "."  # current folder
pdf_df = build_pdf_index(PDF_DIR)
pdf_df.head()

#-----------------------------------------------------------------------------------------------------------------------------#
def load_excel_database(excel_path: str) -> pd.DataFrame:
    """
    Loads the Excel invoice database into a pandas DataFrame.
    """
    if not os.path.exists(excel_path):
        raise FileNotFoundError(f"Excel file not found: {excel_path}")
    df = pd.read_excel(excel_path)
    return df

EXCEL_PATH = "your_database.xlsx"
excel_df = load_excel_database(EXCEL_PATH)
excel_df.head()
excel_df.columns

#-----------------------------------------------------------------------------------------------------------------------------#

def parse_question(question: str) -> Dict[str, Optional[str]]:
    """
    Parses a natural language question and returns:
      - intent: 'count_quantity', 'sum_total', or 'unknown'
      - product: normalized product name like 'Product_37', or None if not found
    """
    q = question.lower()

    # Detect product number: product 37, product_37, product37
    m = re.search(r"product[_\s]*(\d+)", q)
    product: Optional[str] = f"Product_{m.group(1)}" if m else None

    # Detect intent
    if "how many" in q or "quantity" in q:
        intent = "count_quantity"
    elif "total" in q and ("sale" in q or "amount" in q or "revenue" in q):
        intent = "sum_total"
    else:
        intent = "unknown"

    return {"intent": intent, "product": product}

#-----------------------------------------------------------------------------------------------------------------------------#
print(parse_question("How many Product 37 has been sold?"))
print(parse_question("What is the total amount of sale from Product 59?"))
#-----------------------------------------------------------------------------------------------------------------------------#


def get_invoices_from_pdfs_for_product(pdf_df: pd.DataFrame, product: str) -> List[str]:
    """
    Returns a list of invoice numbers from the PDF index that mention the given product.
    """
    if "Product_From_PDF" not in pdf_df.columns:
        return []

    # Normalize to lowercase for comparison
    mask = pdf_df["Product_From_PDF"].astype(str).str.lower() == product.lower()
    invoice_ids = (
        pdf_df.loc[mask, "Invoice No"]
        .dropna()
        .astype(str)
        .unique()
        .tolist()
    )
    return invoice_ids
#-----------------------------------------------------------------------------------------------------------------------------#

#Core QA Engine (Iteration 1)

def answer_question(
    question: str,
    pdf_df: pd.DataFrame,
    excel_df: pd.DataFrame
) -> str:
    """
    Iteration 1 (revised):
    - Parse question -> intent + product
    - Use the Excel database as the source of truth for numbers
    - (PDFs are ingested but not used as a hard filter yet)
    """
    parsed = parse_question(question)
    intent = parsed["intent"]
    product = parsed["product"]

    if product is None:
        return (
            "I couldn't identify which product you're asking about. "
            "Please refer to it like 'Product 37' or 'Product_59'."
        )

    # 1) Filter Excel by product only
    if "Product" not in excel_df.columns:
        return "The Excel database does not have a 'Product' column."

    product_mask = excel_df["Product"].astype(str).str.lower() == product.lower()
    relevant = excel_df[product_mask]

    if relevant.empty:
        return f"I couldn't find any records for {product} in the Excel database."

    # 2) Aggregate based on intent
    if intent == "count_quantity":
        if "Quantity" not in relevant.columns:
            return "The Excel database does not have a 'Quantity' column."
        total_qty = float(relevant["Quantity"].sum())
        n_invoices = int(relevant["Invoice No"].nunique())
        qty_str = str(int(total_qty)) if total_qty.is_integer() else f"{total_qty:.2f}"
        return (
            f"{qty_str} units of {product} have been sold across "
            f"{n_invoices} invoice(s), based on the Excel database."
        )

    elif intent == "sum_total":
        if "Total" not in relevant.columns:
            return "The Excel database does not have a 'Total' column."
        total_amount = float(relevant["Total"].sum())
        n_invoices = int(relevant["Invoice No"].nunique())
        return (
            f"The total sales amount for {product} is {total_amount:.2f} "
            f"across {n_invoices} invoice(s), based on the Excel database."
        )

    else:
        # Unknown intent but we have data
        sample_cols = [c for c in ["Invoice No", "Product", "Quantity", "Total"] if c in relevant.columns]
        sample = relevant[sample_cols].head()
        return (
            "I found matching records for that product, "
            "but I don't yet understand this type of question.\n\n"
            "Here is a sample of the data I used:\n"
            f"{sample.to_markdown(index=False)}"
        )


#-----------------------------------------------------------------------------------------------------------------------------#
# Build dataframes
PDF_DIR = "."
EXCEL_PATH = "your_database.xlsx"

pdf_df = build_pdf_index(PDF_DIR)
excel_df = load_excel_database(EXCEL_PATH)

# Try some questions
questions = [
    "How many Product 37 has been sold?",
    "What is the total amount of sale from Product 59?",
    "How many Product 99 have been sold?",  # try a product that may not exist
]

for q in questions:
    print("Q:", q)
    print("A:", answer_question(q, pdf_df, excel_df))
    print("-" * 80)


{'intent': 'count_quantity', 'product': 'Product_37'}
{'intent': 'sum_total', 'product': 'Product_59'}
Q: How many Product 37 has been sold?
A: 6 units of Product_37 have been sold across 1 invoice(s), based on the invoice PDFs and the Excel database.
--------------------------------------------------------------------------------
Q: What is the total amount of sale from Product 59?
A: The total sales amount for Product_59 is 4614.06 across 1 invoice(s), based on the invoice PDFs and the Excel database.
--------------------------------------------------------------------------------
Q: How many Product 99 have been sold?
A: I couldn't find any invoices mentioning Product_99 in the PDF files.
--------------------------------------------------------------------------------
