In [None]:
# import sys
# !{sys.executable} -m pip install ultralytics

In [1]:
from ultralytics import YOLO
import os
from PIL import Image
import glob
import json
from transformers import BlipProcessor, BlipForConditionalGeneration, MarianMTModel, MarianTokenizer

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
caption_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

translator = MarianMTModel.from_pretrained("Helsinki-NLP/opus-mt-en-id")
translator_tokenizer = MarianTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-id")

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


In [3]:
def generate_caption(img_url, img_obj):
    ymin, ymax, xmin, xmax = img_obj["y_min"], img_obj["y_max"], img_obj["x_min"], img_obj["x_max"]
    raw_image = Image.open(img_url).convert('RGB')
    crop = raw_image[ymin:ymax, xmin:xmax]

    text = "an image of"
    inputs = caption_processor(crop, text, return_tensors="pt")

    out = caption_model.generate(**inputs)
    caption_en = caption_processor.decode(out[0], skip_special_tokens=True)

    translated = translator.generate(**translator_tokenizer(caption_en, return_tensors="pt", padding=True))
    caption_id = translator_tokenizer.decode(translated[0], skip_special_tokens=True)
    print(caption_id, caption_en, sep='\n')
    return caption_id

In [4]:
model = YOLO("models/kfold_result/kfold_training/fold_4/weights/best.pt")
results = model.predict(
    source="../data/raw/source_file/504.png", save=True, save_txt=True, save_conf=True,
    project="detect_result",
)


image 1/1 d:\Research-Methodology\yolo\..\data\raw\source_file\504.png: 640x480 5 Captions, 2 PageNumbers, 2 Pictures, 267.4ms
Speed: 8.6ms preprocess, 267.4ms inference, 18.8ms postprocess per image at shape (1, 3, 640, 480)
Results saved to [1mD:\Research-Methodology\yolo\detect_result\predict2[0m
1 label saved to D:\Research-Methodology\yolo\detect_result\predict2\labels


In [None]:
img_file_names = []
img_json = []
id2label = {0: "Caption", 1:"PageNumber", 2:"Picture"}

for i in range(504, 505):
    name = f'../data/raw/source_file/{i}.png'
    data = f'../data/raw/ocr_dict/dict_{i}.json'
    img_file_names.append(name)
    img_json.append(data)

label_dir = 'detect_result/predict/labels' #yolo

In [None]:
def get_detected_result(image_path, label_path):
    detected = {i : list() for i in range(0, 3)}
    img = Image.open(image_path)
    img_width, img_height = img.size

    with open(label_path, 'r') as f:
        lines = f.readlines()

    # print(f"Image: {image_name}")
    for line in lines:
        # print(line, end='')
        label, x_center, y_center, w, h, confidence = map(float, line.strip().split())

        x_min = int((x_center - w / 2) * img_width)
        y_min = int((y_center - h / 2) * img_height)
        x_max = int((x_center + w / 2) * img_width)
        y_max = int((y_center + h / 2) * img_height)

        obj = dict()
        obj['x_min'] = x_min
        obj['x_max'] = x_max
        obj['y_min'] = y_min
        obj['y_max'] = y_max
        obj['conf'] = confidence
        detected[label].append(obj)
    
    return detected

        #     print(f"Class {id2label[int(label)]}: (x_min={x_min}, y_min={y_min}), (x_max={x_max}, y_max={y_max})")
        #     print(f"Image width: {img_width}, image height: {img_height}")
        #     print()
        # print("=====================================================================")


In [None]:
def intersect_rect(image_obj, x_min2, x_max2, y_min2, y_max2):
    tolerance = 2
    x_min1, x_max1 = image_obj['x_min']+tolerance, image_obj['x_max']+tolerance
    y_min1, y_max1 = image_obj['y_min']+tolerance, image_obj['y_max']+tolerance

    return not ((x_max1 <= x_min2 or x_max2 <= x_min1) or
                (y_max1 <= y_min2 or y_max2 <= y_min1))

In [None]:
def clean_text(ocr_result, detected, caption_pos):
    new_txt = ""
    max_idx = len(ocr_result['text'])

    # tdk append text dalam image yg kedetect ocr
    for i in range(max_idx):
        text = ocr_result['text'][i]
        if i in caption_pos: new_txt += text + " "
        
        is_exclude = False
        exclude_list = detected[1] + detected[2] #1:pagenumber, 2:picture

        for obj in exclude_list:
            # print('masyk')
            x_min2 = ocr_result['left'][i]
            x_max2 = ocr_result['left'][i]+ocr_result['width'][i]
            y_min2 = ocr_result['top'][i]
            y_max2 = ocr_result['top'][i]+ocr_result['height'][i]

            if(intersect_rect(obj, x_min2, x_max2, y_min2, y_max2)): 
                is_exclude = True
                break
        
        if not is_exclude and text != "": new_txt += text + " "
    
    return new_txt.strip()

In [None]:
def match_caption_img(detected):
    if len(detected[0]) == len(detected[2]): #kalau sama jumlahnya
        return []
    
    caption_img = [] #{caption: no urut caption, img: no urut img}: dict
    img_used = {} #utk track img apakah udah dipakai atau belum

    # loop greedy matching caption-img
    for id, caption in enumerate(detected[0]):
        y_min_c, y_max_c = caption['y_min'], caption['y_max']
        min_val = 1e5
        ambil_idx = -1
        ambil_obj = None

        for idx, img in enumerate(detected[2]):
            y_min_img, y_max_img = img['y_min'], img['y_max']

            # caption di atas img
            diff_up = abs(y_min_img-y_max_c)
            # kalau caption di bawah img
            diff_down = abs(y_min_c - y_max_img)
            
            if(diff_up < diff_down): 
                if diff_up < min_val: 
                    min_val = diff_up
                    ambil_idx = idx
                    ambil_obj = img
            else: 
                if diff_down < min_val: 
                    min_val = diff_down
                    ambil_idx = idx
                    ambil_obj = img
        
        # print(id, min_val, ambil_idx)

        #caption lebih byk dari img
        if(ambil_idx in img_used):
            if(min_val < img_used[ambil_idx]['value']):
                prev_caption_idx = img_used[ambil_idx]['caption']
                caption_img[prev_caption_idx]['img'] = None
                caption_img[prev_caption_idx]['img_obj'] = None
        
        caption_img.append({"caption":id, "img":ambil_idx, "img_obj":ambil_obj}) 
        img_used[ambil_idx] = {"caption":id, "value":min_val}

    # img lbih byk dri caption
    for image_id, image in enumerate(detected[2]):
        if image_id not in img_used:
            caption_img.append({"caption":None, "img":image_id, "img_obj":image}) 

    # cleaning, utamain img
    caption_img = [record for record in caption_img if record["img"] is not None]
    return caption_img

In [None]:
def append_caption(record, ocr_data, excludes):
    caption_x = (record['img_obj']['x_min'] + record['img_obj']['x_max']) / 2
    caption_y = (record['img_obj']['y_min'] + record['img_obj']['y_max']) / 2
    caption_str = record['caption']

    min_dist = 1e5
    insert_index = -1

    for i in range(len(ocr_data['top'])):
        if(ocr_data['left'][i] > caption_x) or (ocr_data['top'][i] > caption_y): continue
        if i in excludes: continue
        
        # hitung manhattan
        manhattan = abs(ocr_data['top'][i]-caption_y) + abs(ocr_data['left'][i]-caption_x)
        if (manhattan <= min_dist):
            min_dist = manhattan
            insert_index = i+1

    excludes.append(insert_index)
    ocr_data['left'].insert(insert_index, record['img_obj']['x_max']+1)
    ocr_data['top'].insert(insert_index, record['img_obj']['y_max']+1)

    ocr_data['width'].insert(insert_index, abs(record['img_obj']['x_max']-record['img_obj']['x_min']))
    ocr_data['height'].insert(insert_index, abs(record['img_obj']['y_max']-record['img_obj']['y_min']))
    ocr_data['text'].insert(insert_index, caption_str)

    return ocr_data, excludes

In [None]:
# main loop
final_txts = [] 

for i, dict_path in enumerate(img_json):
    label_path = f"{label_dir}/{i+1}.txt"
    detected_objs = get_detected_result(img_file_names[i], label_path)

    with open(f'{dict_path}','r') as file:
        ocr_data = json.load(file)

    matches = match_caption_img(detected_objs)
    excludes = []

    for record in matches:
        if record['img'] and not record['caption']:
            new_caption = generate_caption(img_file_names[i], record['img_obj'])
            # ubah caption-img (matches) ke dict 
            # add caption ke cleaned text
            record['caption'] = new_caption
            ocr_data, excludes = append_caption(record, ocr_data, excludes)

    cleaned = clean_text(ocr_data, detected_objs, excludes)
    final_txts.append(cleaned)