In [1]:
import json
import numpy as np
from PIL import Image
import torch
from processing_inference import Processor
from model_architecture import get_model_instance_segmentation
import random

In [2]:
# settings for reproducibility
torch.manual_seed(0)
random.seed(0)
np.random.seed(0)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [3]:
# dataset-specific paths
path = './CarDD_release/CarDD_COCO/'
sets = ['train2017', 'test2017', 'val2017'] #inspect all available images
annotations_str ='annotations/instances_'

In [4]:
# processing input images looking for scratch damage associated images
# annotations are expected in COCO format
image_names = []
for data_set in sets:
    target_categories = set()
    target_image_ids = set()
    with open(path+annotations_str+data_set+'.json', 'r') as file:
        curr_json = json.load(file)

    # detect damage ID of scratch data
    [target_categories.add(category['id']) for category in curr_json['categories'] if category['name'].lower() == 'scratch' ]

    # find all images with scratch categories
    for annotation in curr_json['annotations']:
        if annotation['category_id'] in target_categories:
            target_image_ids.add(annotation['image_id'])

    # read images and filter only those that will go to the model
    for image in curr_json['images']:
        if image['id'] in target_image_ids:
            image['file_name'] = data_set+'/'+ image['file_name']
            image_names.append(image['file_name'])

In [5]:
len(image_names)

2121

In [6]:
target_size = [1200, 900] #image size expected by the model
conf_thresh = 0.5 # post-processing confidence threshold
device = 'cuda'
model_name = './model.pth'

In [7]:
# load model weights if there is a trained model
trained_model_found = False
try:
    trained_model = torch.load(model_name, map_location=torch.device(device))
    trained_model_found = True
except:
    print("No model found. Using out-of-box model.")

# define model architecture and use loaded weights (if any)
model = get_model_instance_segmentation(2)
if trained_model_found:
    model.load_state_dict(trained_model['model_state_dict'])
    
model.eval()
model.to(device)

MaskRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
         

In [8]:
#this distribution of the number of predicted scratches is from the actual evaluation dataset
target_dist = [456, 206, 56, 15, 5, 1, 0, 0, 0, 0, 0, 0, 0]

current_dist = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
predictions_num = []
images_to_take = [[], [], [], [], [], [], [], [], [], [], [], [], []]

In [9]:
# create distribution from the current input dataset
with torch.no_grad():
    for image_name in image_names:
        img = Image.open(path+image_name)
        processor = Processor([img], target_size, device = device)
        output = model(processor.images)
        # filter out predictions based on the confidence threshold
        output = processor.filter_outputs(output, conf_thresh)
        
        # count images with a certain number of detections and note the images responsible for those detections
        predicted_scratches = len(output[0]['scores'])
        current_dist[predicted_scratches] += 1
        predictions_num.append(predicted_scratches)
        images_to_take[predicted_scratches].append(image_name)

In [10]:
current_dist

[809, 589, 363, 199, 81, 38, 22, 11, 4, 2, 1, 0, 2]

In [11]:
# find how many samples per bin we can take to preserve the actual target distribution 
# with the reference point being the bin with the highest negative difference between the current and target distribution
ratio = np.array([current_dist[i]/target_dist[i] if target_dist[i] > 0 else 0 for i in range(len(current_dist))])
min_ratio = np.min(ratio[ratio > 0])
samples = np.round(min_ratio * np.array(target_dist))
samples

array([809., 365.,  99.,  27.,   9.,   2.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.])

In [12]:
final_images_to_take = np.array([])
final_distribution = []
# for each detection number sample the images from a respective bucket of images without replacement
for label in range(len(samples)):
    if int(samples[label]) > 0:
        taken_sample = np.random.choice(np.array(images_to_take[label]), size = int(samples[label]), replace = False)
        final_images_to_take = np.concatenate((final_images_to_take, taken_sample))
    final_distribution.append(int(samples[label]))

In [13]:
# check if the resulting and target distributions are similar
np.array(final_distribution)/sum(np.array(final_distribution))

array([0.61708619, 0.27841342, 0.07551487, 0.02059497, 0.00686499,
       0.00152555, 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        ])

In [14]:
np.array(target_dist)/sum(np.array(target_dist))

array([0.61705007, 0.27875507, 0.07577808, 0.0202977 , 0.0067659 ,
       0.00135318, 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        ])

In [15]:
# number of images proposed for inference
len(final_images_to_take)

1311

In [16]:
# check if the distribution of the taken images matches after predicting with the model
output_dist = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
output_predictions_num = []
with torch.no_grad():
    for image_name in final_images_to_take:
        img = Image.open(path+image_name)
        processor = Processor([img], target_size, device = device)
        output = model(processor.images)
        output = processor.filter_outputs(output, conf_thresh)
        predicted_scratches = len(output[0]['scores'])
        output_dist[predicted_scratches] += 1
        output_predictions_num.append(predicted_scratches)

In [17]:
output_dist

[809, 365, 99, 27, 9, 2, 0, 0, 0, 0, 0, 0, 0]

In [18]:
np.array(output_dist)/sum(np.array(output_dist))

array([0.61708619, 0.27841342, 0.07551487, 0.02059497, 0.00686499,
       0.00152555, 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        ])

In [19]:
# save the txt with image names for inference
file_name = "./data/images_for_inference.txt"

In [20]:
with open('./'+file_name, 'w') as file:
    for image_name in final_images_to_take:
        file.write(image_name + "\n")