In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import os
import json
from PIL import Image

In [23]:
# 특정 폴더의 모든 파일 가져오기
def get_all_files(directory):
    all_files = []
    for root, _, files in os.walk(directory):  # 하위 디렉토리 포함 탐색
        for file in files:
            all_files.append(os.path.join(root, file))  # 전체 경로 추가
    return all_files


# 예시 사용
folder_path = '.\\data\\Radiator_grille\\1.Training\\source\\Radiator_grille\\gap'  # 대상 폴더 경로
source_files = get_all_files(folder_path)


# JSON 파일들이 저장된 폴더
label_path = ".\\data\\Radiator_grille\\1.Training\\labeling\\Radiator_grille\\gap"
source_labels = get_all_files(label_path)


In [24]:
forder_test_path = ".\\data\\Radiator_grille\\2.Validation\\source\\Radiator_grille\\gap"
source_test_files = get_all_files(forder_test_path)
label_test_path= ".\\data\\Radiator_grille\\2.Validation\\labeling\\Radiator_grille\\gap"
source_test_labels = get_all_files(label_test_path)

In [25]:
# # 모델 정의
# class SimpleCNN(nn.Module):
#     def __init__(self):
#         super(SimpleCNN, self).__init__()
#         # conv 2번 / pooling 2번 / ReLU 1번
#         self.conv_Layers = nn.Sequential(
#             # 32 - 5 + 1 -> 28 => 32 * 32를 28 * 28 * 6 사이즈로 변환
#             nn.Conv2d(3, 6, 5),
#             nn.ReLU(),
#             nn.MaxPool2d(2, 2),  # stride -> 28 * 28 => 14 * 14
#             nn.Conv2d(6, 16, 5),  # 14 - 5 + 1 = 10 -> 14 * 14 => 10 * 10
#             nn.ReLU(),
#             nn.MaxPool2d(2, 2)  # stride -> 10 * 10 => 5 * 5
#         )
#         self.flatten = nn.Flatten()
#         self.fc_layer = nn.Sequential(
#             nn.Linear(5*5*16, 120),
#             nn.ReLU(),
#             nn.Linear(120, 84),
#             nn.ReLU(),
#             nn.Linear(84, 10)
#         )  # fully connected layer

#     def forward(self, x):
#         out = self.conv_Layers(x)
#         flatten = self.flatten(out)
#         fc_out = self.fc_layer(flatten)
#         return fc_out


# device = "cuda" if torch.cuda.is_available() else "cpu"
# model = SimpleCNN()
# model.to(device)
# print(device)

In [26]:
import torch.nn as nn
import torch.nn.functional as F


class ResNetLikeCNN(nn.Module):
    def __init__(self):
        super(ResNetLikeCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)  # 3x3 Conv
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64 * 4 * 4, 128)  # Fully connected layer
        self.fc2 = nn.Linear(128, 10)  # Output layer (10 classes)

        self.pool = nn.MaxPool2d(2, 2)
        self.batchnorm1 = nn.BatchNorm2d(16)  # Batch Normalization
        self.batchnorm2 = nn.BatchNorm2d(32)
        self.batchnorm3 = nn.BatchNorm2d(64)

    def forward(self, x):
        x = F.relu(self.batchnorm1(self.conv1(x)))
        x = self.pool(x)
        x = F.relu(self.batchnorm2(self.conv2(x)))
        x = self.pool(x)
        x = F.relu(self.batchnorm3(self.conv3(x)))
        x = self.pool(x)
        x = x.view(-1, 64 * 4 * 4)  # Flatten
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


device = "cuda" if torch.cuda.is_available() else "cpu"
model = ResNetLikeCNN()
model.to(device)

ResNetLikeCNN(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (fc1): Linear(in_features=1024, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=10, bias=True)
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (batchnorm1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batchnorm2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batchnorm3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

In [27]:
import json

# JSON 파일에서 'quality'를 숫자로 변환하는 매핑 예시
quality_to_label = {"불량품": 0, "양품": 1}  # 예시로 매핑, 실제 데이터에 맞게 수정

# 새로운 데이터셋을 저장할 리스트
train_json_dataset = []

for json_file in source_labels:
    # JSON 파일 경로
    json_path = json_file

    # JSON 파일 읽기
    with open(json_path, 'r', encoding='utf-8-sig') as f:
        data = json.load(f)

    # 필요한 데이터 추출
    for annotation in data['annotations']:
        # "work", "part", "quality" 데이터 추출
        work = annotation['attributes'].get('work')
        part = annotation['attributes'].get('part')
        quality = annotation['attributes'].get('quality')

        # 새로운 데이터셋 형식으로 저장
        # train_json_dataset.append({
        #     'work': work,
        #     'part': part,
        #     'quality': quality
        # })

        train_json_dataset.append(quality_to_label[quality])

    # 추출된 데이터를 새로운 JSON 파일로 저장
    # with open('extracted_dataset.json', 'w', encoding='utf-8') as f:
    #     json.dump(train_json_dataset, f, ensure_ascii=False, indent=4)

print("학습 데이터셋이 성공적으로 저장되었습니다!")

학습 데이터셋이 성공적으로 저장되었습니다!


In [28]:
import json

# JSON 파일에서 'quality'를 숫자로 변환하는 매핑 예시
quality_to_label = {"불량품": 0, "양품": 1}  # 예시로 매핑, 실제 데이터에 맞게 수정

# 새로운 데이터셋을 저장할 리스트
test_json_dataset = []

for json_file in source_test_labels:
    # JSON 파일 경로
    json_path = json_file

    # JSON 파일 읽기
    with open(json_path, 'r', encoding='utf-8-sig') as f:
        data = json.load(f)

    # 필요한 데이터 추출
    for annotation in data['annotations']:
        # "work", "part", "quality" 데이터 추출
        work = annotation['attributes'].get('work')
        part = annotation['attributes'].get('part')
        quality = annotation['attributes'].get('quality')

        # 새로운 데이터셋 형식으로 저장
        # train_json_dataset.append({
        #     'work': work,
        #     'part': part,
        #     'quality': quality
        # })

        test_json_dataset.append(quality_to_label[quality])

    # 추출된 데이터를 새로운 JSON 파일로 저장
    # with open('extracted_dataset.json', 'w', encoding='utf-8') as f:
    #     json.dump(train_json_dataset, f, ensure_ascii=False, indent=4)

print("테스트 데이터셋이 성공적으로 저장되었습니다!")

테스트 데이터셋이 성공적으로 저장되었습니다!


In [29]:
# print(train_json_dataset)
# print(train_json_dataset.index(0))
# print(train_json_dataset[train_json_dataset.index(0):])
# print(source_files[train_json_dataset.index(0):])

# print(test_json_dataset)
# print(test_json_dataset.index(0))
# print(test_json_dataset[test_json_dataset.index(0):])
# print(source_test_labels[test_json_dataset.index(0):])

In [30]:
# train_data = list(zip((source_files[train_json_dataset.index(
#     "불량품"):]), train_json_dataset[train_json_dataset.index("불량품"):]))

In [31]:
# 손실함수 정의
loss_fn = nn.CrossEntropyLoss()
# # 가중치 학습1
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# 가중치 학습2
# optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
# 가중치 학습3
# optimizer = optim.SGD(model.parameters(), lr=0.01,
#                       momentum=0.9, weight_decay=5e-4)
# scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)
# 가중치 학습 4
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

In [32]:
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import transforms
import torch

# 데이터셋 클래스 정의 (커스텀 데이터셋)


class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, label


# 데이터 전처리 (Normalization 및 그레이스케일 변환)
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),  # 그레이스케일로 변환 후 3채널로 확장
    transforms.RandomHorizontalFlip(),  # 데이터 증강 : 이미지 좌우 반전
    transforms.RandomCrop(32, padding=4),  # 데이터 증강 : padding 4 후 32*32로 랜덤 크롭
    transforms.ToTensor(),  # 텐서로 변환
    transforms.RandomRotation(10),  # 10도 회전
    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # 밝기 및 대비 조정

    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 정규화
])



# CustomDataset으로 데이터셋 만들기
train_dataset = CustomDataset(
    image_paths=source_files, labels=train_json_dataset, transform=transform)
# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = CustomDataset(
    image_paths=source_test_files, labels=test_json_dataset, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [None]:
from PIL import Image, ImageFile, UnidentifiedImageError
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# 손상된 이미지를 처리하도록 설정
ImageFile.LOAD_TRUNCATED_IMAGES = True

# 손상된 이미지가 있으면 건너뛰기
def safe_load_image(image_path):
    try:
        image = Image.open(image_path)
        image.load()  # 이미지가 제대로 로드되는지 확인
        return image
    except (OSError, UnidentifiedImageError, ValueError):
        print(f"Skipping corrupted image: {image_path}")
        return None  # 손상된 이미지는 None 반환

# 데이터셋 클래스 정의 (커스텀 데이터셋)
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.transform = transform
        self.data = []

        for img_path, label in zip(image_paths, labels):
            # 이미지 로드 및 유효성 확인
            image = safe_load_image(img_path)
            if image is not None:
                self.data.append((img_path, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, label = self.data[idx]

        # 이미지를 다시 안전하게 로드
        image = safe_load_image(image_path)
        if self.transform:
            image = self.transform(image)

        return image, label

# 모델 학습
epochs = 5
for epoch in range(epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):  # train_loader에서 가져오기
        # 손상된 이미지가 반환된 경우 건너뛰기
        if inputs is None:
            continue
        
        # 입력 데이터와 레이블을 device로 이동
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)  # 모델에 입력
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # 학습률 스케줄러 업데이트
    # scheduler.step()
    print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

print("학습 완료")



In [20]:
# 테스트하기
correct = 0
total = len(test_loader.dataset)
with torch.no_grad():  # test는 기울기 계산 X
    for (images, labels) in test_loader:
        # 손상된 이미지가 반환된 경우 건너뛰기
        if images is None:
            continue
        
        images, labels = images.to(device), labels.to(device)  # 이미지를 GPU로 이동
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == labels).sum().item()

print(f"Accuracy: {100 * correct / total}%")


Accuracy: 52.478134110787174%


# 원본 코드

In [None]:
# 특정 폴더의 모든 파일 가져오기
def get_all_files(directory):
    all_files = []
    for root, _, files in os.walk(directory):  # 하위 디렉토리 포함 탐색
        for file in files:
            all_files.append(os.path.join(root, file))  # 전체 경로 추가
    return all_files


# 예시 사용
folder_path = '.\\data\\Radiator_grille\\1.Training\\source\\Radiator_grille\\gap'  # 대상 폴더 경로
source_files = get_all_files(folder_path)


# JSON 파일들이 저장된 폴더
label_path = ".\\data\\Radiator_grille\\1.Training\\labeling\\Radiator_grille\\gap"
source_labels = get_all_files(label_path)

# 모델 정의
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        # conv 2번 / pooling 2번 / ReLU 1번
        self.conv_Layers = nn.Sequential(
            # 32 - 5 + 1 -> 28 => 32 * 32를 28 * 28 * 6 사이즈로 변환
            nn.Conv2d(3, 6, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # stride -> 28 * 28 => 14 * 14
            nn.Conv2d(6, 16, 5),  # 14 - 5 + 1 = 10 -> 14 * 14 => 10 * 10
            nn.ReLU(),
            nn.MaxPool2d(2, 2)  # stride -> 10 * 10 => 5 * 5
        )
        self.flatten = nn.Flatten()
        self.fc_layer = nn.Sequential(
            nn.Linear(5*5*16, 120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Linear(84, 10)
        )  # fully connected layer

    def forward(self, x):
        out = self.conv_Layers(x)
        flatten = self.flatten(out)
        fc_out = self.fc_layer(flatten)
        return fc_out


device = "cuda" if torch.cuda.is_available() else "cpu"
model = SimpleCNN()
model.to(device)
print(device)

import json

# JSON 파일에서 'quality'를 숫자로 변환하는 매핑 예시
quality_to_label = {"불량품": 0, "양품": 1}  # 예시로 매핑, 실제 데이터에 맞게 수정

# 새로운 데이터셋을 저장할 리스트
train_json_dataset = []

for json_file in source_labels:
    # JSON 파일 경로
    json_path = json_file

    # JSON 파일 읽기
    with open(json_path, 'r', encoding='utf-8-sig') as f:
        data = json.load(f)

    # 필요한 데이터 추출
    for annotation in data['annotations']:
        # "work", "part", "quality" 데이터 추출
        work = annotation['attributes'].get('work')
        part = annotation['attributes'].get('part')
        quality = annotation['attributes'].get('quality')

        # 새로운 데이터셋 형식으로 저장
        # train_json_dataset.append({
        #     'work': work,
        #     'part': part,
        #     'quality': quality
        # })

        train_json_dataset.append(quality_to_label[quality])

    # 추출된 데이터를 새로운 JSON 파일로 저장
    # with open('extracted_dataset.json', 'w', encoding='utf-8') as f:
    #     json.dump(train_json_dataset, f, ensure_ascii=False, indent=4)

print("학습 데이터셋이 성공적으로 저장되었습니다!")

import json

# JSON 파일에서 'quality'를 숫자로 변환하는 매핑 예시
quality_to_label = {"불량품": 0, "양품": 1}  # 예시로 매핑, 실제 데이터에 맞게 수정

# 새로운 데이터셋을 저장할 리스트
test_json_dataset = []

for json_file in source_test_labels:
    # JSON 파일 경로
    json_path = json_file

    # JSON 파일 읽기
    with open(json_path, 'r', encoding='utf-8-sig') as f:
        data = json.load(f)

    # 필요한 데이터 추출
    for annotation in data['annotations']:
        # "work", "part", "quality" 데이터 추출
        work = annotation['attributes'].get('work')
        part = annotation['attributes'].get('part')
        quality = annotation['attributes'].get('quality')

        # 새로운 데이터셋 형식으로 저장
        # train_json_dataset.append({
        #     'work': work,
        #     'part': part,
        #     'quality': quality
        # })

        test_json_dataset.append(quality_to_label[quality])

    # 추출된 데이터를 새로운 JSON 파일로 저장
    # with open('extracted_dataset.json', 'w', encoding='utf-8') as f:
    #     json.dump(train_json_dataset, f, ensure_ascii=False, indent=4)

print("테스트 데이터셋이 성공적으로 저장되었습니다!")

# 손실함수 정의
loss_fn = nn.CrossEntropyLoss()
# # 가중치 학습1
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# 가중치 학습2
# optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
# 가중치 학습3
optimizer = optim.SGD(model.parameters(), lr=0.01,
                      momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import transforms
import torch

# 데이터셋 클래스 정의 (커스텀 데이터셋)


class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, label


# 데이터 전처리 (Normalization 및 그레이스케일 변환)
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),  # 그레이스케일로 변환 후 3채널로 확장
    transforms.RandomHorizontalFlip(),  # 데이터 증강 : 이미지 좌우 반전
    transforms.RandomCrop(32, padding=4),  # 데이터 증강 : padding 4 후 32*32로 랜덤 크롭
    transforms.ToTensor(),  # 텐서로 변환
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 정규화
])

# CustomDataset으로 데이터셋 만들기
train_dataset = CustomDataset(
    image_paths=source_files, labels=train_json_dataset, transform=transform)
# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = CustomDataset(
    image_paths=source_test_files, labels=test_json_dataset, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

from PIL import Image, ImageFile, UnidentifiedImageError
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# 손상된 이미지를 처리하도록 설정
ImageFile.LOAD_TRUNCATED_IMAGES = True

# 손상된 이미지가 있으면 건너뛰기
def safe_load_image(image_path):
    try:
        image = Image.open(image_path)
        image.load()  # 이미지가 제대로 로드되는지 확인
        return image
    except (OSError, UnidentifiedImageError, ValueError):
        print(f"Skipping corrupted image: {image_path}")
        return None  # 손상된 이미지는 None 반환

# 데이터셋 클래스 정의 (커스텀 데이터셋)
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.transform = transform
        self.data = []

        for img_path, label in zip(image_paths, labels):
            # 이미지 로드 및 유효성 확인
            image = safe_load_image(img_path)
            if image is not None:
                self.data.append((img_path, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, label = self.data[idx]

        # 이미지를 다시 안전하게 로드
        image = safe_load_image(image_path)
        if self.transform:
            image = self.transform(image)

        return image, label

# 모델 학습
epochs = 5
for epoch in range(epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):  # train_loader에서 가져오기
        # 손상된 이미지가 반환된 경우 건너뛰기
        if inputs is None:
            continue
        
        # 입력 데이터와 레이블을 device로 이동
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)  # 모델에 입력
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # 학습률 스케줄러 업데이트
    scheduler.step()
    print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

print("학습 완료")


# 테스트하기
correct = 0
total = len(test_loader.dataset)
with torch.no_grad():  # test는 기울기 계산 X
    for (images, labels) in test_loader:
        # 손상된 이미지가 반환된 경우 건너뛰기
        if images is None:
            continue
        
        images, labels = images.to(device), labels.to(device)  # 이미지를 GPU로 이동
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == labels).sum().item()

print(f"Accuracy: {100 * correct / total}%")

# 수정코드

In [None]:
# 특정 폴더의 모든 파일 가져오기
def get_all_files(directory):
    all_files = []
    for root, _, files in os.walk(directory):  # 하위 디렉토리 포함 탐색
        for file in files:
            all_files.append(os.path.join(root, file))  # 전체 경로 추가
    return all_files


# 예시 사용
folder_path = '.\\data\\Radiator_grille\\1.Training\\source\\Radiator_grille\\gap'  # 대상 폴더 경로
source_files = get_all_files(folder_path)


# JSON 파일들이 저장된 폴더
label_path = ".\\data\\Radiator_grille\\1.Training\\labeling\\Radiator_grille\\gap"
source_labels = get_all_files(label_path)

# 모델 정의
import torch.nn as nn
import torch.nn.functional as F


class ResNetLikeCNN(nn.Module):
    def __init__(self):
        super(ResNetLikeCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)  # 3x3 Conv
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64 * 4 * 4, 128)  # Fully connected layer
        self.fc2 = nn.Linear(128, 10)  # Output layer (10 classes)

        self.pool = nn.MaxPool2d(2, 2)
        self.batchnorm1 = nn.BatchNorm2d(16)  # Batch Normalization
        self.batchnorm2 = nn.BatchNorm2d(32)
        self.batchnorm3 = nn.BatchNorm2d(64)

    def forward(self, x):
        x = F.relu(self.batchnorm1(self.conv1(x)))
        x = self.pool(x)
        x = F.relu(self.batchnorm2(self.conv2(x)))
        x = self.pool(x)
        x = F.relu(self.batchnorm3(self.conv3(x)))
        x = self.pool(x)
        x = x.view(-1, 64 * 4 * 4)  # Flatten
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


device = "cuda" if torch.cuda.is_available() else "cpu"
model = ResNetLikeCNN()
model.to(device)

import json

# JSON 파일에서 'quality'를 숫자로 변환하는 매핑 예시
quality_to_label = {"불량품": 0, "양품": 1}  # 예시로 매핑, 실제 데이터에 맞게 수정

# 새로운 데이터셋을 저장할 리스트
train_json_dataset = []

for json_file in source_labels:
    # JSON 파일 경로
    json_path = json_file

    # JSON 파일 읽기
    with open(json_path, 'r', encoding='utf-8-sig') as f:
        data = json.load(f)

    # 필요한 데이터 추출
    for annotation in data['annotations']:
        # "work", "part", "quality" 데이터 추출
        work = annotation['attributes'].get('work')
        part = annotation['attributes'].get('part')
        quality = annotation['attributes'].get('quality')

        # 새로운 데이터셋 형식으로 저장
        # train_json_dataset.append({
        #     'work': work,
        #     'part': part,
        #     'quality': quality
        # })

        train_json_dataset.append(quality_to_label[quality])

    # 추출된 데이터를 새로운 JSON 파일로 저장
    # with open('extracted_dataset.json', 'w', encoding='utf-8') as f:
    #     json.dump(train_json_dataset, f, ensure_ascii=False, indent=4)

print("학습 데이터셋이 성공적으로 저장되었습니다!")

import json

# JSON 파일에서 'quality'를 숫자로 변환하는 매핑 예시
quality_to_label = {"불량품": 0, "양품": 1}  # 예시로 매핑, 실제 데이터에 맞게 수정

# 새로운 데이터셋을 저장할 리스트
test_json_dataset = []

for json_file in source_test_labels:
    # JSON 파일 경로
    json_path = json_file

    # JSON 파일 읽기
    with open(json_path, 'r', encoding='utf-8-sig') as f:
        data = json.load(f)

    # 필요한 데이터 추출
    for annotation in data['annotations']:
        # "work", "part", "quality" 데이터 추출
        work = annotation['attributes'].get('work')
        part = annotation['attributes'].get('part')
        quality = annotation['attributes'].get('quality')

        # 새로운 데이터셋 형식으로 저장
        # train_json_dataset.append({
        #     'work': work,
        #     'part': part,
        #     'quality': quality
        # })

        test_json_dataset.append(quality_to_label[quality])

    # 추출된 데이터를 새로운 JSON 파일로 저장
    # with open('extracted_dataset.json', 'w', encoding='utf-8') as f:
    #     json.dump(train_json_dataset, f, ensure_ascii=False, indent=4)

print("테스트 데이터셋이 성공적으로 저장되었습니다!")

# 손실함수 정의
loss_fn = nn.CrossEntropyLoss()
# # 가중치 학습1
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# 가중치 학습2
# optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
# 가중치 학습3
# optimizer = optim.SGD(model.parameters(), lr=0.01,
#                       momentum=0.9, weight_decay=5e-4)
# scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)
# 가중치 학습4
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import transforms
import torch

# 데이터셋 클래스 정의 (커스텀 데이터셋)


class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, label


# 데이터 전처리 (Normalization 및 그레이스케일 변환)
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),  # 그레이스케일로 변환 후 3채널로 확장
    transforms.RandomHorizontalFlip(),  # 데이터 증강 : 이미지 좌우 반전
    transforms.RandomCrop(32, padding=4),  # 데이터 증강 : padding 4 후 32*32로 랜덤 크롭
    transforms.ToTensor(),  # 텐서로 변환
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 정규화
])

# CustomDataset으로 데이터셋 만들기
train_dataset = CustomDataset(
    image_paths=source_files, labels=train_json_dataset, transform=transform)
# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = CustomDataset(
    image_paths=source_test_files, labels=test_json_dataset, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

from PIL import Image, ImageFile, UnidentifiedImageError
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# 손상된 이미지를 처리하도록 설정
ImageFile.LOAD_TRUNCATED_IMAGES = True

# 손상된 이미지가 있으면 건너뛰기
def safe_load_image(image_path):
    try:
        image = Image.open(image_path)
        image.load()  # 이미지가 제대로 로드되는지 확인
        return image
    except (OSError, UnidentifiedImageError, ValueError):
        print(f"Skipping corrupted image: {image_path}")
        return None  # 손상된 이미지는 None 반환

# 데이터셋 클래스 정의 (커스텀 데이터셋)
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.transform = transform
        self.data = []

        for img_path, label in zip(image_paths, labels):
            # 이미지 로드 및 유효성 확인
            image = safe_load_image(img_path)
            if image is not None:
                self.data.append((img_path, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, label = self.data[idx]

        # 이미지를 다시 안전하게 로드
        image = safe_load_image(image_path)
        if self.transform:
            image = self.transform(image)

        return image, label

# 모델 학습
epochs = 5
for epoch in range(epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):  # train_loader에서 가져오기
        # 손상된 이미지가 반환된 경우 건너뛰기
        if inputs is None:
            continue
        
        # 입력 데이터와 레이블을 device로 이동
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)  # 모델에 입력
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # 학습률 스케줄러 업데이트
    # scheduler.step()
    print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

print("학습 완료")


# 테스트하기
correct = 0
total = len(test_loader.dataset)
with torch.no_grad():  # test는 기울기 계산 X
    for (images, labels) in test_loader:
        # 손상된 이미지가 반환된 경우 건너뛰기
        if images is None:
            continue
        
        images, labels = images.to(device), labels.to(device)  # 이미지를 GPU로 이동
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == labels).sum().item()

print(f"Accuracy: {100 * correct / total}%")