In [None]:
import os
import requests
from PIL import Image
from io import BytesIO
from google_images_search import GoogleImagesSearch

# Google Custom Search API 설정
API_KEY = "AIzaSyAjjmrx4FMKaiMkJ54UpqSXd9PxSBgTqhk"
CX = "425f50dcde28c47e9"
gis = GoogleImagesSearch(API_KEY, CX)

# 검색할 키워드
search_query = "2025 공모전 포스터 최신"

# 이미지 저장 폴더
save_dir = "downloaded_images"
os.makedirs(save_dir, exist_ok=True)

# 다운로드할 총 이미지 수
total_images = 500
images_per_page = 10  # 한 번에 요청할 이미지 수 (최대 10개)
num_pages = total_images // images_per_page  # 필요한 페이지 수

# 중복된 이미지 URL을 저장할 set
downloaded_urls = set()

# 검색 실행 및 이미지 다운로드
for page in range(num_pages + 1):  # 마지막 페이지까지 반복
    start_index = page * images_per_page + 1
    gis.search({'q': search_query, 'num': images_per_page, 'start': start_index})

    for index, image in enumerate(gis.results()):
        try:
            img_url = image.url

            # 이미 다운로드한 URL인지 확인
            if img_url in downloaded_urls:
                print(f"중복된 이미지 ({img_url})가 발견되었습니다. 건너뜁니다.")
                continue  # 중복된 이미지는 건너뛰기

            # URL을 중복 리스트에 추가
            downloaded_urls.add(img_url)

            response = requests.get(img_url, timeout=5)
            response.raise_for_status()

            # 이미지 포맷 확인
            img = Image.open(BytesIO(response.content))
            ext = img.format.lower()

            # 확장자 강제 지정 (jpg, png만 허용)
            if ext not in ["jpeg", "jpg", "png"]:
                ext = "jpg"

            filename = os.path.join(save_dir, f"image_{start_index + index}.{ext}")
            img.save(filename, format=img.format)
            print(f"다운로드 완료: {filename}")
        except Exception as e:
            print(f"다운로드 실패 ({img_url}): {e}")


다운로드 완료: downloaded_images/image_1.png
다운로드 완료: downloaded_images/image_2.jpeg
다운로드 완료: downloaded_images/image_3.jpeg
다운로드 완료: downloaded_images/image_4.jpeg
다운로드 완료: downloaded_images/image_5.png
다운로드 완료: downloaded_images/image_6.jpeg
다운로드 완료: downloaded_images/image_7.jpeg
다운로드 완료: downloaded_images/image_8.jpeg
다운로드 완료: downloaded_images/image_9.png
다운로드 완료: downloaded_images/image_10.jpeg
중복된 이미지 (https://dimg.donga.com/wps/NEWS/IMAGE/2024/12/11/130616823.1.jpg)가 발견되었습니다. 건너뜁니다.
중복된 이미지 (https://cdn.ibulgyo.com/news/photo/202501/422201_440198_5443.jpg)가 발견되었습니다. 건너뜁니다.
중복된 이미지 (http://www.sisanews.kr/news/photo/202501/112729_98142_3049.png)가 발견되었습니다. 건너뜁니다.
중복된 이미지 (https://cdn.beopbo.com/news/photo/202412/326670_128112_3212.jpg)가 발견되었습니다. 건너뜁니다.
다운로드 완료: downloaded_images/image_15.jpeg
다운로드 완료: downloaded_images/image_16.jpeg
다운로드 완료: downloaded_images/image_17.jpeg
다운로드 완료: downloaded_images/image_18.jpeg
다운로드 완료: downloaded_images/image_19.jpeg
다운로드 완료: downloaded_images/image

In [4]:
import os
from PIL import Image
import numpy as np
from pathlib import Path
import hashlib
from typing import List, Tuple, Dict


def load_image(image_path: str) -> np.ndarray:
    try:
        with Image.open(image_path) as img:
            if img.mode == 'RGBA':
                img = img.convert('RGB')
            img = img.resize((100, 100))  # 이미지를 100x100 크기로 리사이즈
            return np.array(img)
    except Exception as e:
        print(f"Error loading {image_path}: {e}")
        return None


def calculate_image_signature(image: np.ndarray) -> np.ndarray:
    hist_r = np.histogram(image[:, :, 0], bins=32, range=(0, 256))[0]
    hist_g = np.histogram(image[:, :, 1], bins=32, range=(0, 256))[0]
    hist_b = np.histogram(image[:, :, 2], bins=32, range=(0, 256))[0]

    signature = np.concatenate([hist_r, hist_g, hist_b])
    signature = signature / np.sum(signature)  # 정규화

    return signature


def calculate_similarity(sig1: np.ndarray, sig2: np.ndarray) -> float:
    return np.dot(sig1, sig2) / (np.linalg.norm(sig1) * np.linalg.norm(sig2))


def find_duplicate_images(dir: str, threshold: float = 0.95) -> List[Tuple[str, str, float]]:
    image_files = []
    for ext in ['*.jpg', '*.jpeg', '*.png', '*.bmp']:
        image_files.extend(Path(dir).glob(ext))  # 디렉터리 내 이미지 파일 목록

    signatures: Dict[str, np.ndarray] = {}
    dup: List[Tuple[str, str, float]] = []

    # 이미지 서명 계산
    for img_path in image_files:
        img_path = str(img_path)
        img_array = load_image(img_path)
        if img_array is None:
            continue

        current_signature = calculate_image_signature(img_array)
        signatures[img_path] = current_signature

    # 중복 이미지 찾기
    processed_files = set()
    for img1_path, sig1 in signatures.items():
        for img2_path, sig2 in signatures.items():
            if img1_path >= img2_path or (img1_path, img2_path) in processed_files:
                continue

            similarity = calculate_similarity(sig1, sig2)
            if similarity > threshold:  # 유사도 기준으로 중복 이미지 판단
                dup.append((img1_path, img2_path, similarity))
                processed_files.add((img1_path, img2_path))

    return dup


def remove_duplicates(directory: str, threshold: float = 0.95, dry_run: bool = True) -> None:
    duplicates = find_duplicate_images(directory, threshold)

    duplicate_groups = {}
    for img1, img2, similarity in duplicates:
        # 중복 이미지 그룹화
        if img1 not in duplicate_groups and img2 not in duplicate_groups:
            duplicate_groups[img1] = {img1, img2}
            duplicate_groups[img2] = duplicate_groups[img1]
        elif img1 in duplicate_groups:
            duplicate_groups[img1].add(img2)
            duplicate_groups[img2] = duplicate_groups[img1]
        else:
            duplicate_groups[img2].add(img1)
            duplicate_groups[img1] = duplicate_groups[img2]

    processed_groups = set()
    for group in duplicate_groups.values():
        if frozenset(group) in processed_groups:
            continue

        processed_groups.add(frozenset(group))

        # 파일 크기로 중복된 이미지들 정렬 (가장 큰 파일을 남기고 나머지는 삭제)
        files_with_size = [(f, os.path.getsize(f)) for f in group]
        files_with_size.sort(key=lambda x: x[1], reverse=True)

        keep_file = files_with_size[0][0]  # 가장 큰 파일 유지
        files_to_delete = [f[0] for f in files_with_size[1:]]  # 나머지 파일 삭제

        print(f"Keeping: {keep_file}")
        print("Deleting:")
        for file in files_to_delete:
            print(f"- {file}")
            if not dry_run:
                os.remove(file)  # 실제 삭제


if __name__ == "__main__":
    # 다운로드한 이미지들이 저장된 디렉터리 경로
    directory = "downloaded_images"
    
    # 중복 이미지 검증 및 삭제 작업 수행
    remove_duplicates(directory, threshold=0.95, dry_run=False)  # 실제 삭제는 dry_run=False로 변경


Keeping: downloaded_images/image_30.png
Deleting:
- downloaded_images/image_18.jpeg
- downloaded_images/image_28.jpeg
- downloaded_images/image_63.jpeg
Keeping: downloaded_images/image_35.jpeg
Deleting:
- downloaded_images/image_90.jpeg
Keeping: downloaded_images/image_36.jpeg
Deleting:
- downloaded_images/image_27.jpeg
Keeping: downloaded_images/image_5.png
Deleting:
- downloaded_images/image_1.png
- downloaded_images/image_20.jpeg
Keeping: downloaded_images/image_45.jpeg
Deleting:
- downloaded_images/image_89.jpeg
Keeping: downloaded_images/image_2.jpeg
Deleting:
- downloaded_images/image_4.jpeg
Keeping: downloaded_images/image_87.jpeg
Deleting:
- downloaded_images/image_19.jpeg
Keeping: downloaded_images/image_17.jpeg
Deleting:
- downloaded_images/image_47.jpeg
Keeping: downloaded_images/image_3.jpeg
Deleting:
- downloaded_images/image_84.jpeg
Keeping: downloaded_images/image_59.png
Deleting:
- downloaded_images/image_70.jpeg
Keeping: downloaded_images/image_37.png
Deleting:
- downl

In [42]:
import os
import requests
import json
import cv2
import numpy as np
from tqdm import tqdm  # 진행률 표시
from base64 import b64encode

# --- ✅ 네이버 클로바 OCR 설정 ---
CLOVA_OCR_URL = "https://vm7gvqqumi.apigw.ntruss.com/custom/v1/38355/c0de2f995297fffacc6e32898bf023e0b4ecb85b3122c32d3eaaaabbb0c282c9/general"
HEADERS = {
    "X-OCR-SECRET": "YU9xaVRnZHFnd2pVRVlBeWNqT29HbXpkanVER01hbGQ=",
    "Content-Type": "application/json"
}

# --- ✅ 이미지 전처리 (선택적) ---
def preprocess_image(image_path):
    img = cv2.imread(image_path)

    # 해상도 증가 (확대)
    scale_factor = 1.5
    img = cv2.resize(img, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_CUBIC)

    # 명암 대비 조정
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = cv2.equalizeHist(gray)

    return img

# --- ✅ OCR 요청 함수 ---
def extract_text_from_image(image_path):
    try:
        # --- 이미지 파일을 base64로 변환 ---
        with open(image_path, "rb") as f:
            image_data = f.read()
        encoded_image = b64encode(image_data).decode("utf-8")

        # --- OCR 요청 데이터 ---
        payload = {
            "images": [{"format": "jpg", "name": os.path.basename(image_path), "data": encoded_image}],
            "requestId": "ocr-request",
            "version": "V2",
            "timestamp": 0,
            "lang": "ko",  # 한국어 OCR 적용
            "resultType": "string"
        }

        response = requests.post(CLOVA_OCR_URL, headers=HEADERS, data=json.dumps(payload))
        result = response.json()

        # --- OCR 결과 추출 ---
        if "images" in result and len(result["images"]) > 0:
            fields = result["images"][0].get("fields", [])
            text_list = [field["inferText"] for field in fields]  # 단어 단위로 추출
            extracted_text = " ".join(text_list)  # 한 줄로 합치기
            return extracted_text.strip()
        else:
            return "No text detected"

    except Exception as e:
        return f"Error: {str(e)}"

# --- ✅ 폴더 내 모든 이미지 처리 ---
IMAGE_FOLDER = "downloaded_images"
OUTPUT_FILE = "ocr_extracted_texts.txt"

image_files = [f for f in os.listdir(IMAGE_FOLDER) if f.lower().endswith((".jpg", ".jpeg", ".png"))]

# --- OCR 결과 저장 ---
results = []
with open(OUTPUT_FILE, "w", encoding="utf-8") as output_file:
    for image_file in tqdm(image_files, desc="Processing Images"):
        image_path = os.path.join(IMAGE_FOLDER, image_file)
        text = extract_text_from_image(image_path)
        output_line = f"{image_file}: {text}\n"
        results.append(output_line)
        output_file.write(output_line)

print("\n✅ OCR 완료! 모든 텍스트가 ocr_extracted_texts.txt에 저장되었습니다.")


Processing Images: 100%|██████████| 50/50 [01:40<00:00,  2.02s/it]


✅ OCR 완료! 모든 텍스트가 ocr_extracted_texts.txt에 저장되었습니다.





In [43]:
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer

# --- ✅ 설정 ---
TEXT_FILE = "ocr_extracted_texts.txt"
VECTOR_DB_PATH = "ocr_vector_db.index"

# --- ✅ 임베딩 모델 (Sentence-BERT 사용) ---
model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

# --- ✅ 텍스트 파일 불러오기 ---
texts = []
filenames = []
with open(TEXT_FILE, "r", encoding="utf-8") as f:
    for line in f.readlines():
        parts = line.strip().split(": ", 1)  # 이미지 파일명과 텍스트 분리
        if len(parts) == 2:
            filenames.append(parts[0])
            texts.append(parts[1])

# --- ✅ 텍스트를 벡터로 변환 ---
embeddings = model.encode(texts, convert_to_numpy=True)

# --- ✅ FAISS 벡터 DB 생성 ---
d = embeddings.shape[1]  # 벡터 차원
index = faiss.IndexFlatL2(d)
index.add(embeddings)

# --- ✅ 인덱스 저장 ---
faiss.write_index(index, VECTOR_DB_PATH)
with open("filenames.txt", "w", encoding="utf-8") as f:
    for filename in filenames:
        f.write(filename + "\n")

print("\n✅ 벡터 DB 생성 완료! FAISS 인덱스가 저장되었습니다.")


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/4.12k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/645 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/471M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/480 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.08M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]


✅ 벡터 DB 생성 완료! FAISS 인덱스가 저장되었습니다.


In [48]:
import numpy as np
import faiss
from transformers import AutoModel, AutoTokenizer

# --- 설정 ---
TEXT_FILE = "ocr_extracted_texts.txt"  # OCR 추출 텍스트 파일
VECTOR_DB_PATH = "ocr_vector_db.index"  # 벡터 DB 저장 경로
FILENAME_DB_PATH = "filenames.txt"  # 파일명 저장 경로

# --- 임베딩 모델 (sentence-transformers 사용) ---
model_name = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
model = AutoModel.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# --- OCR 텍스트 로드 ---
texts = []
filenames = []
with open(TEXT_FILE, "r", encoding="utf-8") as f:
    for line in f.readlines():
        parts = line.strip().split(": ", 1)  # 이미지 파일명과 텍스트 분리
        if len(parts) == 2:
            filenames.append(parts[0])  # 이미지 파일명 저장
            texts.append(parts[1])  # OCR 추출 텍스트 저장

# --- 텍스트를 벡터화 ---
inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
outputs = model(**inputs)
embeddings = outputs.last_hidden_state[:, 0, :].detach().numpy()

# --- FAISS 벡터 DB 생성 ---
d = embeddings.shape[1]  # 벡터 차원
index = faiss.IndexFlatL2(d)
index.add(embeddings)

# --- 벡터 DB 및 파일명 저장 ---
faiss.write_index(index, VECTOR_DB_PATH)
with open(FILENAME_DB_PATH, "w", encoding="utf-8") as f:
    for filename in filenames:
        f.write(filename + "\n")

print("\n✅ 벡터 DB 생성 완료! 각 이미지의 텍스트가 개별 벡터로 저장되었습니다.")


✅ 벡터 DB 생성 완료! 각 이미지의 텍스트가 개별 벡터로 저장되었습니다.
