In [None]:
import logging
import os
import re
from io import BytesIO
import cv2
import numpy as np
import pytesseract
from PIL import Image
from fastapi import FastAPI, UploadFile, File, HTTPException, Header, Depends
from fastapi.responses import HTMLResponse, JSONResponse
from pdf2image import convert_from_bytes
import pymupdf  # PyMuPDF
import uvicorn

app = FastAPI()

os.environ['TESSDATA_PREFIX'] = r'C:\Program Files\Tesseract-OCR\tessdata'
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'

API_KEY_NAME = "api"
API_KEY_VALUE = "mokamoka"


def get_api_key(api: str = Header(...)) -> str:
    if api != API_KEY_VALUE:
        raise HTTPException(status_code=401, detail="Invalid API Key")
    return api


siret_pattern = re.compile(r'N°\s*de\s*SIRET\s*:\s*(\d{3}\s?\d{3}\s?\d{3}\s?\d{5})', re.IGNORECASE)
iban_pattern = re.compile(r'\b(?:[A-Z]\.?\s*){2}(?:\d\s*){2}(?:[A-Z0-9]\s*){20,23}\b', re.IGNORECASE)
titulaire_pattern = re.compile(r'(?:Titulaire\s+du\s+compte|Titulaire|Acount Owner)\s*[:\-]?\s*([A-Za-z\s]{4,100})', re.IGNORECASE)

facture_patterns = {
    'DATE': re.compile(r'DATE\s*:\s*(\d{2}/\d{2}/\d{4})', re.IGNORECASE),
    'N° de SIRET': re.compile(r'N°\s*de\s*SIRET\s*:\s*(\d{3}\s?\d{3}\s?\d{3}\s?\d{5})', re.IGNORECASE),
    'Facture n°': re.compile(r'Facture\s*n°\s*(\d+)', re.IGNORECASE),
    'Client': re.compile(r'Client\s*:\s*(.+?)\s*(?:\n|Lieu\s*formation)', re.IGNORECASE),
    'SOUS-TOTAL (HT)': re.compile(r'SOUS-TOTAL\s*\(HT\)\s*([\d\s\.,]+)(?:\s*€)?', re.IGNORECASE),
    'TOTAL DÛ (TTC)': re.compile(r'TOTAL\s*DÛ\s*\(TTC\)\s*([\d\s\.,]+)(?:\s*€)?', re.IGNORECASE)
}


def preprocess_image(image_bytes: bytes) -> np.ndarray:
    np_img = np.frombuffer(image_bytes, np.uint8)
    img = cv2.imdecode(np_img, cv2.IMREAD_COLOR)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    clahe_applied = clahe.apply(gray)
    denoised = cv2.fastNlMeansDenoising(clahe_applied, None, h=30, templateWindowSize=7, searchWindowSize=21)
    processed_image = cv2.adaptiveThreshold(denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
    return processed_image


def extract_text_from_image(image_bytes: bytes, psm_mode: int = 6, oem_mode: int = 3, lang: str = 'eng') -> str:
    try:
        preprocessed_image = preprocess_image(image_bytes)
        pil_image = Image.fromarray(preprocessed_image)
        custom_config = f'--oem {oem_mode} --psm {psm_mode} -l {lang}'
        text = pytesseract.image_to_string(pil_image, config=custom_config)
        logging.info("Text extraction successful")
        return text
    except Exception as e:
        logging.error(f"Error during text extraction: {e}")
        return ""


def extract_data_from_pdf(file_bytes: bytes) -> dict:
    extracted_info = {key: None for key in facture_patterns.keys()}
    try:
        pdf_document = pymupdf.open(stream=file_bytes, filetype='pdf')
        for page_num in range(len(pdf_document)):
            page = pdf_document.load_page(page_num)
            text = page.get_text("text")

            for field, pattern in facture_patterns.items():
                if not extracted_info[field]:
                    match = pattern.search(text)
                    if match:
                        extracted_info[field] = match.group(1).strip()
            if all(extracted_info.values()):
                break
        pdf_document.close()
    except Exception as e:
        logging.error(f"Error processing PDF: {e}")
        raise e

    return extracted_info


def extract_iban(text: str) -> list:
    all_ibans = set(iban_pattern.findall(text))
    return all_ibans


def clean_iban(iban: str) -> str:
    if not isinstance(iban, str):
        raise TypeError("Input must be a string")
    if not iban:
        return ''
    cleaned_iban = re.sub(r'[^A-Za-z0-9]', '', iban)
    return cleaned_iban


def preprocess_text(text: str) -> str:
    text = ' '.join(text.split())
    text = text.upper()
    pattern = re.compile(r'\bI\s*[\.\%\s]?\s*B\s*[\.\%\s]?\s*A\s*[\.\%\s]?\s*N\b', re.IGNORECASE)
    text = pattern.sub('IBAN', text)
    return text

def extract_titulaire_du_compte(text: str) -> str:
    match = titulaire_pattern.search(text)
    if match:
        return match.group(1).strip()
    return ""


def clean_titulaire(titulaire: str) -> str:
    if not isinstance(titulaire, str):
        raise TypeError("Input must be a string")
    unwanted_words = [
        "APPARTEMENT","MONSTEUR","MONSIEUR",
        "DOMICILIATION",
        "REVOLUT",
        "BANCAIRE",
        "DOMICILLTATION",
        "DOMICITIATION","DOMICLLLATION",
    ]
    for word in unwanted_words:
        pattern = re.compile(re.escape(word), re.IGNORECASE)
        titulaire = pattern.sub('', titulaire)
    titulaire = ' '.join(titulaire.split())
    return titulaire


@app.post("/IBAN_FACTURE/")
async def upload_file(file: UploadFile = File(...), api_key: str = Depends(get_api_key)):
    if not file.content_type.startswith(('image/', 'application/pdf')):
        raise HTTPException(status_code=400, detail="Invalid file type. Please upload an image or PDF.")

    try:
        file_bytes = await file.read()
        text = ""

        if file.content_type.startswith('image/'):
            text = extract_text_from_image(file_bytes)
        elif file.content_type == 'application/pdf':
            images = convert_from_bytes(file_bytes)
            for img in images:
                img_bytes = BytesIO()
                img.save(img_bytes, format='PNG')
                img_bytes.seek(0)
                text += extract_text_from_image(img_bytes.read())

        text = preprocess_text(text)
        siret_match = siret_pattern.search(text)
        iban_matches = extract_iban(text)

        if siret_match:
            # Extract facture data if SIRET is found
            data = extract_data_from_pdf(file_bytes) if file.content_type == 'application/pdf' else {}
            if not data:
                data = {"SIRET": siret_match.group(1)}
            return JSONResponse(content={"type": "facture", "data": data})

        if iban_matches:
            # Extract titulaire and return IBAN data
            titulaire = extract_titulaire_du_compte(text)
            cleaned_titulaire = clean_titulaire(titulaire)
            cleaned_ibans = [clean_iban(iban) for iban in iban_matches]
            return JSONResponse(content={
                "type": "iban",
                "data": {
                    "titulaire": cleaned_titulaire,
                    "iban": cleaned_ibans[0] if cleaned_ibans else ""
                }
            })

        raise HTTPException(status_code=404, detail="No relevant data found.")

    except Exception as e:
        return JSONResponse(status_code=500, content={"error": str(e)})




if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
