<a href="https://colab.research.google.com/github/JuntaoXu/2021hwsz/blob/main/mobilenetv2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')
! pip install torchmetrics

In [None]:
import os, glob, random
random.seed(42)

def save2txt(path, savepath='labels.txt'):
    print(os.path.exists(path))
    with open(savepath, 'w') as t:
        imgfiles = glob.iglob(os.path.join(path, '**/*.tif'), recursive=True)
        for imgfile in imgfiles:
            imgname = os.path.split(imgfile)[-1]
            label = 0 if 'OK' in imgfile else 1
            # label = 0 if imgfile.split('/')[-2] == 'OK' else 1
            t.write(imgname + '\t' + str(label) + '\n')
        t.close()

def split_data(txtpath):
    with open(txtpath, 'r') as t:
        train_data, val_data, test_data = [], [], []
        lines = t.readlines()
        random.shuffle(lines)
        for line in lines:
            p = random.randint(0,9)
            if p == 0: # 8
                val_data.append(line)
            elif p == 1: # 9
                test_data.append(line)
            else:
                train_data.append(line)
        savedir = os.path.split(txtpath)[0]
        with open(os.path.join(savedir, 'traindata1.txt'), 'w') as traintxt:
            for data in train_data:
                traintxt.write(data)
            traintxt.close()
        with open(os.path.join(savedir, 'valdata0.txt'), 'w') as valtxt:
            for data in val_data:
                valtxt.write(data)
            valtxt.close()
        with open(os.path.join(savedir, 'testdata1.txt'), 'w') as testtxt:
            for data in test_data:
                testtxt.write(data)
            testtxt.close()
        t.close()


path = '/content/drive/MyDrive/yema/yema_dataset_bright0215/'
savepath = os.path.join(path, 'labels.txt')
save2txt(path, savepath)
split_data(savepath)

In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms

from torchmetrics.classification import BinaryAccuracy
import os, cv2
import numpy as np
import gc
from PIL import Image
gc.collect()
torch.cuda.empty_cache()

torch.manual_seed(42)


# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters
num_epochs = 200

# Image preprocessing modules
transform = transforms.Compose([
    # transforms.Pad(4),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation((-12, 12)),
    transforms.RandomAdjustSharpness(sharpness_factor=2),
    # transforms.RandomCrop((480, 640), ),
    transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 1.5)),
    transforms.ToTensor(),
    transforms.Normalize(0.45, 0.225)])

val_transform = transforms.Compose([
    # transforms.Pad(4),
    # transforms.RandomHorizontalFlip(),
    # transforms.RandomCrop(32),
    transforms.ToTensor(),
    transforms.Normalize(0.45, 0.225)])



In [None]:
class myDataset(torch.utils.data.Dataset):
    def __init__(self, labels_file, transform):
        labels = {}
        list_IDs = []
        with open(labels_file, 'r') as f:
            lines = f.readlines()
            for line in lines:
                ID, label = line.rstrip('\n').split('\t')
                labels[ID] = int(label)
                list_IDs.append(ID)
            f.close()
        self.labels = labels
        self.list_IDs = list_IDs
        print(self.labels)
        self.data_path = os.path.split(labels_file)[0]
        self.transform = transform

    def __len__(self):
        return len(self.list_IDs)

    def __getitem__(self, index):
        'Generates one sample of data'
        # Select sample
        ID = self.list_IDs[index]
        
        # Load data and get label
        if self.labels[ID] == 0:
            # imgpath = os.path.join(self.data_path, 'OK', ID)
            if os.path.exists(os.path.join(self.data_path, 'OK', '1', ID)):
                imgpath = os.path.join(self.data_path, 'OK', '1', ID)  
            else:
                imgpath = os.path.join(self.data_path, 'OK', '2', ID)

        else:
            imgpath = os.path.join(self.data_path, 'NG', ID)
        X = Image.open(imgpath)
        y = self.labels[ID]
        if self.transform:
            X = self.transform(X)

        return X, y

NameError: ignored

In [None]:
# Generators
# Parameters
params = {'batch_size': 20,
          'shuffle': True,
          'num_workers': 2}

train_path = os.path.join(path, 'traindata1.txt')
val_path = os.path.join(path, 'valdata0.txt')
test_path = os.path.join(path, 'testdata1.txt')
training_set = myDataset(train_path, transform)
train_loader = torch.utils.data.DataLoader(training_set, **params)

validation_set = myDataset(val_path, val_transform)
val_loader = torch.utils.data.DataLoader(validation_set, **params)

test_set = myDataset(test_path, val_transform)
test_loader = torch.utils.data.DataLoader(test_set, **params)

save_model = '/content/drive/MyDrive/yema/models/mv2_0215/'
if not os.path.exists(save_model):
    os.makedirs(save_model)

In [None]:
__all__ = ['MobileNetV2', 'mobilenet_v2']


model_urls = {
    'mobilenet_v2': 'https://download.pytorch.org/models/mobilenet_v2-b0353104.pth',
}


def _make_divisible(v, divisor, min_value=None):
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    :param v:
    :param divisor:
    :param min_value:
    :return:
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

class ConvBNReLU(nn.Sequential):
    def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
        padding = (kernel_size - 1) // 2
        super(ConvBNReLU, self).__init__(
            nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False),
            nn.BatchNorm2d(out_planes),
            nn.ReLU6(inplace=True)
        )

class InvertedResidual(nn.Module):
    def __init__(self, inp, oup, stride, expand_ratio):
        super(InvertedResidual, self).__init__()
        self.stride = stride
        assert stride in [1, 2]

        hidden_dim = int(round(inp * expand_ratio))
        self.use_res_connect = self.stride == 1 and inp == oup

        layers = []
        if expand_ratio != 1:
            # pw
            layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
        layers.extend([
            # dw
            ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim),
            # pw-linear
            nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
            nn.BatchNorm2d(oup),
        ])
        self.conv = nn.Sequential(*layers)

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class MobileNetV2(nn.Module):
    def __init__(self,
                 num_classes=2,
                 width_mult=0.8,
                 inverted_residual_setting=None,
                 round_nearest=8,
                 block=None):
        """
        MobileNet V2 main class
        Args:
            num_classes (int): Number of classes
            width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
            inverted_residual_setting: Network structure
            round_nearest (int): Round the number of channels in each layer to be a multiple of this number
            Set to 1 to turn off rounding
            block: Module specifying inverted residual building block for mobilenet
        """
        super(MobileNetV2, self).__init__()

        if block is None:
            block = InvertedResidual
        input_channel = 32
        last_channel = 1280


        if inverted_residual_setting is None:
            inverted_residual_setting = [
                # t, c, n, s
                [1, 16, 1, 1],
                [6, 24, 2, 2],
                [6, 32, 3, 2],
                [6, 64, 4, 2],
                [6, 96, 3, 1],
                [6, 160, 3, 2],
                [6, 320, 1, 1],
            ]

        # only check the first element, assuming user knows t,c,n,s are required
        if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
            raise ValueError("inverted_residual_setting should be non-empty "
                             "or a 4-element list, got {}".format(inverted_residual_setting))

        # building first layer
        input_channel = _make_divisible(input_channel * width_mult, round_nearest)
        self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
        features = [ConvBNReLU(1, input_channel, stride=2)] # modified from 3 to 1 for grayscale input
        # building inverted residual blocks
        for t, c, n, s in inverted_residual_setting:
            output_channel = _make_divisible(c * width_mult, round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(block(input_channel, output_channel, stride, expand_ratio=t))
                input_channel = output_channel
        # building last several layers
        features.append(ConvBNReLU(input_channel, self.last_channel, kernel_size=1))
        # make it nn.Sequential
        self.features = nn.Sequential(*features)

        # building classifier
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.last_channel, num_classes),
        )

        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

    def _forward_impl(self, x):
        # This exists since TorchScript doesn't support inheritance, so the superclass method
        # (this one) needs to have a name other than `forward` that can be accessed in a subclass
        x = self.features(x)
        x = x.mean([2, 3])
        x = self.classifier(x)
        return x

    def forward(self, x):
        return self._forward_impl(x)

def mobilenet_v2(pretrained=False, progress=True, **kwargs):
    """
    Constructs a MobileNetV2 architecture from
    `"MobileNetV2: Inverted Residuals and Linear Bottlenecks" `_.
    Args:
        pretrained (bool): If True, returns a model pre-trained on ImageNet
        progress (bool): If True, displays a progress bar of the download to stderr
    """
    model = MobileNetV2(**kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls['mobilenet_v2'],
                                              progress=progress)
        model.load_state_dict(state_dict)
    return model

model = mobilenet_v2().to(device)



In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=40, eta_min=0.00001)

# Train the model
total_step = len(train_loader)

# best epoch
best_acc = 0
best_model = model
best_epoch = 0

for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        loss.backward()
        optimizer.step()
        
        
        if (i+1) % 40 == 0:
            print ("Epoch [{}/{}], Step [{}/{}] Loss: {:.4f}"
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
        
    if epoch % 5 == 0:
        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            
            for images, labels in val_loader:
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)
                # _, predicted = torch.max(outputs.data, 1)
                _, predicted = torch.max(outputs.data, 1)
                # print(predicted)
                total += labels.size(0)
                # print(predicted == labels)
                correct += (predicted == labels).sum().item()
                # y_pred = outputs.cpu().detach().numpy()[:, 1]
                # print(y_pred)
                # y_pred = (y_pred >= 0.5).astype(int)
                # print(y_pred)
                # total += labels.size(0)
                # labels = np.array(labels)
                # correct += (y_pred == labels).astype(int).sum().item()
            acc = correct / total
            if acc > best_acc:
                best_acc = acc
                best_epoch = epoch
                best_model = model
                torch.save(model.state_dict(), save_model + 'best.ckpt')

            print('Accuracy of the epoch {} on the val images: {} %'.format(epoch, 100 * correct / total))
            print('Best accuracy is {} %, from epoch {}'.format(100 * best_acc, best_epoch))

        # Save the model checkpoint
        # torch.save(model.state_dict(), save_model + 'epoch{}_mv2.ckpt'.format(epoch))

    # Update learning rate
    scheduler.step()
    




In [None]:
# Test the model
best_model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = best_model(images)
                
        _, predicted = torch.max(outputs.data, 1)
        # print(predicted)
        total += labels.size(0)
        # print(predicted == labels)
        correct += (predicted == labels).sum().item()

    print('Accuracy of the best model on the test images: {} %'.format(100 * correct / total))

# Save the model checkpoint
torch.save(best_model.state_dict(), save_model + 'best_mv2.ckpt')


model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        # print(predicted)
        total += labels.size(0)
        # print(predicted == labels)
        correct += (predicted == labels).sum().item()

    print('Accuracy of the last model on the test images: {} %'.format(100 * correct / total))

# Save the model checkpoint
torch.save(model.state_dict(), save_model + 'last_mv2.ckpt')

Accuracy of the best model on the test images: 97.87798408488064 %
Accuracy of the last model on the test images: 97.87798408488064 %


Load model and test

In [None]:
import shutil



class testDataset(torch.utils.data.Dataset):
    def __init__(self, labels_file, transform):
        labels = {}
        list_IDs = []
        with open(labels_file, 'r') as f:
            lines = f.readlines()
            for line in lines:
                ID, label = line.rstrip('\n').split()
                labels[ID] = int(label)
                list_IDs.append(ID)
            f.close()
        self.labels = labels
        self.list_IDs = list_IDs
        print(self.labels)
        self.data_path = os.path.split(labels_file)[0]
        self.transform = transform

    def __len__(self):
        return len(self.list_IDs)

    def __getitem__(self, index):
        'Generates one sample of data'
        # Select sample
        ID = self.list_IDs[index]
        
        # Load data and get label
        if self.labels[ID] == 0:
            # imgpath = os.path.join(self.data_path, 'OK', ID)
            if os.path.exists(os.path.join(self.data_path, 'OK', '1', ID)):
                imgpath = os.path.join(self.data_path, 'OK', '1', ID)  
            else:
                imgpath = os.path.join(self.data_path, 'OK', '2', ID)
        else:
            imgpath = os.path.join(self.data_path, 'NG', ID)
        X = Image.open(imgpath)
        y = self.labels[ID]
        if self.transform:
            X = self.transform(X)

        return imgpath, X, y

def testModel(model_path, test_labels, save_path):
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    test_set = testDataset(test_labels, val_transform)
    test_loader = torch.utils.data.DataLoader(test_set, **params)

    model = mobilenet_v2().to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        fn, fp = 0, 0
        count0, count1 = 0, 0
        for img_paths, images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            preds = outputs.data.cpu()
            # print(predicted)
            total += labels.size(0)
            # print(predicted == labels)
            correct += (predicted == labels).sum().item()
            for idx, d in enumerate(preds):
                out = np.exp(d)
                out = out/sum(out)
                pred_label = 1 if out[0] < 0.9 else 0
                label = labels[idx].item()
                if label == 0:
                    count0 += 1
                else:
                    count1 += 1
                if label == 0 and label != pred_label:
                    fn += 1
                    
                    print(out)
                elif label == 1 and label != pred_label:
                    print(img_paths[idx])
                    print(out)
                    
                    fp += 1
            # for idx, item in enumerate(torch.eq(predicted, labels)):
            #     if not item:
            #         label = labels[idx].item()
            #         print(img_paths[idx])
            #         print(label)
            #         out = np.exp(preds[idx])
            #         out = out / sum(out)
            #         print(out)
            #         save_img = os.path.join(save_path, str(label))
            #         if not os.path.exists(save_img):
            #             os.makedirs(save_img)
            #         shutil.copy(img_paths[idx], save_img)
                    
        print(fn, fp)
        print(count0, count1)
        # print('Accuracy of the last model on the test images: {} %'.format(100 * correct / total))

model_path = '/content/drive/MyDrive/yema/models/mv2_0215/best_mv2.ckpt'
save_imgs = '/content/drive/MyDrive/yema/test_results/mv2_0215/'
testModel(model_path, test_path, save_imgs)
