### **[INFO] Make sure to run on the GPU runtime type!**

<img src="https://www.kdnuggets.com/wp-content/uploads/colab-settings-1.png" alt="Drawing" style="width: 300px; border: 1px solid red" width=300/>



# Prepare the Data

In [ ]:
<a href="https://colab.research.google.com/github/ialhashim/EmojiNetTutorial/blob/master/EmojiNetTutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

dataset_size = 1000
num_datasets = 1

print('Will generate', dataset_size * num_datasets, 'samples')

In [None]:
!sudo apt-get install imagemagick libmagickwand-dev libmagickcore-6.q16-3-extra
!pip install Wand

In [None]:
%pylab inline

In [None]:
#
# 2) Prepare input and output data folders
#
import os
os.makedirs('emoji', exist_ok=True)
os.makedirs('png', exist_ok=True)
os.makedirs('data', exist_ok=True)

#
# 3) Download Emoji SVG files, create high-res PNGs
#

def emoji_svg_file(emoji_filename):
  return './emoji/'+emoji_filename+'.svg'

def emoji_png_file(emoji_filename):
  return './png/'+emoji_filename+'.png'

def download_emoji_svg(emoji_filename):
  from urllib import request
  import os.path
  emoji_path = 'emoji/'+emoji_filename+'.svg'
  f = open(emoji_path, 'wb')
  f.write(request.urlopen('https://raw.githubusercontent.com/googlefonts/noto-emoji/main/svg/'+emoji_filename+'.svg').read())
  f.close()
  print('Downloaded: ' + emoji_filename)
  
def download_all_emoji(emoji_set):
  for key in emoji_set.keys():
    download_emoji_svg(emoji_set[key])
    
def convert_svg_to_png(emoji_set):
  import os
  for key in emoji_set.keys():
    filename = emoji_set[key]
    cmd = ''
    if os.name == 'nt':
      cmd = 'magick convert -density 384 -background none emoji/'+filename+'.svg png/'+filename+'.png'
      os.system(cmd)
    else:
      cmd = 'convert -density 384 -background none emoji/'+filename+'.svg png/'+filename+'.png'
      os.system(cmd)
    print('Converted ' + filename, cmd)
    
location = {
		'camping': 'emoji_u1f3d5',
		'beach': 'emoji_u1f3d6',
		'desert': 'emoji_u1f3dc',
		'park': 'emoji_u1f3de',
		'factory': 'emoji_u1f3ed',
		'hills': 'emoji_u1f304',
		'city': 'emoji_u1f307',
		'golf': 'emoji_u26f3',
		'night': 'emoji_u1f306'}

actor = {
		'man_walk': 'emoji_u1f6b6_1f3fe_200d_2642',
		'man_run': 'emoji_u1f3c3_1f3fb_200d_2642',
		'man_sport': 'emoji_u1f3cb_1f3fb_200d_2642',
		'man_police': 'emoji_u1f46e_1f3fd_200d_2642',
		'man_hello': 'emoji_u1f64b_1f3fe_200d_2642',
		'man_engineer': 'emoji_u1f468_1f3fc_200d_1f527',
		'man_turban': 'emoji_u1f473_1f3fe_200d_2642',
		'man_suit': 'emoji_u1f935_1f3fe',
		'man_clown': 'emoji_u1f939_1f3fb_200d_2642',
		'man_construction': 'emoji_u1f477_200d_2642',
		'man_doctor': 'emoji_u1f468_1f3ff_200d_2695',
		'women_bicycle': 'emoji_u1f6b4_1f3fb_200d_2640',
		'women_hijab': 'emoji_u1f9d5_1f3fc',
		'women_doctor': 'emoji_u1f469_1f3fc_200d_2695',
		'animal_camel': 'emoji_u1f42a',
		'animal_cat': 'emoji_u1f408',
		'animal_bird': 'emoji_u1f426',
		'animal_tree': 'emoji_u1f333',
		'animal_car': 'emoji_u1f697',
		'animal_bus': 'emoji_u1f68c'}

danger = {
		'danger1': 'emoji_u1f4a3',
		'danger2': 'emoji_u1f52b',
		'danger3': 'emoji_u1f52a',
		'danger4': 'emoji_u2622'}

# Download all Emoji files
download_all_emoji(location)
download_all_emoji(actor)
download_all_emoji(danger)

# Create high res pngs
convert_svg_to_png(location)
convert_svg_to_png(actor)
convert_svg_to_png(danger)

# test resulting PNG
#from google.colab import files
#files.download('png/emoji_u1f697.png')

from random import random
from math import cos, sin, floor, sqrt, pi, ceil

def euclidean_distance(a, b):
    dx = a[0] - b[0]
    dy = a[1] - b[1]
    return sqrt(dx * dx + dy * dy)
  
# https://github.com/emulbreh/bridson
def poisson_disc_samples(width, height, r, k=5, distance=euclidean_distance, random=random):
    tau = 2 * pi
    cellsize = r / sqrt(2)

    grid_width = int(ceil(width / cellsize))
    grid_height = int(ceil(height / cellsize))
    grid = [None] * (grid_width * grid_height)

    def grid_coords(p):
        return int(floor(p[0] / cellsize)), int(floor(p[1] / cellsize))

    def fits(p, gx, gy):
        yrange = list(range(max(gy - 2, 0), min(gy + 3, grid_height)))
        for x in range(max(gx - 2, 0), min(gx + 3, grid_width)):
            for y in yrange:
                g = grid[x + y * grid_width]
                if g is None:
                    continue
                if distance(p, g) <= r:
                    return False
        return True

    p = width * random(), height * random()
    queue = [p]
    grid_x, grid_y = grid_coords(p)
    grid[grid_x + grid_y * grid_width] = p

    while queue:
        qi = int(random() * len(queue))
        qx, qy = queue[qi]
        queue[qi] = queue[-1]
        queue.pop()
        for _ in range(k):
            alpha = tau * random()
            d = r * sqrt(3 * random() + 1)
            px = qx + d * cos(alpha)
            py = qy + d * sin(alpha)
            if not (0 <= px < width and 0 <= py < height):
                continue
            p = (px, py)
            grid_x, grid_y = grid_coords(p)
            if not fits(p, grid_x, grid_y):
                continue
            queue.append(p)
            grid[grid_x + grid_y * grid_width] = p
    return [p for p in grid if p is not None]

# Draw a gradient
def interpolate_color(minval, maxval, val, color_palette):
  max_index = len(color_palette)-1
  v = float(val-minval) / float(maxval-minval) * max_index
  i1, i2 = int(v), min(int(v)+1, max_index)
  (r1, g1, b1), (r2, g2, b2) = color_palette[i1], color_palette[i2]
  f = v - i1
  return int(r1 + f*(r2-r1)), int(g1 + f*(g2-g1)), int(b1 + f*(b2-b1))

def draw_vt_gradient(draw, rect, color_func, color_palette):
  (max_x, max_y) = rect
  minval, maxval = 1, len(color_palette)
  delta = maxval - minval
  for y in range(0, max_y+1):
    f = y / float(max_y)
    val = minval + f * delta
    color = color_func(minval, maxval, val, color_palette)
    draw.line([(0, y), (max_x, y)], fill=color)

# Create a sky graident image
def create_sky_gradient_image(size):
  from PIL import Image, ImageDraw
  BLUE, WHITE, WHITE = ((28, 146, 210), (255, 255, 255), (255, 255, 255))
  image = Image.new("RGB", size)
  draw = ImageDraw.Draw(image)
  draw_vt_gradient(draw, size, interpolate_color, [BLUE, WHITE, WHITE])
  return image

# Generate random locations filled with random actors
def generate_random_location(locations, actors, dangers, count, s = 126):
  import time
  t = time.time()

  from PIL import Image, ImageDraw, ImageFont
  import random    

  # Cache backgrounds
  cached_locations = []
  for key in locations.keys():
    bg = Image.open(emoji_png_file(locations[key]))
    cached_locations.append(bg)
    cached_locations.append(bg.transpose(Image.FLIP_LEFT_RIGHT))

  # Generate a background
  backgrounds = []
  for i in range(count):
    margin = int(s*0.1)
    location_img = cached_locations[random.randint(0, len(cached_locations)-1)]
    location_img = location_img.resize((s+margin,s+margin), Image.BILINEAR)
    img = create_sky_gradient_image((s,s))
    img.paste(location_img,(int(-margin/2),int(-margin/2)),location_img)
    
    backgrounds.append(img)
    
  # Cache actors
  cached_actors = []
  actor_size = int(s * 0.3)
  for key in actors.keys():
    fg = Image.open(emoji_png_file(actors[key])).resize((actor_size,actor_size), Image.BILINEAR)
    cached_actors.append(fg)
    #cached_actors.append(fg.transpose(Image.FLIP_LEFT_RIGHT))
  
  # Generate some nice random sampling coordinates
  samples = []
  for i in range(10):
    samples.append(poisson_disc_samples(s,s,int(s * 0.2)))
  
  # Add actors to the backgrounds
  imgs = []
  for bg in backgrounds:
    positions = samples[random.randint(0, len(samples)-1)]
    for p in positions:
      if p[1] < s * 0.4: continue
      actor_img = cached_actors[random.randint(0, len(cached_actors)-1)]
      bg.paste(actor_img, (int(p[0]-actor_size/2),int(p[1]-actor_size/2)), actor_img)
    imgs.append(bg)
  
  # Load and cache dangerous items
  cached_danger = []
  danger_size = int(actor_size)
  for key in dangers:
    d = Image.open(emoji_png_file(dangers[key])).resize((danger_size,danger_size), Image.BILINEAR)
    cached_danger.append(d)
  
  # Add a dangerous item
  masks = []
  for img in imgs:
    # Get a random location in the bottom half of the screen
    positions = samples[random.randint(0, len(samples)-1)]
    good_positions = []
    for p in positions:
      if p[1] > s * 0.4:
        good_positions.append(p)
    if len(good_positions) == 0: good_positions = positions
    p = good_positions[random.randint(0, len(good_positions)-1)]
    item = cached_danger[random.randint(0, len(cached_danger)-1)]
    
    img.paste(item, (int(p[0]-danger_size/2),int(p[1]-danger_size/2)), item)
    
    # Simulate CCTV footage by drawing some text
    d = ImageDraw.Draw(img)
    d.text((10,10), time.strftime("%Y-%m-%d %H:%M"), fill=(255,255,255,128))
    d.text((10,20), time.strftime("Camera 07"), fill=(255,255,255,128))
    
  
    mask = Image.new("L", (s,s))
    mask.paste((255), (int(p[0]-danger_size/2),int(p[1]-danger_size/2)), item)
    masks.append(mask)
    
  elapsed = time.time() - t
  print("("+str(len(imgs))+") Scenes created in (" + str(round(elapsed,3)) + " s).")

  return imgs, masks

# Test generation function
imgs, masks = generate_random_location(location, actor, danger, 3)

In [None]:
figure(figsize=(8,8)); imshow(imgs[0])
figure(figsize=(8,8)); imshow(masks[0])

In [None]:
# Save to disk
from os.path import join
import h5py
import numpy as np

for di in range(num_datasets):
  h5_dataset_filename = join('data', 'dataset'+f'{di:03}'+'.hdf5')
  print("Creating ["+h5_dataset_filename+"] ...")

  with h5py.File(h5_dataset_filename, "w") as dataset:
    # Generate large dataset
    count = dataset_size
    imgs, masks = generate_random_location(location, actor, danger, count)

    for i in range(0, len(imgs)):
      img, mask = imgs[i], masks[i]
      filename = f'{i:05}'
      dataset.create_dataset(filename + ".rgb", data=np.array(img), compression="gzip", compression_opts=9)
      dataset.create_dataset(filename + ".mask", data=np.array(mask), compression="gzip", compression_opts=9)
      
  print("Done.")

# Train the neural network

In [None]:
#========================================================================================
# Hyperparameters
#========================================================================================
mynetname           = 'emojinet'
resolution          = 128
epochs              = 5
batch_size          = 64
lrate               = 0.0001
validation_split    = 0.2

eN                  = 32
dN                  = 64

# Specify GPUs
#import os
#os.environ['CUDA_VISIBLE_DEVICES']='0'

#========================================================================================
# Input
#========================================================================================
import os
import cv2, h5py
import numpy as np
from urllib import request
def load(resolution,count=10):
    x,y = [],[]

    for di in range(count):  
        dataset_filename = './data/dataset'+f'{di:03}'+'.hdf5'

        # Open dataset and collect all data
        print("Loading: " + dataset_filename)
        with h5py.File(dataset_filename, "r") as dataset:
            keys = []
            for key in dataset.keys(): keys.append(key.split(".")[0])
            keys = list(set(keys))

            for key in keys:
                rgb, mask = dataset.get(key+'.rgb')[()], dataset.get(key+'.mask')[()]

                rgb = cv2.resize(rgb, dsize=(resolution, resolution))
                mask = cv2.resize(mask, dsize=(resolution, resolution))

                x.append(rgb)
                y.append(mask.reshape((mask.shape[0],mask.shape[1],1)))

    x = np.stack(x)
    y = np.stack(y)

    return x,y

import numpy
x,y = load(resolution=resolution, count=1)

# Modify inputs
x = x / 255
y = y / 255

# Augment

print('x: ' + str(len(x)) + ' images of shape ' + str(x[0].shape))
print('y: ' + str(len(y)) + ' images of shape ' + str(y[0].shape))

#========================================================================================
# Outputs
#========================================================================================
import time, pathlib

# Define output folder name
runID = str(int(time.time())) + '-n' + str(len(x)) + '-r' + str(resolution) + \
		'-eN' + str(eN) + '-dN' + str(dN) + '-e' + str(epochs) + \
		'-bs' + str(batch_size) + '-lr' + str(lrate) + '-' + mynetname

runPath = './Graphs/' + runID

print('Output: ' + runPath)
pathlib.Path(runPath).mkdir(parents=True, exist_ok=True) 
pathlib.Path(runPath+'/checkpoints').mkdir(parents=True, exist_ok=True)
pathlib.Path(runPath+'/samples').mkdir(parents=True, exist_ok=True) 

#========================================================================================
# Utils
#========================================================================================
import numpy, cv2

# Predict for a single image
def predict(model, image):
    prediction = model.predict(image.reshape(1, image.shape[0], image.shape[1], image.shape[2]))
    return numpy.tile(prediction,3)

# Save image to PNG
def save_png(image, filename):
    if image.shape[2] == 3:
        image = cv2.cvtColor(image.astype(numpy.uint8), cv2.COLOR_RGB2BGR)
    cv2.imwrite(filename, numpy.clip(image,0,255))

# Loss function for UNet
from keras.losses import binary_crossentropy
import keras.backend as K

def dice_coeff(y_true, y_pred):
    smooth = 1.
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    score = (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    return score

def dice_loss(y_true, y_pred):
    loss = 1 - dice_coeff(y_true, y_pred)
    return loss

def bce_dice_loss(y_true, y_pred):
    loss = binary_crossentropy(y_true, y_pred) + dice_loss(y_true, y_pred)
    return loss

#========================================================================================
# Neural network
#========================================================================================
import keras as keras
from keras.models import Model
from keras.layers import Input, Conv2D, Activation, Concatenate, LeakyReLU, MaxPooling2D, UpSampling2D
from keras import losses
from keras.utils.vis_utils import plot_model
from tensorflow.keras.optimizers import Adam

# Expected input and output
in_shape = x.shape[1:]
out_shape = y.shape[1:]

# How to set the initial random weights of Keras layers
initializer = 'he_uniform'

# Define the input layer:
input_img = Input(shape=in_shape, name='input_img')

# Encoder
encoder = Conv2D(eN, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='ENC_CONV0')(input_img)
encoder = LeakyReLU(alpha=0.1)(encoder)
encoder = Conv2D(eN, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='ENC_CONV1')(encoder)
encoder = LeakyReLU(alpha=0.1)(encoder)
POOL1   = MaxPooling2D((2, 2), name='POOL1') (encoder)
encoder = Conv2D(eN*2, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='ENC_CONV2')(POOL1)
encoder = LeakyReLU(alpha=0.1)(encoder)
POOL2   = MaxPooling2D((2, 2), name='POOL2') (encoder)
encoder = Conv2D(eN*2, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='ENC_CONV3')(POOL2)
encoder = LeakyReLU(alpha=0.1)(encoder)
POOL3   = MaxPooling2D((2, 2), name='POOL3') (encoder)
encoder = Conv2D(eN*4, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='ENC_CONV4')(POOL3)
encoder = LeakyReLU(alpha=0.1)(encoder)
POOL4   = MaxPooling2D((2, 2), name='POOL4') (encoder)
encoder = Conv2D(eN*4, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='ENC_CONV5')(POOL4)
encoder = LeakyReLU(alpha=0.1)(encoder)
encoder = MaxPooling2D((2, 2), name='POOL5') (encoder)
encoder = Conv2D(eN*8, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='ENC_CONV6')(encoder)
encoder = LeakyReLU(alpha=0.1)(encoder)

# Decoder
decoder = UpSampling2D((2, 2), name='UPSAMPLE5')(encoder)
decoder = keras.layers.concatenate([decoder,POOL4], name='CONCAT5')
decoder = Conv2D(dN*16, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV5A')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = Conv2D(dN*16, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV5B')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = UpSampling2D((2, 2), name='UPSAMPLE4')(decoder)
decoder = keras.layers.concatenate([decoder,POOL3], name='CONCAT4')
decoder = Conv2D(dN*8, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV4A')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = Conv2D(dN*8, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV4B')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = UpSampling2D((2, 2), name='UPSAMPLE3')(decoder)
decoder = keras.layers.concatenate([decoder,POOL2], name='CONCAT3')
decoder = Conv2D(dN*6, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV3A')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = Conv2D(dN*6, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV3B')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = UpSampling2D((2, 2), name='UPSAMPLE2')(decoder)
decoder = keras.layers.concatenate([decoder,POOL1], name='CONCAT2')
decoder = Conv2D(dN*4, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV2A')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = Conv2D(dN*4, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV2B')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = UpSampling2D((2, 2), name='UPSAMPLE1')(decoder)
decoder = keras.layers.concatenate([decoder,input_img], name='CONCAT1')
decoder = Conv2D(64, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV1A')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = Conv2D(32, kernel_size=3, strides=1, padding='same', kernel_initializer=initializer, name='DEC_CONV1B')(decoder)
decoder = LeakyReLU(alpha=0.1)(decoder)
decoder = Conv2D(1, (1, 1), activation='sigmoid')(decoder)

# Single GPU
model = Model(inputs=input_img, outputs=decoder, name=runID)
model.summary()
#plot_model(model, to_file='./Graphs/'+runID+'.png', show_shapes=True, show_layer_names=True)

# Optimizer:
optimizer = Adam(lr=lrate)

# Compile model with specified loss function:
model.compile(loss=bce_dice_loss, optimizer=optimizer)

# Tensorboard:
tbCallBack = keras.callbacks.TensorBoard(log_dir=runPath, histogram_freq=0, write_graph=True, write_images=True)

# Checkpoints:
checkpointCallBack = keras.callbacks.ModelCheckpoint(filepath=runPath+'/checkpoints/weights.{epoch:02d}-{val_loss:.2f}.hdf5', save_weights_only=True, verbose=1, period=50)

# Test prediction result during training:
def testPrediction(epoch):
	if epoch % 5 == 0:
		for i in range(0,len(x),int(len(y)/5)):
			if epoch == 0:
				save_png(x[i]*255,runPath+'/samples/i'+f'{i:04}'+'.0.rgb.png')
				save_png(y[i]*255,runPath+'/samples/i'+f'{i:04}'+'.1.mask.png')
			prediction = predict(model, x[i])*255
			save_png(prediction.reshape(resolution,resolution,3), runPath+'/samples/i'+f'{i:04}'+'.e'+f'{epoch:05}'+'.png')
testPredictCallBack = keras.callbacks.LambdaCallback(on_epoch_end=lambda epoch, logs: testPrediction(epoch))

# Start training:
history = model.fit(x, y, callbacks=[tbCallBack,checkpointCallBack,testPredictCallBack], epochs=epochs, batch_size=batch_size, validation_split=validation_split)

# Save trained model:
model.save(runPath+'/model.h5')

# Test the model


In [None]:
imgs, masks = generate_random_location(location, actor, danger, 20)

In [None]:
from skimage.transform import resize
idx = 3

input = resize(numpy.array( imgs[idx] ), (128,128), anti_aliasing=True)
output = predict(model, input)

figure(); imshow(input)
figure(); imshow(output[0])