In [None]:
# ETAPA 1 - Baixar biblioteca de extração, baixar e organizar dataset - 4 classes
!pip install icrawler

In [None]:
#Baixar Dataset de imagens
from icrawler.builtin import GoogleImageCrawler
import os

# Defina suas classes com a palavra-chave ajustada para notebooks
classes = ['laptop', 'bola de futebol', 'smart tv', 'violao']

# Diretório base para salvar as imagens
base_dir = './images'
if not os.path.exists(base_dir):
    os.makedirs(base_dir)

for class_name in classes:
    print(f"Coletando imagens para a classe: {class_name}")

    # Crie uma pasta para a classe. O nome da pasta será 'laptop'
    # para evitar espaços e manter a consistência.
    class_dir = os.path.join(base_dir, class_name.replace(' ', '_'))
    if not os.path.exists(class_dir):
        os.makedirs(class_dir)

    # Configura o rastreador de imagens do Google
    google_crawler = GoogleImageCrawler(
        feeder_threads=1,
        parser_threads=1,
        downloader_threads=4,
        storage={'root_dir': class_dir}
    )

    # Inicia a coleta de imagens
    google_crawler.crawl(
        keyword=class_name,
        max_num=500,
        min_size=(200, 200)
    )
    print("-" * 30)

print("Coleta de imagens concluída!")
print(f"As imagens foram salvas em: {base_dir}")

In [None]:
#ETAPA 2 - Encodificando as imagens

import itertools
import os

import matplotlib.pylab as plt
import numpy as np

import tensorflow as tf
import tensorflow_hub as hub

print("TF version:", tf.__version__)
print("Hub version:", hub.__version__)
print("GPU is", "available" if tf.config.list_physical_devices('GPU') else "NOT AVAILABLE")

In [None]:
MODULE_HANDLE = "https://tfhub.dev/google/imagenet/mobilenet_v2_100_224/feature_vector/4"
IMAGE_SIZE = (224, 224)
print("Using {} with input size {}".format(MODULE_HANDLE, IMAGE_SIZE))
BATCH_SIZE = 32
N_FEATURES = 256

In [None]:
#hide
data_dir = '/content/images'

In [None]:
datagen_kwargs = dict(rescale=1./255, validation_split=.20)
dataflow_kwargs = dict(target_size=IMAGE_SIZE, batch_size=BATCH_SIZE,
                   interpolation="bilinear")

valid_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    **datagen_kwargs)
valid_generator = valid_datagen.flow_from_directory(
    data_dir, subset="validation", shuffle=False, **dataflow_kwargs)

do_data_augmentation = False
if do_data_augmentation:
  train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
      rotation_range=40,
      horizontal_flip=True,
      width_shift_range=0.2, height_shift_range=0.2,
      shear_range=0.2, zoom_range=0.2,
      **datagen_kwargs)
else:
  train_datagen = valid_datagen
train_generator = train_datagen.flow_from_directory(
    data_dir, subset="training", shuffle=True, **dataflow_kwargs)

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
from keras.layers import Layer
from keras.saving import register_keras_serializable

# 1. Registre sua classe personalizada para que ela possa ser salva e carregada
@register_keras_serializable()
class HubLayerWrapper(Layer):
    def __init__(self, module_handle, trainable=False, **kwargs):
        super(HubLayerWrapper, self).__init__(**kwargs)
        self.module_handle = module_handle
        self.hub_layer = hub.KerasLayer(module_handle, trainable=trainable)
        self.hub_layer.trainable = trainable

    def call(self, inputs, training=False):
        return self.hub_layer(inputs, training=training)

    def get_config(self):
        config = super(HubLayerWrapper, self).get_config()
        config.update({
            'module_handle': self.module_handle,
            'trainable': self.hub_layer.trainable,
        })
        return config

# 2. Use a nova classe para construir o modelo completo
print("Building model with", MODULE_HANDLE)

# Definir a camada de entrada
inputs = tf.keras.layers.Input(shape=IMAGE_SIZE + (3,))

# Conectar a camada do TensorFlow Hub usando sua nova classe
hub_layer = HubLayerWrapper(MODULE_HANDLE, trainable=False)(inputs)

# Adicionar a primeira camada Dropout
dropout_1 = tf.keras.layers.Dropout(rate=0.2)(hub_layer)

# Adicionar a primeira camada Dense
dense_1 = tf.keras.layers.Dense(
    N_FEATURES, kernel_regularizer=tf.keras.regularizers.l2(0.0001)
)(dropout_1)

# Adicionar a segunda camada Dropout
dropout_2 = tf.keras.layers.Dropout(rate=0.2)(dense_1)

# Adicionar a camada de saída
outputs = tf.keras.layers.Dense(
    train_generator.num_classes, kernel_regularizer=tf.keras.regularizers.l2(0.0001)
)(dropout_2)

# Construir o modelo completo com a API Funcional
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.summary()

# E também o seu extrator de características
feature_extractor = tf.keras.Model(inputs=inputs, outputs=dense_1)

In [None]:
# Define optimiser and loss
lr = 0.003 * BATCH_SIZE / 512
SCHEDULE_LENGTH = 500
SCHEDULE_BOUNDARIES = [200, 300, 400]

# Decay learning rate by a factor of 10 at SCHEDULE_BOUNDARIES.
lr_schedule = tf.keras.optimizers.schedules.PiecewiseConstantDecay(boundaries=SCHEDULE_BOUNDARIES,
                                                                   values=[lr, lr*0.1, lr*0.001, lr*0.0001])
optimizer = tf.keras.optimizers.SGD(learning_rate=lr_schedule, momentum=0.9)

loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=True)

model.compile(optimizer=optimizer,
              loss=loss_fn,
              metrics=['accuracy'])

In [None]:
steps_per_epoch = train_generator.samples // train_generator.batch_size
validation_steps = valid_generator.samples // valid_generator.batch_size
hist = model.fit(
    train_generator,
    epochs=5, steps_per_epoch=steps_per_epoch,
    validation_data=valid_generator,
    validation_steps=validation_steps).history

In [None]:
#hide
plt.figure()
plt.ylabel("Loss (training and validation)")
plt.xlabel("Training Steps")
plt.ylim([0,2])
plt.plot(hist["loss"])
plt.plot(hist["val_loss"])

plt.figure()
plt.ylabel("Accuracy (training and validation)")
plt.xlabel("Training Steps")
plt.ylim([0,1])
plt.plot(hist["accuracy"])
plt.plot(hist["val_accuracy"])

In [None]:
import os
import tensorflow as tf

# Crie um diretório para salvar os modelos
caminho_local = '/content/modelos_salvos/'
if not os.path.exists(caminho_local):
    os.mkdir(caminho_local)

# Salve o extrator de características
feature_extractor = tf.keras.Model(inputs=model.inputs, outputs=model.layers[-3].output)
feature_extractor.save(caminho_local + 'bit_feature_extractor.keras')

# Salve o modelo completo
saved_model_path = caminho_local + 'bit_model'
model.save(saved_model_path + '.keras')

In [None]:
#ETAPA 3 - VETORIZAÇÃO DE IMAGEM

#hide
import tensorflow as tf
from pathlib import Path
import numpy as np
import os
from tqdm import tqdm
tqdm.pandas()

In [None]:
from pathlib import Path
import numpy as np

img_paths = []
extensions = ['*.jpg', '*.png']

for ext in extensions:
    for path in Path('/content/images').rglob(ext):
        img_paths.append(path)

np.random.shuffle(img_paths)

In [None]:
def load_img(path):
    # Lê o arquivo de imagem
    img = tf.io.read_file(path)

    # Decodifica a imagem automaticamente (JPEG, PNG, etc.)
    # A decodificação sem o argumento 'channels' preserva o formato original
    img = tf.io.decode_image(img, channels=3)

    # Redimensiona a imagem com preenchimento para as dimensões do modelo
    img = tf.image.resize_with_pad(img, 224, 224)

    # Converte o tipo de dados da imagem e adiciona uma dimensão para o lote (batch)
    img = tf.image.convert_image_dtype(img, tf.float32)[tf.newaxis, ...]

    return img

In [None]:
#hide-output
TRANSFER_LEARNING_FLAG = 1
if TRANSFER_LEARNING_FLAG:
  module = tf.keras.models.load_model('/content/modelos_salvos/bit_feature_extractor.keras', safe_mode=False)
else:
  module_handle = "https://tfhub.dev/google/imagenet/mobilenet_v2_100_224/feature_vector/4"
  module = hub.load(module_handle)

In [None]:
imgvec_path = '/content/img_vectors/'
Path(imgvec_path).mkdir(parents=True, exist_ok=True)

In [None]:
for filename in tqdm(img_paths[:5000]):
    try:
        # Tenta carregar a imagem
        img = load_img(str(filename))

        # Processa a imagem se o carregamento for bem-sucedido
        features = module(img)
        feature_set = np.squeeze(features)

        # Salva o vetor de características
        outfile_name = os.path.basename(filename).split('.')[0] + ".npz"
        out_path_file = os.path.join(imgvec_path, outfile_name)
        np.savetxt(out_path_file, feature_set, delimiter=',')

    except Exception as e:
        # Se ocorrer um erro, imprime uma mensagem e continua para o próximo arquivo
        print(f"Erro ao processar o arquivo {filename}: {e}")
        continue

In [None]:
# METADADOS E INDEXAÇÃO

#hide
import pandas as pd
import glob
import os
import numpy as np
from tqdm import tqdm
tqdm.pandas()
!pip install -q annoy
import json
from annoy import AnnoyIndex
from scipy import spatial
import pickle
from IPython.display import Image as dispImage

In [None]:
test_img = '/content/images/violao/000001.jpg'
dispImage(test_img)

In [None]:
import pandas as pd
import os
import glob
from pathlib import Path

# Crie os dicionários que farão o mapeamento do índice do vetor para os nomes dos arquivos
file_index_to_file_name = {}
file_index_to_product_id = {}

# Localiza todas as imagens (JPG e PNG) nas subpastas
all_images = glob.glob('/content/images/*/*.jpg') + glob.glob('/content/images/*/*.png')

for i, path in enumerate(all_images):
  file_name = Path(path).name
  file_index_to_file_name[i] = file_name
  # Extrai a categoria do nome da pasta pai (ex: 'laptop', 'violao')
  category = Path(path).parts[-2]
  file_index_to_product_id[i] = category

# Nota: O dicionário file_index_to_file_vector será preenchido mais tarde

# Exemplo de verificação para ver quantas imagens foram encontradas
print(f"Total de imagens encontradas: {len(all_images)}")

In [None]:
def match_id(fname):
  return styles.index[styles.id==fname].values[0]

In [None]:
# Defining data structures as empty dict
file_index_to_file_name = {}
file_index_to_file_vector = {}
file_index_to_product_id = {}

# Configuring annoy parameters
dims = 256
n_nearest_neighbors = 20
trees = 10000

# Reads all file names which stores feature vectors
allfiles = glob.glob('/content/img_vectors/*.npz')

t = AnnoyIndex(dims, metric='angular')

In [None]:
for findex, fname in tqdm(enumerate(allfiles)):
  file_vector = np.loadtxt(fname)
  file_name = os.path.basename(fname)
  file_index_to_file_name[findex] = file_name
  file_index_to_file_vector[findex] = file_vector

  # Extraia a categoria da pasta pai
  category = Path(fname).parts[-2]
  file_index_to_product_id[findex] = category

  t.add_item(findex, file_vector)

In [None]:
#hide-output
t.build(trees)
t.save('t.ann')

In [None]:
import os

# Defina o caminho para a pasta de índices dentro do Colab
file_path = '/content/indices_salvos/'

# Crie a pasta se ela não existir
if not os.path.exists(file_path):
    os.mkdir(file_path)

In [None]:
t.save(file_path+'indexer.ann')
pickle.dump(file_index_to_file_name, open(file_path+"file_index_to_file_name.p", "wb"))
pickle.dump(file_index_to_file_vector, open(file_path+"file_index_to_file_vector.p", "wb"))
pickle.dump(file_index_to_product_id, open(file_path+"file_index_to_product_id.p", "wb"))

In [None]:
#Etapa 5 - TESTE LOCAL
#hide
from PIL import Image
import matplotlib.image as mpimg

In [None]:
import os
from PIL import Image

# Use uma nova URL de imagem mais confiável
img_addr = 'https://upload.wikimedia.org/wikipedia/commons/e/e5/Viol%C3%B3_el%C3%A8ctric.jpg'

# Baixe a imagem e verifique se o download foi bem-sucedido
!wget -q -O img.jpg $img_addr
test_img = 'img.jpg'

if os.path.getsize(test_img) > 0:
    print("Download da imagem concluído com sucesso!")
    topK = 4

    # Processamento da imagem
    test_vec = np.squeeze(module(load_img(test_img), training=False))

    basewidth = 224
    img = Image.open(test_img)
    wpercent = (basewidth/float(img.size[0]))
    hsize = int((float(img.size[1])*float(wpercent)))

    # Redimensione a imagem com a sintaxe correta
    img = img.resize((basewidth,hsize), Image.Resampling.LANCZOS)
    img
else:
    print("Erro: O download da imagem falhou ou o arquivo está vazio.")

In [None]:
# ETAPA 6 - CHAMADA DE API
#hide
import os
import time

In [None]:
import os
from pathlib import Path

# Defina a pasta raiz do seu projeto
root_path = '/content/projeto_salvo'

# Crie a pasta se ela ainda não existir
Path(root_path).mkdir(parents=True, exist_ok=True)

# O restante do seu código pode vir aqui
# As próximas linhas que você for executar devem usar o root_path para salvar e carregar arquivos

In [None]:
%%writefile utils.py
### ----utils.py---- ###

import os
import tensorflow as tf
from annoy import AnnoyIndex

root_path = '/content/projeto_salvo'

class Encoder:
  encoder = tf.keras.models.load_model(os.path.join(root_path, 'bit_feature_extractor.keras'), safe_mode=False)

class Indexer:
  dims = 256
  topK = 6
  indexer = AnnoyIndex(dims, 'angular')
  indexer.load(os.path.join(root_path, 'indexer.ann'))

encoder = Encoder()
indexer = Indexer()

In [None]:
%%writefile app.py

!pip install streamlit

import streamlit as st
import pandas as pd
import numpy as np
from PIL import Image
from annoy import AnnoyIndex
import glob
import os
import tensorflow as tf
import pickle
from pathlib import Path
import time
from utils import encoder, indexer

# Define variables for the app
root_path = '/content/projeto_salvo'
topK = 6
query_path = '/content/user_query.jpg'

# Load the model and indexer from the utils.py file
try:
    start_time = time.time()
    encoder = encoder.encoder
    print("---Encoder--- %s seconds ---" % (time.time() - start_time))

    start_time = time.time()
    t = indexer.indexer
    print("---Indexer--- %s seconds ---" % (time.time() - start_time))

except Exception as e:
    st.error(f"Error loading model or indexer: {e}")
    st.stop()

# Load the mappings
try:
    file_index_to_file_name = pickle.load(open(os.path.join(root_path ,'file_index_to_file_name.p'), 'rb'))
    file_index_to_product_id = pickle.load(open(os.path.join(root_path ,'file_index_to_product_id.p'), 'rb'))
except Exception as e:
    st.error(f"Error loading mappings: {e}")
    st.stop()

# Load and prepare the image paths
path_dict = {}
all_images = glob.glob(os.path.join(root_path, 'images', '*', '*.jpg')) + glob.glob(os.path.join(root_path, 'images', '*', '*.png'))
for file_path in all_images:
    file_name = Path(file_path).name
    path_dict[file_name] = file_path

def load_img(path):
    img = tf.io.read_file(path)
    img = tf.io.decode_image(img, channels=3)
    img = tf.image.resize_with_pad(img, 224, 224)
    img  = tf.image.convert_image_dtype(img, tf.float32)[tf.newaxis, ...]
    return img

st.title("Image Similarity App")

uploaded_file = st.file_uploader("Choose an image...", type=['jpg', 'jpeg', 'png'])

if uploaded_file is not None:
    image = Image.open(uploaded_file)
    image.save(query_path)
    st.image(image, caption='Uploaded Image.', use_column_width=True)
    st.write("")
    st.write("Top similar images...")

    start_time = time.time()
    test_vec = np.squeeze(encoder(load_img(query_path), training=False))
    st.write("---Encoding--- %s seconds ---" % (time.time() - start_time))

    start_time = time.time()
    nns = t.get_nns_by_vector(test_vec, n=topK)
    st.write("---SimilarityIndex--- %s seconds ---" % (time.time() - start_time))

    img_files = []
    img_captions = []

    start_time = time.time()
    for i in nns:
        # Get the file name from the index
        img_name = file_index_to_file_name[i]

        # Use path_dict to get the full path
        if img_name in path_dict:
            img_path = path_dict[img_name]

            try:
                img_file = Image.open(img_path)
                img_files.append(img_file)
                # Use the product ID from the index to create a caption
                caption = file_index_to_product_id[i]
                img_captions.append(caption)
            except FileNotFoundError:
                st.warning(f"Image not found for index {i}: {img_name}")

    st.image(img_files, caption=img_captions, width=200)
    st.write("---Output--- %s seconds ---" % (time.time() - start_time))

In [None]:
%%writefile app.py

import streamlit as st
import pandas as pd
import numpy as np
from PIL import Image
from annoy import AnnoyIndex
import glob
import os
import tensorflow as tf
import pickle
from pathlib import Path
import time
from utils import encoder, indexer

# Defina o caminho para a sua pasta local
root_path = '/content/projeto_salvo'
topK = 6
query_path = '/content/user_query.jpg'

# Carrega o modelo e o indexador
try:
    start_time = time.time()
    encoder = encoder.encoder
    print("---Encoder--- %s seconds ---" % (time.time() - start_time))

    start_time = time.time()
    t = indexer.indexer
    print("---Indexer--- %s seconds ---" % (time.time() - start_time))

except Exception as e:
    st.error(f"Erro ao carregar o modelo ou indexador: {e}")
    st.stop()

# Carrega os mapeamentos
try:
    file_index_to_file_name = pickle.load(open(os.path.join(root_path ,'file_index_to_file_name.p'), 'rb'))
    file_index_to_product_id = pickle.load(open(os.path.join(root_path ,'file_index_to_product_id.p'), 'rb'))
except Exception as e:
    st.error(f"Erro ao carregar os mapeamentos: {e}")
    st.stop()

# Prepara os caminhos das imagens
path_dict = {}
all_images = glob.glob(os.path.join(root_path, 'images', '*', '*.jpg')) + glob.glob(os.path.join(root_path, 'images', '*', '*.png'))
for file_path in all_images:
    file_name = Path(file_path).name
    path_dict[file_name] = file_path

def load_img(path):
    img = tf.io.read_file(path)
    img = tf.io.decode_image(img, channels=3)
    img = tf.image.resize_with_pad(img, 224, 224)
    img  = tf.image.convert_image_dtype(img, tf.float32)[tf.newaxis, ...]
    return img

st.title("Image Similarity App")

uploaded_file = st.file_uploader("Choose an image...", type=['jpg', 'jpeg', 'png'])

if uploaded_file is not None:
    image = Image.open(uploaded_file)
    image.save(query_path)
    st.image(image, caption='Uploaded Image.', use_column_width=True)
    st.write("")
    st.write("Top similar images...")

    start_time = time.time()
    test_vec = np.squeeze(encoder(load_img(query_path), training=False))
    st.write("---Encoding--- %s seconds ---" % (time.time() - start_time))

    start_time = time.time()
    nns = t.get_nns_by_vector(test_vec, n=topK)
    st.write("---SimilarityIndex--- %s seconds ---" % (time.time() - start_time))

    img_files = []
    img_captions = []

    start_time = time.time()
    for i in nns:
        # Pega o nome do arquivo do índice
        img_name = file_index_to_file_name[i]

        # Usa o path_dict para obter o caminho completo
        if img_name in path_dict:
            img_path = path_dict[img_name]

            try:
                img_file = Image.open(img_path)
                img_files.append(img_file)
                # Usa o ID do produto para criar uma legenda
                caption = file_index_to_product_id[i]
                img_captions.append(caption)
            except FileNotFoundError:
                st.warning(f"Imagem não encontrada para o índice {i}: {img_name}")

    st.image(img_files, caption=img_captions, width=200)
    st.write("---Output--- %s seconds ---" % (time.time() - start_time))

In [34]:
# Instala as bibliotecas necessárias para a sessão
!pip install streamlit
!pip install pyngrok

# Importa a biblioteca para acessar os segredos do Colab
from google.colab import userdata
from pyngrok import ngrok
import os
import time

# Obtém o token de autenticação do ngrok de forma segura
try:
    NGROK_AUTH_TOKEN = userdata.get("NGROK_AUTH_TOKEN")
    ngrok.set_auth_token(NGROK_AUTH_TOKEN)
except Exception as e:
    print(f"Erro ao obter o token ngrok. Verifique se ele está configurado nos Segredos do Colab: {e}")

# Fecha qualquer túnel ngrok que possa estar rodando
ngrok.kill()

# Inicia o túnel ngrok na porta 8501
print("Iniciando o túnel ngrok...")
start_time = time.time()
ngrok_tunnel = ngrok.connect(8501)
print("---ngrok--- %s seconds ---" % (time.time() - start_time))
print("Link para o seu aplicativo Streamlit:", ngrok_tunnel.public_url)

# Inicia a sua aplicação Streamlit
!streamlit run app.py --server.headless=true

[34m  Stopping...[0m
[34m  Stopping...[0m
Exception ignored in atexit callback: <function shutdown at 0x78058a437060>
Traceback (most recent call last):
  File "/usr/lib/python3.12/logging/__init__.py", line 2259, in shutdown
    h.acquire()
  File "/usr/lib/python3.12/logging/__init__.py", line 973, in acquire
    self.lock.acquire()
  File "/usr/local/lib/python3.12/dist-packages/streamlit/web/bootstrap.py", line 43, in signal_handler
    server.stop()
  File "/usr/local/lib/python3.12/dist-packages/streamlit/web/server/server.py", line 509, in stop
    self._runtime.stop()
  File "/usr/local/lib/python3.12/dist-packages/streamlit/runtime/runtime.py", line 329, in stop
    async_objs.eventloop.call_soon_threadsafe(stop_on_eventloop)
  File "/usr/lib/python3.12/asyncio/base_events.py", line 844, in call_soon_threadsafe
    self._check_closed()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 545, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError