# Set Path & Install Libraries

In [None]:
model_name = "DeepSeekVL"
input_path = "./Devis"
output_path = "./output_json_%s"%model_name
import os

# from google.colab import drive
# drive.mount('/content/drive')
# os.chdir('/content/drive/MyDrive/ColabWorks/LF/Oxalys/Data')

In [None]:
!sudo apt update
!sudo apt install poppler-utils

# !pip install pdf2image
# !pip install PyPDF2
# !pip install lmdeploy
# !pip install git+https://github.com/deepseek-ai/DeepSeek-VL.git
# !pip install nest_asyncio

In [None]:
import nest_asyncio
nest_asyncio.apply()

# Initialize DeepSeek

In [None]:
from lmdeploy import pipeline, TurbomindEngineConfig
from lmdeploy.vl import load_image

engine_config = TurbomindEngineConfig(cache_max_entry_count=0.3)
pipe = pipeline('deepseek-ai/deepseek-vl-1.3b-chat', backend_config=engine_config)

## Example Code for Interaction
# image = load_image('https://raw.githubusercontent.com/open-mmlab/mmdeploy/main/tests/data/tiger.jpeg')
# response = pipe(('describe this image', image))
# print(response)

## Prompts

In [None]:
## Prompts
company_info_prompt = """
You are given the first page of a French quotation (devis). Extract the following:
- Company name (nom_fournisseur)
- Company address (adresse_fournisseur)

Respond in the following JSON format:
{
  "nom_fournisseur": "<company_name>",
  "adresse_fournisseur": "<company_address>"
}
"""

product_info_prompt = """
You are given a page from a French quotation (devis). Extract all listed products. For each product, provide:
- Product code (code_produit)
- Product name (nom_produit)
- Estimated volume (volume_estime)
- Unit price excluding tax (pu_ht)
- Total amount excluding tax (montant_ligne_ht)
- Discount (remise) if mentioned

Respond with a JSON array like this:
[
  {
    "code_produit": "<product_code>",
    "nom_produit": "<product_name>",
    "volume_estime": <float>,
    "pu_ht": <float>,
    "montant_ligne_ht": <float>,
    "remise": <float>
  }
]
"""

## Processing

In [None]:
## Convert pdf to image
from pdf2image import convert_from_path
from PyPDF2 import PdfReader
from PIL import Image
import json
import time
import re


def is_image_pdf(pdf_path):
    reader = PdfReader(pdf_path)
    for page in reader.pages:
        if '/XObject' in page['/Resources']:
            xObject = page['/Resources']['/XObject'].get_object()
            for obj in xObject:
                if xObject[obj]['/Subtype'] == '/Image':
                    return True
    return False


def convert_pdf_to_images(pdf_path, output_folder):
    if is_image_pdf(pdf_path):
        reader = PdfReader(pdf_path)
        base_filename = os.path.splitext(os.path.basename(pdf_path))[0]
        image_paths = []

        for i, page in enumerate(reader.pages):
            xObject = page['/Resources']['/XObject'].get_object()
            for obj in xObject:
                if xObject[obj]['/Subtype'] == '/Image':
                    image_data = xObject[obj]._data
                    image = Image.open(io.BytesIO(image_data))
                    image = image.resize((int(image.width * 0.8), int(image.height * 0.8)))  # 缩小 20%
                    image_path = os.path.join(output_folder, f"{base_filename}_page_{i+1}.png")
                    image.save(image_path, 'PNG')
                    image_paths.append(image_path)
        return image_paths

    else:
        images = convert_from_path(pdf_path)
        base_filename = os.path.splitext(os.path.basename(pdf_path))[0]

        image_paths = []
        for i, image in enumerate(images):
            image_filename = f"{base_filename}_page_{i+1}.png"
            image_path = os.path.join(output_folder, image_filename)
            image.save(image_path, 'PNG')
            image_paths.append(image_path)

        return image_paths

In [None]:
def extract_company_info(llm_output):
    company_name_pattern = r'"nom_fournisseur"\s*:\s*"([^"]+)"'
    company_address_pattern = r'"adresse_fournisseur"\s*:\s*"([^"]+)"'

    name_match = re.search(company_name_pattern, llm_output)
    address_match = re.search(company_address_pattern, llm_output)

    company_info = {
        "nom_fournisseur": name_match.group(1) if name_match else None,
        "adresse_fournisseur": address_match.group(1) if address_match else None
    }

    return company_info

def extract_products_info(llm_output):
    product_pattern = re.compile(
        r'\{\s*"code_produit"\s*:\s*"([^"]+)",\s*'
        r'"nom_produit"\s*:\s*"([^"]+)",\s*'
        r'"volume_estime"\s*:\s*([\d\.]+),\s*'
        r'"pu_ht"\s*:\s*([\d\.]+),\s*'
        r'"montant_ligne_ht"\s*:\s*([\d\.]+),\s*'
        r'"remise"\s*:\s*([\d\.]+)\s*\}',
        re.MULTILINE
    )

    products = []
    for match in product_pattern.finditer(llm_output):
        product = {
            "code_produit": match.group(1),
            "nom_produit": match.group(2),
            "volume_estime": float(match.group(3)),
            "pu_ht": float(match.group(4)),
            "montant_ligne_ht": float(match.group(5)),
            "remise": float(match.group(6))
        }
        products.append(product)

    return products




def convert_pdf_to_images(pdf_path, output_folder):
    images = convert_from_path(pdf_path)
    base_filename = os.path.splitext(os.path.basename(pdf_path))[0]

    image_paths = []
    for i, image in enumerate(images):
        image_filename = f"{base_filename}_page_{i+1}.png"
        image_path = os.path.join(output_folder, image_filename)
        image.save(image_path, 'PNG')
        image_paths.append(image_path)

    return image_paths

def process_pdf(pdf_path, image_folder, output_folder):
    image_paths = convert_pdf_to_images(pdf_path, image_folder)
    pdf_name = os.path.basename(pdf_path)

    extracted_data = {
        "nom_fichier": pdf_name,
        "nom_fournisseur": None,
        "adresse_fournisseur": None,
        "produits": []
    }

    for image_path in image_paths:
        image = load_image(image_path)

        # for page 1: company
        if "_page_1" in image_path:
            response = pipe((company_info_prompt, image))
            company_info = extract_company_info(response.text)
            extracted_data.update(company_info)

        # for all pages: prodcut
        response = pipe((product_info_prompt, image))
        products_info = extract_products_info(response.text)
        extracted_data["produits"].extend(products_info)

    # json
    output_file = os.path.join(output_folder, f"{os.path.splitext(pdf_name)[0]}.json")
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(extracted_data, f, ensure_ascii=False, indent=4)

    print(f"Saved to: {output_file}")


def process_folder(input_folder, image_folder, output_folder):
    start_time = time.time()
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for filename in os.listdir(input_folder):
        if filename.endswith('.pdf'):
            pdf_path = os.path.join(input_folder, filename)
            try:
                process_pdf(pdf_path, image_folder, output_folder)
            except Exception as e:
                print(f"Fail to process: {filename}, Error: {str(e)}")
                empty_output = {
                    "nom_fichier": filename,
                    "nom_fournisseur": None,
                    "adresse_fournisseur": None,
                    "produits": []
                }
                output_file = os.path.join(output_folder, f"{os.path.splitext(filename)[0]}.json")
                with open(output_file, 'w', encoding='utf-8') as f:
                    json.dump(empty_output, f, ensure_ascii=False, indent=4)

    end_time = time.time()
    total_time = end_time - start_time
    print(f"Total time for processing pdfs: {total_time:.2f} seconds")



# Main

In [None]:
process_folder(input_path, 'tmp_images_folder', output_path)

In [None]:
# !pip freeze > requirements.txt