In [1]:
import os
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class UnlabeledBeefDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = [os.path.join(root_dir, fname) for fname in os.listdir(root_dir)]
    
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Create the dataset and dataloader
dataset = UnlabeledBeefDataset(root_dir='C:\\Users\\data\\beef', transform=transform)
dataloader = DataLoader(dataset, batch_size=10, shuffle=True)


In [2]:
import torch.nn as nn
import torch.optim as optim
from torchvision import models

class SimCLR(nn.Module):
    def __init__(self, base_model, out_dim):
        super(SimCLR, self).__init__()
        self.encoder = base_model
        num_ftrs = self.encoder.fc.in_features
        self.encoder.fc = nn.Identity()
        self.projector = nn.Sequential(
            nn.Linear(num_ftrs, 256),
            nn.ReLU(),
            nn.Linear(256, out_dim)
        )

    def forward(self, x):
        h = self.encoder(x)
        z = self.projector(h)
        return h, z

# Initialize the model
base_model = models.resnet50(pretrained=False)
model = SimCLR(base_model, out_dim=128)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)




In [3]:
class NTXentLoss(nn.Module):
    def __init__(self, batch_size, temperature, device):
        super(NTXentLoss, self).__init__()
        self.batch_size = batch_size
        self.temperature = temperature
        self.device = device
        self.criterion = nn.CrossEntropyLoss(reduction="sum")

    def forward(self, z_i, z_j):
        N = 2 * self.batch_size
        z = torch.cat((z_i, z_j), dim=0)

        # Calculate similarity matrix
        sim = torch.mm(z, z.T) / self.temperature

        # Create masks
        mask = torch.eye(N, device=self.device, dtype=torch.bool)
        labels = torch.arange(N, device=self.device)
        labels = (labels + N // 2) % N

        # Positive and negative samples
        positive_samples = torch.cat([torch.diag(sim, self.batch_size), torch.diag(sim, -self.batch_size)], dim=0)
        negative_samples = sim[~mask].view(N, -1)

        logits = torch.cat([positive_samples.view(N, 1), negative_samples], dim=1)
        loss = self.criterion(logits, labels)
        return loss / N

# Initialize the model
base_model = models.resnet50(pretrained=False)
model = SimCLR(base_model, out_dim=128)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)



In [34]:
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = NTXentLoss(batch_size=10, temperature=0.5, device=device)

num_epochs = 10
model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    for images in dataloader:
        images = images.to(device)
        optimizer.zero_grad()
        _, z = model(images)
        loss = criterion(z, z)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(dataloader)}")

torch.save(model.state_dict(), 'simclr_pretrained.pth')



Epoch 1/10, Loss: 3.460002064704895
Epoch 2/10, Loss: 2.7778549989064536
Epoch 3/10, Loss: 2.91237743695577
Epoch 4/10, Loss: 2.827024459838867
Epoch 5/10, Loss: 2.924703518549601
Epoch 6/10, Loss: 2.836981455485026
Epoch 7/10, Loss: 2.8394608894983926
Epoch 8/10, Loss: 2.7996509075164795
Epoch 9/10, Loss: 2.8375885089238486
Epoch 10/10, Loss: 2.789113918940226


---------------------------------------------------------여기까지 pre-training

In [4]:
import os
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class LabeledBeefDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = os.listdir(root_dir)
        self.image_paths = []
        self.labels = []
        for label, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            for img_name in os.listdir(class_dir):
                self.image_paths.append(os.path.join(class_dir, img_name))
                self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Define transformations for supervised learning
transform_supervised = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

labeled_dataset = LabeledBeefDataset(root_dir='C:\\Users\\data\\beef_labeled', transform=transform_supervised)
labeled_dataloader = DataLoader(labeled_dataset, batch_size=16, shuffle=True)


In [5]:
import torch
import torch.nn as nn
from torchvision import models
from collections import OrderedDict

class ResNetEncoder(nn.Module):
    def __init__(self):
        super(ResNetEncoder, self).__init__()
        self.encoder = models.resnet50(pretrained=False)
        self.num_ftrs = self.encoder.fc.in_features
        self.encoder.fc = nn.Identity()  # Remove the fully connected layer

    def forward(self, x):
        x = self.encoder(x)
        return x

# Initialize ResNet model
encoder_model = ResNetEncoder()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
encoder_model = encoder_model.to(device)

# Load state dictionary
state_dict = torch.load('simclr_pretrained.pth', map_location=device)

# Filter out the projector parameters
new_state_dict = OrderedDict()
for k, v in state_dict.items():
    if k.startswith("encoder"):
        new_state_dict[k.replace("encoder.", "")] = v

# Load the filtered state dictionary into the model
encoder_model.encoder.load_state_dict(new_state_dict)


<All keys matched successfully>

In [6]:
# Modify the final layer for classification
encoder_model.encoder.fc = nn.Linear(encoder_model.num_ftrs, len(os.listdir('C:\\Users\\data\\beef_labeled')))
encoder_model = encoder_model.to(device)


In [7]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(encoder_model.parameters(), lr=1e-4)


In [20]:
num_epochs = 10
encoder_model.train()
for epoch in range(num_epochs):
    running_loss = 0.0
    for images, labels in labeled_dataloader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = encoder_model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(labeled_dataloader)}")

# Save the fine-tuned model
torch.save(encoder_model.state_dict(), 'beef_grade_classifier_finetuned.pth')


Epoch 1/10, Loss: 1.6863246361414592
Epoch 2/10, Loss: 1.5231038530667622
Epoch 3/10, Loss: 1.451739529768626
Epoch 4/10, Loss: 1.4391018350919087
Epoch 5/10, Loss: 1.3520343899726868
Epoch 6/10, Loss: 1.4046476085980732
Epoch 7/10, Loss: 1.3570273915926616
Epoch 8/10, Loss: 1.312989075978597
Epoch 9/10, Loss: 1.2872304519017537
Epoch 10/10, Loss: 1.260794421037038


In [8]:
# Load the fine-tuned model
encoder_model.load_state_dict(torch.load('beef_grade_classifier_finetuned.pth'))
encoder_model.eval()


ResNetEncoder(
  (encoder): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
    

In [29]:
def predict_image(image_path, model, transform):
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)  # Add batch dimension
    image = image.to(device)

    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        outputs = model(image)
        _, predicted = torch.max(outputs, 1)
    return predicted.item()

# Example usage
image_path = 'C:\\Users\\2024-05-22 231838.png'
prediction = predict_image(image_path, encoder_model, transform_supervised)
class_names = os.listdir('C:\\Users\\beef_labeled')
print(f'Predicted Class: {class_names[prediction]}')


Predicted Class: 2
