CatDataset에서 2개의 이미지 무작위 추출 = 1 pair <- 파일 경로는 목적에 따라 수정
모든 pair에 대한 similarity 출력

** similarity type : cosine
** 출력 : 엑셀 파일 or confusion matrix


## Data

In [1]:
import torch
import torchvision
import cv2
import numpy as np
import pandas as pd
import random

from torchvision import transforms
from torchvision.transforms.functional import to_pil_image
from torch.utils.data import Dataset
from torchvision.utils import save_image

def get_transform():
    return transforms.Compose([
        #transforms.ToPILImage(),        # Only necessary if your images are not already PIL images
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=(0, 45)),
        transforms.RandomPerspective(distortion_scale=0.6, p=0.5),
        transforms.Resize((224, 224)),
        transforms.ToTensor()            # Converts PIL Image to tensor and normalizes to [0, 1]
    ])



class CatDataset(Dataset):
    def __init__(self, directory, transform=None, num_augmentations = 10):
        """
        Args:
            directory (string): Directory with all the images and annotations.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.directory = directory
        self.transform = transform if transform else get_transform()
        self.num_augmentations = num_augmentations

        self.filenames =[]
        # Dealing with no annotation
        for f in os.listdir(directory):
            if f.endswith('.jpg'):
                annotation_file = os.path.join(directory, f + '.cat')
                if os.path.isfile(annotation_file):  # Check if the annotation file exists
                    self.filenames.append(f)  # Only add the image if the annotation exists


    def __len__(self):
        return len(self.filenames) * self.num_augmentations

    def __getitem__(self, idx):

        random.shuffle(self.filenames)

        file_idx = idx // self.num_augmentations
        img_name = os.path.join(self.directory, self.filenames[file_idx])
        annotation_name = img_name + '.cat'

        #n_idx = file_idx + 1 if not file_idx >= len(self.filenames) else 0
        #negative_name = os.path.join(self.directory, self.filenames[n_idx])
        n_idx = (file_idx + 1) % len(self.filenames)  # Use modulo to wrap around
        negative_name = os.path.join(self.directory, self.filenames[n_idx])
        negative_annotation = negative_name + '.cat'

        image = cv2.imread(img_name)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        n_image = cv2.imread(negative_name)
        n_image = cv2.cvtColor(n_image, cv2.COLOR_BGR2RGB)

         # Load annotations and compute the bounding box
        frame = pd.read_csv(annotation_name, sep =' ', header=None)
        landmarks = (frame.to_numpy()[0][1:-1]).reshape((-1, 2))

        n_frame = pd.read_csv(negative_annotation, sep =' ', header = None)
        n_landmarks = (n_frame.to_numpy()[0][1:-1]).reshape((-1,2))

        crop_img = self.__crop__(image, landmarks)
        n_img = self.__crop__(n_image, n_landmarks)

        if crop_img is None or crop_img.shape[0]==0 or crop_img.shape[1]==0:
            print(f'Invalid image encountered : {img_name}, {crop_img.shape}')


        # Apply transformation
        split = int(self.num_augmentations/2)
        transformed_images = [self.transform(crop_img) for _ in range(self.num_augmentations)]
        anchor = transformed_images[0:split]
        positive = transformed_images[split:]
        negative = [self.transform(n_img) for _ in range(split)]


        #To debug...
        #for i in range(split) :
        #    anchor_img = to_pil_image(anchor[i])
        #    positive_img = to_pil_image(positive[i])
        #    negative_img = to_pil_image(negative[i])
        #    anchor_img.save(f'/home/jeehyun/coursework/DL/MeOw-LO/debug/{file_idx}_{idx}_{i}_anchor.png')
        #    positive_img.save(f'/home/jeehyun/coursework/DL/MeOw-LO/debug/{file_idx}_{idx}_{i}_positive.png')
        #    negative_img.save(f'/home/jeehyun/coursework/DL/MeOw-LO/debug/{file_idx}_{idx}_{i}_negative.png')

        return anchor, positive, negative

    def __crop__(self, image, landmarks):


        # Calculate the angle of rotation
        left_ear = landmarks[4]
        right_ear = landmarks[7]
        angle = np.degrees(np.arctan2(right_ear[1] - left_ear[1], right_ear[0] - left_ear[0]))

        # Calculate the center for rotation
        rotation_center_x,rotation_center_y = landmarks[6]
        rotation_matrix = cv2.getRotationMatrix2D((rotation_center_x, rotation_center_y), angle, 1)

        # Perform rotation
        rotated_image = cv2.warpAffine(image, rotation_matrix, (image.shape[1], image.shape[0]))
        # Update landmarks after rotation
        landmarks_homogenous = np.hstack([landmarks, np.ones((landmarks.shape[0], 1))])  # For affine transformation
        rotated_landmarks = rotation_matrix.dot(landmarks_homogenous.T).T

        # Recalculate the bounding box on the rotated image
        new_left_ear = rotated_landmarks[4]
        new_right_ear = rotated_landmarks[7]
        width = int(new_right_ear[0] - new_left_ear[0])
        margin = int(width * 0.25)
        height = width
        x = int(new_left_ear[0]) - margin
        y = int(new_left_ear[1]) - margin

        # Ensure cropping is within the image boundaries
        x = max(0, x)
        y = max(0, y)
        x_end = min(rotated_image.shape[1], x + width + 2 * margin)
        y_end = min(rotated_image.shape[0], y + height + 2 * margin)
        crop_img = rotated_image[y:y_end, x:x_end]

        return crop_img

  from .autonotebook import tqdm as notebook_tqdm


## Train

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [3]:
import torch
import torchvision
import argparse
from torchvision.utils import make_grid
from torch.utils.data import DataLoader
#from dataload import CatDataset
import matplotlib.pyplot as plt
from torchvision.models import vit_b_16, ViT_B_16_Weights
from torch.optim import Adam
from torch.nn import TripletMarginLoss

In [4]:
# 메인 함수 정의
def main(directory, batch_size, epochs):
    print("Registering Data...")
    dataset = CatDataset(directory=directory)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    print("Finish the Loading!")

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = vit_b_16(weights=ViT_B_16_Weights.IMAGENET1K_V1)
    model.head = torch.nn.Identity()
    model = model.to(device)
    triplet_loss = TripletMarginLoss(margin=1.0, p=2)
    optimizer = Adam(model.parameters(), lr=0.0001)

    model.train()
    print("Train begins...")
    for epoch in range(epochs):
        epoch_loss = 0
        for images in dataloader:
            optimizer.zero_grad()
            anchors, positives, negatives = images
            #print("Check image size")
            #print(f'Anchor : {anchors[0].shape}, Positive : {positives[0].shape}')
            anchors = [anchor.to(device) for anchor in anchors]
            positives = [positive.to(device) for positive in positives]
            negatives = [negative.to(device) for negative in negatives]
            anchor_features = torch.stack([model(anchor) for anchor in anchors])
            positive_features = torch.stack([model(positive) for positive in positives])
            negative_features = torch.stack([model(negative) for negative in negatives])
            loss = triplet_loss(anchor_features, positive_features, negative_features)
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}/{epochs} completed.")


In [5]:
# 파라미터 설정 및 함수 호출
directory = '/data/etc/molo/CAT_00'
batch_size = 8
epochs = 10

main(directory, batch_size, epochs)

Registering Data...
Finish the Loading!
Train begins...


TypeError: Unexpected type <class 'numpy.ndarray'>

In [None]:
import torch
from torchvision.models import vit_b_16, ViT_B_16_Weights
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.nn import TripletMarginLoss
import torchvision.transforms as transforms

# 장치 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 모델 초기화
model = vit_b_16(weights=ViT_B_16_Weights.IMAGENET1K_V1)
model.head = torch.nn.Identity()
model = model.to(device)

# 트리플렛 손실 함수
triplet_loss = TripletMarginLoss(margin=1.0, p=2)

dataset = CatDataset(directory='/data/etc/molo/CAT_00')
data_loader = DataLoader(dataset, batch_size=16, shuffle=True)

# 옵티마이저
optimizer = Adam(model.parameters(), lr=0.0001)

# 훈련 함수
def train():
    model.train()
    total_loss = 0
    for data in data_loader:
        optimizer.zero_grad()
        
        # Unpack the data, each is a list of images
        anchors, positives, negatives = data
        
        # Move each tensor in the list to the device
        anchors = [anchor.to(device) for anchor in anchors]
        positives = [positive.to(device) for positive in positives]
        negatives = [negative.to(device) for negative in negatives]

        # Compute features for each set of images
        anchor_features = torch.stack([model(anchor) for anchor in anchors])
        positive_features = torch.stack([model(positive) for positive in positives])
        negative_features = torch.stack([model(negative) for negative in negatives])
        
        # Compute the triplet loss
        loss = triplet_loss(anchor_features, positive_features, negative_features)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()

    avg_loss = total_loss / len(data_loader)
    print(f"Average Loss: {avg_loss}")

if __name__ == '__main__':
    train()

KeyboardInterrupt: 

## Similarity

In [None]:
import torch
from sklearn.metrics.pairwise import cosine_similarity
import os
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from PIL import Image
from torchvision.io import read_image
from torchvision.transforms.functional import convert_image_dtype
from sklearn.metrics.pairwise import cosine_similarity
from torchvision.models import vit_b_16, ViT_B_16_Weights
from torchvision.transforms import Compose, Resize, ToTensor, Normalize
from itertools import combinations

### Model Loading

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = vit_b_16(weights=ViT_B_16_Weights.IMAGENET1K_V1)
model.head = torch.nn.Identity()
model = model.to(device)
model.eval()

VisionTransformer(
  (conv_proj): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
  (encoder): Encoder(
    (dropout): Dropout(p=0.0, inplace=False)
    (layers): Sequential(
      (encoder_layer_0): EncoderBlock(
        (ln_1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (self_attention): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
        )
        (dropout): Dropout(p=0.0, inplace=False)
        (ln_2): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (mlp): MLPBlock(
          (0): Linear(in_features=768, out_features=3072, bias=True)
          (1): GELU(approximate='none')
          (2): Dropout(p=0.0, inplace=False)
          (3): Linear(in_features=3072, out_features=768, bias=True)
          (4): Dropout(p=0.0, inplace=False)
        )
      )
      (encoder_layer_1): EncoderBlock(
        (ln_1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (self_a

### Pairwise set Loading

In [None]:
def load_images_pair(directory):
    files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.jpg')]
    images = [Image.open(file).convert('RGB') for file in files]
    return files, images

### Loading features from images

In [None]:
def extract_features(model, images, device):
    model.eval()
    transform = get_transform()
    images_transformed = torch.stack([transform(image) for image in images]).to(device)
    with torch.no_grad():
        features = model(images_transformed)
    return features.cpu().np()

### Calculate cosine Similarity

In [None]:
""" 한 번에 하나씩 코드
def similarity_calculator(model, image1, image2, device):
    
    Args:
    image1, image2
    => feature1, feature2 (torch.Tensor): Feature vectors of shape (1, 2048)

    Returns:
    float: Cosine similarity between feature1 and feature2
    
    features = extract_features(model, [image1, image2], device)

    # feature1, feature2 지정
    feature1 = features[0]
    feature2 = features[1]

    # Calculate Cosine Similarity
    similarity = cosine_similarity([feature1], [feature2])[0][0]
    return similarity
""" 
def similarity_calculator(features):
    return cosine_similarity(features) # 모든 이미지 features 쌍 간의 코사인 유사도 계산

### Save Results

In [None]:
def save_results_to_csv(directory, files, similarities):
    print("Saving result to csv...")

    dir_name = os.path.basename(directory)
    filename = f"{dir_name}_output.csv"

    pairs = []
    for i, file1 in enumerate(files):
        for j, file2 in enumerate(files):
            if i != j:
                pairs.append({
                    "Image1": file1,
                    "Image2": file2,
                    "Cosine Similarity": similarities[i, j]
                })
    result = pd.DataFrame(pairs)
    result.to_csv(filename, index=False)
    print("Saved result to csv!")
    return result

In [None]:
def plot_confusion_matrix(similarities, directory):
    print("Saving result to matrix...")

    dir_name = os.path.basename(directory)
    filename = f"{dir_name}_confusion_matrix.png"

    plt.figure(figsize=(10, 8))
    sns.heatmap(similarities, annot=True, cmap='coolwarm')
    plt.title(f"Cosine Similarities for {dir_name}")
    plt.savefig(filename)

    print("Saved matrix!")


여기 고치고 있었음!!!

In [None]:
def save_sim_results():
    import json
    
    data = {"Image 1":, 
            "Image 2":, 
            "similarity":
                }
    # save files
    with open("file.json", "w") as f:
        json.dump(data, f)
    
    # load files
    with open("file.json", "r") as f:
        data = json.load(f)

### 실행

In [35]:
# Main script
if __name__ == "__main__":
    directory = '/data/etc/molo/CAT_00'
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = vit_b_16(weights=ViT_B_16_Weights.IMAGENET1K_V1)
    model.head = torch.nn.Identity()
    model = model.to(device)
    
    files, images = load_images_pair(directory)
    features = extract_features(model, images, device)
    similarities = similarity_calculator(features)
    save_results_to_csv(directory, files, similarities)
    plot_confusion_matrix(similarities, directory)

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7f285c4ed7c0>>
Traceback (most recent call last):
  File "/home/seoyeong/.miniconda3/envs/MOLO/lib/python3.8/site-packages/ipykernel/ipkernel.py", line 770, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


KeyboardInterrupt: 