In [None]:
import torch
import torchvision
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import os
from pathlib import Path
import matplotlib
import cv2
import random
import torch._dynamo
torch._dynamo.config.suppress_errors = True



In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

In [None]:
from modules import model
from modules import handle_data

model, train_transform = model.create_model()
data = handle_data.create_df()

In [None]:
class_names = [
    "punching_hole", "welding_line", "crescent_gap",
    "water_spot", "oil_spot", "silk_spot",
    "inclusion", "rolled_pit", "crease", "waist folding"
    ]
def show_image(filename:str):
  vals = data[data["filename"] == filename]

  img = plt.imread(f"./defects location for metal surface/images/images/{class_names[vals.class_idx.values[0]]}/{filename}")
  fig, ax = plt.subplots(nrows=1, ncols=1 ,figsize=(6, 6), dpi=200)
  ax.imshow(img)
  #x, y, w, h = vals["xmin"].values, vals["ymin"].values, vals["xmax"].values - vals["xmin"].values, vals["ymax"].values - vals["ymin"].values
  #print(x, y, w, h)
  for i in range(len(vals["xmin"].values)):
    rect = matplotlib.patches.Rectangle(
        (vals["xmin"].values[i],  vals["ymin"].values[i]),
        vals["xmax"].values[i] - vals["xmin"].values[i],
        vals["ymax"].values[i] - vals["ymin"].values[i],
        linewidth=1, edgecolor="r", facecolor="none")
    ax.text(*vals.iloc[i][2:4].values, class_names[vals.iloc[i]['class_idx']], verticalalignment="top", color="red", fontsize=7, weight="bold")
    ax.add_patch(rect)
  ax.plot()

In [None]:
show_image("img_01_4406743300_00935.jpg")

In [None]:
class Data():
  def __init__(self, df, IMG_DIR, transforms):
    self.df = df
    self.image_ids = self.df["filename"].unique().tolist()
    self.transforms = transforms
    self.img_dir = IMG_DIR
    self.class_names = [
    "punching_hole", "welding_line", "crescent_gap",
    "water_spot", "oil_spot", "silk_spot",
    "inclusion", "rolled_pit", "crease", "waist folding"
    ]
  def __len__(self):
    return len(self.image_ids)

  def __getitem__(self, idx):
    image_id = self.image_ids[idx] # df
    image_values = self.df[self.df["filename"] == image_id] # df
    try:
      class_dir = image_values.iloc[0]["class_idx"]
      image = cv2.imread(self.img_dir+self.class_names[class_dir]+"/"+image_id, cv2.IMREAD_COLOR)

      image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
      image /=255.0
      boxes = image_values[["xmin", "ymin", "xmax", "ymax"]].to_numpy() # df
      area = (boxes[:,3] - boxes[:, 1] * (boxes[:, 2] - boxes[:, 0]))  # df
      labels = image_values["class_idx"].values # df
      labels = torch.tensor(labels)
      target = {}
      target["boxes"] = boxes
      target["labels"] = labels
      target["image_id"] = torch.tensor([idx])
      target["area"] = torch.as_tensor(area, dtype=torch.float32)
      target["iscrowd"] = torch.zeros(len(class_names), dtype=torch.int64)

      if self.transforms:
        sample = {
            "image": image,
            "bboxes":target["boxes"],
            "labels": labels
        }

        sample = self.transforms(**sample)
        image = sample["image"]
        target["boxes"] = torch.stack(tuple(map(torch.tensor, zip(*sample["bboxes"])))).permute(1, 0)
    except:
      print(self.img_dir+self.class_names[class_dir]+"/"+image_id)
    return torch.tensor(image), target, image_id, self.class_names[class_dir]

In [None]:
def get_train_transform():
  return A.Compose([

      ToTensorV2(p=1.0),

  ], bbox_params={"format": "pascal_voc", "label_fields": ["labels"]})

In [None]:
def get_valid_transform():
  return A.Compose([
      ToTensorV2(p=1.0)
  ], bbox_params={"format": "pascal_voc", "label_fields": ["labels"]})

In [None]:
path = "/content/defects location for metal surface/images/images/"
fcb_dataset = Data(data, path, get_train_transform())

In [None]:
fcb_dataset

In [None]:
type(fcb_dataset[0]), len(fcb_dataset[0]), type(fcb_dataset[0][0]), type(fcb_dataset[0][1]), type(fcb_dataset[0][2])

In [None]:
img, tar, _, class_name = fcb_dataset[random.randint(0, 50)]
bbox = tar["boxes"]
print(class_name)
print(len(bbox), bbox)
fig, ax = plt.subplots(figsize=(6, 6))
ax.imshow(img.permute(1,2,0))
for j in tar["labels"].tolist():
  for i in range(len(bbox)):
    box = bbox[i]
    rect = matplotlib.patches.Rectangle(
      (box[0],  box[1]),
      box[2] - box[0],
      box[3] - box[1],
      linewidth=1, edgecolor="r", facecolor="none")
    ax.text(box[0], box[3], class_name, verticalalignment="top", color="red", fontsize=9, weight="bold")
    ax.add_patch(rect)
  plt.show()

In [None]:
len(data) * 10 / 100

In [None]:
data["class_idx"].loc[1528] = 2
data["class_idx"].loc[2330] = 9

In [None]:
image_ids = data["filename"].unique()
valid_ids = image_ids[-354:]
train_ids = image_ids[:-354]
valid_df = data[data["filename"].isin(valid_ids)]
train_df = data[data["filename"].isin(train_ids)]
train_df.shape, valid_df.shape

In [None]:
def collate_fn(batch):
  return tuple(zip(*batch))

In [None]:
train_dataset = Data(data, path, transforms=get_train_transform()) # for testing use train_df instead of data
valid_dataset = Data(valid_df, path, transforms=get_train_transform())

indices = torch.randperm(len(train_dataset)).tolist()

train_dataloader = DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=False,
    num_workers=os.cpu_count(),
    collate_fn=collate_fn
)

valid_dataloader = DataLoader(
    valid_dataset,
    batch_size=1,
    shuffle=False,
    num_workers=2,
    collate_fn=collate_fn
)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
num_epochs = 10

In [None]:
try:
  from torchinfo import summary
except ModuleNotFoundError:
  !pip install torchinfo
  from torchinfo import summary

summary(model=model,
        input_size=(1, 3, 384 , 384 ), # example of [batch_size, color_channels, height, width]
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"])

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import sys
from tqdm.auto import tqdm
best_epoch = 0
min_loss = sys.maxsize

for epoch in range(num_epochs):
  tk = tqdm(train_dataloader)
  model.train();
  for images, targets, image_ids, _ in tk:
    images = list(image.to(device) for image in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    loss_dict = model(images, targets)

    losses = sum(loss for loss in loss_dict.values())
    loss_value = losses.item()

    optimizer.zero_grad()
    losses.backward()
    optimizer.step()

    tk.set_postfix(train_loss=loss_value)
  tk.close()

  # update the learning rate
  if lr_scheduler is not None:
    lr_scheduler.step()

  print(f"Epoch #{epoch} loss: {loss_value}")

  ## Unnecessary while using full data
  #validation
  # model.eval();
  # with torch.inference_mode():
  #   tk = tqdm(valid_dataloader)

  #   for images, targets, image_ids, _ in tk:
  #     images = list(image.to(device) for image in images)
  #     targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
  #     val_output = model(images)
  #     val_output = [{k: v.to(device) for k, v in t.items()} for t in val_output]
  #     IOU = []
  #     for j in range(len(val_output)):
  #       a,b = val_output[j]['boxes'].cpu().detach(), targets[j]['boxes'].cpu().detach()
  #       chk = torchvision.ops.box_iou(a,b)
  #       res = np.nanmean(chk.sum(axis=1)/(chk>0).sum(axis=1))
  #       IOU.append(res)
  #       tk.set_postfix(IoU=np.mean(IOU))
  #   tk.close()

In [None]:
img,target, file_name, class_name = valid_dataset[5]
# put the model in evaluation mode
fig, ax = plt.subplots(figsize=(6, 6))
ax.set_title(f"Label: {class_name}", color="g")
ax.imshow(img.permute(1,2,0))

model.eval()
with torch.inference_mode():
    prediction = model(img.unsqueeze(dim=0).to(device))[0]




for idx, i in enumerate(prediction["boxes"][:5]):
  if prediction["scores"].cpu().numpy()[idx] > .5:
    box = i.cpu().numpy()
    rect = matplotlib.patches.Rectangle(
        (box[0],  box[1]),
        box[2] - box[0],
        box[3] - box[1],
        linewidth=1, edgecolor="r", facecolor="none")
    ax.text(box[0], box[3], class_names[prediction["labels"].cpu().numpy()[idx]], verticalalignment="top", color="red", fontsize=9, weight="bold")
    ax.add_patch(rect)
plt.show()
print(prediction)

In [None]:
torch.save(model.state_dict(), f="./Interface/DL4MetalSurface.pth")