### Import Libraries

In [6]:
# Standard libraries
import csv
import json
import os
import re
import shutil

# Data processing and analysis
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, recall_score

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Image processing and OCR
from PIL import Image, ImageDraw
import cv2
import pytesseract

# Transformers and huggingface_hub
from huggingface_hub import hf_hub_download
from transformers import DetrFeatureExtractor, TableTransformerForObjectDetection

# PyTorch
import torch


In [7]:
# Constants
DATA_PATH = 'C:/Users/nick2/Desktop/Table Transformer/'
TESSERACT_PATH = r'C:/Program Files/Tesseract-OCR/tesseract'

# Read the CSV file into a DataFrame
test_labels = pd.read_csv(os.path.join(DATA_PATH, 'Labeled Data.csv'))
test_labels['Pred'] = None

# Set working directory
working_directory = os.path.join(DATA_PATH, 'Test Dataset')
os.chdir(working_directory)

# Set tesseract path
pytesseract.pytesseract.tesseract_cmd = TESSERACT_PATH

# List all files and get their full paths
files = os.listdir()
concatenated_paths = [os.path.join(working_directory, filename) for filename in files]


# 1. Table Recognition (Nick, Theo, Kose)

In [8]:
VALID_EXTENSIONS = {'.jpg', '.png'}
COLORS = [
    [0.000, 0.447, 0.741], [0.850, 0.325, 0.098], [0.929, 0.694, 0.125],
    [0.494, 0.184, 0.556], [0.466, 0.674, 0.188], [0.301, 0.745, 0.933]
]

# Define Model and Feature Extractor
feature_extractor = DetrFeatureExtractor()
model = TableTransformerForObjectDetection.from_pretrained("microsoft/table-transformer-detection")

# Functions
def get_valid_images(directory):
    """Get valid image files from a directory."""
    all_files = [os.path.join(directory, file) for file in os.listdir(directory)
                 if os.path.isfile(os.path.join(directory, file))
                 and os.path.splitext(file)[1].lower() in VALID_EXTENSIONS]
    return all_files

def process_directory(directory, plot=False):
    """Process all valid images in a directory and return a DataFrame."""
    print(f"Processing images in directory: {os.path.basename(directory)}")
    
    all_files = get_valid_images(directory)
    all_data = []
    
    for image_file in all_files:
        data = process_image(image_file, base_save_path=directory, plot=plot)  # provide base_save_path here
        if data:
            all_data.append(data)
    
    return pd.DataFrame(all_data)


def process_image(file_name, base_save_path, buffer=50, plot=True):
    base_file_name = os.path.basename(file_name).rsplit('.', 1)[0]
    file_parts = base_file_name.split('_')
    
    image = Image.open(file_name).convert("RGB")
    img_cv = cv2.imread(file_name)
    width, height = image.size
    encoding = feature_extractor(images=image, return_tensors="pt")
    
    with torch.no_grad():
        outputs = model(**encoding)

    results = feature_extractor.post_process_object_detection(outputs, threshold=0.94, target_sizes=[(height, width)])[0]
    num_tables = len(results['scores'])

    boxes = results['boxes'] # Direct extraction from results
    if len(boxes) > 0:
        specific_save_path = os.path.join(base_save_path, f"{base_file_name} Cropped Images")
        if not os.path.exists(specific_save_path):
            os.makedirs(specific_save_path)

        for index, box in enumerate(boxes):
            x1 = int(box[0])
            y1 = int(box[1])
            x2 = int(box[2])
            y2 = int(box[3])
            x1 = max(0, x1 - buffer)
            y1 = max(0, y1 - buffer)
            x2 = min(img_cv.shape[1], x2 + buffer)
            y2 = min(img_cv.shape[0], y2 + buffer)
            
            cropped_img = img_cv[y1:y2, x1:x2]
            cv2.imwrite(f"{specific_save_path}/{base_file_name}_table_{index}.png", cropped_img)

    if plot and num_tables > 0:
        plot_results(image, results['scores'], results['labels'], results['boxes'])

    if len(file_parts) == 2:
        return {'FileName': file_parts[0], 'Page Number': file_parts[1], 'Number of tables': num_tables}
    else:
        return None

def plot_results(pil_img, scores, labels, boxes):
    plt.figure(figsize=(16,10))
    plt.imshow(pil_img)
    ax = plt.gca()
    colors = COLORS * 100
    for score, label, (xmin, ymin, xmax, ymax),c  in zip(scores.tolist(), labels.tolist(), boxes.tolist(), colors):
        ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                   fill=False, color=c, linewidth=3))
        text = f'{model.config.id2label[label]}: {score:0.2f}'
        ax.text(xmin, ymin, text, fontsize=15,
                bbox=dict(facecolor='yellow', alpha=0.5))
    plt.axis('off')
    plt.show()

# Main execution
all_data = []

for path in concatenated_paths:
    if os.path.isdir(path):
        df = process_directory(path, plot=False)
        all_data.append(df)
    elif os.path.isfile(path) and os.path.splitext(path)[1].lower() in VALID_EXTENSIONS:
        parent_dir = os.path.dirname(path)  # This is where our cropped images would be saved
        data = process_image(path, base_save_path=parent_dir, plot=False)
        if data:
            all_data.append(pd.DataFrame([data]))

# Concatenate all data into the master dataframe
master_df = pd.concat(all_data, ignore_index=True)

Some weights of the model checkpoint at microsoft/table-transformer-detection were not used when initializing TableTransformerForObjectDetection: ['model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing TableTransformerForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TableTransformerForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Processing images in directory: 112461
Processing images in directory: 112540
Processing images in directory: 112697
Processing images in directory: 112761
Processing images in directory: 114158
Processing images in directory: 114824
Processing images in directory: 115109
Processing images in directory: 127535
Processing images in directory: 129866
Processing images in directory: 130135
Processing images in directory: 132241
Processing images in directory: 132302
Processing images in directory: 132577
Processing images in directory: 134934
Processing images in directory: 135347


## 1.2 Evaluation of Table Prediction Accuracy (Theo)

# 2.1 Perform Table Data Extraction Using Morphological Operations Tesseract (Nick, Jiyun)

In [19]:
def find_subdirs_with_name(path, keyword):
    """Return all subdirectories containing the given keyword."""
    return [os.path.join(root, directory) for root, dirs, files in os.walk(path) for directory in dirs if keyword in directory]

def find_image_files_in_dir(path):
    """Return all image files in the directory."""
    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff']
    return [os.path.join(root, file) for root, dirs, files in os.walk(path) for file in files if any(file.lower().endswith(ext) for ext in image_extensions)]


def process_image(image_path):
    image = cv2.imread(image_path, 0)
    
    img_bin = 255 - image
    _, img_bin_otsu = cv2.threshold(img_bin, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    
    # Vertical Line extraction
    vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, np.array(img_bin_otsu).shape[1]//150))
    eroded_image = cv2.erode(img_bin_otsu, vertical_kernel, iterations=5)
    vertical_lines = cv2.dilate(eroded_image, vertical_kernel, iterations=5)
    
    # Horizontal Line extraction
    hor_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (np.array(img_bin_otsu).shape[1]//150, 1))
    image_2 = cv2.erode(img_bin_otsu, hor_kernel, iterations=5)
    horizontal_lines = cv2.dilate(image_2, hor_kernel, iterations=5)

    # Combining
    vertical_horizontal_lines = cv2.addWeighted(vertical_lines, 0.5, horizontal_lines, 0.5, 0)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
    vertical_horizontal_lines = cv2.erode(~vertical_horizontal_lines, kernel, iterations=3)
    _, vertical_horizontal_lines = cv2.threshold(vertical_horizontal_lines, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

    b_image = cv2.bitwise_not(cv2.bitwise_xor(image, vertical_horizontal_lines))
    return b_image, vertical_horizontal_lines


def extract_bounding_boxes(b_image, vertical_horizontal_lines):
    contours, _ = cv2.findContours(vertical_horizontal_lines, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    boundingBoxes = [cv2.boundingRect(contour) for contour in contours]
    (contours, boundingBoxes) = zip(*sorted(zip(contours, boundingBoxes), key=lambda x: x[1][1]))

    boxes = []
    image_copy = b_image.copy()  # Initialize image_copy here
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        x-=1
        w+=1
        if w < 1000 and h < 500:
            cv2.rectangle(image_copy, (x, y), (x + w, y + h), (0, 0, 255), 1)
            boxes.append([x, y, w, h])
            
    # Uncomment Section to show plots
    # plt.imshow(image_copy, cmap='gray')
    # plt.title("Identified contours")
    # plt.show()
    return boxes

def extract_text_from_boxes(b_image, boxes):
    rows = []
    columns = []
    heights = [box[3] for box in boxes]  # Extracting the heights of all boxes
    mean_height = np.mean(heights)

    # Initialize columns list with the first box and set the previous box to the first box
    columns.append(boxes[0])
    previous_box = boxes[0]

    for i in range(1, len(boxes)):
        if boxes[i][1] <= previous_box[1] + mean_height / 2:
            columns.append(boxes[i])
            previous_box = boxes[i]
            if i == len(boxes) - 1:
                rows.append(columns)
        else:
            rows.append(columns)
            columns = []
            previous_box = boxes[i]
            columns.append(boxes[i])

    # Determine the total number of cells in the row with the maximum cells
    total_cells = max([len(r) for r in rows])

    # Find the center of each box in the first row
    centers = [int(rows[0][j][0] + rows[0][j][2] / 2) for j in range(len(rows[0]))]
    centers = np.array(centers)
    centers.sort()

    # Organize boxes by their closest center position
    boxes_list = []
    for i in range(len(rows)):
        l = [[] for _ in range(total_cells)]
        for j in range(len(rows[i])):
            # Find the closest center for the current box
            diff = abs(centers - (rows[i][j][0] + rows[i][j][2] / 4))
            minimum = min(diff)
            index = list(diff).index(minimum)
            l[index].append(rows[i][j])
        boxes_list.append(l)

    # Extracting text from cells in the image
    dataframe_final = []
    for i in range(len(boxes_list)):
        for j in range(len(boxes_list[i])):
            s = ''
            if len(boxes_list[i][j]) == 0:
                dataframe_final.append(' ')
            else:
                for k in range(len(boxes_list[i][j])):
                    x, y, w, h = boxes_list[i][j][k]
                    roi = b_image[y:y+h, x:x+w]
                    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 1))
                    border = cv2.copyMakeBorder(roi, 2, 2, 2, 2, cv2.BORDER_CONSTANT, value=[255, 255])
                    resizing = cv2.resize(border, None, fx=2, fy=2, interpolation=cv2.INTER_CUBIC)
                    dilation = cv2.dilate(resizing, kernel, iterations=1)
                    erosion = cv2.erode(dilation, kernel, iterations=2)
                    out = pytesseract.image_to_string(erosion).strip()
                    s += " " + out
                dataframe_final.append(s)

    arr = np.array(dataframe_final)
    dataframe = pd.DataFrame(arr.reshape(len(rows), total_cells))

    return dataframe

def ocr_image_to_text(image_path, extracted_dir, counter):
    b_image, vertical_horizontal_lines = process_image(image_path)
    boxes = extract_bounding_boxes(b_image, vertical_horizontal_lines)

    if len(boxes) <= 1:
        print("No Table Detected.")
        return counter  # Return the current counter value
    else:
        print('Table Extracted!')
        dataframe = extract_text_from_boxes(b_image, boxes)
        
        output_dir = os.path.dirname(extracted_dir)
        if counter == 0:
            csv_output_path = os.path.join(output_dir, "output.csv")
        else:
            csv_output_path = os.path.join(output_dir, f"output{counter}.csv")
        
        dataframe.to_csv(csv_output_path, index=False, header=False)

        counter += 1  # Increment the counter
    return counter  # Return the updated counter value

def process_images_in_extracted_dirs(main_wd, output_dir):
    global processed_tables_counter  # Declare the global variable here
    cropped_dirs = find_subdirs_with_name(main_wd, "Cropped")
    processed_tables_counter = 0 
    for extracted_dir in cropped_dirs:
        for image_path in find_image_files_in_dir(extracted_dir):
            # Update the counter with the returned value
            processed_tables_counter = ocr_image_to_text(image_path, extracted_dir, processed_tables_counter)



# Example usage
 # Introduce a global counter
main_wd = r'C:\Users\nick2\Desktop\Table Transformer\Test Dataset'
output_directory = os.path.dirname(main_wd)
process_images_in_extracted_dirs(main_wd, output_directory)


Table Extracted!
Table Extracted!
Table Extracted!
Table Extracted!
Table Extracted!
Table Extracted!
Table Extracted!
Table Extracted!
No Table Detected.
Table Extracted!
Table Extracted!
No Table Detected.
Table Extracted!
Table Extracted!


# 2.2 Perform Table Data Extraction Using Machine Learning Techniques Tesseract (Kose)

# Table Extraction Evaluation (Jiyun)