In [2]:
!git clone https://github.com/Soundboy007/MaskRCNN_training.git

'git' is not recognized as an internal or external command,
operable program or batch file.


In [2]:
#!/usr/bin/python
# -- coding: utf-8 --
import cv2
import datetime
import glob
import matplotlib.pyplot as plt
import numpy as np
import os
import random as r
import skimage.io as io
import string
import torch
import torch.utils.data
import torchvision
import transforms as T
import utils
from engine import train_one_epoch, evaluate
from PIL import Image
from skimage.transform import rotate, AffineTransform, warp
from skimage.util import random_noise
from skimage.filters import gaussian
from torchvision.transforms import ToTensor
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor


class Dataset(torch.utils.data.Dataset):
            
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        # load all image files, sorting them to
        # ensure that they are aligned
        self.imgs = list(sorted(os.listdir(os.path.join(root, "images"))))
        self.masks = list(sorted(os.listdir(os.path.join(root, "masks"))))

    def __getitem__(self, idx):
        # load images ad masks
        img_path = os.path.join(self.root, "images", self.imgs[idx])
        mask_path = os.path.join(self.root, "masks", self.masks[idx])
        img = Image.open(img_path).convert("RGB")
        # note that we haven't converted the mask to RGB,
        # because each color corresponds to a different instance
        # with 0 being background
        mask = Image.open(mask_path)

        mask = np.array(mask)
        # instances are encoded as different colors
        obj_ids = np.unique(mask)
        # first id is the background, so remove it
        obj_ids = obj_ids[1:]

        # split the color-encoded mask into a set
        # of binary masks
        masks = mask == obj_ids[:, None, None]

        # get bounding box coordinates for each mask
        num_objs = len(obj_ids)
        boxes = []
        for i in range(num_objs):
            pos = np.where(masks[i])
            xmin = np.min(pos[1])
            xmax = np.max(pos[1])
            ymin = np.min(pos[0])
            ymax = np.max(pos[0])
            boxes.append([xmin, ymin, xmax, ymax])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # there is only one class
        labels = torch.ones((num_objs,), dtype=torch.int64)
        masks = torch.as_tensor(masks, dtype=torch.uint8)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        # suppose all instances are not crowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transforms is not None:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        return len(self.imgs)


class AnnotationToolBox(object):

    '''
    ANNOTATION CLASS with following functionalities:
    (a) Training: modelTraining;
    (b) Inference: imageMask, dirMask; 
    (c) Data Augmentation: transformImage, transformDir, rotateImage, AffineImage, flipLRImage, flipUDImage, 
        noiseImage, blurImage, zoomImage, dullImage, brightImage, and hueImage;
    (d) Video to Frames: video2frames.
    '''

    def __init__(self):
        self.device = (torch.device('cpu') if torch.cuda.is_available() else torch.device('cpu'))  # Using GPU if available

    
    def modelTraining(self, root):
        """Function that returns a trained model on given dataset

        Input: input a folderPath that contains Images and corresponding Masks in two seperate folders. Please ensure that there are only image files in both the folders
        Output: a model trained for 10 epochs and saved as 'model(current time).pth' in the current directory
        root
            --images
                    --1.png
                    ...
            --masks
                    --1_mask.png
                    ...

        Typical usage example:

        obj = AnnotationToolBox()
        model = obj.modelTraining('dataset/PenFudenDataset')
        """

        # cloning git files containing helper functions for the class
#         !cd $root
#         !git clone https://github.com/Soundboy007/MaskRCNN_training.git
#         !cp MaskRCNN_training/* ../
#         !rm -rf MaskRCNN_training

        for name in glob.glob(root + '/masks/*'):
            mask = Image.open(name)
            mask = np.array(mask)
            (_, mask) = cv2.threshold(mask, 0, 0XFF, cv2.THRESH_BINARY)
            mask //= 255
            cv2.imwrite(name, mask)

        dataset = Dataset(root)

        def get_instance_segmentation_model(num_classes):
            # load an instance segmentation model pre-trained on COCO
            model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

            # get the number of input features for the classifier
            in_features = model.roi_heads.box_predictor.cls_score.in_features
            # replace the pre-trained head with a new one
            model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

            # now get the number of input features for the mask classifier
            in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
            hidden_layer = 256
            # and replace the mask predictor with a new one
            model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                               hidden_layer,
                                                               num_classes)

            return model

        def get_transform(train):
            transforms = []
            # converts the image, a PIL image, into a PyTorch Tensor
            transforms.append(T.ToTensor())
            if train:
                # during training, randomly flip the training images
                # and ground-truth for data augmentation
                transforms.append(T.RandomHorizontalFlip(0.5))
            return T.Compose(transforms)

        # use our dataset and defined transformations
        dataset = Dataset(root, get_transform(train=True))
        dataset_test = Dataset(root, get_transform(train=False))

        # split the dataset in train and test set
        torch.manual_seed(1)
        indices = torch.randperm(len(dataset)).tolist()
        dataset = torch.utils.data.Subset(dataset, indices[:-50])
        dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])

        # define training and validation data loaders
        data_loader = torch.utils.data.DataLoader(
            dataset, batch_size=2, shuffle=True, num_workers=0,
            collate_fn=utils.collate_fn)

        data_loader_test = torch.utils.data.DataLoader(
            dataset_test, batch_size=1, shuffle=False, num_workers=0,
            collate_fn=utils.collate_fn)

        # our dataset has two classes only - background and person
        num_classes = 2

        # get the model using our helper function
        model = get_instance_segmentation_model(num_classes)
        # move model to the right device
        model.to(self.device)

        # construct an optimizer
        params = [p for p in model.parameters() if p.requires_grad]
        optimizer = torch.optim.SGD(params, lr=0.005,
                                    momentum=0.9, weight_decay=0.0005)

        # and a learning rate scheduler which decreases the learning rate by
        # 10x every 3 epochs
        lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                       step_size=3,
                                                       gamma=0.1)

        # let's train it for 10 epochs
        num_epochs = 1

        for epoch in range(num_epochs):
            # train for one epoch, printing every 10 iterations
            train_one_epoch(model, optimizer, data_loader, self.device, epoch, print_freq=10)
            # update the learning rate
            lr_scheduler.step()
            # evaluate on the test dataset
            evaluate(model, data_loader_test, device=self.device)
            
#         !cd ../
#         !rm *

        torch.save(model, 'model' + str(datetime.datetime.now().time()) + '.pth')

        return model
    
    def imageMask(self, modelPath, imagePath):
        """
        Returns Mask of the image using given model
        Input: Path of the model, Path of the image
        Output: Stores Mask alongside the image
        """

        try:
            model = torch.load(modelPath, map_location=self.device)  # Reading the model from path provided
            image = cv2.imread(imagePath)  # Reading the image from path provided

            background = np.zeros(image.shape[:-1])  # Storing background image for mask

            image = ToTensor()(image)
            model.eval()

            with torch.no_grad():
                prediction = model([image.to(self.device)])  # Using model to find human instances in the image

            try:
                foreground = []
                for i in range(len(prediction[0]['masks'])):
                    foreground.append(np.array(Image.fromarray(prediction[0]['masks'
                            ][i, 0].mul(0XFF).byte().cpu().numpy())))

                human_instance = foreground[0]
                for i in foreground[1:]:
                    human_instance = cv2.add(i, human_instance)  # Adding all human instances into the image
            except:

                human_instance = background

            # thresholding the mask to rid of extra segmentation

            human_instance = cv2.merge((human_instance, human_instance,
                    human_instance))
            (_, mask) = cv2.threshold(human_instance, 100, 0XFF,
                    cv2.THRESH_BINARY)

            # writing the image mask in the same folder

            cv2.imwrite(imagePath.split('.')[0] + '_m.png', mask)

            return mask
        
        except:
            None

    def dirMask(self, modelPath, folderPath):
        """
        Returns all masks of images present in the folder
        Input: Path of model, Path of the folder containing images
        Output: Masks of the images stored in the folder
        """

        folderPath += '/*'
        for name in glob.glob(folderPath):
            print('Writing mask for: ' + name)
            self.imageMask(modelPath, name)
            
    def video2frames(self, videoPath):
        """
        Returns every fifth frame of a video and saves into a folder
        Input: path of the video
        Output: video frames saved into 'video_path/videoFrames/' folder
        """

        fg_video = cv2.VideoCapture(videoPath)  # foreground video path here
        numFrames = 0

        path = videoPath.split('.')[0] + '/videoFrames/'
        try:
            os.makedirs(path)
        except OSError:
            print('Creation of the directory %s failed' % path)
        else:
            print('Saving frames into the directory %s' % path)

        while fg_video.isOpened():
            numFrames += 1
            (Dret, Dframe) = fg_video.read()

            if Dret == True and numFrames % 5 == 0:

                # saving every fifth frame of the video

                x = ''.join(r.choice(string.ascii_uppercase
                            + string.ascii_lowercase + string.digits)
                            for _ in range(16))
                cv2.imwrite(path + 'videoFrame_' + x + '.png', Dframe)
                
            elif Dret == False:
                break

        fg_video.release()
        
    def transformImage(self, imagePath, num_transforms=1):
        """
        Returns 'num_transforms' number of images with randomized transforms
        Input: Path of image, number of images
        Output: Returns images with various transforms applied and stores alongside image
        """

        try:
            if type(imagePath) is str:
                image = io.imread(imagePath)
            mid_img = image.copy()

            for i in range(num_transforms):
                if r.choice([1, 0]):  # print('rotate')
                    mid_img = self.rotateImage(mid_img)
                if r.choice([1, 0]):  # print('flipLR')
                    mid_img = self.flipLRImage(mid_img)
                if r.choice([1, 0]):  # print('flipUD')
                    mid_img = self.flipUDImage(mid_img)
                if r.choice([1, 0]):  # print('noise')
                    mid_img = self.noiseImage(mid_img)
                if r.choice([1, 0]):  # print('blur')
                    mid_img = self.blurImage(mid_img)
                if r.choice([1, 0]):  # print('hue')
                    mid_img = self.hueImage(mid_img)

        #         if r.choice([1,0]): mid_img = self.affineImage(mid_img)
        #         if r.choice([1,0]): mid_img = self.zoomImage(mid_img)
        #         if r.choice([1,0]): mid_img = self.dullImage(mid_img)
        #         if r.choice([1,0]): mid_img = self.brightImage(mid_img)

                x = ''.join(r.choice(string.ascii_uppercase
                            + string.ascii_lowercase + string.digits)
                            for _ in range(16))
                io.imsave(imagePath.split('.')[0] + '_' + x + '.png',
                          mid_img)

            return mid_img
        
        except:
            None

    def transformDir(self, folderPath, num_transformations=1):
        """
        Generates 'num_transforms' number of images with randomized transforms for all images in the directory
        Input: Path of folder containing images, num_transformation (number of randomized pictures for one image)
        Output: Returns images with various transforms applied and stores in the folder alongside the images
        """

        folderPath += '/*'
        for name in glob.glob(folderPath):
            print('Randomizing image: ' + name)
            self.transformImage(name, num_transformations)

    def rotateImage(self, image, angle = None):
        
        """Function that returns a rotated image for a given image.

        A rotation between 0 and 360 degrees is applied randomly. The function accepts a NumPy array or an image path as input.

        Typical usage example:

        obj = AnnotationToolBox()
        bar = obj.rotateImage(image)
        test = obj.rotateImage('PNGImages/FudanPed00001.png', 100)
        """

        angle = r.randint(0, 360) if angle is None else angle

        if type(image) is str:
            image = io.imread(image)
            
        rotated = rotate(image, angle=angle, mode='wrap')
        
        return rotated

    def affineImage(self, image, translation = None):

        """Function that returns an affine transform of the input image.

        An affine transform is applied to the image, moving the image across the image's height and width. The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = AnnotationToolBox()
        bar = obj.affineImage(image)
        test = obj.affineImage('PNGImages/FudanPed00001.png', (100, 50))
        """
        
        if type(image) is str:
            image = io.imread(image)
        
        translation = (r.randint(0, image.shape[0]), r.randint(0, image.shape[1])) if translation is None else translation
            
        transform = AffineTransform(translation=translation)
        wrapShift = warp(image, transform, mode='wrap')
        
        return wrapShift

    def flipLRImage(self, image):

        """Function that returns a Left-Right flipped image for a given image. The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = AnnotationToolBox()
        bar = obj.filpLRImage(image)
        """

        if type(image) is str:
            image = io.imread(image)
            
        flipLR = np.fliplr(image)
        
        return flipLR

    def flipUDImage(self, image):

        """Function that returns a flipped upside down image for a given image. The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = AnnotationToolBox()
        bar = obj.flipUDImage(image)
        """

        if type(image) is str:
            image = io.imread(image)
            
        flipUD = np.flipud(image)
        
        return flipUD

    def noiseImage(self, image, sigma = None):

        """Function that returns the input image with added Gaussian noise, with sigma = (0, 0.3) (standard deviation ranging (0, 0.09)). The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = AnnotationToolBox()
        bar = obj.noiseImage(image)
        test = obj.noiseImage('PNGImages/FudanPed00001.png', 0.15)
        """

        sigma = r.uniform(0, 0.3) if sigma is None else sigma
        
        if type(image) is str:
            image = io.imread(image)
            
        noisyRandom = random_noise(image, var=sigma ** 2)
        
        return noisyRandom

    def blurImage(self, image, sigma = None):
        
        """Function that returns the input image with added Gaussian blur (values = (0, 2)). The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = AnnotationToolBox()
        bar = obj.blurImage(image)
        test = obj.blurImage('PNGImages/FudanPed00001.png', 1.5)
        """

        sigma = r.uniform(0, 2) if sigma is None else sigma
        
        if type(image) is str:
            image = io.imread(image)
            
        blurred = gaussian(image, sigma=sigma,
                           multichannel=True)
        
        return blurred

    def zoomImage(self, image):

        """Function that returns a zoomed image of the input image.

        A random crop of the image is resized to the original size of the image. The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = Annotation()
        bar = obj.zoomImage(image)
        """
        
        if type(image) is str:
            image = io.imread(image)
            
        (x1, x2) = (r.randint(0, image.shape[0] // 2),
                    r.randint(image.shape[0] // 2, image.shape[0]))
        (y1, y2) = (r.randint(0, image.shape[1] // 2),
                    r.randint(image.shape[1] // 2, image.shape[1]))
        zoomed = cv2.resize(image[x1:x2, y1:y2], (image.shape[0],
                            image.shape[1]), cv2.INTER_LINEAR)
        
        return zoomed

    def dullImage(self, image, degree = None):

        """Function that returns a darker image of the input image. The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = AnnotationToolBox()
        bar = obj.dullImage(image)
        test = obj.dullImage('PNGImages/FudanPed00001.png', 80)
        """

        degree = r.randint(0, 100) if degree is None else degree

        if type(image) is str:
            image = io.imread(image)
            
        dullImage = cv2.subtract(image, np.ones_like(image)
                                 * degree)
        
        return dullImage

    def brightImage(self, image, degree = None):

        """Function that returns a lighter image of the input image. The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = AnnotationToolBox()
        bar = obj.brightImage(image)
        test = obj.brightImage('PNGImages/FudanPed00001.png', 80)
        """
        
        degree = r.randint(0, 100) if degree is None else degree

        if type(image) is str:
            image = io.imread(image)
            
        brightImage = cv2.add(image, np.ones_like(image) * degree)
        
        return brightImage

    def hueImage(self, image):

        """Function that returns the input image with a changed hue.

        The [R, G, B] values are flipped to create [B, G, R] pixels to create a hue transform. The function accepts a NumPy array as an input image.

        Typical usage example:

        obj = Annotation()
        bar = obj.brightImage(image)
        """

        if type(image) is str:
            image = io.imread(image)
            
        hueChange = image[..., ::-1]
        
        return hueChange