**Mounted Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Install Library**

In [5]:
import os
import torch
import torchvision
from matplotlib import pyplot as plt
from pathlib import Path
import numpy as np
import cv2
from PIL import Image
import fitz
import copy
from bs4 import BeautifulSoup
import pandas as pd
import json
from types import SimpleNamespace

In [7]:
import sys

folder_path = "/content/drive/MyDrive/VTS Advance Data Analytic/Project/OCR/user/"
sys.path.append(folder_path)

In [8]:
from doclayout_yolo import YOLOv10
from huggingface_hub import snapshot_download

  from .autonotebook import tqdm as notebook_tqdm


In [9]:
# Load a pre-trained layout model
layout_model_dir = snapshot_download('juliozhao/DocLayout-YOLO-DocStructBench', local_dir='/content/drive/MyDrive/VTS Advance Data Analytic/Project/OCR/user/models/DocLayout-YOLO-DocStructBench')

Fetching 3 files:  67%|██████▋   | 2/3 [00:17<00:08,  8.63s/it]


KeyboardInterrupt: 

In [None]:
device_id = 0
device = torch.device(f"cuda:{device_id}")

In [None]:
layout_cfg = dict(layout_model_path=os.path.join("/content/drive/MyDrive/VTS Advance Data Analytic/Project/OCR/user/models", "DocLayout-YOLO-DocStructBench", "doclayout_yolo_docstructbench_imgsz1024.pt"),
                layout_confidence=0.25,
                layout_iou_threshold=0.45,
                device=device
                )
layout_cfg = SimpleNamespace(**layout_cfg)

In [None]:
class LayoutParser:
    def __init__(self, args):
        self.model = YOLOv10(args.layout_model_path)
        self.confidence = args.layout_confidence
        self.iou_threshold = args.layout_iou_threshold
        self.device = args.device
        self.id_to_names = {
            0: 'title',
            1: 'plain text',
            2: 'abandon',
            3: 'figure',
            4: 'figure_caption',
            5: 'table',
            6: 'table_caption',
            7: 'table_footnote',
            8: 'isolate_formula',
            9: 'formula_caption'
        }

    def colormap(self, N=256, normalized=False):
        """
        Generate the color map.
        Args:
            N (int): Number of labels (default is 256).
            normalized (bool): If True, return colors normalized to [0, 1]. Otherwise, return [0, 255].
        Returns:
            np.ndarray: Color map array of shape (N, 3).
        """
        def bitget(byteval, idx):
            """
            Get the bit value at the specified index.
            Args:
                byteval (int): The byte value.
                idx (int): The index of the bit.
            Returns:
                int: The bit value (0 or 1).
            """
            return ((byteval & (1 << idx)) != 0)

        cmap = np.zeros((N, 3), dtype=np.uint8)
        for i in range(N):
            r = g = b = 0
            c = i
            for j in range(8):
                r = r | (bitget(c, 0) << (7 - j))
                g = g | (bitget(c, 1) << (7 - j))
                b = b | (bitget(c, 2) << (7 - j))
                c = c >> 3
            cmap[i] = np.array([r, g, b])

        if normalized:
            cmap = cmap.astype(np.float32) / 255.0

        return cmap

    def visualize_bbox(self, image_path, bboxes, classes, scores, alpha=0.3):
        """
        Visualize layout detection results on an image.
        Args:
            image_path (str): Path to the input image.
            bboxes (list): List of bounding boxes, each represented as [x_min, y_min, x_max, y_max].
            classes (list): List of class IDs corresponding to the bounding boxes.
            id_to_names (dict): Dictionary mapping class IDs to class names.
            alpha (float): Transparency factor for the filled color (default is 0.3).
        Returns:
            np.ndarray: Image with visualized layout detection results.
        """
        # Check if image_path is a PIL.Image.Image object
        if isinstance(image_path, Image.Image) or isinstance(image_path, np.ndarray):
            image = np.array(image_path)
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)  # Convert RGB to BGR for OpenCV
        else:
            image = cv2.imread(image_path)
        ori_image = image.copy()
        overlay = image.copy()

        cmap = self.colormap(N=len(self.id_to_names), normalized=False)

        result = []
        # Iterate over each bounding box
        for i, bbox in enumerate(bboxes):
            x_min, y_min, x_max, y_max = map(int, bbox)
            class_id = int(classes[i])
            class_name = self.id_to_names[class_id]
            score = scores[i]
            roi_img = ori_image[y_min:y_max, x_min:x_max, :]
            # print(roi_img.shape)
            result.append({
                "type": class_name,
                "bbox": [x_min, y_min, x_max, y_max],
                "score": score,
                "roi_img": roi_img
            })
            text = class_name + f":{score:.3f}"

            color = tuple(int(c) for c in cmap[class_id])
            cv2.rectangle(overlay, (x_min, y_min), (x_max, y_max), color, -1)
            cv2.rectangle(image, (x_min, y_min), (x_max, y_max), color, 2)

            # Add the class name with a background rectangle
            (text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.9, 2)
            cv2.rectangle(image, (x_min, y_min - text_height - baseline), (x_min + text_width, y_min), color, -1)
            cv2.putText(image, text, (x_min, y_min - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)

        # Blend the overlay with the original image
        cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image)

        return result, image

    def predict(self, input_img):
        det_res = self.model.predict(
            input_img,   # Image to predict
            imgsz=1024,        # Prediction image size
            conf=self.confidence,          # Confidence threshold
            device=self.device    # Device to use (e.g., 'cuda:0' or 'cpu')
        )[0]
        boxes = det_res.__dict__['boxes'].xyxy
        classes = det_res.__dict__['boxes'].cls
        scores = det_res.__dict__['boxes'].conf

        indices = torchvision.ops.nms(boxes=torch.Tensor(boxes), scores=torch.Tensor(scores),iou_threshold=self.iou_threshold)
        boxes, scores, classes = boxes[indices], scores[indices], classes[indices]
        if len(boxes.shape) == 1:
            boxes = np.expand_dims(boxes, 0)
            scores = np.expand_dims(scores, 0)
            classes = np.expand_dims(classes, 0)

        dict_result, vis_result = self.visualize_bbox(input_img, boxes, classes, scores)

        return dict_result, vis_result

class DocumentExtractor:
    def __init__(self, layout_cfg, table_cfg=None, text_cfg=None):
        self.layout_cfg = layout_cfg
        self.layout_parser = LayoutParser(self.layout_cfg)

    def get_layout(self, img):
        dict_result, vis_result = self.layout_parser.predict(img)
        return dict_result, vis_result

    def pdf2img(self, page):
        page_scale = 4
        mat = fitz.Matrix(page_scale, page_scale)
        pm = page.get_pixmap(matrix=mat, alpha=False)
        # if pm.width > 2000 or pm.height > 2000:
        #     pm = page.get_pixmap(matrix=fitz.Matrix(1, 1), alpha=False)
        img = Image.frombytes("RGB", [pm.width, pm.height], pm.samples)
        img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
        return img

    def __call__(self, filepath):

        pages = []
        if os.path.basename(filepath)[-3:].lower() in ["jpg", "jpeg", "png"]:
            img = cv2.imread(image_file)
            img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
            imgs.append(img)

        elif os.path.basename(filepath)[-3:].lower() == "pdf":
            with fitz.open(filepath) as pdf:
                for pg in range(0, pdf.page_count):
                    page = pdf[pg]
                    text_bboxes = []

                    for box in page.get_text("blocks"):
                        text_bbox = box[:5]
                        text_bboxes.append(text_bbox)

                    img = self.pdf2img(page)
                    pages.append(img)

        result = {}
        num_pages = len(pages)
        document_name = os.path.splitext(os.path.basename(filepath))[0]
        result['document_name'] = document_name
        result['filepath'] = filepath
        result['num_pages'] = num_pages
        result['pages'] = []
        for page_id in range(num_pages):
            page_result = {}
            page_result['page_id'] = page_id
            img = pages[page_id]
            dict_result, vis_result = self.get_layout(img)
            page_result['layout'] = dict_result
            result['pages'].append(page_result)

        return result


In [None]:
def show_image(img):
    plt.figure(figsize=(32, 32), dpi=150)
    plt.imshow(img)
    plt.axis('off')
    plt.show()

In [None]:
document_extractor = DocumentExtractor(layout_cfg)

In [None]:
user_name = 'Duc'

In [None]:
import os
import pandas as pd

# Đường dẫn gốc đến thư mục thực tập
base_path = "/content/drive/MyDrive/VTS Advance Data Analytic/Project/OCR/user/dataset/raw_document"

# Danh sách tên thực tập viên

# Hàm thu thập thông tin file từ thư mục
def collect_files_from_folder(folder_path):
    file_data = []
    if os.path.exists(folder_path):  # Kiểm tra thư mục tồn tại
        for file in os.listdir(folder_path):
            if file.endswith(".pdf"):  # Chỉ thu thập file PDF
                full_path = os.path.join(folder_path, file)
                file_data.append({
                    "File Name": file,
                    "File Path": full_path,
                    "Status": "Not Processed",
                    "Table Count": 0  # Thêm cột Table Count ban đầu là 0
                })
    return file_data

# Hàm cập nhật file log
def update_log(user_name, base_path):
    # Đường dẫn thư mục và file log
    user_path = os.path.join(base_path, user_name)
    log_file_path = f"/content/drive/MyDrive/VTS Advance Data Analytic/Project/OCR/user/file_logs/raw_logs/{user_name}_log.csv"

    # Thu thập danh sách file hiện tại từ thư mục
    current_files = collect_files_from_folder(user_path)
    current_df = pd.DataFrame(current_files)

    # Nếu file log chưa tồn tại, tạo mới
    if not os.path.exists(log_file_path):
        print(f"Creating new log file for {user_name}.")
        os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
        current_df.to_csv(log_file_path, index=False)
        return

    # Đọc file log hiện tại
    existing_df = pd.read_csv(log_file_path)

    # Thêm cột "Table Count" nếu chưa tồn tại (đề phòng log cũ chưa có cột này)
    if "Table Count" not in existing_df.columns:
        existing_df["Table Count"] = 0

    # So sánh để tìm file mới và file bị xóa
    # Tìm file mới (có trong current_df nhưng không có trong existing_df)
    new_files = current_df[~current_df["File Name"].isin(existing_df["File Name"])]

    # Tìm file bị xóa (có trong existing_df nhưng không còn trong current_df)
    removed_files = existing_df[~existing_df["File Name"].isin(current_df["File Name"])]

    # Cập nhật log
    if not new_files.empty:
        print(f"Adding {len(new_files)} new files to log for {user_name}.")
        existing_df = pd.concat([existing_df, new_files], ignore_index=True)

    if not removed_files.empty:
        print(f"Marking {len(removed_files)} files as deleted for {user_name}.")
        existing_df.loc[existing_df["File Name"].isin(removed_files["File Name"]), "Status"] = "Deleted"

    # Lưu lại file log
    existing_df.to_csv(log_file_path, index=False)
    print(f"Log for {user_name} updated successfully.")

# Duyệt qua từng thực tập viên và cập nhật log
update_log(user_name, base_path)

In [None]:
import os
import pandas as pd
from tqdm import tqdm
import cv2
import numpy as np
from PIL import Image

# Hàm giả lập document_extractor (thay bằng code thực tế của bạn)
# Đường dẫn cơ sở
base_path = "/content/drive/MyDrive/VTS Advance Data Analytic/Project/OCR/user/dataset"
log_path = "/content/drive/MyDrive/VTS Advance Data Analytic/Project/OCR/user/file_logs/raw_logs"
output_path_base = "/content/drive/MyDrive/VTS Advance Data Analytic/Project/OCR/user/dataset/table_ocr/image/local"

# Hàm xử lý từng file
# Hàm xử lý từng file
def process_file(file_name, user_name, log_file_path, output_path):
    # Load log file
    log_df = pd.read_csv(log_file_path)

    # Kiểm tra trạng thái file
    file_row = log_df[log_df["File Name"] == file_name]
    if not file_row.empty and file_row.iloc[0]["Status"] == "Processed":
        print(f"Skipping {file_name} as it is already Processed.")
        return

    # Thực hiện trích xuất thông tin
    # print(f"Processing {file_name}...")
    file_path = os.path.join(base_path, "raw_document", user_name, file_name)
    extracted_result = document_extractor(file_path)

    # Tạo thư mục đầu ra
    os.makedirs(output_path, exist_ok=True)

    table_count = 0
    for page in extracted_result['pages']:
        layout = page['layout']
        for box in layout:
            if box['type'] == 'table':
                table_count += 1
                # Lấy ảnh numpy từ box['roi_img']
                roi_img = box['roi_img']  # Đây là ảnh numpy array

                # Tạo tên file đầu ra
                output_file_name = f"{os.path.splitext(file_name)[0]}_table{table_count}.png"
                output_file_path = os.path.join(output_path, output_file_name)

                # Lưu ảnh bằng cv2
                if isinstance(roi_img, np.ndarray):
                    cv2.imwrite(output_file_path, roi_img)
                    print(f"Saved table image to {output_file_path}")
                else:
                    print(f"Invalid ROI image for {output_file_name}. Skipping...")

    # Cập nhật log file
    log_df.loc[log_df["File Name"] == file_name, "Status"] = "Processed"
    log_df.loc[log_df["File Name"] == file_name, "Table Count"] += table_count
    log_df.to_csv(log_file_path, index=False)

    print(f"Processed {file_name}: {table_count} tables extracted.")

    # print(f"Processed {file_name}: {table_count} tables extracted.")

# Hàm xử lý tất cả các file của một thực tập viên
def process_user_files(user_name):
    user_folder = os.path.join(base_path, "raw_document", user_name)
    log_file_path = os.path.join(log_path, f"{user_name}_log.csv")
    output_path = os.path.join(output_path_base, user_name)

    # Đọc log file
    if not os.path.exists(log_file_path):
        print(f"Log file for {user_name} does not exist.")
        return

    log_df = pd.read_csv(log_file_path)

    # Duyệt qua các file trong thư mục
    for file_name in tqdm(os.listdir(user_folder)):
        if file_name.endswith(".pdf"):
            process_file(file_name, user_name, log_file_path, output_path)

# Duyệt qua tất cả thực tập viên
print(f"Processing files for {user_name}...")
process_user_files(user_name)