In [1]:
import torch 
import os
import datetime
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.init as init
import torch.nn as nn
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.data import TensorDataset

In [2]:
class ImagePairDataset(Dataset):
    def __init__(self, root_dir1, root_dir2, transform=None):
        self.root_dir1 = root_dir1
        self.root_dir2 = root_dir2
        self.transform = transform
        self.samples = []
        
        # 두 디렉토리 안의 이미지 파일을 쌍으로 찾기
        for filename in os.listdir(root_dir1):
            img1_path = os.path.join(root_dir1, filename)
            img2_path = os.path.join(root_dir2, filename)
            if os.path.isfile(img1_path) and os.path.isfile(img2_path):
                # 올바른 쌍인 경우 레이블을 0으로 설정
                self.samples.append((img1_path, img2_path, 0))
                

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img1_path, img2_path, label = self.samples[idx]
        img1 = Image.open(img1_path).convert("RGB")
        img2 = Image.open(img2_path).convert("RGB")
        
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
        
        return img1, img2, label


In [3]:
class WrongImagePairDataset(Dataset):
    def __init__(self, root_dir1, root_dir2, transform=None):
        self.root_dir1 = root_dir1
        self.root_dir2 = root_dir2
        self.transform = transform
        self.samples = []
        
        # 두 디렉토리 안의 이미지 파일을 쌍으로 찾기
        for filename in os.listdir(root_dir1):
            img1_path = os.path.join(root_dir1, filename)
            img2_path = os.path.join(root_dir2, filename)
            if os.path.isfile(img1_path) and os.path.isfile(img2_path):
                # 올바른 쌍인 경우 레이블을 1으로 설정
                self.samples.append((img1_path, img2_path, 1))
                

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img1_path, img2_path, label = self.samples[idx]
        img1 = Image.open(img1_path).convert("RGB")
        img2 = Image.open(img2_path).convert("RGB")
        
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
        
        return img1, img2, label


In [4]:
class CombinedImagePairDataset(Dataset):
    def __init__(self, dataset1, dataset2):
        self.dataset1 = dataset1
        self.dataset2 = dataset2
        self.total_length = len(dataset1) + len(dataset2)

    def __len__(self):
        return self.total_length

    def __getitem__(self, idx):
        if idx < len(self.dataset1):
            return self.dataset1[idx]
        else:
            # Adjust the index to fit within the second dataset
            adjusted_idx = idx - len(self.dataset1)
            return self.dataset2[adjusted_idx]

In [5]:
# class ImageComparisonModel(nn.Module):
#     def __init__(self):
#         super(ImageComparisonModel, self).__init__()
        
#         # Stream 1 for the first image modality
#         self.stream1 = nn.Sequential(
#             nn.Conv2d(3, 64, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )
        
#         # Stream 2 for the second image modality
#         self.stream2 = nn.Sequential(
#             nn.Conv2d(3, 64, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )
        
#         # Fusion and decision mechanism
#         self.fusion = nn.Sequential(
#             nn.Linear(32*32*128*2, 1024),  # Assuming images are 224x224 size, adjust if different
#             nn.ReLU(inplace=True),
#             nn.Dropout(0.5),
#             nn.Linear(1024, 512),
#             nn.ReLU(inplace=True),
#             nn.Linear(512, 1)  # Binary classification
#         )
        
#     def forward(self, x1, x2):
#         x1 = self.stream1(x1)
#         x2 = self.stream2(x2)
        
#         # Flatten the features from both streams
#         x1 = x1.view(x1.size(0), -1)
#         x2 = x2.view(x2.size(0), -1)
        
#         # Concatenate the features
#         x = torch.cat((x1, x2), dim=1)
        
#         x = self.fusion(x)  # 수정된 부분: fusion 레이어 추가
#         return x


In [6]:
# Fusion 레이어:

# Fusion 레이어는 두 개의 입력을 결합하고 합쳐진 특징을 생성하는 역할을 합니다. 
# 이 레이어는 모델이 입력 데이터를 효과적으로 처리하고 결합하는 데 중요한 역할을 합니다. 
# 따라서 Fusion 레이어가 입력 데이터를 잘 결합하여 모델의 성능을 향상시키는 경우, 이를 사용하는 것이 좋습니다.

# Comparison 레이어: 

# Comparison 레이어는 결합된 특징을 사용하여 최종 예측을 생성하는 역할을 합니다. 
# 이 레이어는 모델이 입력 데이터의 특징을 비교하고 결합하여 원하는 작업을 수행하는 데 중요한 역할을 합니다. 
# 따라서 Comparison 레이어가 효과적인 예측을 생성하는 경우, 이를 사용하는 것이 좋습니다.

In [7]:
class ImageComparisonModel(nn.Module):
    def __init__(self):
        super(ImageComparisonModel, self).__init__()
        
        # Stream 1 for the first image modality
        self.stream1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=5, stride=1, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Stream 2 for the second image modality
        self.stream2 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=5, stride=1, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Fusion and decision mechanism
        self.comparison = nn.Sequential(
            nn.Linear(32*32*128*2, 1024),  # Assuming images are 224x224 size, adjust if different
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 1)  # Binary classification
        )
        
    def forward(self, x1, x2):
        x1 = self.stream1(x1)
        x2 = self.stream2(x2)
        
        # Flatten the features from both streams
        x1 = x1.view(x1.size(0), -1)
        x2 = x2.view(x2.size(0), -1)
        
        # Concatenate the features
        x = torch.cat((x1, x2), dim=1)
        
        # Pass through the comparison mechanism
        x = self.comparison(x)
        return x

In [8]:
# # 모델 초기화
#combined_model = CombinedModel()
combined_model = ImageComparisonModel()

In [9]:
# 이미지 전처리 설정
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 이미지 크기를 64x64로 조정
    transforms.ToTensor(),         # 이미지를 텐서로 변환
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # 정규화
])

In [10]:
batch_size = 16

In [11]:
# 경로 설정

chemdraw_train = "data/train/chemdraw_train"
chemdraw_test = "data/test/chemdraw_test"
chemdraw_valid = "data/valid/chemdraw_val"

chemdraw_train_smlies = "data/train/chemdraw_train_smlies"
chemdraw_test_smlies = "data/test/chemdraw_test_smlies"
chemdraw_valid_smlies = "data/valid/chemdraw_val_smlies"

chemdraw_train_smlies_wrong = "data/train/chemdraw_train_smlies_wrong"
chemdraw_test_smlies_wrong = "data/test/chemdraw_test_smlies_wrong"
chemdraw_valid_smlies_wrong = "data/valid/chemdraw_val_smlies_wrong"

In [12]:
# 학습 데이터셋 생성
correct_train_dataset = ImagePairDataset(chemdraw_train, chemdraw_train_smlies, transform=transform)
correct_test_dataset = ImagePairDataset(chemdraw_test, chemdraw_test_smlies, transform=transform)
correct_valid_dataset = ImagePairDataset(chemdraw_valid, chemdraw_valid_smlies, transform=transform)

wrong_train_dataset = WrongImagePairDataset(chemdraw_train, chemdraw_train_smlies_wrong, transform=transform)
wrong_test_dataset = WrongImagePairDataset(chemdraw_test, chemdraw_test_smlies_wrong, transform=transform)
wrong_valid_dataset = WrongImagePairDataset(chemdraw_valid, chemdraw_valid_smlies_wrong, transform=transform)


combine_train_dataset = CombinedImagePairDataset(correct_train_dataset, wrong_train_dataset)
combine_test_dataset = CombinedImagePairDataset(correct_test_dataset, wrong_test_dataset)
combine_valid_dataset = CombinedImagePairDataset(correct_valid_dataset, wrong_valid_dataset)

In [13]:
# 데이터로더 생성
train_loader = DataLoader(combine_train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(combine_test_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(combine_valid_dataset, batch_size=batch_size, shuffle=True)

In [14]:
# GPU를 사용할 수 있는지 확인합니다.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = ImageComparisonModel().to(device)

print(device)

cuda


In [15]:
# 예시: Contrastive Loss 직접 구현
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, target):
        euclidean_distance = F.pairwise_distance(output1, output2, keepdim=True)
        loss_contrastive = torch.mean((1 - target) * torch.pow(euclidean_distance, 2) +
                                      (target) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        return loss_contrastive

In [16]:
# 학습을 위한 하이퍼파라미터 설정
learning_rate = 0.001
num_epochs = 50

# 손실 함수 정의
criterion = nn.BCEWithLogitsLoss()

# 옵티마이저 설정
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(optimizer)

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    eps: 1e-08
    foreach: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)


In [17]:
# train 중간에 accuracy 프린트하는 부분 추가하기, 더 좋은 모델이 나왔을경우 모델을 저장하는 코드를 만들자.

# 모델 코드는 새로운 폴더를 만들고 (if save else make and save) 현재 시간을 가져와 이름으로 +@ 해서 저장해주자.

# 값이 더 이상 내려가지 않는다 -> 수렴한다, 높은 loss에서 수렴했다고 해서 나쁜 모델이 아니다. 만드시 정확도(accuracy)를 확인해주자.

In [18]:
# def train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs):
#     train_losses = []
#     valid_losses = []

#     for epoch in range(num_epochs):
#         model.train()
#         running_train_loss = 0.0
        
#         # 훈련 데이터로 모델 학습
#         for inputs1, inputs2, labels in train_loader:
#             inputs1, inputs2, labels = inputs1.to(device), inputs2.to(device), labels.to(device)
#             optimizer.zero_grad()
#             outputs = model(inputs1, inputs2)
#             loss = criterion(outputs, labels.float().view(-1, 1))
#             loss.backward()
#             optimizer.step()
#             running_train_loss += loss.item() * inputs1.size(0)
        
#         # 평균 훈련 손실 계산
#         epoch_train_loss = running_train_loss / len(train_loader.dataset)
#         train_losses.append(epoch_train_loss)
        
#         model.eval()
#         running_valid_loss = 0.0
        
#         # 검증 데이터로 모델 평가
#         for inputs1, inputs2, labels in valid_loader:
#             inputs1, inputs2, labels = inputs1.to(device), inputs2.to(device), labels.to(device)
#             outputs = model(inputs1, inputs2)
#             loss = criterion(outputs, labels.float().view(-1, 1))
#             running_valid_loss += loss.item() * inputs1.size(0)
        
#         # 평균 검증 손실 계산
#         epoch_valid_loss = running_valid_loss / len(valid_loader.dataset)
#         valid_losses.append(epoch_valid_loss)
        
#         # 에포크마다 학습 결과 출력
#         print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_train_loss:.8f}, Valid Loss: {epoch_valid_loss:.8f}')

#     torch.save(model.state_dict(), 'Check_Model.pth')
#     print("모델이 저장되었습니다.")
#     return train_losses, valid_losses


In [None]:
# Comparison 레이어: train

# def train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs):
#     train_losses = []
#     valid_losses = []

#     for epoch in range(num_epochs):
#         model.train()
#         running_train_loss = 0.0
        
#         # 훈련 데이터로 모델 학습
#         for inputs1, inputs2, labels in train_loader:
#             inputs1, inputs2, labels = inputs1.to(device), inputs2.to(device), labels.to(device)
#             optimizer.zero_grad()
#             outputs = model(inputs1, inputs2)
#             loss = criterion(outputs, labels.float().view(-1, 1))
#             loss.backward()
#             optimizer.step()
#             running_train_loss += loss.item() * inputs1.size(0)
        
#         # 평균 훈련 손실 계산
#         epoch_train_loss = running_train_loss / len(train_loader.dataset)
#         train_losses.append(epoch_train_loss)
        
#         model.eval()
#         running_valid_loss = 0.0
        
#         # 검증 데이터로 모델 평가
#         for inputs1, inputs2, labels in valid_loader:
#             inputs1, inputs2, labels = inputs1.to(device), inputs2.to(device), labels.to(device)
#             outputs = model(inputs1, inputs2)
#             loss = criterion(outputs, labels.float().view(-1, 1))
#             running_valid_loss += loss.item() * inputs1.size(0)
        
#         # 평균 검증 손실 계산
#         epoch_valid_loss = running_valid_loss / len(valid_loader.dataset)
#         valid_losses.append(epoch_valid_loss)
        
#         # 에포크마다 학습 결과 출력
#         print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_train_loss:.8f}, Valid Loss: {epoch_valid_loss:.8f}')

#     torch.save(model.state_dict(), 'Check_Model.pth')
#     print("모델이 저장되었습니다.")
#     return train_losses, valid_losses

In [25]:
save_model_path = "M_check"

def save_model(model, save_path):
    # 새로운 폴더 생성 (없으면)
    os.makedirs(save_path, exist_ok=True)
    # 현재 시간 가져오기
    now = datetime.datetime.now()
    # 모델 저장
    torch.save(model.state_dict(), os.path.join(save_path, f"model_{now.strftime('%Y%m%d_%H%M%S')}.pth"))

def train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, save_path):
    train_losses = []
    valid_losses = []
    best_valid_loss = float('inf')
    best_accuracy = 0.0

    for epoch in range(num_epochs):
        model.train()
        running_train_loss = 0.0
        
        # 훈련 데이터로 모델 학습
        for inputs1, inputs2, labels in train_loader:
            inputs1, inputs2, labels = inputs1.to(device), inputs2.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs1, inputs2)
            loss = criterion(outputs, labels.float().view(-1, 1))
            loss.backward()
            optimizer.step()
            running_train_loss += loss.item() * inputs1.size(0)
        
        # 평균 훈련 손실 계산
        epoch_train_loss = running_train_loss / len(train_loader.dataset)
        train_losses.append(epoch_train_loss)
        
        model.eval()
        running_valid_loss = 0.0
        correct = 0
        total = 0
        
        # 검증 데이터로 모델 평가
        for inputs1, inputs2, labels in valid_loader:
            inputs1, inputs2, labels = inputs1.to(device), inputs2.to(device), labels.to(device)
            outputs = model(inputs1, inputs2)
            loss = criterion(outputs, labels.float().view(-1, 1))
            running_valid_loss += loss.item() * inputs1.size(0)
            
            # 정확도 계산
            predicted = torch.round(torch.sigmoid(outputs))
            correct += (predicted == labels.view(-1, 1)).sum().item()
            total += labels.size(0)
        
        # 평균 검증 손실 계산
        epoch_valid_loss = running_valid_loss / len(valid_loader.dataset)
        valid_losses.append(epoch_valid_loss)
        
        # 정확도 계산
        accuracy = correct / total
        
        # 에포크마다 학습 결과 출력
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_train_loss:.8f}, Valid Loss: {epoch_valid_loss:.8f}, Accuracy: {accuracy:.8f}')
        
        # 더 나은 모델인 경우 저장
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            if save_path:
                save_model(model, save_path)
        
        # 모델의 손실이 더 낮아졌을 때 저장
        if epoch_valid_loss < best_valid_loss:
            best_valid_loss = epoch_valid_loss
            if save_path:
                save_model(model, save_path)

    print("모델이 저장되었습니다.")
    return train_losses, valid_losses, best_accuracy

In [26]:
# 모델 학습
train_losses, valid_losses = train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs=num_epochs, save_path = save_model_path)

Epoch 1/50, Train Loss: 0.69332522, Valid Loss: 0.69314975, Accuracy: 0.50000000
Epoch 2/50, Train Loss: 0.69331849, Valid Loss: 0.69315849, Accuracy: 0.50000000
Epoch 3/50, Train Loss: 0.69328434, Valid Loss: 0.69316806, Accuracy: 0.50000000
Epoch 4/50, Train Loss: 0.69327813, Valid Loss: 0.69314754, Accuracy: 0.50000000
Epoch 5/50, Train Loss: 0.69325082, Valid Loss: 0.69314725, Accuracy: 0.50000000
Epoch 6/50, Train Loss: 0.69327386, Valid Loss: 0.69314943, Accuracy: 0.50000000
Epoch 7/50, Train Loss: 0.69326459, Valid Loss: 0.69316250, Accuracy: 0.50000000
Epoch 8/50, Train Loss: 0.69319084, Valid Loss: 0.69316243, Accuracy: 0.50000000
Epoch 9/50, Train Loss: 0.69325080, Valid Loss: 0.69314993, Accuracy: 0.50000000
Epoch 10/50, Train Loss: 0.69320882, Valid Loss: 0.69315726, Accuracy: 0.50000000
Epoch 11/50, Train Loss: 0.69322731, Valid Loss: 0.69315314, Accuracy: 0.50000000
Epoch 12/50, Train Loss: 0.69321196, Valid Loss: 0.69315565, Accuracy: 0.50000000
Epoch 13/50, Train Loss: 

FileNotFoundError: [Errno 2] No such file or directory: 'data/train/chemdraw_train_smlies/US07314887-20080101-C00215.png'

In [None]:
# 학습 결과 시각화
plt.plot(train_losses, label='Train Loss')
plt.plot(valid_losses, label='Valid Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()

In [None]:
num_epochs_improve = 5

# 모델 개선 함수
def improve_model(model, optimizer, criterion, train_loader, valid_loader, num_epochs):
    train_losses = []
    valid_losses = []

    for epoch in range(num_epochs):
        model.train()
        running_train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_train_loss += loss.item() * inputs.size(0)
        epoch_train_loss = running_train_loss / len(train_loader.dataset)
        train_losses.append(epoch_train_loss)

        model.eval()
        running_valid_loss = 0.0
        with torch.no_grad():
            for inputs, labels in valid_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                running_valid_loss += loss.item() * inputs.size(0)
            epoch_valid_loss = running_valid_loss / len(valid_loader.dataset)
            valid_losses.append(epoch_valid_loss)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_train_loss:.4f}, Valid Loss: {epoch_valid_loss:.4f}')

    torch.save(model.state_dict(), 'improved_model.pth')
    print("모델이 저장되었습니다.")
    return train_losses, valid_losses

In [None]:
# 모델 개선 및 재학습
improved_train_losses, improved_valid_losses = improve_model(model, optimizer, criterion, train_loader, valid_loader, num_epochs_improve)

In [None]:
def evaluate_model(model, criterion, test_loader):
    total_samples = 0
    total_loss = 0
    correct_predictions = 0
    
    with torch.no_grad():
        for inputs1, inputs2 in test_loader:
            inputs1, inputs2 = inputs1.to(device), inputs2.to(device)
            outputs = model(inputs1, inputs2)
            loss = criterion(outputs, torch.ones(outputs.shape[0], 1, device=device))  # 이진 분류 문제에서는 일관된 레이블 사용
            total_loss += loss.item() * inputs1.size(0)
            
            # 모델의 예측 결과를 확인하여 정확도 계산
            predicted_labels = torch.round(torch.sigmoid(outputs))
            correct_predictions += (predicted_labels == 1).sum().item()
            
            total_samples += inputs1.size(0)
    
    avg_loss = total_loss / total_samples
    accuracy = correct_predictions / total_samples * 100
    
    return avg_loss, accuracy


In [None]:
# 테스트 데이터셋에 대해 모델 평가
test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
print(f'Test Loss: {test_loss:.8f}, Test Accuracy: {test_accuracy:.2f}%')