# Improting Libraries

In [1]:
import os
import json
import cv2
import numpy as np
import pytesseract
import re
from ocr_text_exctraction import (
    invert,
    custom_binarize
)
import google.generativeai as genai
from clean_museum_plaques_text import clean_extracted_text
from googleapiclient.discovery import build
from web_search import google_search_top3
import re
from urllib.parse import urljoin
import aiohttp
import mimetypes
import heapq
from io import BytesIO
from PIL import Image
from playwright.async_api import async_playwright
from web_scrapping import process_a_painting
import asyncio
from docx import Document
from docx.shared import Inches
from make_it_pretty import save_results_to_docx

# OCR Text Extraction

In [None]:
folder_name = 'museum_plaques'

try:
    with open("intermediate_results/ocr_extracted_text.json", "r", encoding="utf-8") as f:
        ocr_extracted_text = json.load(f)
except:
    ocr_extracted_text = {}

newly_extracted_text = {}
for filename in os.listdir(folder_name):
    if filename in ocr_extracted_text:
        continue

    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')):
        img_path = os.path.join(folder_name, filename)

        img = cv2.imread(img_path)
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        inverted = invert(gray)
        t, binary = cv2.threshold(inverted, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

        text = pytesseract.image_to_string(binary, lang='eng+deu+ita+kor+chi_sim+jpn')
        if not text: # Try custom tresholding in the case of a failure
            t, binary = custom_binarize(inverted)
            text = pytesseract.image_to_string(binary, lang='eng+deu+ita+kor+chi_sim+jpn')
        
        if not text: text = "Failed to extract text."
        
        newly_extracted_text[filename] = text.strip()

ocr_extracted_text.update(newly_extracted_text)

In [None]:
with open("intermediate_results/ocr_extracted_text.json", "w", encoding="utf-8") as f:
    json.dump(ocr_extracted_text, f, ensure_ascii=False, indent=2)

# Cleaning Extracted Text

In [None]:
with open("intermediate_results/ocr_extracted_text.json", "r", encoding="utf-8") as f:
    ocr_extracted_text = json.load(f)

In [None]:
GEMINI_API_KEY = "AIzaSyBxDdLnZ9kckyAd-uMhe9PwAqM_-nCZtrw"
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel("models/gemini-1.5-pro-latest")

In [None]:
try:
    with open("intermediate_results/cleaned_names.json", "r", encoding="utf-8") as f:
        cleaned_names = json.load(f)
except:
    cleaned_names = {}

newly_cleaned_names = {}
for img_name, ocr_text in newly_extracted_text.items():
    if ocr_text == "Failed to extract text.":
        newly_cleaned_names[img_name] = "Failed to extract text."
        continue
    newly_cleaned_names[img_name] = clean_extracted_text(ocr_text, model)

cleaned_names.update(newly_cleaned_names)

In [None]:
with open("intermediate_results/cleaned_names.json", "w", encoding="utf-8") as f:
    json.dump(cleaned_names, f, ensure_ascii=False, indent=2)

# Internet Search

In [None]:
with open("intermediate_results/cleaned_names.json", "r", encoding="utf-8") as f:
    imgs_to_names = json.load(f)

new_imgs_to_names = imgs_to_names
new_names_to_imgs = {painting_name: img for img, painting_name in new_imgs_to_names.items()}


In [None]:
batch_size = 70
output_dir = "intermediate_results/batches_for_searching"
os.makedirs(output_dir, exist_ok=True)

# Find the max existing batch index
existing_files = os.listdir(output_dir)
pattern = re.compile(r"batch_(\d+)\.json")
existing_indices = [int(m.group(1)) for f in existing_files if (m := pattern.match(f))]
new_batch_start_idx = max(existing_indices, default=0) + 1

# Create and write new batches
items = list(new_imgs_to_names.items())
for i in range(0, len(items), batch_size):
    batch = dict(items[i:i + batch_size])
    batch_num = new_batch_start_idx + (i // batch_size)
    batch_path = os.path.join(output_dir, f"batch_{batch_num}.json")
    with open(batch_path, "w", encoding="utf-8") as f:
        json.dump(batch, f, ensure_ascii=False, indent=2)

In [9]:
google_search_api_key = 'AIzaSyB0SrIGXKEkSF_9lMhtHYnKiKCJ-0rlkq0'
CSE_ID = 'f1bd37f154c3c403c'

def google_search_top3(query, api_key, cse_id):
    service = build("customsearch", "v1", developerKey=api_key)
    try:
        res = service.cse().list(q=str(query), cx=cse_id, num=3).execute()
        items = res.get("items", [])
        return tuple(item["link"] for item in items) if items else "Unsuccessful search"
    except:
        return "Unsuccessful search"

In [None]:
# Get current batch to process. One should be very careful with this step.
idx_file = os.path.join("intermediate_results/batches_for_searching", "last_batch_index.txt")
try:
    with open(idx_file, "r") as f:
        cur_batch_idx = int(f.read().strip()) + 1
except:
    cur_batch_idx = 1

In [None]:
cur_batch_file = os.path.join("intermediate_results/batches_for_searching", f"batch_{cur_batch_idx}.json")
if os.path.exists(cur_batch_file):
    with open(cur_batch_file, "r", encoding="utf-8") as f:
        cur_batch = json.load(f)
else:
    raise FileNotFoundError(f"Batch file for index {cur_batch_idx} not found.")

In [None]:
try:
    with open("intermediate_results/search_results.json", "r", encoding="utf-8") as f:
        search_results = json.load(f)
except:
    search_results = {}

new_search_results = {}
for _, painting_name in cur_batch.items():
    links = google_search_top3(painting_name, google_search_api_key, CSE_ID)
    new_search_results[painting_name] = links

search_results.update(new_search_results)

with open(idx_file, "w") as f:
    f.write(str(cur_batch_idx))

In [None]:
with open("intermediate_results/search_results.json", "w", encoding="utf-8") as f:
    json.dump(search_results, f, indent=2, ensure_ascii=False)

# Extract Images

In [None]:
cur_batch_file = os.path.join("intermediate_results/batches_for_searching", f"batch_{cur_batch_idx}.json")
if os.path.exists(cur_batch_file):
    with open(cur_batch_file, "r", encoding="utf-8") as f:
        cur_batch = json.load(f)
else:
    raise FileNotFoundError(f"Batch file for index {cur_batch_idx} not found.")

search_results_file = 'intermediate_results/search_results.json'
with open(search_results_file, "r", encoding="utf-8") as f:
    full_search_results = json.load(f)

# Get the artwork titles (values) from cur_batch
cur_artwork_titles = set(cur_batch.values())

# Filter only entries in full_search_results with matching keys
cur_batch_search_results = {
    title: urls for title, urls in full_search_results.items() if title in cur_artwork_titles
}

In [None]:
try:
    with open("final_results.json", "r") as f:
        final_results = json.load(f)
except:
    final_results = {}

cur_batch_results = {}

async_groups_cur_batch = []
cur_async_group = {}
for painting_name, urls in cur_batch_search_results.items():
    if len(cur_async_group) == 3:
        async_groups_cur_batch.append(cur_async_group)
        cur_async_group = {}
    cur_async_group[painting_name] = urls

if cur_async_group:
    async_groups_cur_batch.append(cur_async_group)

async with async_playwright() as p:
    browser = await p.chromium.launch(headless=False)
    for async_group in async_groups_cur_batch:
        tasks = [
            process_a_painting(painting_name, urls, browser, cur_batch_names_to_imgs, cur_batch_results)
            for painting_name, urls in async_group.items()
        ]
        await asyncio.gather(*tasks)
    await browser.close()

final_results.update(cur_batch_results)

In [None]:
with open("final_results.json", "w", encoding="utf-8") as f:
    json.dump(final_results, f, indent=2, ensure_ascii=False)

doc_path = "final_results.docx"
save_results_to_docx(doc_path,  cur_batch_results)