# General

In [47]:
import os
import torch
import torchvision
from torchvision import models
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision.transforms import v2
from torch import nn
import random
import cv2

In [2]:
if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

device

'cuda'

In [3]:
current_directory = os.getcwd()
DATA_PATH = os.path.join(current_directory, r'archive\Bridge_Crack_Image\DBCC_Training_Data_Set')
DATA_PATH_TRAIN = os.path.join(current_directory, r'archive\Bridge_Crack_Image\DBCC_Training_Data_Set\train')
DATA_PATH_VAL = os.path.join(current_directory, r"archive\Bridge_Crack_Image\DBCC_Training_Data_Set\val")

In [4]:
class DefectDataset(Dataset):

    def __init__(self, img_dir, label_dir, transform) -> None:
        super().__init__()
        self.transform = transform
        self.img_paths = []
        self.img_labels = []
        with open(label_dir) as a:
            for line in a:
                name, label = line.replace('\n', '').split(' ')
                self.img_paths.append(os.path.join(img_dir, name))
                self.img_labels.append(int(label))


    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index) -> tuple:
        img_path = self.img_paths[index]
        image = cv2.imread(img_path)

        if self.transform:
            image = self.transform(image)

        label = self.img_labels[index]

        return (image, label)

In [5]:
transform = v2.Compose([
    v2.ToTensor(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])



In [6]:
from torch.utils.data import random_split

batch_size = 64
dataset = DefectDataset(DATA_PATH_TRAIN, DATA_PATH + r'\train.txt', transform)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
val_dataset = DefectDataset(DATA_PATH_VAL, DATA_PATH + r'\val.txt', transform)
val_loader = DataLoader(val_dataset, batch_size=180)

In [9]:
def val(model, val_loader):
    model.eval()
    tp = 0
    total = 0

    for x,y in val_loader:
        y = y.to(device)
        pre = model(x.to(device))
        pred = torch.Tensor([1 if i>0.5 else 0 for i in pre])
        tp += int(torch.eq(torch.Tensor(pred),y.to('cpu')).sum())
        total +=len(pred)
    
    return tp/total

def train(dataloader, model, loss_fn, op_fn, epoch):
    for ep in range(epoch):
        for step, (x, y) in enumerate(dataloader):
            (x, y) = (x.to(device), y.float().to(device))
            y_pred = torch.flatten(model.forward(x))
            y_pred = y_pred.to(device)
            #print('y_pred', y_pred)
            #print('y', y)
            loss = loss_fn(y_pred, y)

            op_fn.zero_grad()
            loss.backward()
            op_fn.step()

            if step%100 == 0:
                loss, current = loss.item(), (ep+1)*(step+1)*batch_size
                print(f"loss = {loss}, samples = {current}")

        acc = val(model, val_loader)
        print(f'accuracy {acc}:.2f')

# 1. R-CNN

In [9]:
model = models.resnet50(pretrained=True)
for param in model.parameters():
    param.requires_grad = False
model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, 512), 
    nn.ReLU(), 
    nn.Linear(512, 64), 
    nn.ReLU(), 
    nn.Linear(64, 1), 
    nn.Sigmoid()
    )
for param in model.fc.parameters():
    param.requires_grad = True
#print(model)
resnet = model.to(device)

loss_function = torch.nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001,)

In [87]:
train(train_loader, resnet, loss_function, optimizer, 20)

torch.Size([64, 3, 16, 16])
loss = 0.7078182697296143, samples = 64
torch.Size([64, 3, 16, 16])
torch.Size([64, 3, 16, 16])
torch.Size([64, 3, 16, 16])
torch.Size([64, 3, 16, 16])
torch.Size([64, 3, 16, 16])
torch.Size([64, 3, 16, 16])
torch.Size([64, 3, 16, 16])
torch.Size([64, 3, 16, 16])
torch.Size([64, 3, 16, 16])


KeyboardInterrupt: 

In [10]:
predicts = []
labels = []

for x,y in test_loader:
    y = y.to(device)
    pre = resnet(x.to(device))
    pred = [1 if i>0.5 else 0 for i in pre]
    predicts.append(pred)
    labels.append(y)

  return F.conv2d(input, weight, bias, self.stride,


KeyboardInterrupt: 

In [11]:
a = [i.tolist() for i in labels]

In [12]:
true = 0
count = 0
for x,y in zip(a,predicts):
    for xi,yi in zip(x,y):
        if xi==yi:
            true += 1
        count += 1
print(true/count)

0.23039772727272728


# 2. Autoencoder

In [57]:
#import torch
#from torch import nn, optim
#from torchvision import datasets, transforms
#from torch.utils.data import DataLoader
#import matplotlib.pyplot as plt
#import numpy as np
#from torchvision.datasets.folder import default_loader
import torch.nn.functional as F

In [58]:
batch_size = 64
latent_dim = 64
learning_rate = 0.0001

In [59]:
class DefectDatasetVAE(Dataset):

    def __init__(self, img_dir, label_dir, transform) -> None:
        super().__init__()
        self.transform = transform
        self.img_paths = []
        with open(label_dir) as a:
            for line in a:
                name, label = line.replace('\n', '').split(' ')
                if label == '1':
                    break
                self.img_paths.append(os.path.join(img_dir, name))


    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index) -> tuple:
        img_path = self.img_paths[index]
        image = cv2.imread(img_path)

        if self.transform:
            image = self.transform(image)

        label = self.img_labels[index]

        return image

In [60]:
from torch.utils.data import random_split

batch_size = 64
dataset_vae = DefectDatasetVAE(DATA_PATH_TRAIN, DATA_PATH + r'\train.txt', transform)
train_loader_vae = DataLoader(dataset_vae, batch_size=batch_size)
val_dataset_vae = DefectDatasetVAE(DATA_PATH_VAL, DATA_PATH + r'\val.txt', transform)
val_loader_vae = DataLoader(val_dataset_vae, batch_size=180)
latent_dim = 64

In [61]:
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=4, stride=2, padding=1), # (3,16,16) -> (16,8,8)
            nn.ReLU(),
            nn.Conv2d(16, 64, kernel_size=4, stride=2, padding=1), # (16,8,8) -> (64,4,4)
            nn.ReLU(),
        )
        self.fc_mu = nn.Linear(64*4*4, latent_dim)
        self.fc_logvar = nn.Linear(64*4*4, latent_dim)
        
        self.fc_decode = nn.Linear(latent_dim, 64*4*4)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1), # (64,4,4) -> (32, 8, 8)
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, kernel_size=4, stride=2, padding=1), # (32, 8, 8) -> (3, 16, 16)
            nn.Sigmoid()
        )

    def encode(self, x):
        h = self.encoder(x)
        h = h.view(h.size(0), -1)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h = self.fc_decode(z)
        h = h.view(h.size(0), 64, 4, 4)
        return self.decoder(h)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decode(z)
        return x_recon, mu, logvar

In [62]:
vae = VAE().to(device)

In [63]:
# функция потерь 
def loss_function_vae(recon_x, x, mu, logvar):
    recon_loss = F.mse_loss(recon_x, x, reduction='sum')
    kld_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + kld_loss
# оптимизатор
optimizer_vae = torch.optim.Adam(vae.parameters(), lr=0.001,)

In [64]:
def val_vae(model, val_loader):
    model.eval()
    tp = 0
    total = 0

    for x,y in val_loader:
        y = y.to(device)
        pre = model(x.to(device))
        pred = torch.Tensor([1 if i>0.5 else 0 for i in pre])
        tp += int(torch.eq(torch.Tensor(pred),y.to('cpu')).sum())
        total +=len(pred)
    
    return tp/total

def train_vae(dataloader, model, loss_fn, op_fn, epoch):
    for ep in range(epoch):
        for step, (x, y) in enumerate(dataloader):
            (x, y) = (x.to(device), y.float().to(device))
            x_recon, mu, logvar = model.forward(x)
            loss = loss_fn(x_recon, x, mu, logvar)
            op_fn.zero_grad()
            loss.backward()
            op_fn.step()

            if step%100 == 0:
                loss, current = loss.item(), (ep+1)*(step+1)*batch_size
                print(f"loss = {loss}, samples = {current}")

        acc = val(model, val_loader)
        print(f'accuracy {acc}:.2f')

In [66]:
train_vae(train_loader, vae, loss_function_vae, optimizer_vae, 10)

loss = 7065.75390625, samples = 64
loss = 7242.78759765625, samples = 6464
loss = 7533.87060546875, samples = 12864


KeyboardInterrupt: 

In [114]:
predicts = []
labels = []

for x,y in test_loader:
    x = x.to(device)
    x_recon,_,_ = vae.forward(x)
    diff = abs(x - x_recon)
    sums = diff.view(64, -1).sum(1)
    predicts.append(sums)
    labels.append(y)

In [132]:
preds = []
for i in predicts:
    for j in i:
        if float(j) >= 299:
            preds.append(1)
        else:
            preds.append(0)

In [144]:
labls = []
for i in labels:
    for j in i:
        labls.append(int(j))

In [148]:
a = [i.tolist() for i in labels]
true = 0
count = 0
for x,y in zip(labls,preds):
    if x==y:
        true += 1
    count += 1
print(true/count)

0.932


# 3. YOLO

In [10]:
model_yolo = models.densenet161(pretrained=True)
for param in model_yolo.parameters():
    param.requires_grad = False
model_yolo.fc = nn.Sequential(
    nn.Linear(1000, 512), 
    nn.ReLU(), 
    nn.Linear(512, 64), 
    nn.ReLU(), 
    nn.Linear(64, 1), 
    nn.Sigmoid()
    )
for param in model_yolo.fc.parameters():
    param.requires_grad = True
#print(model)
yolo = model_yolo.to(device)

loss_function = torch.nn.BCELoss()
optimizer = torch.optim.Adam(model_yolo.parameters(), lr=0.001,)

In [32]:
import numpy as np

def train_yolo(dataloader, model, loss_fn, op_fn, epoch):
    for ep in range(epoch):
        for step, (x, y) in enumerate(dataloader):
            temp = torch.zeros((64,2))
            for i, value in enumerate(y):
                temp[i][int(value)] = 1
            y = temp
            print(y.shape)
            (x, y) = (x.to(device), y.float().to(device))
            y_pred = torch.flatten(model.forward(x))
            y_pred = y_pred.to(device)
            #print('y_pred', y_pred)
            #print('y', y)
            loss = loss_fn(y_pred, y)

            op_fn.zero_grad()
            loss.backward()
            op_fn.step()

            if step%100 == 0:
                loss, current = loss.item(), (ep+1)*(step+1)*batch_size
                print(f"loss = {loss}, samples = {current}")

        acc = val(model, val_loader)
        print(f'accuracy {acc}:.2f')

In [33]:
train_yolo(train_loader, yolo, loss_function, optimizer, 20)

torch.Size([64, 2])


RuntimeError: Given input size: (1056x1x1). Calculated output size: (1056x0x0). Output size is too small

In [None]:
predicts_yolo = []
labels_yolo = []

for x,y in test_loader:
    y = y.to(device)
    pre = yolo(x.to(device))
    pred = [1 if i>0.5 else 0 for i in pre]
    predicts_yolo.append(pred)
    labels_yolo.append(y)

In [None]:
a = [i.tolist() for i in labels_yolo]
true_yolo = 0
count_yolo = 0
for x,y in zip(a,predicts_yolo):
    for xi,yi in zip(x,y):
        if xi==yi:
            true_yolo += 1
        count_yolo += 1
print(true_yolo/count_yolo)