In [1]:
!mkdir DATA
!unzip -qq {'/content/drive/MyDrive/데이콘/반도체/open.zip'} -d /content/DATA

In [2]:
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

In [3]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from sklearn.ensemble import IsolationForest
from tqdm import tqdm
import random

In [4]:
# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(176) # Seed 고정

In [6]:
# 데이터 로딩 클래스 정의
class CustomDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        """
        Args:
            csv_file (string): csv 파일의 경로.
            transform (callable, optional): 샘플에 적용될 Optional transform.
        """
        self.df = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df['img_path'].iloc[idx]
        image = Image.open('/content/DATA'+img_path[1:])
        if self.transform:
            image = self.transform(image)
        target = torch.tensor([0.]).float()
        return image, target

# 이미지 전처리 및 임베딩
transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_data = CustomDataset(csv_file='/content/DATA/train.csv', transform=transform)
train_loader = DataLoader(train_data, batch_size=32, shuffle=False)

In [7]:
model = models.resnet50(pretrained=True)
model.fc = nn.Linear(in_features=512, out_features=1, bias=True)
model = model.to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)
scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=0.00001)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 152MB/s]


In [14]:
def train(model, train_loader, criterion, optimizer, scheduler, num_epochs=10):
    for epoch in range(num_epochs):
        print('start_',epoch)
        model.train()
        running_loss = 0.0
        running_corrects = 0
        total = 0

        for images, labels in train_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels.view(-1, 1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            predictions = (torch.sigmoid(outputs) > 0.5).float()
            running_corrects += torch.sum(predictions == labels.view(-1, 1)).item()
            total += labels.size(0)

        scheduler.step()

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = running_corrects / total

        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')


In [None]:
# 모델 학습 실행
train(model, train_loader, criterion, optimizer, scheduler, num_epochs=5)

start_ 0


In [26]:
# 사전 학습된 모델 로드
model.eval()  # 추론 모드로 설정

# 특성 추출을 위한 모델의 마지막 레이어 수정
model = torch.nn.Sequential(*(list(model.children())[:-1]))

model.to(device)

# 이미지를 임베딩 벡터로 변환
def get_embeddings(dataloader, model):
    embeddings = []
    with torch.no_grad():
        for images, _ in tqdm(dataloader):
            images = images.to(device)
            emb = model(images)
            embeddings.append(emb.cpu().numpy().squeeze())
    return np.concatenate(embeddings, axis=0)

train_embeddings = get_embeddings(train_loader, model)

100%|██████████| 7/7 [00:02<00:00,  3.30it/s]


In [28]:
train_embeddings.shape

(213, 512)

In [10]:
# type: ignore
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers


def l2_distance(z):
    diff = tf.expand_dims(z, axis=1) - tf.expand_dims(z, axis=0)
    return tf.reduce_sum(diff ** 2, axis=-1)


class PairwiseSimilarity(layers.Layer):
    def __init__(self, sigma=1.0):
        super(PairwiseSimilarity, self).__init__()
        self.sigma = sigma

    def call(self, z):
        return tf.exp(-l2_distance(z) / self.sigma)


class ContextualSimilarity(layers.Layer):
    def __init__(self, k):
        super(ContextualSimilarity, self).__init__()
        self.k = k

    def call(self, z):
        distances = l2_distance(z)
        kth_nearst = -tf.math.top_k(-distances, k=self.k, sorted=True)[0][:, -1]
        mask = tf.cast(distances <= tf.expand_dims(kth_nearst, axis=-1), tf.float32)

        similarity = tf.matmul(mask, mask, transpose_b=True) / tf.reduce_sum(mask, axis=-1, keepdims=True)
        R = mask * tf.transpose(mask)
        similarity = tf.matmul(similarity, R, transpose_b=True) / tf.reduce_sum(R, axis=-1, keepdims=True)
        return 0.5 * (similarity + tf.transpose(similarity))


class ReConPatch(keras.Model):
    def __init__(self, input_dim, embedding_dim, projection_dim, alpha, margin=0.1):
        super(ReConPatch, self).__init__()
        self.alpha = alpha
        self.margin = margin

        # embedding & projection layers
        self.embedding = layers.Dense(embedding_dim)
        self.projection = layers.Dense(projection_dim)

        # ema ver of embedding & projection layers
        self.ema_embedding = layers.Dense(embedding_dim, trainable=False)
        self.ema_projection = layers.Dense(projection_dim, trainable=False)

        # initialize layers
        self.embedding.build((None, input_dim))
        self.projection.build((None, embedding_dim))
        self.ema_embedding.build((None, input_dim))
        self.ema_projection.build((None, embedding_dim))

        # ema operator
        self.ema = tf.train.ExponentialMovingAverage(decay=0.9)
        self.update_ema()

        self.pairwise_similarity = PairwiseSimilarity(sigma=1.0)
        self.contextual_similarity = ContextualSimilarity(k=3)

    def call(self, x):
        return self.embedding(x)

    def train_step(self, x):
        h_ema = self.ema_embedding(x)
        z_ema = self.ema_projection(h_ema)

        p_sim = self.pairwise_similarity(z_ema)
        c_sim = self.contextual_similarity(z_ema)
        w = self.alpha * p_sim + (1 - self.alpha) * c_sim

        with tf.GradientTape() as tape:
            h = self.embedding(x)
            z = self.projection(h)

            # Contrastive loss
            distances = tf.sqrt(l2_distance(z) + 1e-9)
            delta = distances / tf.reduce_mean(distances, axis=-1, keepdims=True)
            rc_loss = tf.reduce_sum(tf.reduce_mean(
                w * (delta ** 2) + (1 - w) * (tf.nn.relu(self.margin - delta) ** 2),
                axis=-1
            ))
        # Update weights
        self.optimizer.minimize(rc_loss, self.trainable_variables, tape=tape)
        # Update EMA
        self.update_ema()

        return {"rc_loss": rc_loss}

    def update_ema(self):
        self.ema.apply(self.embedding.weights + self.projection.weights)

        avg_emb_w, avg_emb_b = self.ema.average(self.embedding.weights[0]), self.ema.average(self.embedding.weights[1])
        avg_proj_w, avg_proj_b = self.ema.average(self.projection.weights[0]), self.ema.average(self.projection.weights[1])

        self.ema_embedding.set_weights([avg_emb_w, avg_emb_b])
        self.ema_projection.set_weights([avg_proj_w, avg_proj_b])

In [29]:
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer, f1_score

# Isolation Forest 모델 학습
clf = IsolationForest(random_state=42)
clf.fit(train_embeddings)

In [30]:
# 테스트 데이터에 대해 이상 탐지 수행
test_data = CustomDataset(csv_file='/content/DATA/test.csv', transform=transform)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

test_embeddings = get_embeddings(test_loader, model)
test_pred = clf.predict(test_embeddings)

# Isolation Forest의 예측 결과(이상 = -1, 정상 = 1)를 이상 = 1, 정상 = 0으로 변환
test_pred = np.where(test_pred == -1, 1, 0)

100%|██████████| 4/4 [00:01<00:00,  3.90it/s]


In [32]:
submit = pd.read_csv('/content/DATA/sample_submission.csv')
submit['label'] = test_pred
submit.to_csv('submission.csv', index=False)

# Recon

In [None]:
# ReConPatch 모델 인스턴스화
model = ReConPatch(input_dim=512, embedding_dim=512, projection_dim=256, alpha=1, margin=0.1)
model.compile(optimizer='adam')

# 모델 학습
model.fit(train_dataset, epochs=10)

# 모델 평가 (여기서는 학습 데이터로 간단히 평가)
# 실제 사용 시 별도의 검증 데이터셋을 준비해야 함
print("모델 평가:")
model.evaluate(train_dataset)


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.optimizers import Adam

# 데이터 준비: 여기서는 예제를 위한 임시 데이터를 생성합니다.
# 실제 사용 시에는 적절한 입력 데이터를 준비해야 합니다.
input_dim =  # 예제 입력 차원
embedding_dim = 512  # 임베딩 차원
projection_dim = 32  # 프로젝션 차원

x_train = np.random.random((num_samples, input_dim)).astype(np.float32)

# 모델 인스턴스 생성
model = ReConPatch(input_dim, embedding_dim, projection_dim, alpha=0.5)

# 컴파일러 설정: 옵티마이저, 손실 함수 등
model.compile(optimizer=Adam(learning_rate=1e-3))

# 학습 과정: x_train을 입력과 목표 모두로 사용합니다.
# 이 모델은 자기 자신을 학습하는 self-supervised 모델의 특성을 가집니다.
model.fit(x_train, x_train, epochs=10, batch_size=32)