In [None]:
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import random_split, DataLoader
from sklearn.metrics import roc_auc_score, roc_curve
from sklearn.preprocessing import label_binarize

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("qingyi/wm811k-wafer-map")

print("Path to dataset files:", path)

In [None]:
with open('/root/.cache/kagglehub/datasets/qingyi/wm811k-wafer-map/versions/1/LSWMD.pkl', 'rb') as f:
    raw_data = pd.read_pickle(f)

print(raw_data)

In [None]:
def oversampleData(wafers, defect_type):
  temp_df = wafers.copy()
  temp_df.drop(temp_df.index, inplace = True)
  for index, rows in wafers.loc[wafers['failureType'] == defect_type].iterrows():
    temp_wafer_ud = rows
    temp_wafer_ud['waferMap'] = np.flip(temp_wafer_ud['waferMap'], 0)
    temp_df.loc[len(temp_df)] = temp_wafer_ud

    temp_wafer_lr = rows
    temp_wafer_lr['waferMap'] = np.flip(temp_wafer_lr['waferMap'], 0)
    temp_df.loc[len(temp_df)] = temp_wafer_lr

    temp_wafer_90 = rows
    temp_wafer_90['waferMap'] = np.rot90(temp_wafer_90['waferMap'], 1)
    temp_df.loc[len(temp_df)] = temp_wafer_90

    temp_wafer_180 = rows
    temp_wafer_180['waferMap'] = np.rot90(temp_wafer_180['waferMap'], 1)
    temp_df.loc[len(temp_df)] = temp_wafer_180

    temp_wafer_270 = rows
    temp_wafer_270['waferMap'] = np.rot90(temp_wafer_270['waferMap'], 1)
    temp_df.loc[len(temp_df)] = temp_wafer_270

  return temp_df
  pass

In [None]:
def describeWaferData(wafer_data, classes):
  for classType in classes:
    count = (wafer_data['failureType'] == classType).sum()
    print(classType + ": " + str(count))

In [None]:
def printWaferMap(wafer, prediction = ""):
  # Generate a plot using the binary colormap
  # Wafer area is indicated by grey, defects are indicated in black
  wafer_map = wafer['waferMap']
  plt.imshow(wafer_map, cmap='binary', interpolation='nearest')

  plt.suptitle("Labeled Defect: " + str(wafer['failureType'][0][0]))

  plt.show()
  pass

In [None]:
def testModel(test_loader, defect_classes):
  # Evaluate the model

  with torch.no_grad():
    n_correct = 0
    n_samples = 0
    n_class_correct = [0 for i in range(len(defect_classes))]
    n_class_samples = [0 for i in range(len(defect_classes))]
    for images, labels in test_loader:
      images = images.float().to(device)
      labels = labels.to(device)
      outputs = model(images)

      _, predicted = torch.max(outputs, 1)
      n_samples += labels.size(0)
      n_correct += (predicted == labels).sum().item()

      current_batch_size = labels.size(0)

      for i in range(current_batch_size):
        label = labels[i]
        pred = predicted[i]
        if (label == pred):
          n_class_correct[label] += 1
        n_class_samples[label] += 1

  acc = round(100.0 * n_correct / n_samples, 3)
  print("The test accuracy of the network is: " + str(acc))

  for i in range(len(defect_classes)):
    acc = round(100 * n_class_correct[i]/ n_class_samples[i], 3)
    print(defect_classes[i] + " " + str(acc))

In [None]:
def validateModel(test_loader, defect_classes):
  # Evaluate the model

  with torch.no_grad():
    n_correct = 0
    n_samples = 0
    n_class_correct = [0 for i in range(len(defect_classes))]
    n_class_samples = [0 for i in range(len(defect_classes))]
    for images, labels in val_loader:
      images = images.float().to(device)
      labels = labels.to(device)
      outputs = model(images)

      _, predicted = torch.max(outputs, 1)
      n_samples += labels.size(0)
      n_correct += (predicted == labels).sum().item()

      current_batch_size = labels.size(0)

      for i in range(current_batch_size):
        label = labels[i]
        pred = predicted[i]
        if (label == pred):
          n_class_correct[label] += 1
        n_class_samples[label] += 1

  acc = round(100.0 * n_correct / n_samples, 3)
  print("The validation accuracy of the network is: " + str(acc))

  for i in range(len(defect_classes)):
    acc = round(100 * n_class_correct[i]/ n_class_samples[i], 3)
    print(defect_classes[i] + " " + str(acc))

In [None]:
raw_data.rename(columns={'trianTestLabel': 'trainTestLabel'}, inplace=True)
wafer_data = raw_data[raw_data['trainTestLabel'].str.len() != 0]

columns_to_drop = ['dieSize','lotName','waferIndex','trainTestLabel']

wafer_data = wafer_data.drop(columns = columns_to_drop)
wafer_data.reset_index(drop = True, inplace = True)
print(wafer_data)

In [None]:
defect_classes = ['Center', 'Donut', 'Edge-Loc', 'Edge-Ring', 'Loc', 'Near-full', 'Random', 'Scratch', 'none']
num_classes = len(defect_classes)

In [None]:
print((wafer_data.iloc[0]['waferMap']).shape)
printWaferMap(wafer_data.iloc[20000])

In [None]:
# Print class distribution
describeWaferData(wafer_data, defect_classes)

In [None]:
for idx, i in enumerate(wafer_data["waferMap"]):
  # theoretical maximum size is 212x212, but i don't have enough system ram
  new_size = (120, 120)

  resized_array = F.interpolate(torch.tensor(i).clone().detach().unsqueeze(0).unsqueeze(0).float(), size = new_size, mode = "nearest")
  resized_array = resized_array.squeeze(0).squeeze(0)

  wafer_data.loc[idx, "waferMap"] = np.array(resized_array)

  if (idx % 10000 == 0):
    printWaferMap(wafer_data.iloc[idx])
    print(idx)


In [None]:
for defect_type in defect_classes:
  if defect_type == 'none':
    break
  print(defect_type)
  augmentation = pd.DataFrame(oversampleData(wafer_data, defect_type))
  wafer_data = pd.concat([wafer_data.reset_index(drop=True), pd.DataFrame(augmentation).reset_index(drop=True)], ignore_index=True)

In [None]:
# Assess Data Augmentation
describeWaferData(wafer_data, defect_classes)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyperparameters
num_epochs = 150
batch_size = 64
learning_rate = 0.001

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, dataframe):
        self.wafer_data = dataframe
        self.waferMap = dataframe["waferMap"]
        self.failureType = dataframe["failureType"]

    def __len__(self):
        return len(self.waferMap)

    def __getitem__(self, idx):
        waferMap = self.waferMap.iloc[idx]
        failureType = self.failureType.iloc[idx]
        failureType = np.array(failureType)
        waferMap = np.array(waferMap)

        failureType_mapping = {'Center': 0, 'Donut' : 1, 'Edge-Loc' : 2, 'Edge-Ring' : 3, 'Loc' : 4, 'Near-full' : 5, 'Random' : 6, 'Scratch' : 7, 'none' : 8}
        failureType = failureType_mapping[failureType[0][0]]

        return waferMap, failureType

In [None]:
print(len(wafer_data))
custom_wafer_data = CustomDataset(wafer_data)
print(custom_wafer_data.__len__())

In [None]:
train_size = int(0.8 * len(custom_wafer_data))
val_size = int(0.1 * len(custom_wafer_data))
test_size = len(custom_wafer_data) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(custom_wafer_data, [train_size, val_size, test_size])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size = batch_size, shuffle = True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = batch_size, shuffle = False)

In [None]:
class ConvNet(nn.Module):
  def __init__(self):
    super(ConvNet, self).__init__()
    self.conv1 = nn.Conv2d(1, 16, 5)
    self.pool = nn.MaxPool2d(2,2)
    self.conv2 = nn.Conv2d(16, 32, 5)
    self.fc1 = nn.Linear(23328, 512)
    self.fc2 = nn.Linear(512, 128)
    self.fc3 = nn.Linear(128, len(defect_classes))
    pass

  def forward(self, x):
    x = x.unsqueeze(1)
    x = self.pool(F.relu(self.conv1(x)))
    x = self.pool(F.relu(self.conv2(x)))
    x = x.view(x.size(0), -1)
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = self.fc3(x)
    return x
    pass

In [None]:
model = ConvNet().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)

In [None]:
training_steps = len(train_loader)
for epoch in range(num_epochs):
  print("Current Epoch: " + str(epoch + 1))
  if((epoch + 1) % 10 == 0):
    validateModel(val_loader, defect_classes)
  for i, (images, labels) in enumerate(train_loader):
    images = images.float().to(device)
    labels = labels.to(device)

    outputs = model(images)
    loss = criterion(outputs, labels)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

print("Training Complete")

In [None]:
testModel(test_loader,defect_classes)

In [None]:
# Get predictions and true labels for the test set
y_true = []
y_scores = []
with torch.no_grad():
    for images, labels in test_loader:
        images = images.float().to(device)
        labels = labels.to(device)
        outputs = model(images)
        y_true.extend(labels.cpu().numpy())
        y_scores.extend(outputs.cpu().numpy())

y_scores = np.array(y_scores)

# Binarize the true labels for multi-class AUROC calculation
y_true_bin = label_binarize(y_true, classes=list(range(len(defect_classes))))

# Compute macro-average AUROC
auroc_macro = roc_auc_score(y_true_bin, y_scores, average='macro', multi_class='ovr')
print(f"Macro-average AUROC: {auroc_macro}")

# Compute and plot ROC curves for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(len(defect_classes)):
    fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_scores[:, i])
    roc_auc[i] = roc_auc_score(y_true_bin[:, i], y_scores[:, i])

# Plot ROC curves
plt.figure()
for i in range(len(defect_classes)):
    plt.plot(fpr[i], tpr[i], label=f'{defect_classes[i]}')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves for Wafer Defect Classification')
plt.legend(loc="lower right")
plt.show()