In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import json
import os
import cv2
import copy
import random
import numpy as np 
import matplotlib.pyplot as plt
import torchvision.transforms as transforms

from PIL import Image, ImageEnhance
from torch.utils.data import DataLoader

IMAGES_PATH = 'drive/MyDrive/challenge/raw'
ANNOTATIONS_PATH = 'drive/MyDrive/challenge/annotations'

class satellite_dataset(
    torch.utils.data.Dataset
    ):

  def __init__(
      self,
      mode
      ):
    
    assert mode == 'train' or mode == 'val' or mode == 'test'
    self.mode = mode
    self.classes = {
        'Background': 0,
        'Buildings': 1,
        'Houses': 2,
        'Sheds/Garages': 3
    }
    
    if self.mode == 'train':

      rows = [0, 1, 2, 3, 4, 5, 6, 7]

    elif self.mode == 'val':

      rows = [8]

    else:

      rows = [0, 1, 2, 3, 4, 5, 6, 7, 8]

    images, polygons, labels = [], [], []

    for row in rows:

      for col in range(9):

        image = os.path.join(IMAGES_PATH, f'{row}_{col}.png')
        
        if os.path.isfile(os.path.join(ANNOTATIONS_PATH, f'{row}_{col}.png-annotated.json')) and mode != 'test':

          polygons_, labels_ = [], []

          with open(os.path.join(ANNOTATIONS_PATH, f'{row}_{col}.png-annotated.json'), 'r') as f:

            data = json.load(f) 

          for i in range(len(data['labels'])):

            label = data['labels'][i]['name']

            for j in range(len(data['labels'][i]['annotations'])): 

              polygon = np.array(data['labels'][i]['annotations'][j]['segmentation'])
              labels_.append(self.classes[label])
              polygons_.append(polygon)

          images.append(image)
          polygons.append(polygons_)
          labels.append(labels_)

        elif mode == 'test':

          images.append(image)

    self.images, self.polygons, self.labels = images, polygons, labels

  def __getitem__(
      self, 
      idx
      ):
    
    image = Image.open(self.images[idx]).convert('RGB')

    newSize = (500, 500)
    oldSize = image.size
    image = image.resize(newSize)
    imageTensor = transforms.PILToTensor()(image).float()

    if self.mode != 'test':

      transform = transforms.Compose([transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
      imageTensor = transform(imageTensor)

      polygons = copy.deepcopy(self.polygons[idx])

      for polygon in polygons:

        polygon[::2] *= newSize[0] / oldSize[0]
        polygon[1::2] *= newSize[1] / oldSize[1]
      
      labels = self.labels[idx]
      imageTensor, polygons = self.augment(imageTensor, polygons)
      boxes, masks = [], []

      for polygon in polygons:

        boxes.append(self.get_box(polygon))
        masks.append(self.get_mask(imageTensor, polygon))

      boxesTensor = torch.tensor(np.array(boxes), dtype=torch.float32)
      labelsTensor = torch.tensor(np.array(labels), dtype=torch.int64)
      masksTensor = torch.tensor(np.array(masks), dtype=torch.uint8)
      imageIDTensor = torch.tensor([idx])
      areaTensor = (boxesTensor[:, 3] - boxesTensor[:, 1]) * (boxesTensor[:, 2] - boxesTensor[:, 0])
      iscrowdTensor = torch.zeros((len(polygons),), dtype=torch.int64)

      target = {
          'boxes': boxesTensor,
          'labels': labelsTensor,
          'masks': masksTensor,
          'image_id': imageIDTensor,
          'area': areaTensor,
          'iscrowd': iscrowdTensor,
          }

      return imageTensor, target

    else:

      transform = transforms.Compose([transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
      imageTensor = transform(imageTensor)

      return imageTensor, image

  def __len__(
      self
      ):

    return len(self.images)

  def augment(
      self,
      image,
      polygons
      ):

    image, polygons = self.horizontal_flip(image, polygons)
    image, polygons = self.vertical_flip(image, polygons)

    return image, polygons

  def horizontal_flip(
      self,
      image,
      polygons
      ):
    
    if random.random() < 0.5:

      image = transforms.RandomHorizontalFlip(p=1)(image)

      for i, polygon in enumerate(polygons):

        polygons[i][::2] = image.numpy().shape[2] - polygon[::2]

    return image, polygons

  def vertical_flip(
      self,
      image,
      polygons
      ):
    
    if random.random() < 0.5:

      image = transforms.RandomVerticalFlip(p=1)(image)

      for i, polygon in enumerate(polygons):

        polygons[i][1::2] = image.numpy().shape[1] - polygon[1::2]

    return image, polygons

  def get_box(
      self, 
      polygon
      ):

    xMin, xMax = min(polygon[::2]), max(polygon[::2])
    yMin, yMax = min(polygon[1::2]), max(polygon[1::2])
    box = [xMin, yMin, xMax, yMax]
    
    return box

  def get_mask(
      self, 
      image, 
      polygon
      ):

    mask = np.zeros((image.numpy().shape[1], image.numpy().shape[2]), dtype=np.uint8)
    xPoints = polygon[::2]
    yPoints = polygon[1::2]
    coordinates = []

    for i, xPoint in enumerate(xPoints):

      coordinates.append([xPoint, yPoints[i]])

    coordinates = np.array(coordinates, dtype=int)
    cv2.fillPoly(mask, [coordinates], color=(1))

    return mask

In [None]:
trainDataset = satellite_dataset(mode='train')
valDataset = satellite_dataset(mode='val')
testDataset = satellite_dataset(mode='test')

In [None]:
def collate_fn(
    batch
    ):
  
  return tuple(zip(*batch))

class data_loader():

    def __init__(
        self,
        trainDataset,
        valDataset
        ):
      
      trainBatchSize, valBatchSize = 4, len(valDataset)
      numWorkers = 1

      self.trainLoader = torch.utils.data.DataLoader(
          trainDataset, 
          batch_size=trainBatchSize,
          shuffle=True,
          drop_last=True,
          num_workers=numWorkers,
          collate_fn=collate_fn
          )

      self.valLoader = torch.utils.data.DataLoader(
          valDataset, 
          batch_size=valBatchSize,
          shuffle=True,
          drop_last=True,
          num_workers=numWorkers,
          collate_fn=collate_fn
          )

In [None]:
! git clone 'https://github.com/pytorch/vision'

In [None]:
import torchvision
import torch.optim as optim
import vision.references.detection.utils as utils
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

class agent():

  def __init__(
      self,
      trainDataset,
      valDataset
      ):

    self.dataLoader = data_loader(trainDataset, valDataset)

    numClasses, hiddenLayer = 4, 512

    model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)
    inFeatures = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(inFeatures, numClasses)
    inFeaturesMask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    model.roi_heads.mask_predictor = MaskRCNNPredictor(inFeaturesMask, hiddenLayer, numClasses)
    self.model = model
    self.model.train()

    if torch.cuda.is_available():

      torch.cuda.manual_seed(1)
      self.device = torch.device('cuda')
      torch.cuda.set_device(0)

    else:

      self.device = torch.device('cpu')
      torch.manual_seed(1)

    lr, weightDecay, betas, eps = 1e-03, 0.0005, (0.9, 0.999), 1e-08
    parameters = [p for p in self.model.parameters() if p.requires_grad]
    self.optimizer = torch.optim.Adam(parameters, lr=lr, betas=betas, eps=eps, weight_decay=weightDecay, amsgrad=False)

    stepSize, gamma = 32, 0.95
    self.lrScheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=stepSize, gamma=gamma)

    self.currentEpoch, self.maxEpoch = 0, 40

  def run(
      self
      ):

    for epoch in range(1, self.maxEpoch + 1):

      self.currentEpoch += 1
      self.trainEpoch()
      self.lrScheduler.step()

  def trainEpoch(
      self,
      printFreq=4
      ):
    
    self.model.to(self.device)
    
    trainLogger = utils.MetricLogger(delimiter="  ")
    trainLogger.add_meter('lr', utils.SmoothedValue(window_size=1, fmt='{value:.6f}'))
    header = 'Epoch: [{}]'.format(self.currentEpoch)

    for images, targets in trainLogger.log_every(self.dataLoader.trainLoader, printFreq, header):
      
      images = list(image.to(self.device) for image in images)
      targets = [{k: v.to(self.device) for k, v in t.items()} for t in targets]

      lossDict = utils.reduce_dict(self.model(images, targets))
      losses = sum(loss for loss in lossDict.values())

      self.optimizer.zero_grad()

      losses.backward()

      self.optimizer.step()

      trainLogger.update(loss=losses, **lossDict)
      trainLogger.update(lr=self.optimizer.param_groups[0]["lr"])

      self.lrScheduler.step()

    with torch.no_grad():

      for images, targets in self.dataLoader.valLoader:

        images = list(image.to(self.device) for image in images)
        targets = [{k: v.to(self.device) for k, v in t.items()} for t in targets]

        valLogger = self.model(images, targets)

        valLossClassifier, valLossBoxReg, valLossMask, valLossObjectness, valLossRpnBoxReg = valLogger['loss_classifier'].item(), valLogger['loss_box_reg'].item(), valLogger['loss_mask'].item(), valLogger['loss_objectness'].item(), valLogger['loss_rpn_box_reg'].item()
        print(f'Epoch: [{self.currentEpoch}]  [Validation]  loss_classifier: {valLossClassifier}  loss_box_reg: {valLossBoxReg}  loss_mask: {valLossMask}  loss_objectness: {valLossObjectness}  loss_rpn_box_reg: {valLossRpnBoxReg}')

In [None]:
myAgent = agent(trainDataset, valDataset)

In [None]:
myAgent.run()
torch.save(myAgent.model.state_dict(), '/content/drive/MyDrive/Colab Notebooks/model.pth')