In [None]:
from PIL import Image
import os
import cv2 
from pdf2image import convert_from_path
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import median_filter
from tqdm import tqdm
import pytesseract
import re
import csv
import concurrent.futures as CONC
from threading import Lock

In [None]:
os.chdir('/projects/ocr_extraction_from_images')

In [None]:
# ------------- 1. Chargement des documents -------------
def is_valide(image_path):
    try:
        with Image.open(image_path) as img:
            img.verify()
        return True
    except:
        return False

def images_checking(data_path,images_extensions,pdf_extensions):
    valide_images = []
    for file_name in os.listdir(data_path):
        image_path = os.path.join(data_path,file_name)
        if is_valide(image_path):
            if os.path.splitext(file_name)[1].lower() in pdf_extensions:
                image_path_pdf = convert_from_path(Image.open(image_path))
                valide_images.append(image_path_pdf)
            elif os.path.splitext(file_name)[1].lower() in images_extensions:
                valide_images.append(image_path)
            else:
                print(f'Image {image_path}, have invalid extension')
    return valide_images


In [None]:
# ------------- 2. Prétraitement d’image -------------
def binarizing(image_path):
    numerical_image = cv2.imread(image_path)
    gray_img = cv2.cvtColor(numerical_image,cv2.COLOR_BGR2GRAY) ## Transforming to Gray Scale
    _,binarized = cv2.threshold(gray_img,200,255,cv2.THRESH_BINARY) ## Binarizing the image each pixel value as either black or white)
    return binarized

def parallel_binarizing(valide_images):
    with tqdm(total=len(valide_images), desc="Binarizing ...", ncols=120) as pbar:
        def process_one(image_path):
            result = binarizing(image_path)
            pbar.update(1)
            return result
        with CONC.ThreadPoolExecutor(max_workers=4) as executor:
            binarized_images = list(executor.map(process_one, valide_images))
    print(f'We got {len(binarized_images)} Binarized images.')
    return binarized_images

In [None]:
def parallel_denoising(binarized_images):
    with tqdm(total=len(binarized_images), desc="Denoising ....",colour='red', ncols=120) as pbar:
        def process_one(image_path):
            result = median_filter(np.array(image_path),3)  ## Median Value of a neighboring pixels to smoothen the pixels
            pbar.update(1)
            return result
        with CONC.ThreadPoolExecutor(max_workers=4) as executor:
            denoised_images = list(executor.map(process_one, binarized_images))
    print(f'We got {len(denoised_images)} Denoised images.')
    return denoised_images

In [None]:
def save_processed_images(processed_images_folder,images):
    for idx,img in enumerate(images):
        img_path = f'img_{idx}.jpg'
        cv2.imwrite(os.path.join(processed_images_folder,img_path),img) ## saving processed images to use them later on in ocr

In [None]:
def ocr_extraction(image):
    img = Image.open(image) ## open the image
    text = pytesseract.image_to_string(img, lang='eng') ## send it to the ocr api
    text = text.replace('\u2003', ' ').replace('\t', ' ') ## removing unessessary spaces etc
    if not text:
        print(f'Problem With Ocr Extraction for image {image}')
        return None
    lines = [l.strip() for l in text.split('\n') if l.strip()] ## splitting the spaces and storing data into an array
    # search bill and date
    bill = re.search(r"Invoice\s+no[:\s]*([0-9\-]+)", text, re.IGNORECASE)
    bill = bill.group(1) if bill else None
    
    date_match = re.search(r"\b\d{2}/\d{2}/\d{4}\b", text)
    date = date_match.group(0) if date_match else None

    # Recherche les details
    tva_perc, net_worth, client = None, None, None
    for i, line in enumerate(lines):
        if "VAT [%]" in line and i + 1 < len(lines): ## if VAT percentage is in our string 
            match = re.search(r"(\d{1,2}%)", lines[i + 1]) ## search for any number length 2 + ends with %
            if match:
                tva_perc = match.group(1)
                break
    for i, line in enumerate(lines):   
        if "Client:" in line and i + 1 < len(lines): ## search for client name
            client = lines[i + 1].strip().strip('"\'')

    for i,line in enumerate(lines):
        if "Net worth" in line and i + 1 < len(lines):
            match = re.search(r"([$ ]?[0-9]{1,3}(?: [0-9]{3})*[,\.][0-9]{2})", lines[i + 1])
            ## search for any starting with $ OR space OR nothing THEN
            ## search for number  number,number (like 2 150,15) OR number,number (like 150,15)
            if match :
                net_worth = match.group(1)
    main_data = {
        "numero_facture": bill,
        "date": date,
        "client": client,
        "tva_perc": None,
        "valeur_nette": None,
        "tva": None,
        "valeur_brute": None
    }
    
    if tva_perc is not None and net_worth is not None:
        tva_perc = float(tva_perc.replace('%','').replace(' ','').replace(',', '.'))  ## casting to float
        net_worth = float(net_worth.replace('$','').replace(' ','').replace(',', '.')) ## casting to float
        tva_val = tva_perc/100 * net_worth ## tva value = tva/100 * net_worth
        gross_worth = net_worth + tva_val ## gross worth value = tva value + net worth
        main_data.update({
            "tva_perc": tva_perc,
            "valeur_nette": net_worth,
            "tva": round(tva_val,2),
            "valeur_brute": round(gross_worth,2)
        })
    return main_data

In [None]:
def parallel_ocr(valide_images):
    with tqdm(total=len(valide_images), desc="Ocr Extraction ...", ncols=120, colour='yellow') as pbar:
        lock = Lock()
        def process_one(image_path):
            result = ocr_extraction(image_path)
            with lock:
                pbar.update(1)
            return result
        with CONC.ThreadPoolExecutor(max_workers=2) as executor:
            results = list(executor.map(process_one, valide_images))
    return results

In [None]:
# ------------- MAIN -------------
data_path = 'batch_1'
images_extensions = ['.png','.jpg','.tiff']
pdf_extensions = ['.pdf']
processed_images_folder = 'processed/'

valide_images = images_checking(data_path,images_extensions,pdf_extensions)
print(f'We Have {len(valide_images)} Images in our {data_path} folder')

In [None]:
binarized_images = parallel_binarizing(valide_images)

In [None]:
denoised_images = parallel_denoising(binarized_images)

In [None]:
save_processed_images(processed_images_folder,denoised_images)

In [None]:
valide_images = images_checking(processed_images_folder,images_extensions,pdf_extensions)

In [358]:
results = parallel_ocr(valide_images)

Ocr Extraction ...: 100%|[33m███████████████████████████████████████████████████████████[0m| 1489/1489 [12:08<00:00,  2.04it/s][0m


In [361]:
with open('ocr_output.csv', mode='w', newline='', encoding='utf-8') as file:
    writer = csv.DictWriter(file, fieldnames=results[0].keys())
    writer.writeheader()
    writer.writerows(results)