In [1]:
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import cv2
import os
import sys
sys.path.append('../../src/spine/model')
sys.path.append('../../src/spine/dataset')
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.metrics import AUC, Recall, Precision

from retinanet import *
from loss import *
from label_encoder import *
from vrt2coco import *

data_dir = '../data/json'

# Load data

# Data generator

In [2]:
label_encoder = LabelEncoder()

class DataGen(Sequence):
    def __init__(self, df, path, input_cols, target_col, mode=None, batch_size=8, shuffle=True, img_size=640):
        self.df = df
        self.shuffle = shuffle
        self.batch_size = batch_size
        self.img_size = img_size
        self.input_cols = input_cols
        self.target_col = target_col
        self.mode = mode
        self.transform = A.Compose([A.HorizontalFlip(),
#                                     A.Rotate(limit=15),
#                                     A.RandomBrightnessContrast(),
            #                        A.RandomScale(0.25),
            #                        A.ChannelShuffle(),
#                                     A.ShiftScaleRotate(),
                                   ])
        self.path = path
        self.mode = mode
        self.on_epoch_end()
    
    def on_epoch_end(self):
        if self.mode == 'train':
            self.indexes = np.arange(len(self.df))
            if self.shuffle:
                np.random.shuffle(self.indexes)
            
    def cropPad(self, img_path):
        img = cv2.imread(os.path.join(self.path, img_path))
        old_size = img.shape[:2] # old_size is in (height, width) format
        ratio = float(self.img_size)/max(old_size)
        new_size = tuple([int(x*ratio) for x in old_size])
        img = cv2.resize(img, (new_size[1], new_size[0]))

        delta_w = self.img_size - new_size[1]
        delta_h = self.img_size - new_size[0]
        top, bottom = delta_h//2, delta_h-(delta_h//2)
        left, right = delta_w//2, delta_w-(delta_w//2)

        img= cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT)
               
        return img
    
    def __len__(self):
        return int(np.floor(len(self.df) / self.batch_size))
    
    def __classes__(self):
        return self.df[self.target_col].to_numpy()
    
    def __getitem__(self, index):
        batch_size = min(self.batch_size, len(self.df) - index * self.batch_size)
        X_img = np.zeros((batch_size, self.img_size, self.img_size, 3), dtype=int)
        y = np.zeros((batch_size, len(self.target_col)), dtype=int)
        
        X_tab = self.df[index * self.batch_size : (index + 1) * self.batch_size][self.input_cols].values
        patch_path = self.df[index * self.batch_size : (index + 1) * self.batch_size]['img_name']
        
        for i, img_path in enumerate(patch_path):
            img = self.cropPad(img_path)
            if self.mode == 'train' and self.transform != None:
                X_img[i, ] = self.transform(image=img)['image']
            else:
                X_img[i, ] = img
            y[i, ] = self.df[self.df['img_name']==img_path][self.target_col]
        
        return [X_img, X_tab], y

In [None]:
train_df, test_df = train_test_split(df_smile, test_size=0.15, random_state=0)
train_df, val_df = train_test_split(train_df, test_size=0.15, random_state=0)

In [None]:
img_size = 480
train_datagen = DataGen(train_df, data_path, train_cols, target_col, img_size, 'train')
val_datagen = DataGen(val_df, data_path, train_cols, target_col, img_size)
test_datagen = DataGen(test_df, data_path, train_cols, target_col, img_size)

# Train data visualization

In [None]:
(X_img, X_tab), y = train_datagen.__getitem__(0)
print('Image: ', X_img.shape, 'Tabular: ', X_tab.shape, 'Label: ', y.shape)

plt.figure(figsize=(20,20))
for idx, (img, tab, label) in enumerate(zip(X_img, X_tab, y)):
    plt.subplot(2, batch_size/2, idx+1)
    plt.imshow(img[:, :, [2, 1, 0]])
    plt.title('Label: %s'%(target_col[0] if label[0] else 'not ' + target_col[0]))
    plt.axis('off')
plt.tight_layout()

# Model

In [3]:
model_dir = "retinanet/"

num_classes = 1
batch_size = 2

learning_rates = [2.5e-06, 0.000625, 0.00125, 0.0025, 0.00025, 2.5e-05]
learning_rate_boundaries = [125, 250, 500, 240000, 360000]
learning_rate_fn = tf.optimizers.schedules.PiecewiseConstantDecay(boundaries=learning_rate_boundaries, 
                                                                  values=learning_rates)

In [4]:
resnet50_backbone = get_backbone()
loss_fn = RetinaNetLoss(num_classes)
model = RetinaNet(num_classes, resnet50_backbone)

optimizer = tf.optimizers.SGD(learning_rate=learning_rate_fn, momentum=0.9)
model.compile(loss=loss_fn, optimizer=optimizer)

In [5]:
# callbacks_list = [ModelCheckpoint(filepath=os.path.join(model_dir, "weights" + "_epoch_{epoch}"),
#                                   monitor="loss",
#                                   save_best_only=True,
#                                   save_weights_only=True,
#                                   verbose=1,)]

filepath = '../model/spine.epoch{epoch:02d}-loss{val_loss:.2f}.hdf5'
csv_logger = CSVLogger("../model/training_smile_binary.csv", append=True)
tb = TensorBoard(log_dir= "../model/logs", histogram_freq=0, write_graph=True, write_images=True)
checkpoint = ModelCheckpoint(filepath=filepath, monitor=’val_acc’, verbose=1, save_best_only=True, mode=’max’)
callbacks_list = [csv_logger, tb, checkpoint]

# Training

In [None]:
# history = model.fit(train_dataset.take(100),
#                     validation_data=val_dataset.take(50),
#                     epochs=10,
#                     callbacks=callbacks_list,
#                     verbose=1,)

history =  model.fit_generator(generator=train_datagen,
                               steps_per_epoch = train_datagen.__len__() // batch_size,
                               epochs=10,
                               validation_data = val_datagen,
                               validation_steps = val_datagen.__len__() // batch_size,
                               callbacks=callbacks_list,
                               verbose=1)

# Training metrics

In [None]:
plt.figure(figsize=(20,7))
plt.rcParams.update({'font.size': 15})

plt.subplot(1,2,1)
plt.plot(history.history['loss'], label='train_loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.legend()

plt.subplot(1,2,2)
plt.plot(history.history['acc'], label='train_iou')
plt.plot(history.history['val_acc'], label='val_iou')
plt.legend()

plt.tight_layout()

# Evaluation

# Test data visualization

In [None]:
image = tf.keras.Input(shape=[None, None, 3], name="image")
predictions = model(image, training=False)
detections = DecodePredictions(confidence_threshold=0.5)(image, predictions)
inference_model = tf.keras.Model(inputs=image, outputs=detections)

In [None]:
def prepare_image(image):
    image, _, ratio = resize_and_pad_image(image, jitter=None)
    image = tf.keras.applications.resnet.preprocess_input(image)
    return tf.expand_dims(image, axis=0), ratio

val_dataset = tfds.load("coco/2017", split="validation", data_dir="data")
int2str = dataset_info.features["objects"]["label"].int2str

for sample in val_dataset.take(2):
    image = tf.cast(sample["image"], dtype=tf.float32)
    input_image, ratio = prepare_image(image)
    detections = inference_model.predict(input_image)
    num_detections = detections.valid_detections[0]
    class_names = [int2str(int(x)) for x in detections.nmsed_classes[0][:num_detections]]
    visualize_detections(image,
                         detections.nmsed_boxes[0][:num_detections] / ratio,
                         class_names,
                         detections.nmsed_scores[0][:num_detections],)