In [3]:
# update gdown, used to download stuff from google drive
!pip install -q --upgrade gdown

In [4]:
# download dataset
!gdown -q -O dataset.zip 1Mrx0OKnBFteOw1q8IZy-n8x9q8cxZwhT

In [5]:
# unzip dataset
!unzip -q -o dataset.zip

In [6]:
import pathlib

import cv2
import numpy as np
import numpy.typing as npt
import tensorflow as tf

from loguru import logger
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import LabelEncoder

DAEDALUS2_DIR = pathlib.Path("/workspaces/playground/playground/daedalus")

TRAIN_DATASET_DIR = DAEDALUS2_DIR / "post-processed"
IMAGE_SHAPE = (112, 112)
CLASS_COUNT = 2996
BATCH_SIZE = 256

DATABASE_DIR = DAEDALUS2_DIR / "features_database"
MODEL_WEIGHTS_PATH = DAEDALUS2_DIR / "feature_extractor" / "weights"

RNG_SEED = 42

# ensure directories exist
assert TRAIN_DATASET_DIR.exists()
MODEL_WEIGHTS_PATH.parent.mkdir(parents=True, exist_ok=True)


In [7]:
def check_if_images_have_same_shape(
    dataset_dir: pathlib.Path = TRAIN_DATASET_DIR,
) -> None:
    paths = dataset_dir.rglob("*.jpg")
    imgs = [cv2.imread(str(p)) for p in paths]
    shapes = [img.shape for img in imgs]
    return np.all(np.asarray(shapes)), shapes[0]


# check_if_images_have_same_shape()


In [8]:
def load_dataset(
    dataset_dir: pathlib.Path = TRAIN_DATASET_DIR,
    rng_seed: int = RNG_SEED,
    batch_size: int = BATCH_SIZE,
) -> tf.data.Dataset:
    ds = tf.keras.utils.image_dataset_from_directory(
        directory=dataset_dir,
        batch_size=None,
        image_size=IMAGE_SHAPE,
        label_mode="categorical",
    )

    return (
        ds.cache()
        .shuffle(
            buffer_size=ds.cardinality().numpy(),
            seed=rng_seed,
            reshuffle_each_iteration=True,
        )
        .batch(batch_size, drop_remainder=True)
        .prefetch(tf.data.AUTOTUNE)
    )


# load_dataset().element_spec


In [9]:
def create_models() -> tf.keras.Model:
    base = tf.keras.applications.resnet.ResNet50(
        weights="imagenet",
        input_shape=IMAGE_SHAPE + (3,),
        include_top=False,
    )

    for layer in base.layers:
        layer.trainable = False

    # the arch is not particularly important
    flatten = tf.keras.layers.Flatten()(base.output)
    dense1 = tf.keras.layers.Dense(512, activation="relu")(flatten)
    dense1 = tf.keras.layers.BatchNormalization()(dense1)
    dense2 = tf.keras.layers.Dense(256, activation="relu")(dense1)
    dense2 = tf.keras.layers.BatchNormalization()(dense2)
    output = tf.keras.layers.Dense(256)(dense2)

    feature_extractor = tf.keras.Model(
        inputs=base.input,
        outputs=output,
        name="feature_extractor",
    )

    softmax = tf.keras.layers.Dense(CLASS_COUNT, "softmax")(output)
    classifier = tf.keras.Model(
        inputs=base.input,
        outputs=softmax,
        name="classifier",
    )

    classifier.compile(
        loss="categorical_crossentropy",
        optimizer="adam",
        metrics="accuracy",
    )

    return feature_extractor, classifier


In [10]:
def load_or_create_feature_extractor(
    train_dataset_dir: pathlib.Path = TRAIN_DATASET_DIR,
    model_weights_path: pathlib.Path = MODEL_WEIGHTS_PATH,
) -> tf.keras.Model:
    feature_extractor, classifier = create_models()

    try:
        classifier.load_weights(model_weights_path).expect_partial()
    except tf.errors.NotFoundError:
        ds = load_dataset(dataset_dir=train_dataset_dir)
        classifier.fit(ds, epochs=2)
        classifier.save_weights(model_weights_path, save_format="tf")

    return feature_extractor


In [11]:
def load_image_and_extract_features(
    image_path: pathlib.Path,
    feature_extractor: tf.keras.Model,
) -> npt.NDArray[np.float32]:
    img = cv2.imread(str(image_path))
    batched_image = img.reshape(1, *img.shape)
    return feature_extractor.predict(batched_image).flatten()
    

In [12]:
def add_new_instance_to_database(
    image_path: pathlib.Path,
    instance_label: str,
    database_dir: pathlib.Path,
    feature_extractor: tf.keras.Model,
) -> None:
    """
    `instance_path`: path to the image to be added to the database
    `instance_label`: the name or identifier of the the instance
    `database_dir`: location of the database
    `feature_extractor`: a pre-trained neural network that generated feature vectors
    """

    logger.info(f"storing new instance, label={instance_label}, path={image_path}")

    feature_vector = load_image_and_extract_features(
        image_path=image_path,
        feature_extractor=feature_extractor,
    )

    # create, if necessary, the label dir
    label_dir = database_dir / instance_label
    label_dir.mkdir(parents=True, exist_ok=True)

    # store the image on the database
    stored_image_path = label_dir / image_path.name

    if stored_image_path.exists():
        logger.warning(
            "there is already an instance with this filename in the database, overwriting"
        )

    stored_image_path.write_bytes(image_path.read_bytes())

    # store feature vector on the database
    feature_vector_path = stored_image_path.with_suffix(".feature_vector")
    np.save(
        file=feature_vector_path,
        arr=feature_vector,
    )


In [13]:
def populate_database_with_train_dataset(
    feature_extractor: tf.keras.Model,
    train_dataset_dir: pathlib.Path = TRAIN_DATASET_DIR,
    database_dir: pathlib.Path = DATABASE_DIR,
) -> None:
    for label_dir in train_dataset_dir.iterdir():
        for image_path in label_dir.iterdir():
            add_new_instance_to_database(
                image_path=image_path,
                instance_label=label_dir.name,
                database_dir=database_dir,
                feature_extractor=feature_extractor,
            )

populate_database_with_train_dataset(
    feature_extractor=load_or_create_feature_extractor()
)


2023-03-09 14:43:36.003586: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-09 14:43:36.392251: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 46712 MB memory:  -> device: 0, name: NVIDIA RTX A6000, pci bus id: 0000:61:00.0, compute capability: 8.6
2023-03-09 14:43:38.385 | INFO     | __main__:add_new_instance_to_database:14 - storing new instance, label=Vitali_Klitschko, path=/workspaces/playground/playground/daedalus/post-processed/Vitali_Klitschko/Vitali_Klitschko_0003_0001.jpg
2023-03-09 14:43:39.694956: I tensorflow/stream_executor/cuda/cuda_dnn.cc:368] Loaded cuDNN version 8101


In [31]:
def load_feature_vectors_and_labels(
    database_dir: pathlib.Path = DATABASE_DIR,
) -> KNeighborsClassifier:
    label_dirs = sorted(database_dir.iterdir())
    label_names = [path.name for path in label_dirs]
    label_encoder = LabelEncoder().fit(label_names)

    paths = list(database_dir.rglob("*.npy"))

    features = np.array([np.load(p) for p in paths])

    labels = [p.parent.name for p in paths]
    encoded_labels = label_encoder.transform(labels)
    
    return features, encoded_labels

features, labels = load_feature_vectors_and_labels()
features.shape, labels.shape


((12000, 256), (12000,))

In [33]:
# !gdown -q -O marquinho_treino.jpg 1EgvzTNEWTXvegURlmJAt8OXOtrKAlQEb

6051.14s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


In [2]:
def classify_instance(
    instance_path: pathlib.Path,
    feature_extractor: tf.keras.Model,
) -> int:
    database_features, database_labels = load_feature_vectors_and_labels()
    model = KNeighborsClassifier().fit(database_features, database_labels)

    img = cv2.imread(str(instance_path))
    batched_image = img.reshape(1, *img.shape)
    instance_features = feature_extractor.predict(batched_image).flatten()

    return model.predict([instance_features])

classify_instance(
    pathlib.Path("/workspaces/playground/playground/daedalus/features_database/Aaron_Eckhart/Aaron_Eckhart_0001_0000.jpg"),
    feature_extractor=load_or_create_feature_extractor()
)

NameError: name 'pathlib' is not defined