# 사용자 입력

In [3]:
user_input = "피카츄가 싸운다"

# 자막 정보를 이용하여 Timeline 추출

In [2]:
from sentence_transformers import SentenceTransformer

# Download from the 🤗 Hub
model = SentenceTransformer("nlpai-lab/KoE5")

In [None]:
import os
import re
import pickle
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity


# 찾고자 하는 텍스트 리스트와 임베딩 생성
finding_text = [
    user_input
]
finding_embeddings = model.encode(finding_text)

# Threshold 설정
threshold = 0.3  # 임계값 설정

# 자막 파일 경로
subtitle_folder_path = "/home/aikusrv03/pokemon/LCY/moogeun/output_folder/"
embedding_cache_folder = "/home/aikusrv03/pokemon/LCY/minjun/embedding_cache"  # 캐시 저장 경로
os.makedirs(embedding_cache_folder, exist_ok=True)

# 시간 형식 확인 함수
def is_time_format(line):
    """시간 형식인지 확인 (예: 109.590s 또는 HH:MM:SS)"""
    return re.match(r"^\d+(\.\d+)?s$", line.strip()) is not None

# 파일 정렬 함수
def extract_number(file_name):
    """파일 이름에서 숫자 추출 (예: '014' 추출)"""
    match = re.search(r'\d+', file_name)
    return int(match.group()) if match else float('inf')  # 숫자가 없으면 무한대 처리

# 캐시된 임베딩을 로드하거나 새로 생성
def load_or_create_embeddings(file_path, cache_path):
    if os.path.exists(cache_path):
        # 캐시된 임베딩 로드
        with open(cache_path, "rb") as f:
            print(f"Loading cached embeddings for {file_path}")
            loaded_data = pickle.load(f)
            if isinstance(loaded_data, tuple) and len(loaded_data) == 2:
                return loaded_data  # (embeddings, subtitles)
            else:
                raise ValueError(f"Cached data format invalid: {loaded_data}")
    else:
        # 임베딩 새로 생성
        with open(file_path, "r", encoding="utf-8") as file:
            lines = file.readlines()

        embeddings = []
        subtitles = []
        current_time = None
        current_dialogue = []

        for line in lines:
            line = line.strip()
            if is_time_format(line):
                if current_time and current_dialogue:
                    subtitle = " ".join(current_dialogue)
                    subtitle_embedding = model.encode(subtitle)
                    embeddings.append((current_time, subtitle_embedding))
                    subtitles.append((current_time, subtitle))

                current_time = line
                current_dialogue = []
            else:
                current_dialogue.append(line)

        if current_time and current_dialogue:
            subtitle = " ".join(current_dialogue)
            subtitle_embedding = model.encode(subtitle)
            embeddings.append((current_time, subtitle_embedding))
            subtitles.append((current_time, subtitle))

        # 임베딩 캐시에 저장
        with open(cache_path, "wb") as f:
            pickle.dump((embeddings, subtitles), f)  # 두 개의 값 저장

        return embeddings, subtitles

# 폴더 내 파일 정렬
file_names = sorted(
    [file_name for file_name in os.listdir(subtitle_folder_path) if file_name.endswith(".txt")],
    key=extract_number
)

# 결과 저장용 리스트
results = []
# 자막 파일 순회 (숫자 순서대로)
for file_name in file_names:
    print(f"Processing: {file_name}")
    
    file_path = os.path.join(subtitle_folder_path, file_name)
    cache_path = os.path.join(embedding_cache_folder, f"{file_name}.pkl")  # 캐시 파일 경로
    
    # 캐시된 임베딩 로드 또는 생성
    embeddings, subtitles = load_or_create_embeddings(file_path, cache_path)
    
    # 각 대사와 finding_text 유사도 계산
    for (time, subtitle_embedding), (time_text, subtitle) in zip(embeddings, subtitles):
        similarities = [cosine_similarity([subtitle_embedding], [ft_emb])[0][0] for ft_emb in finding_embeddings]
        avg_similarity = sum(similarities) / len(similarities)
        
        if avg_similarity >= threshold:
            results.append(f"{file_name}\t{time_text}\t{subtitle}")

# 결과 출력
for result in results:
    print(result)

# 결과 저장
output_file_path = "subtitles_timeclips.txt"
with open(output_file_path, "w", encoding="utf-8") as output_file:
    for result in results:
        output_file.write(result + "\n")

print(f"\n필터링된 결과가 '{output_file_path}'에 저장되었습니다.")

# 이미지 임베딩을 활용하여 포켓몬 찾기

In [None]:
# 포켓몬 이름 매핑 파일 불러오기
pokemon_name_map_file = 'pokemon_name_map.csv'
with open(pokemon_name_map_file, 'r') as f:
    name_list = f.readlines()
    name_list = [x.split('\t') for x in name_list]

# 포켓몬 한글 이름 목록, 포켓몬 한글 to 영어 이름 목록
pokemon_kor = [x[1] for x in name_list]
pokemon_kor_to_eng = dict([(x[1], x[3]) for x in name_list])

pokemon_name_korean = None
for name in sorted(pokemon_kor, reverse=True):
    if name in user_input:
        pokemon_name_korean = name
        break
    
if pokemon_name_korean is None:
    print("포켓몬이 없습니다!")
    
print(pokemon_name_korean)

In [None]:
import os

import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from PIL import Image
import torch

from extract_embedding import extract_embedding

In [8]:
# 코사인 유사도 계산 함수
def cosine_similarity(vec1, vec2):
    # 벡터의 내적 계산
    dot_product = np.dot(vec1, vec2)
    # 벡터의 크기(norm) 계산
    norm_a = np.linalg.norm(vec1)
    norm_b = np.linalg.norm(vec2)
    # 코사인 유사도 계산
    return dot_product / (norm_a * norm_b)

In [9]:
# 애니메이션에서 나온 detection 이미지 임베딩 파일 불러오기
embedding_path = '/home/aikusrv03/pokemon/LCY/lcy/openclip/embeddings_flatten_anime_new.txt'
with open(embedding_path, 'r') as f:
    embedding_list = f.readlines()
    embedding_list = [x.split('\t') for x in embedding_list]
    embedding_dict = dict(embedding_list)

In [None]:
# 애니메이션의 이미지와 비교할 target 이미지 불러오기
target_pokemon = pokemon_kor_to_eng[pokemon_name_korean]
pokemon_image_dir = '/home/aikusrv03/pokemon/LCY/data/image_1026/class'
pokemon_image_list = os.listdir(pokemon_image_dir)
target_pokemon_image = [name for name in pokemon_image_list if target_pokemon.lower() in name.lower()]
print(target_pokemon_image)

image_list = [Image.open(os.path.join(pokemon_image_dir, image_name)) for image_name in target_pokemon_image]
target_embedings = [extract_embedding(image) for image in image_list]

cos_sim_list = []

# 코사인 유사도 계산
for image_name, embedding in tqdm(embedding_list):
    embedding = eval(embedding)
    cos_sim_sum = 0
    for target_embedding in target_embedings:
        target_embedding = target_embedding.detach().numpy()
        cos_sim = cosine_similarity(target_embedding, np.array(embedding))
        cos_sim_sum += cos_sim
    ave_cos_sim = cos_sim_sum / len(target_embedings)
    cos_sim_list.append((ave_cos_sim[0], image_name))
cos_sim_list = sorted(cos_sim_list, reverse=True)
print(cos_sim_list)

In [None]:
# 유사도 시각화
fig, axes = plt.subplots(10, 10, figsize=(20, 25))
fig.suptitle("Images Sorted by Cosine Similarity", fontsize=16)

for i, ax in enumerate(axes.flat):
    ax.imshow(Image.open(cos_sim_list[i][1]), cmap='gray')
    ax.set_title(f"Sim: {cos_sim_list[i][0]:.2f}")
    ax.axis('off')

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

In [None]:
# 임베딩 threshold
threshold = 0.6
filtered_cos_sim_list = [data for data in cos_sim_list if data[0] > threshold]

sorted_data = sorted(filtered_cos_sim_list, key=lambda x: x[1])

print(len(sorted_data))

In [None]:
# 자막 정보 불러오기
subtitles_timeclips_file_path = 'subtitles_timeclips.txt'
subtitles_timelines = []
with open(subtitles_timeclips_file_path, 'r') as f:
    subtitles_timelines = f.readlines()
subtitles_timelines = [(x.split('Pokemon')[1][:3], x.split('\t')[1].split('.')[0]) for x in subtitles_timelines]
subtitles_timelines_dict = {}
for key, value in subtitles_timelines:
    if key not in subtitles_timelines_dict:
        subtitles_timelines_dict[key] = []
    subtitles_timelines_dict[key].append(value)

print(subtitles_timelines_dict)

In [None]:
import cv2
from collections import defaultdict

def get_second_from_name(file_name):
    name = file_name.split('/')[-2]
    sec = name.split('sec')[0]
    return sec

# 애니메이션 timeclip 준비
timeclip_dict = defaultdict(list)

current_anime_name = None
start_time = -5
end_time = -5

for data in sorted_data:
    file_name = data[1]
    anime_name = file_name.split('/')[-3]
    second = int(get_second_from_name(file_name))
    
    if current_anime_name != anime_name:
        current_anime_name = anime_name
        start_time = -5
        end_time = -5
    
    if second == end_time + 5:
        end_time = second
    else:
        if start_time >= 0:
            timeclip_dict[anime_name].append((start_time, end_time))
        start_time = second
        end_time = second
        
print(timeclip_dict)

In [None]:
import cv2
import matplotlib.pyplot as plt

# timeclip을 이용해서 애니메이션 클립 만들기

animation_dir = '/home/aikusrv03/pokemon/LCY/data/[01] Kanto Chapter [01-82]'
animation_list = os.listdir(animation_dir)

output_video_dir = f'/home/aikusrv03/pokemon/LCY/data/video_output/1223_{pokemon_name_korean}2'
os.makedirs(output_video_dir, exist_ok = True)

frame_padding_ratio = 1

for animation in timeclip_dict.keys():
    
    print(f"Processing {animation}...")
    
    timeclips = timeclip_dict[animation]
    animation_file_name = [x for x in animation_list if animation in x][0]
    
    animation_num = animation.split(' ')[-1]
    
    input_video_path = os.path.join(animation_dir, animation_file_name)
    
    cap = cv2.VideoCapture(input_video_path)

    fps = cap.get(cv2.CAP_PROP_FPS)  # 초당 프레임 수
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))  # 전체 프레임 수
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))  # 영상 폭
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))  # 영상 높이


    if animation_num not in subtitles_timelines_dict:
        continue
    subtitles_time = subtitles_timelines_dict[animation_num]

    frame_pair_list = []

    for timeclip in tqdm(timeclips):

        start_time, end_time = timeclip
        
        flag = False
        for time in subtitles_time:
            if start_time / 10 <= int(time) - 20 <= end_time / 10:
                flag = True
                break
        if flag is False:
            continue
        
        start_frame = int(start_time / 10 * fps)
        end_frame = int(end_time / 10 * fps)
        
        start_frame = int(max(0, start_frame - fps  * frame_padding_ratio))
        end_frame = int(end_frame + fps  * frame_padding_ratio)
        
        frame_pair_list.append((start_frame, end_frame))
        
    if len(frame_pair_list) == 0:
        continue

    current_frame = 0
    frame_pair_idx = 0
    start_frame = frame_pair_list[frame_pair_idx][0]
    end_frame = frame_pair_list[frame_pair_idx][1]
    
    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break

        if current_frame == start_frame:
            video_name = f'{animation}_{start_time}_{end_time}_{start_frame}_{end_frame}.mp4'
            fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # 코덱 설정
            out = cv2.VideoWriter(os.path.join(output_video_dir, video_name), fourcc, fps, (width, height))
            out.write(frame)
        elif current_frame == end_frame:
            out.write(frame)
            out.release()
            frame_pair_idx += 1
            if frame_pair_idx >= len(frame_pair_list):
                break
            start_frame = frame_pair_list[frame_pair_idx][0]
            end_frame = frame_pair_list[frame_pair_idx][1]
        elif start_frame < current_frame < end_frame:
            out.write(frame)

        current_frame += 1

# 자원 해제
cap.release()

print("Finished")

In [None]:
import mediapy as media
import random

video_list = os.listdir(output_video_dir)

video_path = os.path.join(output_video_dir, random.choice(video_list))
with media.VideoReader(video_path) as r:
    fps = r.fps
media.show_video(media.read_video(video_path), fps=fps, codec='gif')