1. IoU 기반 중복제거 : iou에서 특정 임계값 이상을 가진 바운딩 박스들은 동일한 객체로 간주하고, 신뢰도가 높은 바운딩박스만 남김
2. 중복제거된 바운딩 박스 json 파일로 저장
3. 이미지로도 보이기


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Hugging Face 라이브러리 설치
!pip install huggingface_hub ultralytics
!pip install doclayout-yolo
!git clone https://github.com/opendatalab/DocLayout-YOLO.git

# 필요한 라이브러리 설치
!apt-get update
!apt-get install -y poppler-utils
!pip install pdf2image
!pip install torch transformers==4.40.0 accelerate
!pip install pytesseract
!pip install opencv-python
!pip install pillow
!pip install easyocr

Collecting ultralytics
  Downloading ultralytics-8.3.38-py3-none-any.whl.metadata (35 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.12-py3-none-any.whl.metadata (9.4 kB)
Downloading ultralytics-8.3.38-py3-none-any.whl (896 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m896.3/896.3 kB[0m [31m51.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.12-py3-none-any.whl (26 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.38 ultralytics-thop-2.0.12
Collecting doclayout-yolo
  Downloading doclayout_yolo-0.0.2-py3-none-any.whl.metadata (8.9 kB)
Collecting thop>=0.1.1 (from doclayout-yolo)
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl.metadata (2.7 kB)
Downloading doclayout_yolo-0.0.2-py3-none-any.whl (708 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m708.2/708.2 kB[0m [31m38.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloadi

In [None]:
from huggingface_hub import hf_hub_download
from ultralytics import YOLO
import numpy as np
import pandas as pd
import cv2
import json
from doclayout_yolo import YOLOv10
import os
import uuid
from pdf2image import convert_from_path
import easyocr
from collections import Counter
from google.colab.patches import cv2_imshow
from tqdm import tqdm
import transformers
import torch

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:
print("GPU available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU name:", torch.cuda.get_device_name(0))

GPU available: True
GPU name: NVIDIA A100-SXM4-40GB


In [None]:
# PDF to JPG 변환 함수
def pdf_to_jpg(pdf_path, output_folder="images", dpi=300):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    images = convert_from_path(pdf_path, dpi=dpi)
    image_paths = []
    for i, image in enumerate(images):
        image_path = os.path.join(output_folder, f"page_{i+1}.jpg")
        image.save(image_path, "JPEG")
        image_paths.append(image_path)
    print(f"Converted PDF to {len(image_paths)} JPG files.")
    return image_paths


## YOLO

In [None]:
# IoU 계산 함수 정의
def calculate_iou(box1, box2):
    x1, y1, x2, y2 = box1
    x3, y3, x4, y4 = box2
    inter_x1 = max(x1, x3)
    inter_y1 = max(y1, y3)
    inter_x2 = min(x2, x4)
    inter_y2 = min(y2, y4)
    inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x4 - x3) * (y4 - y3)
    return inter_area / (box1_area + box2_area - inter_area)

# 중복 바운딩 박스 제거 함수 (IoU 기반)
def filter_duplicate_boxes(bounding_boxes, iou_threshold=0.5):
    filtered_boxes = []
    for box in bounding_boxes:
        keep = True
        for fbox in filtered_boxes:
            iou = calculate_iou(
                (box["x_min"], box["y_min"], box["x_max"], box["y_max"]),
                (fbox["x_min"], fbox["y_min"], fbox["x_max"], fbox["y_max"])
            )
            if iou > iou_threshold:
                if box["confidence"] > fbox["confidence"]:
                    filtered_boxes.remove(fbox)
                else:
                    keep = False
                break
        if keep:
            filtered_boxes.append(box)
    return filtered_boxes

# 고유값 생성 함수
def generate_unique_suffix(index):
    """
    주어진 인덱스를 기반으로 영어 소문자(a-z)를 반환.
    숫자가 아닌 문자만 사용.
    """
    alphabet = "abcdefghijklmnopqrstuvwxyz"
    return alphabet[index % len(alphabet)]


# 바운딩 박스 예측 및 JSON 저장
def process_image(image_path, model, page_number, output_folder="output_results"):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # 모델 예측 수행
    det_res = model.predict(image_path, imgsz=1024, conf=0.2, device="cpu")

    # 바운딩 박스 정보 추출
    bounding_boxes = []
    for i, box in enumerate(det_res[0].boxes):
        class_name = model.names[int(box.cls)]  # 클래스 이름
        class_number = int(box.cls)   # 클래스 번호 지정
        unique_suffix = generate_unique_suffix(i)  # 고유값 생성 (영어 소문자)
        bounding_boxes.append({
            "class": class_name,
            "confidence": float(box.conf),
            "x_min": float(box.xyxy[0][0]),
            "y_min": float(box.xyxy[0][1]),
            "x_max": float(box.xyxy[0][2]),
            "y_max": float(box.xyxy[0][3]),
            "unique_id": f"{page_number}_{class_number}_{unique_suffix}"  # unique_id 생성
        })

    # 중복 제거된 바운딩 박스 생성
    filtered_boxes = filter_duplicate_boxes(bounding_boxes, iou_threshold=0.5)

    # JSON 파일로 저장
    json_output_path = os.path.join(output_folder, f"page_{page_number}_filtered_boxes.json")
    with open(json_output_path, "w") as f:
        json.dump(filtered_boxes, f, indent=4)
    print(f"Saved filtered boxes for Page {page_number} to: {json_output_path}")

# YOLO 모델 로드
def load_yolo_model():
    filepath = hf_hub_download(repo_id="juliozhao/DocLayout-YOLO-DocStructBench", filename="doclayout_yolo_docstructbench_imgsz1024.pt")
    return YOLOv10(filepath)

# 메인 실행 함수
def main(pdf_path, output_folder="output_results"):
    print("Loading YOLO model...")
    model = load_yolo_model()

    print("Converting PDF to images...")
    image_paths = pdf_to_jpg(pdf_path, output_folder="images")

    print("Processing images...")
    for page_number, image_path in enumerate(image_paths, start=1):
        process_image(image_path, model, page_number, output_folder)

# 실행
if __name__ == "__main__":
    pdf_path = "/content/비타민 CV 프로젝트.pdf"  # 처리할 PDF 경로
    main(pdf_path)

Loading YOLO model...
Converting PDF to images...
Converted PDF to 2 JPG files.
Processing images...

image 1/1 /content/images/page_1.jpg: 1024x736 2 titles, 3 plain texts, 1 abandon, 2 figures, 2 figure_captions, 3337.7ms
Speed: 27.8ms preprocess, 3337.7ms inference, 25.0ms postprocess per image at shape (1, 3, 1024, 736)
Saved filtered boxes for Page 1 to: output_results/page_1_filtered_boxes.json

image 1/1 /content/images/page_2.jpg: 1024x736 1 plain text, 1 abandon, 3 tables, 2709.7ms
Speed: 7.2ms preprocess, 2709.7ms inference, 1.0ms postprocess per image at shape (1, 3, 1024, 736)
Saved filtered boxes for Page 2 to: output_results/page_2_filtered_boxes.json


In [None]:
def combine_json_by_class(directory_path, output_table_file="table_json.json", output_figure_file="figure_json.json", output_plain_text_file="plain_text_json.json"):
    """
    디렉터리 내 모든 JSON 파일에서 클래스 번호에 따라 데이터를 분류하여 저장.

    Args:
        directory_path (str): JSON 파일이 저장된 디렉터리 경로.
        output_table_file (str): 테이블 데이터를 저장할 JSON 파일 이름.
        output_figure_file (str): 그림 데이터를 저장할 JSON 파일 이름.
        output_plain_text_file (str): 평문 데이터를 저장할 JSON 파일 이름.
    """
    table_data = []  # 클래스 번호가 'table'
    figure_data = []  # 클래스 번호가 'figure'
    plain_text_data = [] # 클래스 번호가 'plain text'

    # 디렉터리 내 모든 JSON 파일 검색
    json_files = [f for f in os.listdir(directory_path) if f.endswith("_filtered_boxes.json")]

    for json_file in json_files:
        json_file_path = os.path.join(directory_path, json_file)

        # JSON 파일 로드
        with open(json_file_path, "r", encoding="utf-8") as f:
            data = json.load(f)

        # 클래스 번호에 따라 데이터를 분류
        for box in data:
            if box["class"] == "table":
                table_data.append(box)
            elif box["class"] == "figure":
                figure_data.append(box)
            elif box["class"] == "plain text":
                plain_text_data.append(box)

    # 결합된 데이터를 각각의 파일로 저장
    save_json(os.path.join(directory_path, output_table_file), table_data, "table")
    save_json(os.path.join(directory_path, output_figure_file), figure_data, "figure")
    save_json(os.path.join(directory_path, output_plain_text_file), plain_text_data, "plain_text")

def save_json(output_path, data, data_type):
    """
    데이터를 JSON 파일로 저장.

    Args:
        output_path (str): 저장할 JSON 파일 경로.
        data (list): 저장할 데이터.
        data_type (str): 데이터 유형 (table 또는 figure).
    """
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4)
    print(f"{data_type.capitalize()} data saved to: {output_path}")

# 실행 예시
directory_path = "/content/output_results"  # JSON 파일이 저장된 디렉터리 경로
combine_json_by_class(directory_path)


Table data saved to: /content/output_results/table_json.json
Figure data saved to: /content/output_results/figure_json.json
Plain_text data saved to: /content/output_results/plain_text_json.json


In [None]:
def crop_and_save_by_json(image_dir, json_path, output_dir):
    """
    이미지에서 JSON 파일에 정의된 바운딩 박스 정보를 기반으로 이미지를 크롭하여 저장.

    Args:
        image_dir (str): 원본 이미지가 저장된 디렉토리 경로.
        json_path (str): JSON 파일 경로.
        output_dir (str): 크롭된 이미지를 저장할 디렉토리 경로.
    """
    # 출력 디렉토리 생성
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # JSON 파일 읽기
    with open(json_path, "r", encoding="utf-8") as f:
        bounding_boxes = json.load(f)

    # 바운딩 박스 정보 순회
    for box in bounding_boxes:
        # 페이지 번호 추출
        page_number = int(box["unique_id"].split("_")[0])
        image_name = f"page_{page_number}.jpg"
        image_path = os.path.join(image_dir, image_name)

        # 이미지 읽기
        image = cv2.imread(image_path)
        if image is None:
            print(f"이미지를 불러올 수 없습니다: {image_path}")
            continue

        # 바운딩 박스 좌표 추출
        x_min = int(box["x_min"])
        y_min = int(box["y_min"])
        x_max = int(box["x_max"])
        y_max = int(box["y_max"])

        # 바운딩 박스 영역 크롭
        cropped_image = image[y_min:y_max, x_min:x_max]

        # 파일 저장 경로 생성
        save_path = os.path.join(output_dir, f"{box['unique_id']}.jpg")

        # 크롭된 이미지 저장
        cv2.imwrite(save_path, cropped_image)
        print(f"Saved cropped image to: {save_path}")

# 실행 예시
image_dir = "/content/images"  # 원본 이미지가 저장된 디렉토리
output_dir_base = "/content/cropped_images"  # 크롭된 이미지가 저장될 기본 디렉토리

# JSON 파일별 작업
json_files = {
    "figure": "/content/output_results/figure_json.json",
    "table": "/content/output_results/table_json.json",
    "plain_text": "/content/output_results/plain_text_json.json"
}

for category, json_path in json_files.items():
    output_dir = os.path.join(output_dir_base, category)  # figure 또는 table 디렉토리
    crop_and_save_by_json(image_dir, json_path, output_dir)


Saved cropped image to: /content/cropped_images/figure/1_3_c.jpg
Saved cropped image to: /content/cropped_images/figure/1_3_e.jpg
Saved cropped image to: /content/cropped_images/table/2_5_a.jpg
Saved cropped image to: /content/cropped_images/table/2_5_c.jpg
Saved cropped image to: /content/cropped_images/plain_text/2_1_b.jpg
Saved cropped image to: /content/cropped_images/plain_text/1_1_a.jpg
Saved cropped image to: /content/cropped_images/plain_text/1_1_b.jpg


In [None]:
# 1. 먼저 필요한 패키지 설치
!pip install opencv-contrib-python
!pip install gdown

# 2. EDSR 모델 다운로드 (처음 한 번만 실행)
!mkdir -p models
!gdown --id 1-HnR-AM1ndkeRh7zCJ-Fr3Tq9QKG-qUp -O models/EDSR_x3.pb

Failed to retrieve file url:

	Cannot retrieve the public link of the file. You may need to change
	the permission to 'Anyone with the link', or have had many accesses.
	Check FAQ in https://github.com/wkentaro/gdown?tab=readme-ov-file#faq.

You may still be able to access the file from the browser:

	https://drive.google.com/uc?id=1-HnR-AM1ndkeRh7zCJ-Fr3Tq9QKG-qUp

but Gdown can't. Please check connections and permissions.


## TextExtractor

In [None]:
import os
import json
import easyocr
from typing import List, Dict, Any

class TextExtractor:
    def __init__(self):

        self.reader = easyocr.Reader(['ko', 'en'])

    def extract_text_from_image(self, image_path: str) -> Dict[str, Any]:

        # Read the image and extract text
        text_result = self.reader.readtext(image_path, detail=0)
        text = " ".join(text_result).strip()
        text = " ".join(text.split())

        return {
            'data_id': image_path.split('/')[-1],
            '유형': '평문',
            '내용': text
        }

    def process_directory(self, input_dir: str) -> List[Dict[str, Any]]:

        results = []

        # Ensure input directory exists
        if not os.path.exists(input_dir):
            raise FileNotFoundError(f"Input directory not found: {input_dir}")

        # Process each image in the directory
        for filename in os.listdir(input_dir):
            if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp')):
                image_path = os.path.join(input_dir, filename)
                try:
                    result = self.extract_text_from_image(image_path)
                    results.append(result)
                except Exception as e:
                    print(f"Error processing {filename}: {str(e)}")

        return results

def save_results_as_json(results: List[Dict[str, Any]], output_dir: str) -> str:
    """
    Save extraction results to a JSON file.

    Args:
        results: List of dictionaries containing extraction results
        output_dir: Directory where the JSON file will be saved

    Returns:
        Path to the saved JSON file
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # JSON save path
    results_json_path = os.path.join(output_dir, "result_plain_text.json")

    # Save to JSON
    with open(results_json_path, "w", encoding="utf-8") as json_file:
        json.dump(results, json_file, ensure_ascii=False, indent=4)

    return results_json_path


# Configuration
input_dir = "/content/cropped_images/plain_text"
output_dir = "/content/outputs"

# Initialize extractor
extractor = TextExtractor()

# Process all images
results = extractor.process_directory(input_dir)

# Save results
json_path = save_results_as_json(results, output_dir)
print(f"Extraction completed. Results saved to: {json_path}")

Extraction completed. Results saved to: /content/outputs/result_plain_text.json


## TableExtractor

In [None]:
class TableExtractor:
    def __init__(self):
        """
        Initialize the TableExtractor with necessary components
        """
        # OCR reader initialization
        self.reader = easyocr.Reader(['ko', 'en'])

    def process_image(self, image):
      """
      Process the input image and extract table cells

      Args:
          image: JPG image object (numpy array or file path)

      Returns:
          dict: JSON formatted extraction results
      """
      if isinstance(image, str):
          self.image = cv2.imread(image)
      else:
          self.image = image

      self.result = self.image.copy()

      self.detect_lines()
      self.classify_lines_and_find_intersections()
      self.remove_duplicate_points()

      # 텍스트 추출 및 셀 정보 얻기
      data, extracted_cells = self.extract_text_from_cells()

      # 데이터프레임 생성 및 처리
      df = pd.DataFrame(data)

      # 빈 행/열 제거를 위한 전처리
      # 모든 빈 문자열을 NaN으로 변환
      df = df.replace(r'^\s*$', np.nan, regex=True)
      df = df.replace('', np.nan)

      # 모든 값이 NaN인 행과 열 제거
      df = df.dropna(how='all', axis=0).dropna(how='all', axis=1)

      # 인덱스 리셋
      df = df.reset_index(drop=True)

      # NaN을 다시 빈 문자열로 변환
      df = df.fillna('')

      # 처리된 데이터프레임을 기반으로 셀 정보 업데이트
      processed_cells = []
      for i in range(len(df)):
          for j in range(len(df.columns)):
              # 원본 좌표 찾기
              original_cell = next(
                  (cell for cell in extracted_cells
                  if cell['row'] == i + 1 and cell['col'] == j + 1),
                  None
              )

              if original_cell:
                  processed_cells.append({
                      'row': i + 1,
                      'col': j + 1,
                      'text': df.iloc[i, j],
                      'coordinates': original_cell['coordinates']
                  })

      # 최종 결과를 JSON 형태로 반환
      final_result = {
          'cells': processed_cells,
          'grid_info': {
              'rows': len(df),
              'cols': len(df.columns)
          }
      }

      return final_result

    def detect_lines(self):
        """
        Detect lines in the image using Canny edge detection and Hough transform
        """
        # 1. 엣지 검출 (Canny)
        self.edges = cv2.Canny(self.image, 50, 150, apertureSize=3)

        # 2. Hough 변환 적용하여 선 감지
        self.lines = cv2.HoughLinesP(
            self.edges,
            1,
            np.pi/180,
            threshold=100,
            minLineLength=100,
            maxLineGap=10
        )

        return self.lines

    def classify_lines_and_find_intersections(self):
        """
        Classify lines as horizontal or vertical and find their intersection points
        """
        self.intersection_points = []
        self.horizontal_lines = []
        self.vertical_lines = []

        if self.lines is not None:
            # 선 분류 (수평/수직)
            for line in self.lines:
                x1, y1, x2, y2 = line[0]
                angle = np.abs(np.arctan2(y2 - y1, x2 - x1) * 180.0 / np.pi)

                if angle < 10 or angle > 170:
                    self.horizontal_lines.append(line[0])
                elif 80 < angle < 100:
                    self.vertical_lines.append(line[0])

            # 이미지의 경계에 가상의 테두리 선 추가
            height, width = self.image.shape[:2]
            margin = 10
            self.horizontal_lines.append([margin, margin, width - margin, margin])  # 상단 경계선
            self.horizontal_lines.append([margin, height - margin, width - margin, height - margin])  # 하단 경계선
            self.vertical_lines.append([margin, margin, margin, height - margin])  # 왼쪽 경계선
            self.vertical_lines.append([width - margin, margin, width - margin, height - margin])  # 오른쪽 경계선

            # 교차점 찾기
            self._find_intersection_points()

            # 끝점 처리
            self._process_end_points()

    def _find_intersection_points(self):
        """
        Calculate intersection points between horizontal and vertical lines
        """
        for h_line in self.horizontal_lines:
            for v_line in self.vertical_lines:
                x1, y1, x2, y2 = h_line
                x3, y3, x4, y4 = v_line

                denominator = ((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4))
                if denominator != 0:
                    t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denominator
                    u = -((x1 - x2) * (y1 - y3) - (y1 - y2) * (x1 - x3)) / denominator

                    if 0 <= t <= 1 and 0 <= u <= 1:
                        x = int(x1 + t * (x2 - x1))
                        y = int(y1 + t * (y2 - y1))
                        self.intersection_points.append((x, y))
        # 교차점 정렬
        self.intersection_points = sorted(set(self.intersection_points), key=lambda p: (p[1], p[0]))

    def _process_end_points(self):
        """
        Process end points of lines and combine with intersection points
        """
        # 끝점 수집
        end_points = []
        for line in self.horizontal_lines + self.vertical_lines:
            x1, y1, x2, y2 = line
            end_points.append((x1, y1))
            end_points.append((x2, y2))

        # 최소/최대 좌표 계산
        x_values = [point[0] for point in end_points]
        y_values = [point[1] for point in end_points]

        x_min, x_max = min(x_values), max(x_values)
        y_min, y_max = min(y_values), max(y_values)

        # 필터링된 끝점 선택
        self.filtered_end_points = [
            (x, y) for (x, y) in end_points
            if (x_min <= x <= x_min + 10 or x_max - 10 <= x <= x_max) or
               (y_min <= y <= y_min + 10 or y_max - 10 <= y <= y_max)
        ]

        # 모든 점 결합
        self.all_points = self.intersection_points + self.filtered_end_points

    def remove_duplicate_points(self, distance_threshold=15):
        """
        Remove duplicate points that are within a certain distance threshold

        Args:
            distance_threshold (int): Maximum distance between points to be considered duplicates
        """
        self.unique_points = []
        points_array = np.array(self.all_points)

        # 각 점에 대해 거리를 계산하여 중복 제거
        for point in self.all_points:
            is_unique = True
            for unique_point in self.unique_points:
                distance = np.linalg.norm(np.array(point) - np.array(unique_point))
                if distance <= distance_threshold:
                    is_unique = False
                    break
            if is_unique:
                self.unique_points.append(point)

    def extract_text_from_cells(self, min_height=30, min_width=30):
      """
      Extract text from each cell in the table grid and return with cell coordinates

      Args:
          min_height (int): Minimum height of cell to process
          min_width (int): Minimum width of cell to process

      Returns:
          tuple: (2D list of extracted text, list of cell information)
      """
      # x, y 좌표 분리 및 정렬
      self.x_coords = sorted(list(set([point[0] for point in self.intersection_points])))
      self.y_coords = sorted(list(set([point[1] for point in self.intersection_points])))

      # 격자 구간별 텍스트 추출
      data = []
      extracted_cells = []

      for i in range(len(self.y_coords) - 1):
          row = []
          for j in range(len(self.x_coords) - 1):
              # 격자 영역 좌표 계산
              top_left_x = self.x_coords[j]
              top_left_y = self.y_coords[i]
              bottom_right_x = self.x_coords[j + 1]
              bottom_right_y = self.y_coords[i + 1]

              # 격자 영역 잘라내기
              tile = self.image[top_left_y:bottom_right_y, top_left_x:bottom_right_x]

              # 셀 정보 생성
              cell_info = {
                  'row': i + 1,
                  'col': j + 1,
                  'coordinates': {
                      'top_left': (top_left_x, top_left_y),
                      'bottom_right': (bottom_right_x, bottom_right_y)
                  }
              }

              # 너무 작은 이미지는 빈 텍스트로 처리
              if tile.shape[0] < min_height or tile.shape[1] < min_width:
                  row.append("")
                  cell_info['text'] = ""
                  extracted_cells.append(cell_info)
                  continue

              # EasyOCR로 텍스트 추출
              text_result = self.reader.readtext(tile, detail=0)
              text = "\n".join(text_result).strip()
              row.append(text)
              cell_info['text'] = text
              extracted_cells.append(cell_info)

          data.append(row)

      return data, extracted_cells

In [None]:
import os
import cv2
import json
import numpy as np
import easyocr
from tqdm import tqdm

def extract_text_from_cells(cells_data):
    """Extract text content from cells data"""
    extracted_text = []
    for cell in cells_data:
        if 'text' in cell:
            extracted_text.append(cell['text'])
    return ' '.join(extracted_text)

def process_images_in_directory(input_dir, output_dir):
    """
    Process all images in a directory and extract text.

    Args:
        input_dir (str): Directory containing the images.
        output_dir (str): Directory to save the JSON file with extracted text.
    """
    if not os.path.exists(input_dir):
        raise ValueError(f"Input directory does not exist: {input_dir}")

    results = []

    # Initialize TableExtractor once to reuse the EasyOCR reader
    processor = TableExtractor()

    # Iterate through all images in the directory
    for file_name in tqdm(os.listdir(input_dir), desc="Processing Images"):
        if file_name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".tiff")):
            image_path = os.path.join(input_dir, file_name)

            # Read image
            image = cv2.imread(image_path)
            if image is None:
                print(f"Failed to read image: {image_path}")
                continue

            # Process image and extract text
            try:
                # Process the image using TableExtractor
                result = processor.process_image(image)

                if result and 'cells' in result and 'grid_info' in result:
                    # Extract text from cells
                    extracted_text = extract_text_from_cells(result['cells'])

                    json_data = {
                        "data_id": file_name,
                        "제목": file_name,  # 임시로 파일명 사용
                        "유형": "일반표",
                        "내용": extracted_text,
                        "요약": "",
                        "cells": result['cells'],
                        "grid": result['grid_info']
                    }
                    results.append(json_data)
                else:
                    print(f"No valid table data found in {file_name}")

            except Exception as e:
                print(f"Error processing {file_name}: {e}")

    # Save results as JSON
    save_results_as_json(results, output_dir)

def save_results_as_json(results, output_dir):
    """Save results to a JSON file."""
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # JSON save path
    results_json_path = os.path.join(output_dir, "result_table.json")

    # Save to JSON
    with open(results_json_path, "w", encoding="utf-8") as json_file:
        json.dump(results, json_file, ensure_ascii=False, indent=4)
    print(f"Extracted text saved to: {results_json_path}")

In [None]:
# 여러 이미지 처리할 경우
input_dir = "/content/cropped_images/table"
output_dir = "/content/outputs"
process_images_in_directory(input_dir, output_dir)

  df = df.replace(r'^\s*$', np.nan, regex=True)
  df = df.replace(r'^\s*$', np.nan, regex=True)
Processing Images: 100%|██████████| 2/2 [00:01<00:00,  1.42it/s]

Extracted text saved to: /content/outputs/result_table.json





In [None]:
import json

def read_json_file(file_path):
    """
    Read and display the contents of a JSON file

    Args:
        file_path (str): Path to the JSON file
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            data = json.load(file)
            return data
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        return None
    except json.JSONDecodeError:
        print(f"Error decoding JSON from file: {file_path}")
        return None
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return None

# JSON 파일 읽기
file_path = "/content/outputs/result_table.json"
result = read_json_file(file_path)

## ImageToDataTable

In [None]:
from transformers import AutoProcessor, Pix2StructForConditionalGeneration
import torch
from datetime import datetime
from pytz import timezone
import pandas as pd
from PIL import Image
import os
import json
from tqdm import tqdm
import warnings


In [None]:

warnings.filterwarnings('ignore')

MAX_PATCHES = 512

class ImageToDataTable:
    def __init__(self, model_path: str, output_dir: str = "outputs", device: str = None):
        self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu")
        self.output_dir = output_dir
        self.model_path = model_path

        # 모델 초기화
        self.processor = AutoProcessor.from_pretrained("ybelkada/pix2struct-base")
        self.model = Pix2StructForConditionalGeneration.from_pretrained("ybelkada/pix2struct-base")

        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model file not found at {model_path}")

        self.model.load_state_dict(torch.load(model_path, map_location=self.device))
        self.model.to(self.device)
        self.model.eval()

        # 출력 디렉토리 준비
        os.makedirs(self.output_dir, exist_ok=True)

    def process_image(self, image_path: str):
        """단일 이미지를 처리하고 생성된 텍스트 데이터를 반환합니다."""
        data_id = os.path.splitext(os.path.basename(image_path))[0]
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"Error loading image {data_id}: {e}")
            return None, None

        # 이미지 전처리 및 모델 추론
        inputs = self.processor(images=image, return_tensors="pt", max_patches=MAX_PATCHES).to(self.device)
        with torch.no_grad():
            generated_ids = self.model.generate(
                flattened_patches=inputs.flattened_patches,
                attention_mask=inputs.attention_mask,
                max_length=1000
            )
        generated_text = self.processor.batch_decode(generated_ids, skip_special_tokens=True)[0]

        return data_id, self._parse_generated_text(generated_text)

    def _parse_generated_text(self, text: str):
        """생성된 텍스트를 제목, 유형, 내용으로 파싱하여 반환합니다."""
        try:
            lines = text.replace("<pad>", "").replace("<unk>", "").replace("</s>", "").split("유형:")
            if len(lines) >= 2:
                title = lines[0].split("제목:")[-1].strip()
                type_ = lines[1].split("내용:")[0].strip()
                content = "내용:".join(lines[1:]).split("내용:")[-1].strip()
            else:
                title, type_, content = "", "", text
        except Exception as e:
            print(f"Error parsing generated text: {e}")
            title, type_, content = "", "", text
        return {"제목": title, "유형": type_, "내용": content, "요약": ""}


    def process_images(self, image_paths):
        """여러 이미지를 처리하고 결과를 JSON 형식으로 저장합니다."""
        log_path = os.path.join(self.output_dir, "evaluation_log.txt")

        json_data = []

        with open(log_path, "a", encoding='utf-8') as log_file:
            current_time = datetime.now(timezone('Asia/Seoul'))
            print(f"Start: {current_time} Data: {len(image_paths)}")
            log_file.write(f"Start: {current_time} Data: {len(image_paths)}\n")

            for image_path in tqdm(image_paths, desc="Processing Images"):
                data_id, parsed_data = self.process_image(image_path)
                if parsed_data:
                    json_data.append({"data_id": data_id, **parsed_data})

            current_time = datetime.now(timezone('Asia/Seoul'))
            print(f"End: {current_time} Data: {len(json_data)}")
            log_file.write(f"End: {current_time} Data: {len(json_data)}\n")

        # 결과 저장
        self._save_results(json_data)

    def _save_results(self, json_data):
        """JSON 파일로 결과를 저장합니다."""

        # JSON 저장
        json_output_path = os.path.join(self.output_dir, "result_figure.json")
        with open(json_output_path, "w", encoding='utf-8') as jf:
            json.dump(json_data, jf, ensure_ascii=False, indent=4)
        print(f"JSON results saved to {json_output_path}")


def get_image_paths(data_path: str):
    """지원하는 확장자를 가진 이미지 파일 경로 목록을 반환합니다."""
    supported_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.gif')
    return [
        os.path.join(data_path, fname)
        for fname in os.listdir(data_path)
        if fname.lower().endswith(supported_extensions)
    ]


# 고정된 경로 설정
DEFAULT_DATA_PATH = "/content/cropped_images/figure"
DEFAULT_MODEL_PATH = "/content/drive/MyDrive/Colab Notebooks/cv project/deplot_k.pt"
DEFAULT_OUTPUT_DIR = "/content/outputs"

if __name__ == '__main__':
    # 이미지 경로 가져오기
    image_paths = get_image_paths(DEFAULT_DATA_PATH)
    if not image_paths:
        raise ValueError(f"No supported image files found in {DEFAULT_DATA_PATH}")

    # ImageToDataTable 클래스 초기화 및 이미지 처리
    image_to_table = ImageToDataTable(model_path=DEFAULT_MODEL_PATH, output_dir=DEFAULT_OUTPUT_DIR)
    image_to_table.process_images(image_paths)

preprocessor_config.json:   0%|          | 0.00/231 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/2.61k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/851k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/3.27M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/2.20k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.92k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.13G [00:00<?, ?B/s]

Start: 2024-11-27 21:01:29.425512+09:00 Data: 2


Processing Images: 100%|██████████| 2/2 [00:09<00:00,  4.88s/it]

End: 2024-11-27 21:01:39.198160+09:00 Data: 2
JSON results saved to /content/outputs/result_figure.json





## llama-3.2-Korean-Bllossom-3B

- 드라이브의 모델을 불러오는 코드로 변경

In [None]:
# JSON 파일 로드 함수
def load_json(file_path):
    with open(file_path, "r", encoding="utf-8") as file:
        return json.load(file)

# JSON 파일 저장 함수
def save_json(file_path, data):
    with open(file_path, "w", encoding="utf-8") as file:
        json.dump(data, file, ensure_ascii=False, indent=4)


# Google Drive 내 JSON 파일 경로
figure_json_file_path = "/content/outputs/result_figure.json"  # 원하는 JSON 파일 경로 입력
table_json_file_path = "/content/outputs/result_table.json"  # 원하는 JSON 파일 경로 입력
figure_output_file_path = "/content/outputs/result_figure_with_summary.json"  # 저장 경로
table_output_file_path = "/content/outputs/result_table_with_summary.json"  # 저장 경로

# JSON 데이터 로드
json_file = load_json(table_json_file_path)

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# 모델 경로 설정 (Google Drive 내부 경로로 변경)
model_path = "/content/drive/MyDrive/cv project/llama-3.2-Korean-Bllossom-3B"

# GPU 디바이스 확인
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(
    model_path,
    torch_dtype=torch.bfloat16,  # GPU에서 효율적 연산
    device_map=None  # 직접 디바이스 지정
)
model = model.to(device)

# 필수 키 확인 및 PROMPT 생성 함수
def create_prompt_and_instruction(json_data):
    # 기본 프롬프트 정의
    category_prompts = {
        "차트": (
            '너는 차트를 분석하여 명확하고 객관적인 요약문을 생성하는 AI이다. 아래의 차트 정보를 정리해서 하나의 문장으로 요약문을 생성하라. 차근차근 생각해보자.'
            '데이터에서 패턴, 공통점, 차이점, 이상치나 중요한 점이 있다면 이를 포함하라.'
            '입력 데이터 외의 정보를 추가로 추측하지 말아라.'
        ),
        "표": (
            '너는 표를 분석하여 명확하고 객관적인 요약문을 생성하는 AI이다. 아래의 표 정보를 정리해서 하나의 문장으로 요약문을 생성하라. 차근차근 생각해보자.'
            '주요 항목 간의 비교 및 공통점 혹은 차이점, 가장 두드러지는 부분도 서술하라.'
            '입력 데이터 외의 정보를 추가로 추측하지 말아라.'
        ),
    }

    # 유형 키에 '표'가 포함되어 있는지 확인
    if "표" in json_data["유형"]:
        prompt = category_prompts["표"]
    else:
        prompt = category_prompts["차트"]

    # Instruction 생성
    instruction = (
        f"다음은 {json_data['제목']}에 대한 설명입니다.\n"
        f"유형: {json_data['유형']}\n"
        f"내용: {json_data['내용']}\n"
        "위 내용을 기반으로 요약문을 작성해줘."
    )
    return prompt, instruction

# JSON 파일의 모든 항목에 대해 실행
for index, json_data in enumerate(json_file):
    # 필수 키 확인
    required_keys = ["data_id", "제목", "유형", "내용"]
    if not all(key in json_data for key in required_keys):
        print(f"[Error] JSON 항목 {index}가 필수 키 {required_keys}를 포함하지 않습니다.")
        continue

    # PROMPT 및 instruction 생성
    PROMPT, instruction = create_prompt_and_instruction(json_data)

    # 메시지 생성
    messages = [
        {"role": "system", "content": f"{PROMPT}"},
        {"role": "user", "content": f"{instruction}"}
    ]

    # 모델 입력 생성
    input_ids = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        return_tensors="pt"
    ).to(model.device)

    # 종료 토큰 정의
    terminators = [
        tokenizer.eos_token_id,
        tokenizer.convert_tokens_to_ids("<|eot_id|>")
    ]

    # 텍스트 생성
    outputs = model.generate(
        input_ids,
        max_new_tokens=512,
        eos_token_id=terminators,
        do_sample=True,
        temperature=0.1,
        top_p=0.9
    )

    # 결과 텍스트 생성
    generated_text = tokenizer.decode(outputs[0][input_ids.shape[-1]:], skip_special_tokens=True)
    print(f"[Result for JSON {index}]")
    print(generated_text)
    print("\n" + "="*80 + "\n")

    # JSON 데이터에 요약 추가
    json_data["요약"] = generated_text

# 업데이트된 JSON 파일 저장
save_json(output_file_path, json_file)

print(f"업데이트된 JSON 파일이 {output_file_path}에 저장되었습니다.")

In [None]:
# JSON 데이터 로드
json_file = load_json('/content/outputs/result_table_with_summary.json')

## 최종 파일 생성

In [None]:
# Define input file paths
json_paths = [
    "/content/outputs/result_plain_text.json",
    "/content/outputs/result_table_with_summary.json",
    "/content/outputs/result_figure_with_summary.json"
]

# Initialize list to store all data
merged_data = []

# Read and merge each JSON file
for json_path in json_paths:
    try:
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
            # If data is a list, extend merged_data
            if isinstance(data, list):
                merged_data.extend(data)
            # If data is a dictionary, append it
            elif isinstance(data, dict):
                merged_data.append(data)

        print(f"Successfully loaded: {json_path}")
    except Exception as e:
        print(f"Error loading {json_path}: {str(e)}")

# Create output directory if it doesn't exist
output_dir = "/content/outputs"
os.makedirs(output_dir, exist_ok=True)

# Save merged data
output_path = os.path.join(output_dir, "merged_results.json")
with open(output_path, 'w', encoding='utf-8') as f:
    json.dump(merged_data, f, ensure_ascii=False, indent=4)

print(f"\nMerged data saved to: {output_path}")
print(f"Total number of items in merged file: {len(merged_data)}")

Successfully loaded: /content/outputs/result_plain_text.json
Successfully loaded: /content/outputs/result_table_with_summary.json
Successfully loaded: /content/outputs/result_figure_with_summary.json

Merged data saved to: /content/outputs/merged_results.json
Total number of items in merged file: 7
