In [None]:
import json
import torch
import requests
import os
from transformers import CLIPProcessor, CLIPModel
from dotenv import load_dotenv
from PIL import Image
import logging
from io import BytesIO
from langchain_openai import ChatOpenAI
import openai
import re

# .env 파일 로드
load_dotenv()

# 환경 변수에서 API 키 읽기
openai.api_key = os.getenv("OPENAI_API_KEY")

# CLIP 모델과 프로세서 로드
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# 실행 흐름
input_file = "output_data.jsonl"
keywords_file = "keyword.json"
embedding_file = "keyword_embeddings.json"
rag_dataset_file = "rag_dataset.json"
username = "KimEunOh"
repo = "image"
branch = "main"
folder_path = "test"

In [None]:
# 1. output_data로부터 assistant의 content값 추출
def extract_assistant_content(input_file):
    contents = []
    with open(input_file, "r", encoding="utf-8") as infile:
        for line in infile:
            data = json.loads(line.strip())
            for message in data.get("messages", []):
                if message.get("role") == "assistant":
                    content = message.get("content", "").strip()
                    if content:
                        contents.append(content)
    return contents


def clean_text(text):
    """
    텍스트에서 마크다운, 불필요한 기호, 공백을 제거
    """
    # 1. 마크다운 포맷 제거
    text = re.sub(r"\*\*|\*|__|_|`", "", text)  # Bold, Italic, Underline, Inline Code
    text = re.sub(r"#+\s?", "", text)  # Header (#)

    # 2. 특수 문자 제거
    text = re.sub(r"[()\[\]{}]", "", text)  # 괄호 및 대괄호
    text = re.sub(r"[<>]", "", text)  # 꺽쇠 괄호

    # 3. 다중 공백과 줄바꿈 처리
    text = re.sub(r"\s+", " ", text)  # 다중 공백 제거
    text = text.strip()  # 양쪽 공백 제거

    # 4. 빈 문자열 검사
    if not text:
        print("Warning: Text is empty after cleaning.")
    return text


def truncate_text(text, max_length=1000):
    """
    입력 텍스트 길이 조절.
    """
    if len(text) > max_length:
        return text[:max_length] + "..."
    return text


def extract_keywords_with_llm(contents, output_file):
    """
    GPT를 사용하여 키워드를 추출하고 JSON 파일에 저장
    """
    keywords_data = []
    for idx, content in enumerate(contents):
        cleaned_content = clean_text(content)
        truncated_content = truncate_text(cleaned_content, max_length=1000)
        messages = [
            {
                "role": "system",
                "content": "You are an AI model trained to analyze text and extract deepfake indicators or clues. These clues can include unnatural features, inconsistencies, or anomalies commonly found in deepfake media.",
            },
            {
                "role": "user",
                "content": f"""
                Analyze the following text to extract key deepfake clues. Focus on identifying abnormalities or inconsistencies in facial features, textures, lighting, or expressions that might suggest digital manipulation.

                Text:
                {truncated_content}

                Extracted Clues:
                1. Identify any unnatural features, such as misaligned eyes, uneven lighting, or irregular skin texture.
                2. Include specific details about abnormalities, such as asymmetrical facial features, inconsistent reflections, or poorly rendered details like teeth or hair.
                3. Mention lighting inconsistencies, shadow mismatches, or environmental factors that suggest editing.
                4. List these clues as specific and concise keywords separated by commas.

                Keywords:
                """,
            },
        ]
        try:
            gpt = ChatOpenAI(
                temperature=0,
                model_name="gpt-4o",  # 모델명
            )
            response = gpt.invoke(messages)
            keywords = response.content.strip()
            print(f"Keywords for item {idx}: {keywords}")
            keywords_data.append({"content": content, "keywords": keywords})
        except KeyError as ke:
            # 응답 형식이 예상과 다를 경우 오류 처리
            print(f"KeyError for item {idx}: {ke}")
            logging.error(f"KeyError for item {idx}: {ke} - Response: {response}")
            keywords_data.append(
                {"content": content, "keywords": "default, keyword, placeholder"}
            )
        except Exception as e:
            # 기타 예외 처리
            print(f"Error extracting keywords for content at index {idx}: {e}")
            logging.error(f"Error extracting keywords for content at index {idx}: {e}")
            keywords_data.append(
                {"content": content, "keywords": "default, keyword, placeholder"}
            )
    # JSON 저장
    with open(output_file, "w", encoding="utf-8") as outfile:
        json.dump(keywords_data, outfile, ensure_ascii=False, indent=4)
    return keywords_data


# 3. keyword.json을 CLIP를 통해 임베딩을 수행하고, DB에 저장
def embed_keywords_with_clip(keyword_file, output_file):
    with open(keyword_file, "r", encoding="utf-8") as infile:
        keywords_data = json.load(infile)

    embeddings = []
    for entry in keywords_data:
        keywords = entry["keywords"]
        inputs = clip_processor(
            text=[keywords], return_tensors="pt", padding=True, truncation=True
        ).to(device)
        with torch.no_grad():
            text_embedding = clip_model.get_text_features(**inputs)
        embeddings.append(
            {"keywords": keywords, "embedding": text_embedding.cpu().tolist()}
        )
    # JSON 저장
    with open(output_file, "w", encoding="utf-8") as outfile:
        json.dump(embeddings, outfile, ensure_ascii=False, indent=4)
    return embeddings


# 4. GitHub의 목록으로부터 이미지 URL 리스트 전달받음
def get_github_image_urls(username, repo, branch, folder_path):
    api_url = f"https://api.github.com/repos/{username}/{repo}/contents/{folder_path}?ref={branch}"
    try:
        response = requests.get(api_url)
        response.raise_for_status()
        files = response.json()
        image_urls = [
            f"https://raw.githubusercontent.com/{username}/{repo}/{branch}/{folder_path}/{file['name']}"
            for file in files
            if file.get("name", "").lower().endswith((".jpg", ".jpeg", ".png"))
        ]
        return image_urls
    except Exception as e:
        print(f"Error fetching image URLs from GitHub: {e}")
        return []


# 5. 이미지 임베딩을 수행하여, 기존에 저장된 텍스트 임베딩 값과 유사도 계산
def embed_image_and_calculate_similarity(image_urls, keyword_embeddings):
    results = []
    for image_url in image_urls:
        try:
            response = requests.get(image_url)
            response.raise_for_status()
            image = Image.open(BytesIO(response.content)).convert("RGB")
            inputs = clip_processor(images=image, return_tensors="pt").to(device)
            with torch.no_grad():
                image_embedding = clip_model.get_image_features(**inputs).cpu()

            # 유사도 계산
            similarities = [
                {
                    "keywords": entry["keywords"],
                    "similarity": torch.nn.functional.cosine_similarity(
                        torch.tensor(entry["embedding"]), image_embedding
                    ).item(),
                }
                for entry in keyword_embeddings
            ]
            most_relevant = sorted(
                similarities, key=lambda x: x["similarity"], reverse=True
            )[:5]
            results.append({"image_url": image_url, "relevant_keywords": most_relevant})
        except Exception as e:
            print(f"Error processing image {image_url}: {e}")
            continue
    return results


# 6. 추출된 키워드들을 바탕으로 프롬프트 생성
def generate_prompts(results):
    prompts = []
    for result in results:
        image_url = result["image_url"]
        relevant_keywords = [kw["keywords"] for kw in result["relevant_keywords"]]
        prompt = f"""
            Analyze the image at the following URL: {image_url}.  
            Using the following keywords as a checklist: {', '.join(relevant_keywords)}.  

            Instructions:  
            1. For each keyword, verify whether the described clue is present in the image. Clearly state which keywords (if any) correspond to observed anomalies.  
            2. If no anomalies are found, conclude that the image does not show signs of deepfake manipulation.  
            3. Provide a balanced judgment and explain your reasoning, ensuring not to assume manipulation unless supported by clear evidence.  

            Response Format:  
            1. Observations: List the observations for each keyword, explicitly noting whether it matches an observed anomaly or not.  
            2. Conclusion: State whether the image is likely a deepfake (Yes/No).  
            3. Explanation: Provide a brief explanation, citing specific observations or lack thereof.

            Example Response:  
            Observations:  
            - "Mismatched shadows": Not observed.  
            - "Unnatural facial features": Slight asymmetry in the eyes.  
            - "Smooth skin texture": Observed on the cheeks.  

            Conclusion: No.  

            Explanation: While some minor anomalies were noted (e.g., asymmetry in the eyes), these are consistent with natural variations in human features and do not strongly indicate manipulation.
            """
        prompts.append({"image_url": image_url, "prompt": prompt.strip()})
    return prompts


# 7. 이미지 URL과 프롬프트를 쌍으로 한 rag_dataset.json 파일 생성
def save_rag_dataset(prompts, output_file):
    with open(output_file, "w", encoding="utf-8") as outfile:
        json.dump(prompts, outfile, ensure_ascii=False, indent=4)

In [None]:
"""
새로운 output_data가 추가되었을 경우에만 실행 cntl + /
"""

# # 단계별 실행

# # 컨텐츠 추출

# assistant_contents = extract_assistant_content(input_file)

# # 키워드 생성

# keywords_data = extract_keywords_with_llm(assistant_contents, keywords_file)

In [None]:
# 키워드 임베딩
keyword_embeddings = embed_keywords_with_clip(keywords_file, embedding_file)
# 이미지 리스트 생성

image_urls = get_github_image_urls(username, repo, branch, folder_path)

# 이미지 임베딩 및 유사도 계산

results = embed_image_and_calculate_similarity(image_urls, keyword_embeddings)

In [10]:
# 프롬프트 생성
prompts = generate_prompts(results)
save_rag_dataset(prompts, rag_dataset_file)

print(f"RAG dataset saved to {rag_dataset_file}")

RAG dataset saved to rag_dataset.json
