In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import os
import glob
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from pathlib import Path

### Carregando os dados, aplicando a padronização e rotação

In [None]:
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 32
N_FEATURES = 256  # numero de características a serem extraídas das imagens

In [None]:
data_gen = tf.keras.preprocessing.image.ImageDataGenerator(
    rescale=1./255, 
    validation_split=.2,
    rotation_range=.2
)

data_dir = './dataset'
train_dataset = data_gen.flow_from_directory(
    data_dir,
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    interpolation='bilinear',
    subset='training',
)

val_dataset = data_gen.flow_from_directory(
    data_dir,
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    interpolation='bilinear',
    subset='validation',
    shuffle=False
)

### Carregando o modelo de base MobileNet usado para realizar a transferência de aprendizado

In [None]:
base_model = tf.keras.applications.MobileNet(
    include_top=False,
    input_shape=IMAGE_SIZE + (3,)
)
base_model.trainable = False

### Criando a estrutura do modelo principal

In [None]:
inp = tf.keras.layers.Input(shape=IMAGE_SIZE + (3,))
base = base_model(inp, training=False)
avg_p = tf.keras.layers.GlobalAveragePooling2D()(base)
drop = tf.keras.layers.Dropout(.2)(avg_p)
flat = tf.keras.layers.Flatten()(drop)
d1 = tf.keras.layers.Dense(
    N_FEATURES,
    kernel_regularizer=tf.keras.regularizers.l2(0.0001)
)(flat)
drop = tf.keras.layers.Dropout(rate=0.2)(d1)
out = tf.keras.layers.Dense(
    train_dataset.num_classes,
    kernel_regularizer=tf.keras.regularizers.l2(0.0001)
)(drop)

model = tf.keras.models.Model(inputs=inp, outputs=out)
model.build((None,)+IMAGE_SIZE+(3,))
model.summary()

### Compilando e realizando o treinamento

In [None]:
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
steps_per_epoch = train_dataset.samples // train_dataset.batch_size
validation_steps = val_dataset.samples // val_dataset.batch_size
cbs = [
    tf.keras.callbacks.EarlyStopping(patience=5, 
                                     restore_best_weights=True,
                                     monitor='val_accuracy'),
    tf.keras.callbacks.ReduceLROnPlateau(min_lr=.01)
]

In [None]:
hist = model.fit(
    train_dataset,
    epochs=10, 
    steps_per_epoch=steps_per_epoch,
    validation_data=val_dataset,
    validation_steps=validation_steps,
    callbacks=cbs,
    verbose=2
).history

### visualizando o desempenho

In [None]:
plt.figure()

plt.subplot(1, 2, 1)
plt.ylabel("Loss (training and validation)")
plt.xlabel("Training Steps")
plt.plot(hist["loss"], label='train')
plt.plot(hist["val_loss"], label='valid')
plt.legend()

plt.subplot(1, 2, 2)
plt.ylabel("Accuracy (training and validation)")
plt.xlabel("Training Steps")
plt.plot(hist["accuracy"], label='train')
plt.plot(hist["val_accuracy"], label='valid')
plt.legend()

plt.tight_layout()

### Modelo de extração de features das imagens

In [None]:
feature_extractor = tf.keras.Model(
    inputs=model.inputs, 
    outputs=model.layers[-3].output
)

In [None]:
img_paths = [path for path in Path('./dataset').rglob('*.jpg')]
np.random.shuffle(img_paths)

In [None]:
def load_img(path):
    img = tf.io.read_file(path)
    img = tf.io.decode_jpeg(img, channels=3)
    img = tf.image.resize_with_pad(img, 224, 224)
    img  = tf.image.convert_image_dtype(img, tf.float32)[tf.newaxis, ...]
    return img

In [None]:
feature_vectors_dir = './img_vectors'
if not os.path.exists(feature_vectors_dir):
    os.mkdir(feature_vectors_dir)

### Usando o modelo de extração de features para extrair as features das imagens que foram aprendidas no treinamento do nosso modelo principal e salvando na forma de numpy em arquivo

In [None]:
for filename in img_paths:
    img = load_img(str(filename))
    features = feature_extractor(img)
    feature_set = np.squeeze(features)
    outfile_name = os.path.basename(filename).split('.')[0] + ".npz"
    out_path_file = os.path.join(feature_vectors_dir, outfile_name)
    np.savetxt(out_path_file, feature_set, delimiter=',')

In [None]:
all_files = glob.glob('./img_vectors/*.npz')

In [None]:
# carregando os vetores e nomes das imagens atribuindo indexes equivalentes
files_name = {}
files_vector = {}
for index, file in enumerate(all_files):
    file_vector = np.loadtxt(file)
    file_name = os.path.basename(file).split('.')[0]
    
    files_vector[index] = file_vector
    files_name[index] = file_name

In [None]:
path_dict = {}
for path in Path('./dataset').rglob('*.jpg'):
    path_dict[path.name] = path

### Testando com imagem nova

In [None]:
test_img = 'test.jpg'
img_test = load_img(test_img)
features_vec = feature_extractor(img_test)
test_vec = np.squeeze(features_vec)

In [None]:
plt.figure(figsize=(4, 2.7))
plt.imshow(img_test.numpy().reshape(IMAGE_SIZE+(3,)).astype('uint8'))
plt.axis('off')

In [None]:
from annoy import AnnoyIndex

### Adicionando os vetores e os indices ao annoy utilizando 10000 trees

In [None]:
ann = AnnoyIndex(N_FEATURES, 'angular')
for i, vec in files_vector.items():
    ann.add_item(i, vec)

ann.build(10000)

In [None]:
top = ann.get_nns_by_vector(test_vec, 10)  # pegando os 10 vizinhos mais próximos

In [None]:
for idx, i in enumerate(top):
    fname = files_name[i] + '.jpg'
    file_path = path_dict[fname]
    
    plt.subplot(1, len(top), idx+1)
    plt.imshow(mpimg.imread(file_path))
    plt.axis('off')