# BINARY CLASSIFICATION USING RESNET-18
## Import Packets

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import sampler

import torchvision.datasets as dset
import torchvision.transforms as T
import torch.nn.functional as F
from PIL import ImageFilter

import numpy as np
import cv2 
from PIL import Image
import matplotlib.pyplot as plt
import pandas as pd
plt.style.use('ggplot')
plt.rcParams['figure.figsize'] = [4, 3]
plt.rcParams['figure.dpi'] = 500

USE_GPU = True
dtype = torch.float32 # We will be using float throughout this tutorial.

if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

# Constant to control how frequently we print train loss.
print_every = 150
print('using device:', device)
print(torch.__version__)

using device: cpu
1.10.0


## Root Path

In [None]:
ROOT = './LHY_data'
data = pd.read_csv(ROOT+'/Oxford_Score_data_two/data_label.csv')
print(data.shape)
data = data.dropna(axis=0)
print(data.shape)

## Generate Training Set and Test Set

edge enhance：head，greater，ilium，acetabulum，junction

find：Y

detal：pubis，ischium

In [38]:
def default_loader(path):
        return Image.open(path).convert('L').filter(ImageFilter.BLUR)
        # 均值滤波器

class TrainingSet(Dataset): 
    def __init__(self, transform=None,target_transform=None, loader=default_loader):

        super(TrainingSet,self).__init__()
        imgs = []
        df = pd.read_csv(ROOT+'/Oxford_Score_data/data_label.csv').dropna(axis=0)
        df = pd.concat([df,df],axis=0)
        for index, row in df.iterrows():
            #id=row['index']
            img=row['index']
            labels=np.array([row['head of femur'],
                             row['greater trochanter'],
                             row['ilium'],
                             row['ischium'],
                             row['acetabulum'],
                             row['junction'],
                             row['pucis'],
                             row['Y']
                             ])
            imgs.append((img,labels))   
        self.imgs = imgs
        self.transform = transform
        self.target_transform = target_transform
        self.loader = loader
        
    def __getitem__(self, index):
        #print(index)
        fn, label = self.imgs[index]
        img = self.loader(ROOT+'/Oxford_Score_data/'+str(int(fn))+'.PNG')
        img = np.array(img)
        if index >= 973:
            img = np.rot90(img,1)
        img = cv2.resize(img, (int(128), int(128)))
        if self.transform is not None:
            img = self.transform(img) 
        return img,label

    def __len__(self):
        return len(self.imgs)

class ValidationSet(Dataset): 
    def __init__(self, transform=None,target_transform=None, loader=default_loader):

        super(ValidationSet,self).__init__()
        imgs = []
        df = pd.read_csv(ROOT+'/Oxford_Score_data/data_label.csv').dropna(axis=0)
        df = pd.concat([df,df],axis=0)
        for index, row in df.iterrows():
            # img=open(ROOT+'\\dataset\\'+str(row['index'])+'.PNG')
            img=row['index']
            labels=np.array([row['head of femur'],
                             row['greater trochanter'],
                             row['ilium'],
                             row['ischium'],
                             row['acetabulum'],
                             row['junction'],
                             row['pucis'],
                             row['Y']
                             ])
            imgs.append((img,labels))   
  
        self.imgs = imgs
        self.transform = transform
        self.target_transform = target_transform
        self.loader = loader  
        
    def __getitem__(self, index):
        #print(index)
        fn, label = self.imgs[index]
        img = self.loader(ROOT+'/Oxford_Score_data/'+fn)
        img = np.array(img)
        img = cv2.resize(img, (int(128), int(128)))
        if index >= 973:
            img = np.rot90(img,1)
        if self.transform is not None:
            img = self.transform(img) 
        return img,label

    def __len__(self):
        return len(self.imgs)

In [None]:
train_data=TrainingSet(transform=T.ToTensor())
val_data=ValidationSet(transform=T.ToTensor())
train_data.__getitem__(1100)


In [None]:
img, label = train_data.__getitem__(1100)
img = np.array(img)
plt.imshow(img.squeeze(), cmap=plt.get_cmap('gray'))

In [41]:
#图片缺失 718，722
loader_train = DataLoader(train_data, batch_size=5, sampler=sampler.SubsetRandomSampler(list(range(700))+(list(range(976,1800)))))
loader_val = DataLoader(train_data, batch_size=1, sampler=sampler.SubsetRandomSampler(list(range(700,976))))

In [42]:
print(len(loader_train))
print(len(loader_val))

305
276


## Check Accuracy

In [44]:
def check_accuracy(loader, model):
    #print('Check accuracy on validation set...')
    num_samples = 0
    model.eval()  # set model to evaluation mode
    with torch.no_grad():
        error = 0
        for x, y in loader:
            x = x.to(device=device, dtype=dtype)  # move to device, e.g. GPU
            y = y.to(device=device, dtype=dtype)
            pred = model(x)
            pred.to(device=device, dtype=dtype)
            #print(np.array(pred.cpu()))
            #print(np.array(y.cpu()))
            #error += np.sum(np.power(np.array(abs(np.round(pred.cpu(),0) - y.cpu()).cpu()),2))
            error += np.sum(np.array(abs(np.round(pred.cpu(),0) - y.cpu()).cpu()))
            num_samples += 1
        avg_error = error / num_samples
        #print('Average L1 error:', avg_error.item(), "on", num_samples, "validation images.")
    return avg_error

## Training and Loss Function

In [45]:
def train(model, optimizer, epochs=1, print_every=10):
    model = model.to(device=device)  # move the model parameters to CPU/GPU
    epoch_loss = 0
    for e in range(epochs):
        for t, (x, y) in enumerate(loader_train):
            model.train()  # put model to training mode
            x = x.to(device=device, dtype=dtype)  # move to device, e.g. GPU
            y = y.to(device=device, dtype=dtype)
            scores = model(x)
            loss = F.mse_loss(scores,y)
            epoch_loss += loss
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            #if t % print_every == 0:
               #print("Iteration",t , "loss =", loss.item())
    #print("Average loss:", epoch_loss.item() / len(loader_train))
    return epoch_loss.item() / len(loader_train)

## The ResNet-18 Structure

In [46]:
model = None
optimizer = None

def flatten(x):
    N = x.shape[0] # read in N, C, H, W
    return x.view(N, -1)  # "flatten" the C * H * W values into a single vector per image

class Flatten(nn.Module):
    def forward(self, x):
        return flatten(x)

class Residual(nn.Module):
    def __init__(self, in_channel, out_channel, use_1x1_conv=False, strides=1):
        super().__init__()

        self.conv_1 = nn.Conv2d(in_channel, out_channel, (3,3), padding=1, stride=strides)
        nn.init.kaiming_normal_(self.conv_1.weight)
        self.conv_2 = nn.Conv2d(out_channel, out_channel, (3,3), padding=1)
        nn.init.kaiming_normal_(self.conv_2.weight)
        if use_1x1_conv:
            self.conv_3 = nn.Conv2d(in_channel, out_channel, (1,1), stride=strides)
            nn.init.kaiming_normal_(self.conv_3.weight)
        else:
            self.conv_3 = None
        self.use_1x1_conv = use_1x1_conv
        
        self.bn_1 = nn.BatchNorm2d(out_channel)
        self.bn_2 = nn.BatchNorm2d(out_channel)

    def forward(self, x):
        Y = torch.relu((self.bn_1(self.conv_1(x))))
        Y = self.bn_2(self.conv_2(Y))
        if self.conv_3:
            x = self.conv_3(x)
        return torch.relu(Y + x)

model = nn.Sequential(
    # Layer 1 128x128x3 -> 128x128x64
    nn.Conv2d(1, 64, (7,7), stride=1, padding=3),   #inchanel = 1灰度图，3RGB图；
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.MaxPool2d((3,3), stride=1, padding=1)
) 

# Layer 2 & 3 128x128x64 -> 128x128x64
model.add_module('Resnet_block_1_1', Residual(64, 64))

# Layer 4 & 5 128x128x64 -> 128x128x64
model.add_module('Resnet_block_1_2', Residual(64, 64))

# Layer 6 & 7 128x128x64 -> 64x64x128
model.add_module('Resnet_block_2_1', Residual(64, 128, use_1x1_conv=True, strides=2))

# Layer 8 & 9 64x64x128 -> 64x64x128
model.add_module('Resnet_block_2_2', Residual(128, 128))

# Layer 10 & 11 64x64x128 -> 32x32x256
model.add_module('Resnet_block_3_1', Residual(128, 256, use_1x1_conv=True, strides=2))

# Layer 12 & 13 32x32x256 -> 32x32x256
model.add_module('Resnet_block_3_2', Residual(256, 256))

# Layer 14 & 15 32x32x256 -> 16x16x512
model.add_module('Resnet_block_4_1', Residual(256, 512, use_1x1_conv=True, strides=2))

# Layer 16 & 17 16x16x512 -> 16x16x512
model.add_module('Resnet_block_4_2', Residual(512, 512))

# 16x16x512 -> 1x1x512
model.add_module('avgpool', nn.AdaptiveAvgPool2d((1, 1)))               
# 1x1x512 -> 512
model.add_module('flatten', nn.Flatten())                 
# Layer 18 FC Layer 512 -> 10
model.add_module('linear', nn.Linear(512, 8))

## Train the Network!

In [None]:
epochs = 100
min_error = 10
error = []
loss = []
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9, nesterov=True)
for epoch in range(epochs):
    #print(f"Epoch {epoch+1} start.")
    #print("\r", end="")
    #print("Progressing Rate: {} %: " .format(np.round(epoch/epochs*100),1), "▋" * (int(epoch/epochs*100/5)), end="")
    tmp_loss = train(model, optimizer, epochs=1, print_every=5)
    print(f"Epoch {epoch+1} ends.")
    loss.append(tmp_loss)
    error.append(check_accuracy(loader_val, model))
    if error[epoch] < min_error:
        min_error = error[epoch]
        print("Saving model...")
        torch.save(model,'model_best.pth')
print("Trained Completed!")

In [111]:
def SaveResult(filtername):
    normal = []
    for i in range(len(error)):
        normal.append([error[i],loss[i]])
    info_normal = pd.DataFrame(normal,columns=['error','loss'])
    info_normal.to_csv('./FilterResult/%s.csv' %(filtername))
    print("Save %s OK!" %(filtername))

Save EDGE_ENHANCE_MORE OK!


In [None]:
SaveResult(filtername='EDGE_ENHANCE_MORE')

In [28]:
result1 = pd.read_csv('./FilterResult/Normal1000.csv')
result2 = pd.read_csv('./FilterResult/Mean1000.csv')

In [None]:
print(result1['loss'].var(),result2['loss'].var())
print(result1['error'].var(),result2['error'].var())

In [None]:
plt.clf()
plt.plot(list(range(100)), result1[['loss']], linewidth=1)
plt.plot(list(range(100)), result2[['loss']], linewidth=1)
font = {'size' : 5}
plt.legend(['Non-Filter','Mean'],fontsize=5,loc='best')
plt.title("Loss.",size=5)
plt.xlabel("Epochs",font)
plt.ylabel("Average Loss",font)
plt.yticks(size=5)
plt.xticks(size=5)
plt.savefig('./FilterResultGraph/Mean Loss1000.jpg')
plt.show()

## Load Existing Models

In [13]:
model = torch.load('model_best.pth')

In [None]:
# Check accuracy on validation set
check_accuracy(loader_val, model)

In [None]:
# Check accuracy on training set
check_accuracy(loader_train, model)

In [None]:
from torchsummary import summary
summary(model, (1,128,128))