In [3]:
import os
import time
from mistralai import Mistral

import base64
import requests
import json
import pandas as pd


from sentence_transformers import SentenceTransformer, util

MODEL_EMBEDED = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
from pretretment_file import clean_text,clean_and_format_dates

In [15]:
from dotenv import load_dotenv
load_dotenv()
Api_key= os.environ["mistral_jd"]

In [6]:
def image_to_base64(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

In [18]:
def extract(image_path):
    # api_url = 'https://api.mistral.ai/v1/chat/completions'

    client= Mistral(api_key=Api_key)
    base64_image = image_to_base64(image_path)

    model= "pixtral-12b-2409"
    messages= [
        {
            "role": "system",
            "content": [
                {
                    "type": "text",
                    "text": "Tu es un comptable et tu dois extraire les informations d'une facture sous forme de photo en tenant compte du fait que les photos sont prise a des positions differente et souvent pas tres claire. Tu dois répondre en JSON structuré, All amounts must be floats with exactly two digits after the decimal point, using a dot as the decimal separator.Do not append any currency symbols to amounts.Any missing fields must be set to null. currency: Identify the currency used as an ISO code (EUR, USD, GBP, JPY, etc.) and not as a symbol.All dates must be in the format yyyy-mm-dd"
                } 
            ],

            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": """Décris cette image sous forme de fichier JSON structuré avec la clé : 'fields'.le format de date: YYYY-MM-DD et le format de l'heure: HH:MM:SS. Required fields:date: The payment due date,total_amount: The total amount due,
                    vendor_name: name of the supply,currency: The current of the total amount, items: description: Description of the item or service,quantity: Quantity of the item,total_price: Total price for the item,tax: Tax amount, vendor_address: Address of the supplier,
                    tax_rate: Tax rate, line_items: List of line items with description, quantity, unit price and total price. """
                },
                {
                    "type": "image_url", 
                    "image_url": f"data:image/jpeg;base64,{base64_image}"}
            ]
        }
    ]
    response = client.chat.complete(
        model=model, 
        messages=messages,
        response_format={"type": "json_object"}
    )
    content = response.choices[0].message.content
    try:
        json_data = json.loads(content)
        return json.dumps(json_data, indent=2)
    except json.JSONDecodeError:
        return json.dumps({"error": "Invalid JSON response", "raw_response": content})

In [6]:
# # IMAGE_FOLDER="../dataset/receipts/"
# all_dataframes = []
# def process_images(IMAGE_FOLDER):
#     if not os.path.exists(IMAGE_FOLDER):
#         raise ValueError(f"Le dossier {IMAGE_FOLDER} n'existe pas.")

#     for filename_clean_clean_clean_clean in os.listdir(IMAGE_FOLDER):
#         if filename.lower().endswith((".jpg", ".jpeg", ".png")):
#             image_path = os.path.join(IMAGE_FOLDER, filename)
#             print(f"📷 Traitement de : {filename}")

#             result = extract(image_path)
#             try:
#                 result = json.loads(result)
#             except json.JSONDecodeError:
#                 print(f"⚠️ Erreur de décodage JSON pour {filename}")
#                 continue
#             if "error" in result:
#                 print(f"⚠️ Erreur dans la réponse pour {filename}: {result['error']}")
#                 continue
#             if "fields" not in result:
#                 print(f"⚠️ Aucune donnée trouvée pour {filename}")
#                 continue
#             if "fields" in result and isinstance(result["fields"], list):
#                 general_info = {field["name"]: field["value"] for field in result["fields"] if field["name"] != "Items"}
#             if general_info:
#                 df_general = pd.DataFrame([general_info])
#                 df_general["filename"] = image_path
#                 all_dataframes.append(df_general)
#             else:
#                 print(f"⚠️ Données invalides ou manquantes pour {filename}")
#                 all_dataframes.append(df_general)
#             if isinstance(result, str) and "Requests rate limit exceeded" in result:
#                 time.sleep(5)
#                 continue
#     if all_dataframes:
#         final_df = pd.concat(all_dataframes, ignore_index=True)
#         return final_df
#     else:
#         print(" Aucun fichier n'a été extrait avec succès.")
#         return None

In [19]:
def process_images(IMAGE_FOLDER):
    if not os.path.exists(IMAGE_FOLDER):
        raise ValueError(f"Le dossier {IMAGE_FOLDER} n'existe pas.")
    
    all_dataframes = []
    
    for filename in os.listdir(IMAGE_FOLDER):
        if filename.lower().endswith((".jpg", ".jpeg", ".png")):
            image_path = os.path.join(IMAGE_FOLDER, filename)
            print(f"📷 Traitement de : {filename}")
            
            result = extract(image_path)
            try:
                result = json.loads(result)
            except json.JSONDecodeError:
                print(f"⚠️ Erreur de décodage JSON pour {filename}")
                continue
            if not isinstance(result, dict):
                print(f"⚠️ Réponse inattendue pour {filename}, format incorrect.")
                continue
            
            if "error" in result:
                print(f"⚠️ Erreur dans la réponse pour {filename}: {result['error']}")
                continue
            
            if "fields" not in result or not isinstance(result["fields"], dict):
                print(f"⚠️ Aucune donnée valide trouvée pour {filename}")
                continue
            
            fields = result["fields"]
            general_info = {key: value for key, value in fields.items() if key != "items"}
            
            if general_info:
                df_general = pd.DataFrame([general_info])
                # df_general["date"] = df_general["date"].apply(clean_and_format_dates)
                df_general["filename"] = image_path
                all_dataframes.append(df_general)
            else:
                print(f"⚠️ Données invalides ou manquantes pour {filename}")
                continue
            
            # if "items" in fields and isinstance(fields["items"], list):
            #     items_df = pd.DataFrame(fields["items"])
            #     items_df["filename"] = image_path
            #     all_dataframes.append(items_df)
            
            if isinstance(result, str) and "Requests rate limit exceeded" in result:
                time.sleep(5)
                continue
    
    if all_dataframes:
        final_df = pd.concat(all_dataframes, ignore_index=True)
        final_df["date"]= pd.to_datetime(final_df["date"], errors='coerce').dt.strftime('%Y-%m-%d')
        return final_df
    else:
        print("Aucun fichier n'a été extrait avec succès.")
        return None

In [20]:
json1=extract("../dataset/receipts/1075-receipt.jpg")
print(json1)

{
  "fields": {
    "date": "2014-05-22",
    "total_amount": 14.36,
    "vendor_name": "SmokeBox BBQ",
    "currency": "USD",
    "vendor_address": "2361 Whitney Ave, Hamden, CT 06518",
    "tax_rate": 0.06,
    "line_items": [
      {
        "description": "1 FXN DIET BIRCH",
        "quantity": 1,
        "unit_price": 1.5,
        "total_price": 1.5
      },
      {
        "description": "1 FULL BOX",
        "quantity": 1,
        "unit_price": 12.0,
        "total_price": 12.0
      }
    ],
    "tax": 0.86,
    "items": [
      {
        "description": "1 FXN DIET BIRCH",
        "quantity": 1,
        "total_price": 1.5
      },
      {
        "description": "1 FULL BOX",
        "quantity": 1,
        "total_price": 12.0
      }
    ]
  }
}


In [9]:
# df= process_images()
# if df is not None:
#     df.head()
# else:
#     print(" Aucune donnée extraite.")
# df

In [21]:
def get_bank_statement(bank_statement_path):
    df = pd.read_csv(bank_statement_path)  
    
    return df

In [26]:
      
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')


def match_releve_receipt(releve_df, receipt_df):
    
    date_tolerance_days = 2         
    similarity_threshold = 0.75 
    receipt_df['vendor_name_clean'] = receipt_df['vendor_name'].apply(clean_text)
    releve_df['vendor_clean'] = releve_df['vendor'].apply(clean_text)

    if 'supplier_emb' not in receipt_df.columns:
        receipt_df = receipt_df.copy()
        receipt_df['supplier_emb'] = receipt_df['vendor_name_clean'].apply(
            lambda x: model.encode(x, convert_to_tensor=True)
        )
    
    invoice_ids = []
    similarity_percentages = []
    
    for idx, row in releve_df.iterrows():
        montant = row['amount']
        date_releve = pd.to_datetime(row['date'], errors='coerce')
        vendor_text = row['vendor_clean']
        
        candidats = receipt_df[receipt_df['total_amount'] == montant].copy()
        
        if len(candidats) == 1:
            best_match = candidats.iloc[0]
            try:
                invoice_id = best_match['filename']
            except:
                invoice_id = None
            invoice_ids.append(invoice_id)
            similarity_percentages.append(100.0)
        
        elif len(candidats) > 1:
            candidats_date = candidats[
                (candidats['date'] >= date_releve) &
                (candidats['date'] <= date_releve + pd.Timedelta(days=date_tolerance_days))
            ]
            
            if len(candidats_date) == 1:
                best_match = candidats_date.iloc[0]
                try:
                    invoice_id = best_match['filename']
                except:
                    invoice_id = None
                invoice_ids.append(invoice_id)
                similarity_percentages.append(100.0)
            
            elif len(candidats_date) > 1:
                vendor_emb = model.encode(vendor_text, convert_to_tensor=True)
                candidats_date['similarity'] = candidats_date['supplier_emb'].apply(
                    lambda emb: util.cos_sim(vendor_emb, emb).item()
                )
                max_similarity = candidats_date['similarity'].max()
                best_candidate = candidats_date.loc[candidats_date['similarity'].idxmax()]
                sim_percentage = max_similarity * 100
                if max_similarity > similarity_threshold:
                    try:
                        invoice_id = best_candidate['filename']
                    except:
                        invoice_id = None
                    invoice_ids.append(invoice_id)
                    similarity_percentages.append(sim_percentage)
                else:
                    invoice_ids.append(None)
                    similarity_percentages.append(sim_percentage)
            else:
                invoice_ids.append(None)
                similarity_percentages.append(None)
        else:
            invoice_ids.append(None)
            similarity_percentages.append(None)
    
    result_df = releve_df.copy()
    result_df['invoice'] = invoice_ids
    result_df['similarity'] = similarity_percentages
    return result_df

In [None]:
if __name__ == "__main__":
    image_folder = input("📂 Entrez le chemin du dossier contenant les images : ").strip()
    releve = input("📂 Entrez le chemin du relevé bancaire : ").strip()
    releve = get_bank_statement(releve)
    
    start_time = time.time()
    
    try:
        df = process_images(image_folder)
        if not df.empty:
            print("\n🎉 Résultats finaux:")
            # print(df.to_markdown(index=False))            
            
            # Sauvegarde des résultats
            timestamp = time.strftime("%Y%m%d-%H%M%S")
            df.to_csv(f"resultats_extraction_{timestamp}.csv", index=False)
            print(f"\n💾 Résultats sauvegardés dans resultats_extraction_{timestamp}.csv")
            df_macht= match_releve_receipt(releve,df)
            df_macht.to_csv(f"resultats_matching_{timestamp}.csv", index=False)
        else:
            print("\n⚠️ Aucun résultat valide obtenu.")
            
    except Exception as e:
        print(f"\n🔥 Erreur critique: {str(e)}")
    
    print(f"\n⏱ Durée totale: {time.time() - start_time:.2f} secondes")

FileNotFoundError: [Errno 2] No such file or directory: ''

In [14]:
all_dataframes

NameError: name 'all_dataframes' is not defined