## Initialization

In [None]:
import os
import cv2
import numpy as np
from PIL import Image

import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.cross_validation import train_test_split

In [None]:
# Hight and width of the images
IMAGE_SIZE = 100

# 3 channels, Red, Green and Blue
CHANNELS = 3

## Functions

In [None]:
# Read total image from folder
def read_image():
    images = []
    labels = []
    folder_path = '//Users//jun//Desktop//Dataset//br-coins//all//'
    
    for image_name in os.listdir(folder_path):
        image_path = folder_path + image_name
        image_label = int(image_name.split('_')[0])
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        images.append(image)
        labels.append(image_label)
            
    return images, labels


# Extract coins from image
def extract_coins(img):
    # Convert to b&w
    cimg = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    # Find circles on the image
    circles = cv2.HoughCircles(
        cimg, cv2.HOUGH_GRADIENT, 2, 60, param1=300, param2=30, minRadius=30, maxRadius=50)
    
    # Convert to HSV colorspace
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    # Define color range for masking
    lower = np.array([0, 0, 0])
    upper = np.array([255, 255, 90])
    # Apply the mask
    mask = cv2.blur(cv2.inRange(hsv, lower, upper), (10, 10))
    
    frames = []
    radiuses = []
    # If circles were not found
    if circles is None:
        return None, None
    
    for circle in circles[0]:
        center_x = int(circle[0])
        center_y = int(circle[1])
        
        # If center of coin lays in masked coin range
        if not mask[center_y, center_x]:
            continue
        
        # Increase radius by C, circle detector tends to decrease radius
        radius = circle[2] + 3
        
        radiuses.append(radius)
        
        # Coordinates of upper left corner of square
        x = int(center_x - radius)
        y = int(center_y - radius)
        
        # As radius was increased the coordinates, could go out of bounds
        if y < 0:
            y = 0
        if x < 0:
            x = 0
                
        frames.append(img[y: int(y + 2 * radius), x: int(x + 2 * radius)])

    return np.array(frames), radiuses


# Rejoint images to a new one
def image_joint(image_list):
    image_size = 376
    size = 0
    img_dict = {}
    count = 0
    
    # Get the size of the biggest one, resize all images with it
    for img in image_list:
        img_dict[count] = img.shape[0] * img.shape[1]
        count += 1
        if img.shape[0] >= size:
            size = img.shape[0]
        if img.shape[1] >= size:
            size = img.shape[1] 
    
    img_dict = sorted(img_dict.items(), key=lambda item:item[1])
    new_image_list = []
    for key, item in img_dict:
        img = image_list[key]
        if img.shape[0] < size or img.shape[1] < size:
            new_img=Image.new('RGB', (size, size), '#FAFAFA')
            new_img.paste(Image.fromarray(img), box=(0, 0, img.shape[1], img.shape[0]))
            new_image_list.append(np.array(new_img))
        else:
            new_image_list.append(img)
            
    # Joint all single images to new one with original size
    new_img=Image.new('RGB', (image_size, image_size), '#FAFAFA')
    
    for i in range(len(new_image_list)):
        img = new_image_list[i]
        
        if i == 0:
            lt = rt = 0
            lb = size
            rb = size
        else:
            if lb + size <= image_size:
                lt += size
                lb += size 
            else:
                lt = 0
                rt += size
                lb = size
                rb += size
                
        new_img.paste(Image.fromarray(img), box=(lt, rt, lb, rb))
            
    return np.array(new_img)

## Data Preparation

In [None]:
# Read all images fron folder
all_image, all_label = read_image()
all_image = np.array(all_image)
all_label = np.array(all_label)

In [None]:
images = []
labels = []
for i in range(len(all_image)):
    image = all_image[i]
    label = all_label[i]

    # Extract coins from image and rejoint to a new image
    prepared, _ = extract_coins(image)

    if prepared is not None and len(prepared):
        # merge extracted coins back to on image
        image = image_joint(prepared)

        image = cv2.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        images.append(image)
        labels.append(label)

all_image_new = np.array(images)
all_label_new = np.array(labels)

In [None]:
np.save('coins_all_image_new.npy', all_image_new)
np.save('coins_all_label_new.npy', all_label_new)
all_image_new.shape, all_label_new.shape

In [None]:
all_image_new = np.load('coins_all_image_new.npy')
all_label_new = np.load('coins_all_label_new.npy')
all_image_new.shape, all_label_new.shape

In [None]:
# Splitting
# X_train, X_test, Y_train, Y_test = train_test_split(all_image_new, all_label_new, test_size=0.2, random_state=None)
X_train = np.array([])
X_test = np.array([])
Y_train = np.array([])
Y_test = np.array([])
for label in np.unique(all_label_new):        
    index = np.argwhere(all_label_new == label)
    x_train, x_test, y_train, y_test = train_test_split(all_image_new[index[:,0]], all_label_new[index[:,0]], test_size=0.2, random_state=None)
    if len(X_train) == 0:
        X_train = x_train
        X_test = x_test
        Y_train = y_train
        Y_test = y_test
    else:
        X_train = np.concatenate((X_train, x_train), axis=0)
        X_test = np.concatenate((X_test, x_test), axis=0)
        Y_train = np.concatenate((Y_train, y_train), axis=0)
        Y_test = np.concatenate((Y_test, y_test), axis=0)

# Show first 10 images
print(Y_train[0:10])

slice = 10
plt.figure(figsize=(16, 16))
for i in range(slice):
    plt.subplot(1, slice, i+1)
    plt.imshow(X_train[i], interpolation='nearest')
    plt.axis('off')    
    
print(Y_test[0:10])

slice = 10
plt.figure(figsize=(16, 16))
for i in range(slice):
    plt.subplot(1, slice, i+1)
    plt.imshow(X_test[i], interpolation='nearest')
    plt.axis('off')    
    
print(X_train.shape, Y_train.shape, X_test.shape, Y_test.shape)

In [None]:
# Normalization
X_train = X_train.astype(np.float32, copy=False)
X_test = X_test.astype(np.float32, copy=False)

mean = X_train.mean(axis=(0, 1, 2))
std = X_train.std(axis=(0, 1, 2))

X_train -= mean
X_train /= std
X_test -= mean
X_test /= std

In [None]:
np.save('coins_X_train.npy', X_train)
np.save('coins_Y_train.npy', Y_train)
np.save('coins_X_test.npy', X_test)
np.save('coins_Y_test.npy', Y_test)

In [None]:
X_train = np.load('coins_X_train.npy')
Y_train = np.load('coins_Y_train.npy')
X_test = np.load('coins_X_test.npy')
Y_test = np.load('coins_Y_test.npy')

In [None]:
# Flatten data
X_flat_train = X_train.reshape(X_train.shape[0], IMAGE_SIZE*IMAGE_SIZE*CHANNELS)
X_flat_test = X_test.reshape(X_test.shape[0], IMAGE_SIZE*IMAGE_SIZE*CHANNELS)
X_flat_train.shape, X_flat_test.shape, Y_train.shape, Y_test.shape

## PCA

In [None]:
from sklearn.decomposition import PCA
import math

components = []
errors = []

i = 1
while i>0:
    i = i - 0.05
    pca = PCA(i, copy=False, whiten=True) 
    X_flat_train = pca.fit_transform(X_flat_train)
    X_flat_test = pca.transform(X_flat_test)
    
    # Construct grnn
    grnnet = algorithms.GRNN(std=0.5, verbose=True)
    grnnet.train(X_flat_train, Y_train)
    
    Y_pred = grnnet.predict(X_flat_test)
    error = estimators.rmsle(Y_pred, Y_test)
    
    if math.isnan(error):
        error = 1
    components.append("%.2f" % i)
    errors.append("%.3f" % error)

In [None]:
plt.plot(components, errors, color="red", linewidth=2)  
plt.xlabel("PCA Components")  
plt.ylabel("RMSLE")  
plt.title("GRNN RMSLE")   

In [None]:
pca = PCA(0.8, copy=False, whiten=True) 
X_flat_train = pca.fit_transform(X_flat_train)
X_flat_test = pca.transform(X_flat_test)

sum(pca.explained_variance_ratio_)

In [None]:
X_flat_train.shape, X_flat_test.shape, Y_train.shape, Y_test.shape

In [None]:
plt.scatter(X_flat_train[:,0], X_flat_train[:,1], marker='x')

In [None]:
plt.scatter(X_flat_test[:,0], X_flat_test[:,1], marker='x')

## GRNN

In [None]:
from sklearn import datasets, preprocessing
from sklearn.model_selection import train_test_split
from neupy import algorithms, estimators, environment

# Construct grnn
grnnet = algorithms.GRNN(std=0.5, verbose=True)
grnnet.train(X_flat_train, Y_train)

Y_pred = grnnet.predict(X_flat_test)

error = estimators.rmsle(Y_pred, Y_test)
print("GRNN RMSLE = {:.3f}\n".format(error))


In [None]:
error = np.mean(abs(np.array(Y_test) - np.array(Y_pred)))
print("Mean prediction error for images: {0:.2f} cents".format(error))

# Ensemble

In [None]:
from keras.models import load_model

cnn_model = load_model('coins_reg_cnn.final.h5')
cnn_model.summary()

In [None]:
X_train_cnn = cnn_model.predict(X_train)
X_test_cnn = cnn_model.predict(X_test)

In [None]:
X_train_grnn = grnnet.predict(X_flat_train)
X_test_grnn = grnnet.predict(X_flat_test)

In [None]:
# concatenate the two train probability arrays to one as the input of ensemble
X_train_ens = np.concatenate((X_train_cnn, X_train_grnn), axis=1)
X_test_ens = np.concatenate((X_test_cnn, X_test_grnn), axis=1)

In [None]:
np.asarray(Y_train).reshape(len(Y_train)) 

In [None]:
def on_epoch_end(network):
    pred = network.predict(X_test_ens) 
    Y_pred.append(pred)

In [None]:
from neupy.algorithms import GradientDescent
from neupy import algorithms, layers, environment
from sklearn import metrics

Y_pred = []

model_ens = algorithms.MinibatchGradientDescent(
    [layers.Input(2), layers.Tanh(1), layers.Linear(1)],
    error='mse',
    step=0.25,
    shuffle_data=True,
    batch_size=16,
    verbose=True,
    epoch_end_signal=on_epoch_end,
)

In [None]:
model_ens.architecture()

In [None]:
model_ens.train(X_train_ens, Y_train, X_test_ens, Y_test, epochs=20)

In [None]:
error = np.mean(abs(np.array(Y_test) - np.array(Y_pred)))
print("Mean prediction error for ensemble model: {0:.2f} cents".format(error))