# Lego CV Recognizer

In [1]:
from __future__ import print_function
import numpy as np
import cv2 

import mxnet as mx
from mxnet import nd, autograd, gluon, optimizer
from mxnet.image import color_normalize

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline


### Data

In [None]:
mFig = mpimg.imread('datasets/minifigure/')
bricks = mpimg.imread('datasets/bricks/')

In [None]:
plt.imshow(mFig)
plt.show()

In [None]:
ctx = mx.gpu()

In [None]:
num_outputs = 1
batch_size = 64

train_data = mx.io.ImageRecordIter(path_imgrec='lego_train.rec',
                                   min_img_size=512,
                                   data_shape=(3, 512, 512),
                                   rand_crop=False,
                                   shuffle=True,
                                   batch_size=batch_size,
                                   max_rotation=15,
                                   rand_mirror=True)

test_data = mx.io.ImageRecordIter(path_imgrec='lego_val.rec',
                                 min_img_size=512,
                                 data_shape=(3, 512, 512),
                                 batch_size=batch_size)

train_data.reset()
test_data.reset()

In [None]:
batch = train_data.next()
data = batch.data[0]
print(data.shape)
for i in range(4):
    plt.subplot(1,4,i+1)
    plt.imshow(data[i].asnumpy().astype(np.uint8).transpose((1,2,0)))

### CNN

In [None]:
net = gluon.nn.Sequential()
with net.name_scope():
    #  First convolutional layer
    net.add(gluon.nn.Conv2D(channels=96, kernel_size=11, strides=(4,4), activation='relu'))
    net.add(gluon.nn.MaxPool2D(pool_size=3, strides=2))    
    #  Second convolutional layer
    net.add(gluon.nn.Conv2D(channels=192, kernel_size=5, activation='relu'))
    net.add(gluon.nn.MaxPool2D(pool_size=3, strides=(2,2)))            
    # Third convolutional layer
    net.add(gluon.nn.Conv2D(channels=384, kernel_size=3, activation='relu'))
    # Fourth convolutional layer
    net.add(gluon.nn.Conv2D(channels=384, kernel_size=3, activation='relu')) 
    # Fifth convolutional layer
    net.add(gluon.nn.Conv2D(channels=256, kernel_size=3, activation='relu'))
    net.add(gluon.nn.MaxPool2D(pool_size=3, strides=2))    
    # Flatten and apply fullly connected layers
    net.add(gluon.nn.Flatten())
    net.add(gluon.nn.Dense(4096, activation="relu"))
    net.add(gluon.nn.Dense(4096, activation="relu"))
    net.add(gluon.nn.Dense(num_outputs, activation="sigmoid"))

In [None]:
net.collect_params().initialize(mx.init.Xavier(magnitude=2.24), ctx=ctx)

In [None]:
sigmoid_binary_cross_entropy = gluon.loss.SigmoidBinaryCrossEntropyLoss(from_sigmoid=True)
logisitc_loss = gluon.loss.LogisticLoss(label_format='binary')

In [None]:
trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.0001})

In [None]:
metric = mx.metric.create(['rmse'])

def evaluate(net, data_iter, ctx):
    data_iter.reset()
    for batch in data_iter:
        data = gluon.utils.split_and_load(batch.data[0], ctx_list=ctx, batch_axis=0)
        label = gluon.utils.split_and_load(batch.label[0], ctx_list=ctx, batch_axis=0)
        outputs = []
        for x in data:
            outputs.append(net(x))
        metric.update(label, outputs)
    out = metric.get()
    metric.reset()
    return out


### Train

In [None]:
epochs = 100
smoothing_constant = .01
moving_loss = 0

train_data.reset()

for e in range(epochs):
    train_data.reset()
    for i, batch in enumerate(train_data):
        data = gluon.utils.split_and_load(nd.array(batch.data[0]), ctx_list=[ctx])
        label = gluon.utils.split_and_load(nd.array(batch.label[0]), ctx_list=[ctx])
        
        with autograd.record():
            for x, y in zip(data, label):
                output = net(x)
                loss = sigmoid_binary_cross_entropy(output, y)
        loss.backward()
        trainer.step(64)
        
        curr_loss = nd.mean(loss).asscalar()
        moving_loss = (curr_loss if ((i == 0) and (e == 0)) 
                       else (1 - smoothing_constant) * moving_loss + smoothing_constant * curr_loss)
        
    test_accuracy = evaluate(net, test_data, [ctx])
    train_accuracy = evaluate(net, train_data, [ctx])
    print("Epoch %s. Loss: %s, Train_acc %s, Test_acc %s" % (e, moving_loss, train_accuracy, test_accuracy))

### Test

In [None]:
def get_image(url, show=False):
    # download and show the image
    fname = mx.test_utils.download(url)
    img = cv2.cvtColor(cv2.imread(fname), cv2.COLOR_BGR2RGB)
    if img is None:
         return None
    if show:
         plt.imshow(img)
         plt.axis('off')
    # convert into format (batch, RGB, width, height)
    img = cv2.resize(img, (512, 512))
    img = np.swapaxes(img, 0, 2)
    img = np.swapaxes(img, 1, 2)
    img = img[np.newaxis, :]
    return img