## Pickle-DL 모델
24/5/31 최종업뎃 <br>
24/6/12 새 모델 시도 & 메타데이터



### Requirements

In [None]:
# !pip install torch torchvision
# !pip install ftfy regex tqdm
# !git clone https://github.com/openai/CLIP.git
# !pip install git+https://github.com/openai/CLIP.git
# !pip install faiss-cpu

In [None]:
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

### 1. EfficientNet(B0) + faiss (0.6)

In [None]:
from PIL import Image, ExifTags

def correct_image_rotation(img):
    try:
        for orientation in ExifTags.TAGS.keys():
            if ExifTags.TAGS[orientation] == 'Orientation':
                break

        exif = img._getexif()
        if exif is not None:
            orientation = exif.get(orientation, None)

            if orientation == 3:
                img = img.rotate(180, expand=True)
            elif orientation == 6:
                img = img.rotate(270, expand=True)
            elif orientation == 8:
                img = img.rotate(90, expand=True)
    except (AttributeError, KeyError, IndexError):
        # In case of missing EXIF data or other issues, do nothing
        pass

    return img

def get_image_embedding(image_path, model, device):
    try:
        # 이미지 로드 및 전처리
        img = Image.open(image_path).convert('RGB')
        img = correct_image_rotation(img)  # 이미지 회전 보정
        img_tensor = preprocess(img).unsqueeze(0).to(device)

        # 임베딩 벡터 생성
        model.eval()
        with torch.no_grad():
            embedding_vector = model(img_tensor).cpu().numpy().flatten()
        return embedding_vector
    except UnidentifiedImageError:
        print(f"Cannot identify image file {image_path}. Skipping.")
        return None

# 이미지 그룹 출력 시에도 회전 보정 적용
def plot_image_groups(groups):
    for idx, group in enumerate(groups):
        if idx < 200:
            print("=================group {}=================".format(idx))
            plt.figure(figsize=(10,10))
            for i, image_path in enumerate(group):
                plt.subplot(1, len(group), i + 1)
                img = Image.open(image_path)
                img = correct_image_rotation(img)  # 이미지 회전 보정
                plt.imshow(img)
                plt.axis('off')
            plt.show()

# 예시 폴더 경로
# folder_path = r"C:\Users\ben81\zflip_camera"
folder_path = r"C:\Users\ben81\zflip_random2"

# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
base_model = base_model.to(device)

# 폴더 내 모든 이미지에 대한 임베딩 벡터 생성
embeddings, image_paths = get_embeddings_from_folder(folder_path, base_model, device)

# FAISS를 이용한 코사인 유사도 측정
index = faiss.IndexFlatIP(embeddings.shape[1])
faiss.normalize_L2(embeddings)
index.add(embeddings)

D, I = index.search(embeddings, k=len(embeddings))  # 모든 이미지에 대해 유사도 측정

# 유사도 0.6 이상인 그룹 생성
threshold = 0.65
groups = []
visited = set()

for i in range(len(embeddings)):
    if i in visited:
        continue
    group = [image_paths[i]]
    visited.add(i)
    for j in range(1, len(I[i])):
        if D[i][j] >= threshold and I[i][j] not in visited:
            group.append(image_paths[I[i][j]])
            visited.add(I[i][j])
    groups.append(group)

# 결과 출력
print(device)
plot_image_groups(groups)


## 2. CLIP

In [None]:
import torch
import clip
from PIL import Image
import os

# CLIP 모델 로드
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

# 이미지 파일들을 로드하고 전처리
def load_images(image_folder):
    images = []
    image_files = [os.path.join(image_folder, file) for file in os.listdir(image_folder) if file.endswith(('png', 'jpg', 'jpeg'))]

    for image_file in image_files:
        image = preprocess(Image.open(image_file)).unsqueeze(0).to(device)
        images.append((image_file, image))

    return images

# 이미지 임베딩 계산
def get_image_features(images):
    image_features = []
    for image_file, image in images:
        with torch.no_grad():
            image_feature = model.encode_image(image)
            image_features.append((image_file, image_feature))
    return image_features

# 텍스트 쿼리 임베딩 계산
def get_text_features(text_query):
    with torch.no_grad():
        text_tokens = clip.tokenize([text_query]).to(device)
        text_features = model.encode_text(text_tokens)
    return text_features

# 유사도 계산
def calculate_similarity(image_features, text_features):
    similarities = []
    for image_file, image_feature in image_features:
        similarity = torch.cosine_similarity(image_feature, text_features, dim=1)
        similarities.append((image_file, similarity))
    return similarities

# 이미지 검색 함수
def search_images(image_folder, text_query):
    images = load_images(image_folder)
    image_features = get_image_features(images)
    text_features = get_text_features(text_query)
    
    similarities = calculate_similarity(image_features, text_features)
    # Converting similarity scores to scalar for sorting
    similarities = [(image_file, sim.item()) for image_file, sim in similarities]
    similarities.sort(key=lambda x: x[1], reverse=True)
    
    return similarities

# 해시태그 임베딩 계산
def get_hashtag_features(hashtags):
    with torch.no_grad():
        text_tokens = clip.tokenize(hashtags).to(device)
        text_features = model.encode_text(text_tokens)
    return text_features

# 해시태그 생성 함수
def generate_hashtags(image_folder, hashtags):
    images = load_images(image_folder)
    image_features = get_image_features(images)
    hashtag_features = get_hashtag_features(hashtags)

    image_hashtags = {}
    for image_file, image_feature in image_features:
        similarity = torch.cosine_similarity(image_feature, hashtag_features, dim=1)
        top_indices = similarity.topk(5).indices.cpu().numpy()
        image_hashtags[image_file] = [hashtags[i] for i in top_indices]

    return image_hashtags
    

### 1. CLIP 이미지 검색

In [None]:
# 이미지 폴더 경로 입력 
image_folder = 'C:\\Users\\ben81\\GitHub\\Pickle-DL\\pickle_clip'
# 텍스트 쿼리 설정
text_query = "travel image"

# 이미지 검색
results = search_images(image_folder, text_query)

# 결과 출력
for image_file, similarity in results:
    print(f"Image: {image_file}, Similarity: {similarity}")




### 2. CLIP 해시태그 자동분배

영어가 좀 더 작동 잘됨. 필요할 경우 한글 해시태그 영어로 번역해서 저장하는 것도

In [None]:
# 이미지 폴더 경로
image_folder = 'C:\\Users\\ben81\\GitHub\\Pickle-DL\\pickle_clip' 

# 해시태그 입력
# hashtags = [
#     "#nature", "#travel", "#food", "#art", "#technology",
#     "#fashion", "#sports", "#music", "#animals", "#people"
# ]
hashtags = [
    "#character", "#people", "#travel", "#technology", "#art",
    "#animals", "#computer", "#fashion", "#game", "#mobile"
]
# hashtags = [
#     "#캐릭터","#사람","#여행","#기술","#예술","#동물","#컴퓨터","#패션", "#게임", "#모바일"
# ]

# 해시태그 생성
image_hashtags = generate_hashtags(image_folder, hashtags)

# 결과 출력
for image_file, tags in image_hashtags.items():
    print(f"Image: {image_file}, Hashtags: {tags}")


### 유사도 검색: CLIP + faiss (0.8)

성능이 매우별로다... threshold 줄이면 대참사가 난다..

In [None]:
import torch
import torch.nn as nn
from PIL import Image, UnidentifiedImageError
import numpy as np
import os
import faiss
import matplotlib.pyplot as plt
import clip

# CLIP 모델 및 전처리기 로드
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, preprocess = clip.load("ViT-B/32", device=device)

def get_image_embedding(image_path, model, preprocess, device):
    try:
        # 이미지 로드 및 전처리
        img = Image.open(image_path).convert('RGB')
        img_tensor = preprocess(img).unsqueeze(0).to(device)

        # 임베딩 벡터 생성
        with torch.no_grad():
            embedding_vector = model.encode_image(img_tensor).cpu().numpy().flatten()
        return embedding_vector
    except UnidentifiedImageError:
        print(f"Cannot identify image file {image_path}. Skipping.")
        return None

def get_embeddings_from_folder(folder_path, model, preprocess, device):
    embeddings = []
    image_paths = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith(('jpg', 'jpeg', 'png')):
            image_path = os.path.join(folder_path, file_name)
            embedding = get_image_embedding(image_path, model, preprocess, device)
            if embedding is not None:
                embeddings.append(embedding)
                image_paths.append(image_path)
    return np.array(embeddings, dtype=np.float32), image_paths

def plot_image_groups(groups):
    for idx, group in enumerate(groups):
        print("=================group {}=================".format(idx))
        plt.figure(figsize=(20, 20))
        for i, image_path in enumerate(group):
            plt.subplot(1, len(group), i + 1)
            img = Image.open(image_path)
            plt.imshow(img)
            plt.axis('off')
        plt.show()

# 폴더 경로
folder_path = "C:\\Users\\ben81\\GitHub\\Pickle-DL\\pickle"

# 폴더 내 모든 이미지에 대한 임베딩 벡터 생성
embeddings, image_paths = get_embeddings_from_folder(folder_path, model, preprocess, device)

# FAISS를 이용한 코사인 유사도 측정
index = faiss.IndexFlatIP(embeddings.shape[1])
faiss.normalize_L2(embeddings)
index.add(embeddings)

D, I = index.search(embeddings, k=len(embeddings))  # 모든 이미지에 대해 유사도 측정

threshold = 0.8
groups = []
visited = set()

for i in range(len(embeddings)):
    if i in visited:
        continue
    group = [image_paths[i]]
    visited.add(i)
    for j in range(1, len(I[i])):
        if D[i][j] >= threshold and I[i][j] not in visited:
            group.append(image_paths[I[i][j]])
            visited.add(I[i][j])
    groups.append(group)

# 결과 출력
plot_image_groups(groups)

In [None]:
embeddings[0].shape

# 모델 새로 만들기

In [None]:
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import models
import torchvision.datasets as datasets
import random
import os

# Pretrained EfficientNet-B0 모델 불러오기
model = models.efficientnet_b0(pretrained=True)
# 마지막 fully connected layer를 제거
model.classifier = nn.Identity()

# 이미지 전처리 함수 정의
preprocess = transforms.Compose([
    transforms.Resize((256,224)),  
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])



class RotationDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = os.listdir(root_dir)  # 디렉토리에서 이미지 파일 경로 가져오기

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_name = self.image_paths[idx]
        image_path = os.path.join(self.root_dir, image_name)  # 이미지 파일 경로 생성
        image = Image.open(image_path).convert('RGB')
        image = self.transform(image)

        # 이미지를 무작위로 0도, 90도, 180도, 270도 회전
        angle = random.choice([0, 90, 180, 270])
        rotated_image = transforms.functional.rotate(image, angle)

        return image, rotated_image


# 데이터셋과 데이터로더 생성
train_data = RotationDataset(root_dir=r"C:\Users\ben81\zflip_random1", transform=preprocess)
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)

# 손실 함수 정의
criterion = nn.MSELoss()

# 옵티마이저 정의
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 학습
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, rotated_images in train_loader:
        optimizer.zero_grad()

        # 모델에 이미지 전달하여 임베딩 얻기
        embeddings = model(images)
        rotated_embeddings = model(rotated_images)

        # 손실 계산
        loss = criterion(embeddings, rotated_embeddings)

        # 역전파 및 가중치 업데이트
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")



In [None]:
import torch

# 모델 저장
torch.save(model.state_dict(), 'model_weights.pth')

# 모델 전체 저장 (모델 아키텍처 + 가중치)
# torch.save(model, 'full_model.pth')

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image, UnidentifiedImageError
import numpy as np
import os
import faiss
import matplotlib.pyplot as plt

# EfficientNet 모델 로드 및 임베딩 벡터 추출 레이어 설정
base_model = models.efficientnet_b0(pretrained=False)
base_model.classifier = nn.Identity()  # Remove the classification layer
# base_model.load_state_dict(torch.load('model_weights.pth'))
# base_model.eval()
# 이미지 전처리 함수
preprocess = transforms.Compose([
    transforms.Resize((256, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def get_image_embedding(image_path, model, device):
    try:
        # 이미지 로드 및 전처리
        img = Image.open(image_path).convert('RGB')
        img_tensor = preprocess(img).unsqueeze(0).to(device)

        # 임베딩 벡터 생성
        model.eval()
        with torch.no_grad():
            embedding_vector = model(img_tensor).cpu().numpy().flatten()
        return embedding_vector
    except UnidentifiedImageError:
        print(f"Cannot identify image file {image_path}. Skipping.")
        return None

def get_embeddings_from_folder(folder_path, model, device):
    embeddings = []
    image_paths = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith(('jpg', 'jpeg', 'png')):
            image_path = os.path.join(folder_path, file_name)
            embedding = get_image_embedding(image_path, model, device)
            if embedding is not None:
                embeddings.append(embedding)
                image_paths.append(image_path)
    return np.array(embeddings), image_paths

def plot_image_groups(groups):
    for idx, group in enumerate(groups):
        if idx < 200:
            print("=================group {}=================".format(idx))
            plt.figure(figsize=(20, 20))
            for i, image_path in enumerate(group):
                plt.subplot(1, len(group), i + 1)
                img = Image.open(image_path)
                plt.imshow(img)
                plt.axis('off')
            plt.show()

# 예시 폴더 경로
folder_path = r"C:\Users\ben81\zflip_random2"

# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
base_model = base_model.to(device)

# 폴더 내 모든 이미지에 대한 임베딩 벡터 생성
embeddings, image_paths = get_embeddings_from_folder(folder_path, base_model, device)

# FAISS를 이용한 코사인 유사도 측정
index = faiss.IndexFlatIP(embeddings.shape[1])
faiss.normalize_L2(embeddings)
index.add(embeddings)

D, I = index.search(embeddings, k=len(embeddings))  # 모든 이미지에 대해 유사도 측정

# 유사도 0.6 이상인 그룹 생성
threshold = 0.65
groups = []
visited = set()

for i in range(len(embeddings)):
    if i in visited:
        continue
    group = [image_paths[i]]
    visited.add(i)
    for j in range(1, len(I[i])):
        if D[i][j] >= threshold and I[i][j] not in visited:
            group.append(image_paths[I[i][j]])
            visited.add(I[i][j])
    groups.append(group)

# 결과 출력
print(device)
plot_image_groups(groups)

In [None]:
import cv2
import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image, UnidentifiedImageError
import numpy as np
import os
import faiss
import matplotlib.pyplot as plt

# EfficientNet 모델 로드 및 임베딩 벡터 추출 레이어 설정
base_model = models.efficientnet_b0(pretrained=False)
base_model.classifier = nn.Identity()  # Remove the classification layer
base_model.load_state_dict(torch.load('model_weights.pth'))
base_model.eval()

# 이미지 전처리 함수
preprocess = transforms.Compose([
    transforms.Resize((256, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def get_orientation(image):
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    edges = cv2.Canny(gray, 50, 150)
    points = np.argwhere(edges > 0)
    points = np.fliplr(points)
    mean, eigenvectors = cv2.PCACompute(points.astype(np.float32), mean=np.array([]))
    angle = np.arctan2(eigenvectors[0, 1], eigenvectors[0, 0])
    angle = np.degrees(angle)
    return angle

def align_image(image, angle):
    (h, w) = image.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    rotated = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
    return rotated

def get_image_embedding(image_path, model, device):
    try:
        img = Image.open(image_path).convert('RGB')
        img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
        angle = get_orientation(img_cv)
        aligned_img_cv = align_image(img_cv, -angle)
        aligned_img = Image.fromarray(cv2.cvtColor(aligned_img_cv, cv2.COLOR_BGR2RGB))
        img_tensor = preprocess(aligned_img).unsqueeze(0).to(device)

        model.eval()
        with torch.no_grad():
            embedding_vector = model(img_tensor).cpu().numpy().flatten()
        return embedding_vector
    except UnidentifiedImageError:
        print(f"Cannot identify image file {image_path}. Skipping.")
        return None

def get_embeddings_from_folder(folder_path, model, device):
    embeddings = []
    image_paths = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith(('jpg', 'jpeg', 'png')):
            image_path = os.path.join(folder_path, file_name)
            embedding = get_image_embedding(image_path, model, device)
            if embedding is not None:
                embeddings.append(embedding)
                image_paths.append(image_path)
    return np.array(embeddings), image_paths

def plot_image_groups(groups):
    for idx, group in enumerate(groups):
        if idx < 200:
            print("=================group {}=================".format(idx))
            plt.figure(figsize=(20, 20))
            for i, image_path in enumerate(group):
                plt.subplot(1, len(group), i + 1)
                img = Image.open(image_path)
                plt.imshow(img)
                plt.axis('off')
            plt.show()

# 예시 폴더 경로
folder_path = r"C:\Users\ben81\zflip_camera"

# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
base_model = base_model.to(device)

# 폴더 내 모든 이미지에 대한 임베딩 벡터 생성
embeddings, image_paths = get_embeddings_from_folder(folder_path, base_model, device)

# FAISS를 이용한 코사인 유사도 측정
index = faiss.IndexFlatIP(embeddings.shape[1])
faiss.normalize_L2(embeddings)
index.add(embeddings)

D, I = index.search(embeddings, k=len(embeddings))  # 모든 이미지에 대해 유사도 측정

# 유사도 0.6 이상인 그룹 생성
threshold = 0.65
groups = []
visited = set()

for i in range(len(embeddings)):
    if i in visited:
        continue
    group = [image_paths[i]]
    visited.add(i)
    for j in range(1, len(I[i])):
        if D[i][j] >= threshold and I[i][j] not in visited:
            group.append(image_paths[I[i][j]])
            visited.add(I[i][j])
    if len(group) == 1:
        img = cv2.imread(group[0])  # 이미지 읽기
        rotated_embeddings = []
        for angle in [0, 90, 180, 270]:  # 0, 90, 180, 270도로 회전
            rotated_img = cv2.rotate(img, angle)
            rotated_embedding = get_embeddings_from_folder(rotated_img, base_model, device)
            rotated_embeddings.append(rotated_embedding)
        rotated_embeddings = torch.cat(rotated_embeddings, dim=0)  # 회전된 임베딩 결합
        rotated_D, rotated_I = index.search(rotated_embeddings, k=len(rotated_embeddings))  # 회전된 이미지에 대한 유사도 계산

        # 회전된 이미지에서 가장 가까운 이미지를 찾아 해당 그룹에 추가
        min_dist_idx = rotated_I[0][1]  # 첫 번째는 자기 자신이므로 두 번째로 가까운 이미지 선택
        group.append(image_paths[min_dist_idx])

        visited.add(min_dist_idx)

    groups.append(group)

# 결과 출력
print(device)
plot_image_groups(groups)


In [None]:
from PIL import Image, ExifTags

def print_exif_data(image_path):
    img = Image.open(image_path)
    exif_data = img._getexif()
    if exif_data is not None:
        for tag, value in exif_data.items():
            tag_name = ExifTags.TAGS.get(tag, tag)
            print(f"{tag_name}: {value}")
    else:
        print("No EXIF data found")

# 예시 이미지 경로
image_path =r"C:\Users\ben81\zflip_random2\20230809_185725.jpg"
print_exif_data(image_path)
