In [None]:
import os
from glob import glob
import shutil
from zipfile import ZipFile
import json
from matplotlib import pyplot as plt
from PIL import Image, ImageDraw, ExifTags
import numpy as np 
import cv2
from tqdm import tqdm
import pandas as pd
from pathlib import Path
import hashlib
src = "/home/kai/workspace/DeepDocs_Project/DataETL/source/provider=aihub/tourism_food_menu_board_data/"

In [None]:
data = glob(f"{src}/**/*.zip", recursive=True)

In [None]:
# unzip to "./"
for file in data:
    with ZipFile(file, 'r') as zip_ref:
        zip_ref.extractall(os.path.dirname(file).replace(src, "./"))
    print(f"Unzipped {file} to {os.path.dirname(file)}")

In [None]:
images = sorted(glob(f"./**/*.jpg", recursive=True))
jsons = sorted(glob(f"./**/*.json", recursive=True))
len(images), len(jsons)

In [None]:
def get_sha256(file_path):
    with open(file_path, "rb") as f:
        bytes = f.read()
        hash = hashlib.sha256(bytes).hexdigest()
    return hash

In [None]:
if not os.path.exists("images"):
    os.makedirs("images")
records = []
MAX_SIZE = 2048
for img_path, json_path in tqdm(zip(images, jsons), total=len(images)):
    try:
        with open(json_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        img = Image.open(img_path)
        img_np = np.array(img)
        img_np = cv2.cvtColor(img_np, cv2.COLOR_BGR2RGB)
        img_exif = img.getexif()
        tags = {ExifTags.TAGS.get(tag, tag): value for tag, value in img_exif.items() if tag in ExifTags.TAGS}

        orientation = tags.get("Orientation", None)
        if orientation == 3:
            img_np = cv2.rotate(img_np, cv2.ROTATE_180)
        elif orientation == 6:
            img_np = cv2.rotate(img_np, cv2.ROTATE_90_CLOCKWISE)
        elif orientation == 8:
            img_np = cv2.rotate(img_np, cv2.ROTATE_90_COUNTERCLOCKWISE)
            
        meta = data["meta"]
        meta_img_height, meta_img_width = meta["image_original_height"], meta["image_original_width"]
        annotations = data["annotations"]
        img_height, img_width = img_np.shape[:2]
        label = {"menu":[]}
        for annot in  annotations:
            menu = annot['menu_information']
            ocr = annot['ocr']
            menu_name_ko = menu['ko']
            menu_name_en = menu['en']
            menu_price = menu['price']
            x, y, width, height, rot = ocr['x'], ocr['y'], ocr['width'], ocr['height'], ocr['rotation']
            
            px = x * img_width / 100
            py = y * img_height / 100
            pw = width * img_width / 100
            ph = height * img_height / 100
            theta = np.deg2rad(rot)
            w_dx = pw * np.cos(theta)
            w_dy = pw * np.sin(theta)
            h_dx = -ph * np.sin(theta)
            h_dy = ph * np.cos(theta)
            TL = (px, py)
            TR = (px + w_dx, py + w_dy)
            BR = (px + w_dx + h_dx, py + w_dy + h_dy)
            BL = (px + h_dx, py + h_dy)
            quad = np.array([TL, TR, BR, BL], dtype=np.int32)
            min_TL = np.min(quad, axis=0)
            max_BR = np.max(quad, axis=0)
            bbox = np.array([min_TL[0], min_TL[1], max_BR[0], max_BR[1]], dtype=np.int32)
            scaled_bbox = bbox / np.array([img_width, img_height, img_width, img_height])
            
            label["menu"].append({
                "name": {
                    "<|value|>": menu_name_ko,
                    "<|bbox|>": [round(f,3) for f in scaled_bbox.tolist()],
                },
                "price": {
                    "<|value|>": menu_price,
                }
            })
        output_img_path = Path(img_path).name
        output_img_path = os.path.join("images", output_img_path)
    
        scale = min(MAX_SIZE / img_width, MAX_SIZE / img_height)
        if scale < 1.0:
            new_width = int(img_width * scale)
            new_height = int(img_height * scale)
            img_np = cv2.resize(img_np, (new_width, new_height), interpolation=cv2.INTER_AREA)
        img_height, img_width = img_np.shape[:2]
        cv2.imwrite(output_img_path, img_np, [cv2.IMWRITE_JPEG_QUALITY, 95])
        img_sha256 = get_sha256(output_img_path)
        sha256_dir = os.path.join("images", img_sha256[:2])
        if not os.path.exists(sha256_dir):
            os.makedirs(sha256_dir)
        rename_path = os.path.join(sha256_dir, img_sha256)
        shutil.move(output_img_path, rename_path)
        
        records.append({
            "image_path": rename_path,
            "width": img_width,
            "height": img_height,
            "label": json.dumps(label, ensure_ascii=False),
        })
    except Exception as e:
        print(f"Error processing {img_path}: {e}")
        continue
df = pd.DataFrame(records)
df.to_parquet("data.parquet", index=False)

In [None]:
def draw_bbox(image_path, label):
    img = Image.open(image_path)
    img_width, img_height = img.size
    draw = ImageDraw.Draw(img)
    for item in label["menu"]:
        bbox = item["name"]["<|bbox|>"]
        bbox = [
            int(bbox[0] * img_width),
            int(bbox[1] * img_height),
            int(bbox[2] * img_width),
            int(bbox[3] * img_height)
        ]
        draw.rectangle(bbox, outline="red", width=2)
        draw.text((bbox[0], bbox[1]), item["name"]["<|value|>"], fill="red")
    return img
def show_image_with_bbox(image_path, label):
    img = draw_bbox(image_path, label)
    plt.figure(figsize=(20, 20))
    plt.imshow(img)
    plt.axis('off')
    plt.show()

sample_df = df.sample(n=5, random_state=3213)
for _, row in sample_df.iterrows():
    image_path = row["image_path"]
    label = json.loads(row["label"])
    show_image_with_bbox(image_path, label)

In [None]:
def extract_schema(obj):
    """
    주어진 dict/list에서 value/bbox 구조를 자동 추출하여
    JSON schema 형태로 반환 (value는 string/int/float/None 등 실제 타입으로 표기)
    """
    if isinstance(obj, dict):
        schema = {}
        for k, v in obj.items():
            # value/bbox 오브젝트 처리
            if isinstance(v, dict) and "<|value|>" in v:
                value_type = type(v["<|value|>"]).__name__
                bbox_type = (
                    f"array[{type(v['<|bbox|>'][0]).__name__}]" if "<|bbox|>" in v else None
                )
                field_schema = {"<|value|>": value_type}
                if bbox_type:
                    field_schema["<|bbox|>"] = bbox_type
                schema[k] = field_schema
            # 일반 dict 처리 (nested)
            else:
                schema[k] = extract_schema(v)
        return schema
    elif isinstance(obj, list):
        # 리스트의 첫 번째 원소로부터 구조를 추론 (비어 있으면 unknown)
        if obj:
            return [extract_schema(obj[0])]
        else:
            return []
    else:
        # 단일 값 (사용하지 않음)
        return type(obj).__name__

In [None]:
json_dict = {
  "invoice": {
    "date": {
      "<|value|>": "2024-05-20",
      "<|bbox|>": [0.1, 0.2, 0.15, 0.25]
    },
    "vendor": {
      "name": {
        "<|value|>": "가나다(주)",
        "<|bbox|>": [0.3, 0.1, 0.38, 0.15]
      },
      "representative": {
        "<|value|>": "홍길동",
        "<|bbox|>": [0.4, 0.1, 0.48, 0.15]
      }
    },
    "items": [
      {
        "name": {
          "<|value|>": "상품A",
          "<|bbox|>": [0.1, 0.4, 0.18, 0.45]
        },
        "quantity": {
          "<|value|>": 3,
          "<|bbox|>": [0.2, 0.4, 0.22, 0.45]
        }
      },
      {
        "name": {
          "<|value|>": "상품B",
          "<|bbox|>": [0.1, 0.5, 0.18, 0.55]
        },
        "quantity": {
          "<|value|>": 1,
          "<|bbox|>": [0.2, 0.5, 0.22, 0.55]
        }
      }
    ]
  }
}

In [None]:
extract_schema(json_dict)