### Import Libraries

In [None]:
# Standard libraries
import csv
import json
import os
import re
import shutil

# Data processing and analysis
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, precision_score,confusion_matrix, f1_score, recall_score, ConfusionMatrixDisplay

# Visualization
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import seaborn as sns

# Image processing and OCR
from PIL import Image, ImageDraw
import cv2
import pytesseract

# Transformers and huggingface_hub
from huggingface_hub import hf_hub_download
from transformers import DetrFeatureExtractor, TableTransformerForObjectDetection
from paddleocr import PaddleOCR, draw_ocr

# PyTorch
import torch


# Define Working Directory and Program Locations

In [None]:
# Constants

# Get current working directory
cwd = os.getcwd()

# Get parent directory
parent_dir = os.path.dirname(cwd)

DATA_PATH = parent_dir ## Path To Repository 
TESSERACT_PATH = r'C:/Program Files/Tesseract-OCR/tesseract' ## Path To Tesseract OCR - See README For Setup 

# Read the CSV file into a DataFrame
test_labels = pd.read_csv(os.path.join(DATA_PATH, 'Capstone Project - Table Text Extraction/Labeled Data.csv'))
test_labels['Pred'] = None

# Set working directory
working_directory = os.path.join(DATA_PATH, 'WAMEX_DATA_EXTRACTED')
os.chdir(working_directory)

# Set tesseract path
pytesseract.pytesseract.tesseract_cmd = TESSERACT_PATH

# List all files and get their full paths
files = os.listdir()
concatenated_paths = [os.path.join(working_directory, filename) for filename in files]

# 1. Table Detection

In [None]:
VALID_EXTENSIONS = {'.jpg', '.png'}
COLORS = [
    [0.000, 0.447, 0.741], [0.850, 0.325, 0.098], [0.929, 0.694, 0.125],
    [0.494, 0.184, 0.556], [0.466, 0.674, 0.188], [0.301, 0.745, 0.933]
]

# Define Model and Feature Extractor
feature_extractor = DetrFeatureExtractor()
model = TableTransformerForObjectDetection.from_pretrained("microsoft/table-transformer-detection")

# Functions
def get_valid_images(directory):
    """Get valid image files from a directory."""
    all_files = [os.path.join(directory, file) for file in os.listdir(directory)
                 if os.path.isfile(os.path.join(directory, file))
                 and os.path.splitext(file)[1].lower() in VALID_EXTENSIONS]
    return all_files


def move_jpg_to_extracted_folder(directory):
    """Move all JPG files in a directory to a sub-directory named 'jpg_extracted'."""
    jpg_files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f)) and f.lower().endswith('.jpg')]
    
    # Create 'jpg_extracted' sub-directory if it doesn't exist
    extracted_folder = os.path.join(directory, "jpg_extracted")
    if not os.path.exists(extracted_folder):
        os.makedirs(extracted_folder)

    # Move each JPG file to the 'jpg_extracted' folder
    for jpg_file in jpg_files:
        shutil.move(os.path.join(directory, jpg_file), os.path.join(extracted_folder, jpg_file))



def process_directory(directory, plot=False):
    """Process all valid images in a directory and return a DataFrame."""
    print(f"Processing images in directory: {os.path.basename(directory)}")
    
    all_files = get_valid_images(directory)
    all_data = []
    
    for image_file in all_files:
        data = process_image(image_file, base_save_path=directory, plot=plot)
        if data:
            all_data.append(data)

    # Move the JPG files to the 'jpg_extracted' folder after processing
    move_jpg_to_extracted_folder(directory)
    
    return pd.DataFrame(all_data)


def process_image(file_name, base_save_path, buffer=30, plot=True):
    base_file_name = os.path.basename(file_name).rsplit('.', 1)[0]
    file_parts = base_file_name.split('_')
    
    image = Image.open(file_name).convert("RGB")
    img_cv = cv2.imread(file_name)
    width, height = image.size
    encoding = feature_extractor(images=image, return_tensors="pt")
    
    with torch.no_grad():
        outputs = model(**encoding)

    results = feature_extractor.post_process_object_detection(outputs, threshold=0.94, target_sizes=[(height, width)])[0]
    num_tables = len(results['scores'])

    boxes = results['boxes'] # Direct extraction from results
    if len(boxes) > 0:
        specific_save_path = os.path.join(base_save_path, f"{base_file_name} Cropped Images")
        if not os.path.exists(specific_save_path):
            os.makedirs(specific_save_path)

        for index, box in enumerate(boxes):
            x1 = int(box[0])
            y1 = int(box[1])
            x2 = int(box[2])
            y2 = int(box[3])
            
            # Adjust the buffer for x1 and x2 (wideness) by 1.2 times
            x1 = max(0, x1 - int(1.2 * buffer))
            y1 = max(0, y1 - buffer)
            x2 = min(img_cv.shape[1], x2 + int(4.5 * buffer))
            y2 = min(img_cv.shape[0], y2 + buffer)

            
            cropped_img = img_cv[y1:y2, x1:x2]
            cv2.imwrite(f"{specific_save_path}/{base_file_name}_table_{index}.png", cropped_img)

    if plot and num_tables > 0:
        plot_results(image, results['scores'], results['labels'], results['boxes'])

    if len(file_parts) == 2:
        return {'FileName': file_parts[0], 'Page Number': file_parts[1], 'Number of tables': num_tables}
    else:
        return None

def plot_results(pil_img, scores, labels, boxes):
    plt.figure(figsize=(16,10))
    plt.imshow(pil_img)
    ax = plt.gca()
    colors = COLORS * 100
    for score, label, (xmin, ymin, xmax, ymax),c  in zip(scores.tolist(), labels.tolist(), boxes.tolist(), colors):
        ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                   fill=False, color=c, linewidth=3))
        text = f'{model.config.id2label[label]}: {score:0.2f}'
        ax.text(xmin, ymin, text, fontsize=15,
                bbox=dict(facecolor='yellow', alpha=0.5))
    plt.axis('off')
    plt.show()

# Main execution
all_data = []

for path in concatenated_paths:
    if os.path.isdir(path):
        df = process_directory(path, plot=False)
        all_data.append(df)
    elif os.path.isfile(path) and os.path.splitext(path)[1].lower() in VALID_EXTENSIONS:
        parent_dir = os.path.dirname(path)  # This is where our cropped images would be saved
        data = process_image(path, base_save_path=parent_dir, plot=False)
        if data:
            all_data.append(pd.DataFrame([data]))

# Concatenate all data into the master dataframe
master_df = pd.concat(all_data, ignore_index=True)

# 2.1 Perform Table Data Extraction Using Morphological Operations Tesseract

In [None]:
# Search Functions
def find_subdirs_with_name(path, keyword):
    """Return all subdirectories containing the given keyword."""
    return [os.path.join(root, directory) for root, dirs, files in os.walk(path) for directory in dirs if keyword in directory]

def find_image_files_in_dir(path):
    """Return all image files in the directory."""
    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff']
    return [os.path.join(root, file) for root, dirs, files in os.walk(path) for file in files if any(file.lower().endswith(ext) for ext in image_extensions)]

In [None]:

def process_image(image_path):
    image = cv2.imread(image_path, 0)
    
    img_bin = 255 - image
    _, img_bin_otsu = cv2.threshold(img_bin, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    
    # Vertical Line extraction
    vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, np.array(img_bin_otsu).shape[1]//150))
    eroded_image = cv2.erode(img_bin_otsu, vertical_kernel, iterations=5)
    vertical_lines = cv2.dilate(eroded_image, vertical_kernel, iterations=5)
    
    # Horizontal Line extraction
    hor_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (np.array(img_bin_otsu).shape[1]//150, 1))
    image_2 = cv2.erode(img_bin_otsu, hor_kernel, iterations=5)
    horizontal_lines = cv2.dilate(image_2, hor_kernel, iterations=5)

    # Combining
    vertical_horizontal_lines = cv2.addWeighted(vertical_lines, 0.5, horizontal_lines, 0.5, 0)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
    vertical_horizontal_lines = cv2.erode(~vertical_horizontal_lines, kernel, iterations=3)
    _, vertical_horizontal_lines = cv2.threshold(vertical_horizontal_lines, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

    b_image = cv2.bitwise_not(cv2.bitwise_xor(image, vertical_horizontal_lines))
    return b_image, vertical_horizontal_lines


def extract_bounding_boxes(b_image, vertical_horizontal_lines):
    contours, _ = cv2.findContours(vertical_horizontal_lines, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    boundingBoxes = [cv2.boundingRect(contour) for contour in contours]
    (contours, boundingBoxes) = zip(*sorted(zip(contours, boundingBoxes), key=lambda x: x[1][1]))

    boxes = []
    image_copy = b_image.copy()  # Initialize image_copy here
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        x-=1
        w+=1
        if w < 1000 and h < 500:
            cv2.rectangle(image_copy, (x, y), (x + w, y + h), (0, 0, 255), 1)
            boxes.append([x, y, w, h])
            
    return boxes

def extract_text_from_boxes(b_image, boxes):
    rows = []
    columns = []
    heights = [box[3] for box in boxes]  # Extracting the heights of all boxes
    mean_height = np.mean(heights)

    # Initialize columns list with the first box and set the previous box to the first box
    columns.append(boxes[0])
    previous_box = boxes[0]

    for i in range(1, len(boxes)):
        if boxes[i][1] <= previous_box[1] + mean_height / 2:
            columns.append(boxes[i])
            previous_box = boxes[i]
            if i == len(boxes) - 1:
                rows.append(columns)
        else:
            rows.append(columns)
            columns = []
            previous_box = boxes[i]
            columns.append(boxes[i])

    # Determine the total number of cells in the row with the maximum cells
    total_cells = max([len(r) for r in rows])

    # Find the center of each box in the first row
    centers = [int(rows[0][j][0] + rows[0][j][2] / 2) for j in range(len(rows[0]))]
    centers = np.array(centers)
    centers.sort()

    # Organize boxes by their closest center position
    boxes_list = []
    for i in range(len(rows)):
        l = [[] for _ in range(total_cells)]
        for j in range(len(rows[i])):
            # Find the closest center for the current box
            diff = abs(centers - (rows[i][j][0] + rows[i][j][2] / 4))
            minimum = min(diff)
            index = list(diff).index(minimum)
            l[index].append(rows[i][j])
        boxes_list.append(l)

    # Extracting text from cells in the image
    dataframe_final = []
    for i in range(len(boxes_list)):
        for j in range(len(boxes_list[i])):
            s = ''
            if len(boxes_list[i][j]) == 0:
                dataframe_final.append(' ')
            else:
                for k in range(len(boxes_list[i][j])):
                    x, y, w, h = boxes_list[i][j][k]
                    roi = b_image[y:y+h, x:x+w]
                    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 1))
                    border = cv2.copyMakeBorder(roi, 2, 2, 2, 2, cv2.BORDER_CONSTANT, value=[255, 255])
                    resizing = cv2.resize(border, None, fx=2, fy=2, interpolation=cv2.INTER_CUBIC)
                    dilation = cv2.dilate(resizing, kernel, iterations=1)
                    erosion = cv2.erode(dilation, kernel, iterations=2)
                    out = pytesseract.image_to_string(erosion).strip()
                    s += " " + out
                dataframe_final.append(s)

    arr = np.array(dataframe_final)
    dataframe = pd.DataFrame(arr.reshape(len(rows), total_cells))

    return dataframe

def ocr_image_to_text(image_path, extracted_dir):
    b_image, vertical_horizontal_lines = process_image(image_path)
    boxes = extract_bounding_boxes(b_image, vertical_horizontal_lines)

    if len(boxes) <= 1:
        print("No Table Detected.")
        return  # No need to return the counter
    else:
        print('Table Extracted!')
        dataframe = extract_text_from_boxes(b_image, boxes)
        
        # Changed this line to set output directory as the extracted_dir itself
        output_dir = extracted_dir
        
        # Removed the redundant "Cropped Images" replacement
        file_name_without_ext = os.path.basename(image_path).rsplit('.', 1)[0]
        
        csv_output_path = os.path.join(output_dir, f"{file_name_without_ext}.csv")
        
        dataframe.to_csv(csv_output_path, index=False, header=False)


def process_images_in_extracted_dirs(main_wd, output_dir):
    cropped_dirs = find_subdirs_with_name(main_wd, "Cropped")
    for extracted_dir in cropped_dirs:
        for image_path in find_image_files_in_dir(extracted_dir):
            ocr_image_to_text(image_path, extracted_dir)


## Use of Extractor
main_wd = working_directory
output_directory = os.path.dirname(main_wd)
process_images_in_extracted_dirs(main_wd, output_directory)


# 2.2 Perform Table Data Extraction Using Paddle OCR

In [None]:
# Instantiate PaddleOCR
ocr = PaddleOCR(
    use_gpu=True,
    det_db_thresh=0.3,
    det_db_box_thresh=0.5,
    det_db_unclip_ratio=1.2
)

# Function to process the image
def process_image(image_path):
    image = cv2.imread(image_path, 0)

    img_bin = 255 - image
    _, img_bin_otsu = cv2.threshold(img_bin, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

    # Vertical Line extraction
    vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, img_bin_otsu.shape[1] // 35))
    eroded_image = cv2.erode(img_bin_otsu, vertical_kernel, iterations=1)
    vertical_lines = cv2.dilate(eroded_image, vertical_kernel, iterations=1)

    # Horizontal Line extraction
    hor_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (np.array(img_bin_otsu).shape[1] // 50, 1))
    image_2 = cv2.erode(img_bin_otsu, hor_kernel, iterations=1)
    horizontal_lines = cv2.dilate(image_2, hor_kernel, iterations=2)

    horizontal_contours, _ = cv2.findContours(horizontal_lines, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
    vertical_contours, _ = cv2.findContours(vertical_lines, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

    hor_lines = len(horizontal_contours)
    ver_lines = len(vertical_contours)

    if hor_lines > 1 or ver_lines > 1:

        return image
    else:
        return None


# Function to plot bounding boxes on the original image
def plot_boxes_on_original_image(img, bounding_boxes_list):
    img_with_boxes = img.copy()

    for bounding_boxes in bounding_boxes_list:
        top_left = bounding_boxes[0]
        bottom_right = bounding_boxes[2]

        x1, y1 = top_left
        x2, y2 = bottom_right


        # Draw the bounding box
        cv2.rectangle(img_with_boxes, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)

def extract_data_from_image(img_path, ocr_instance):

    
    # Load and preprocess the image
    preprocessed_img = process_image(img_path)

    
    if preprocessed_img is None:
        
        return None
    
    else:  # Extract text using PaddleOCR
        results = ocr_instance.ocr(preprocessed_img)
        
        # Process OCR results
        result_list = [
            [result[0][0], result[0][2], result[1][0], result[1][1]]
            for result in results[0]
        ]
        
        sorted_data = sorted(result_list, key=lambda x: x[0][1])
        
        # Group results by rows based on a threshold
        threshold = 10
        rows, current_row = [], []
        for entry in sorted_data:
            if not current_row or (entry[0][1] - current_row[-1][0][1] <= threshold):
                current_row.append(entry)
            else:
                rows.append(current_row)
                current_row = [entry]
        
        if current_row:
            rows.append(current_row)
        
        # Sort and convert results to DataFrame
        table = [sorted(row, key=lambda x: x[0][0]) for row in rows]
        table_strings = [[entry[2] for entry in row] for row in table]
        df = pd.DataFrame(table_strings)
        
        return df


def process_images_in_extracted_dirs(main_wd):
    cropped_dirs = find_subdirs_with_name(main_wd, "Cropped")
    for extracted_dir in cropped_dirs:
        for image_path in find_image_files_in_dir(extracted_dir):
            # Initialize PaddleOCR instance
            ocr_instance = PaddleOCR(
                use_gpu=True,
                det_db_thresh=0.3,
                det_db_box_thresh=0.5,
                det_db_unclip_ratio=1.2
            )
            
            # Extract data from image and save as DataFrame
            df = extract_data_from_image(image_path, ocr_instance)

            if df is None:
                continue
            else:
                # Save the DataFrame to CSV in the same directory as the image
                base_filename = os.path.splitext(os.path.basename(image_path))[0]
                output_file_path = os.path.join(os.path.dirname(image_path), base_filename + ".csv")
                df.to_csv(output_file_path, index=False, header=False)


# Usage
main_wd = working_directory
output_directory = os.path.dirname(main_wd)
process_images_in_extracted_dirs(main_wd)



# 2.3 Perform Table Data Extraction Using Paddle OCR and Bounding Boxes from cv2

In [None]:
ocr = PaddleOCR()


def threshold_image(image):
    """Thresholds the image for further processing"""
    img_bin = 255 - image
    _, img_bin_otsu = cv2.threshold(img_bin, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    return img_bin_otsu

def extract_lines(img_bin_otsu, orient="horizontal"):
    """Extracts horizontal or vertical lines from a binarized image"""
    if orient == "horizontal":
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (img_bin_otsu.shape[1] // 100, 1))
    else:
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, img_bin_otsu.shape[1] // 50))
    eroded_image = cv2.erode(img_bin_otsu, kernel, iterations=1) 
    return cv2.dilate(eroded_image, kernel, iterations=1)

def get_cell_locations(vertical_horizontal_lines):
    """Extracts cell locations from a combined vertical and horizontal lines image"""
    contours, _ = cv2.findContours(vertical_horizontal_lines, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
    cell_locations = []

    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        if w > 20 and h > 20:
            roi = vertical_horizontal_lines[y:y+h, x:x+w]
            _, roi_bin = cv2.threshold(roi, 127, 255, cv2.THRESH_BINARY_INV)
            cell_contours, _ = cv2.findContours(roi_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            for cell_contour in cell_contours:
                cx, cy, cw, ch = cv2.boundingRect(cell_contour)
                cell_locations.append((x + cx, y + cy, cw, ch))

    return list(set(cell_locations))

def process_cell_region(cell_region, ocr):
    """Processes a cell region using OCR and returns extracted text"""
    results = ocr.ocr(cell_region)
    text_list = [text_info[1][0] for entry in results if entry for text_info in entry]
    return ' '.join(text_list) if text_list else '0'

def detect_cells_in_image(image_path):
    """Main function to detect cells in the image"""
    image = cv2.imread(image_path, 0)
    img_bin_otsu = threshold_image(image)
    vertical_lines = extract_lines(img_bin_otsu, "vertical")
    horizontal_lines = extract_lines(img_bin_otsu, "horizontal")
    data_t = []

    if vertical_lines.any() and horizontal_lines.any():
        vertical_horizontal_lines = cv2.addWeighted(vertical_lines, 0.5, horizontal_lines, 0.5, 0)
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
        vertical_horizontal_lines = cv2.dilate(vertical_horizontal_lines, kernel, iterations=5)
        
        cell_locations = get_cell_locations(vertical_horizontal_lines)

        for cell in cell_locations:
            x, y, w, h = cell
            cell_region = image[y:y+h, x:x+w]
            text = process_cell_region(cell_region, ocr)
            data_t.append([text, [x, y]])

        return data_t
    else:
        return None

def bucket_data_by_rows(data, tolerance=12):
    """Group data into rows based on y-coordinates and a given tolerance."""
    rows_data = {}
    for text, coord in data:
        y = coord[1]
        for key in sorted(rows_data.keys()):
            if abs(y - key) <= tolerance:
                rows_data[key].append((text, coord))
                break
        else:  # This else corresponds to the for-loop, and will only execute if the for-loop completes without a break
            rows_data[y] = [(text, coord)]
    return rows_data

def convert_rows_to_dataframe(rows_data):
    """Convert organized rows data into a DataFrame."""
    header_row_coords = sorted(rows_data.keys())[0]
    header_positions = [item[1][0] for item in rows_data[header_row_coords]]

    df_data = []
    for key in sorted(rows_data.keys()):
        row_items = rows_data[key]
        row_data = [''] * len(header_positions)
        for text, coord in row_items:
            closest_index = min(range(len(header_positions)), key=lambda i: abs(header_positions[i]-coord[0]))
            row_data[closest_index] = text
        df_data.append(row_data)

    return pd.DataFrame(df_data)

def process_images_in_extracted_dirs(main_wd):
    cropped_dirs = find_subdirs_with_name(main_wd, "Cropped")
    for extracted_dir in cropped_dirs:
        for image_path in find_image_files_in_dir(extracted_dir):
            data = detect_cells_in_image(image_path)
            if data:
                data.sort(key=lambda x: (x[1][1], x[1][0]))
                rows_data = bucket_data_by_rows(data)
                df = convert_rows_to_dataframe(rows_data)
                df = df.astype(str)
                df = df.loc[:, df.iloc[0] != '0']
                df.replace('', np.nan, inplace=True)
                df.dropna(axis=1, how='all', inplace=True)
                
                # Save to csv
                base_filename = os.path.splitext(os.path.basename(image_path))[0]
                output_file_path = os.path.join(os.path.dirname(image_path), base_filename + ".csv")
                df.to_csv(output_file_path, index=False, header=False)
                

# Usage
process_images_in_extracted_dirs(working_directory)