In [None]:
from pathlib import Path
import pickle
from itertools import zip_longest
from fastai.vision.all import *
from utils import label_func, slice_model, get_embedding_from_paths

In [None]:
learn = load_learner("dogs/train/exported_resnext50_32x4d.pickle")

In [None]:
embedder = slice_model(learn.model, to_layer=-1)

## Calculate embeddings for test images

In [None]:
test_path_base = "dogs/recognition/test/"
test_paths = get_image_files(test_path_base)

In [None]:
def grouper(iterable, n, fillvalue=None):
    "Collect data into non-overlapping fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

In [None]:
# calculate embeddings in batches, for faster inference
embeddings = []
bs = 64
for batch in grouper(test_paths, bs):
    # remove filler values (may appear in last batch)
    batch = [x for x in batch if x is not None]
    es = get_embedding_from_paths(learn, embedder, batch)
    embeddings.extend(es)
embeddings = np.array(embeddings)

In [None]:
print(f"embeddings.shape: {embeddings.shape}")
print(embeddings)

In [None]:
pickle.dump(embeddings, open("test_embeddings.pickle", "wb"))

In [None]:
embedding_dict = {}
for test_path, embedding in zip(test_paths, embeddings):
    embedding_dict[test_path] = embedding

In [None]:
pickle.dump(embedding_dict, open("test_embedding_dict.pickle", "wb"))

## Enroll new images

In [None]:
def calculate_embeddings_for_dataset(learn, paths, bs=64):
    embedder = slice_model(learn.model, to_layer=-1)
    embeddings = []
    for batch in grouper(paths, bs):
        # remove filler values (may appear in last batch)
        batch = [x for x in batch if x is not None]
        es = get_embedding_from_paths(learn, embedder, batch)
        embeddings.extend(es)
    embeddings = np.array(embeddings)
    return embeddings

In [None]:
enroll_paths = get_image_files("dogs/recognition/enroll/")

In [None]:
enroll_embeddings = calculate_embeddings_for_dataset(learn, enroll_paths)

In [None]:
pickle.dump(enroll_embeddings, open("enroll_embeddings.pickle", "wb"))

In [None]:
def make_embedding_dict(paths, embeddings):
    return dict(zip(paths, embeddings))

In [None]:
enroll_embedding_dict = make_embedding_dict(enroll_paths, enroll_embeddings)

In [None]:
pickle.dump(enroll_embedding_dict, open("enroll_embedding_dict.pickle", "wb"))

In [None]:
from sklearn.preprocessing import Normalizer, LabelEncoder
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier

In [None]:
test_embeddings = embeddings

## Train classifier for embeddings
Reference: https://machinelearningmastery.com/how-to-develop-a-face-recognition-system-using-facenet-in-keras-and-an-svm-classifier/

In [None]:
trainX, trainy = enroll_embeddings, enroll_paths
testX, testy = test_embeddings, test_paths
trainy = [label_func(Path(p)) for p in trainy]
testy = [label_func(Path(p)) for p in testy]

In [None]:
in_encoder = Normalizer(norm='l2')
trainX = in_encoder.transform(trainX)
testX = in_encoder.transform(testX)

In [None]:
out_encoder = LabelEncoder()
out_encoder.fit(trainy)

trainy = out_encoder.transform(trainy)
testy = out_encoder.transform(testy)

In [None]:
def evaluate(model, trainX, trainy, testX, testy):
    # predict
    yhat_train = model.predict(trainX)
    yhat_test = model.predict(testX)
    # score
    score_train = accuracy_score(trainy, yhat_train)
    score_test = accuracy_score(testy, yhat_test)
    # summarize
    print('Accuracy: train=%.3f, test=%.3f' % (score_train*100, score_test*100))

In [None]:
# Linear SVM
model = SVC(kernel='linear', probability=True)
model.fit(trainX, trainy)
evaluate(model, trainX, trainy, testX, testy)

In [None]:
# K-nearest neighbors
# model = KNeighborsClassifier(n_neighbors=5)
# model.fit(trainX, trainy)
# evaluate(model, trainX, trainy, testX, testy)

## Unknown class

In [None]:
unknown_paths = get_image_files("dogs/recognition/unknown/enroll")
unknown_embeddings = calculate_embeddings_for_dataset(learn, unknown_paths)

In [None]:
unknownX, unknowny = unknown_embeddings, unknown_paths
unknowny = [Path(p).parent.name for p in unknowny]

unknownX = in_encoder.transform(unknownX)
# fakey = np.ones(len(unknownX)) * testy.max() + 100 # make up fake class index
# evaluate(model, trainX, trainy, unknownX, fakey)

In [None]:
prob_unknown = model.predict_proba(unknownX)
max_prob = prob_unknown.max(axis=1)
correct_unknown = len(unknowny) - len(max_prob[max_prob > 0.6])
print(f"unknown correct: {correct_unknown} / {len(unknowny)} ( {100*correct_unknown/len(unknowny)} %) ")

In [None]:
# FIXME this is actually testing the number of test items whose max prob is higher than a threshold
# we're not seeing if it's actually correct
prob_test = model.predict_proba(testX)
max_prob_test = prob_test.max(axis=1)
correct_test = len(max_prob_test[max_prob_test > 0.6])
print(f"test correct: {correct_test} / {len(testy)} ( {100*correct_test/len(testy)} %) ")

## Approximate nearest neighbor search

In [None]:
import faiss

In [None]:
dimensions = enroll_embeddings.shape[1]
metric = "cosine"
if metric == "euclidean":
    index = faiss.IndexFlatL2(dimensions)
elif metric == "cosine":
    index = faiss.IndexFlatIP(dimensions)
    faiss.normalize_L2(embeddings)
index = faiss.IndexIDMap(index)

In [None]:
# index.add(enroll_embeddings)
enroll_ids = np.array([i for i,e in enumerate(enroll_embeddings)])
index.add_with_ids(enroll_embeddings, enroll_ids)

In [None]:
def query_show(index, i, test_paths, test_embeddings, k=5):
    q = np.expand_dims(test_embeddings[i], axis=0)
    distances, neighbors = index.search(q, k)
    print(distances)
    print(neighbors)
    print(f"query class: {test_paths[i].parent.name}")
    img = PILImage.create(test_paths[i])
    show_image(img)
    for p in enroll_paths[neighbors[0]]:
        print(p)
        img = PILImage.create(p)
        show_image(img)

In [None]:
def search(index, embedding, k=5):
    q = np.expand_dims(embedding, axis=0)
    distances, neighbors = index.search(q, k)
#     query_class = test_paths[i].parent.name
    pred_paths = enroll_paths[neighbors[0]]
    pred_classes = [p.parent.name for p in pred_paths]
    max_class = max(set(pred_classes), key=pred_classes.count)
    count_max_class = pred_classes.count(max_class)
    if count_max_class > k // 2:
        # we have a winner
        return count_max_class
    else:
        # not confident enough
        return -1
    return acc

In [None]:
def query_eval(index, i, test_paths, test_embeddings, k=5):
    q = np.expand_dims(test_embeddings[i], axis=0)
    distances, neighbors = index.search(q, k)
    query_class = test_paths[i].parent.name
    pred_paths = enroll_paths[neighbors[0]]
#     print(pred_paths)
    pred_classes = [p.parent.name for p in pred_paths]
    max_class = max(set(pred_classes), key=pred_classes.count)
    count_max_class = pred_classes.count(max_class)
    if count_max_class > k // 2:
        # we have a winner
        acc = 1.0 if query_class == max_class else 0.0
    else:
        # not confident enough
        acc = 0.0
#     print(f"max_class: {max_class}  {foo}")
#     print(pred_classes)
#     acc = 1.0 if query_class == pred_class else 0.0
    return acc

In [None]:
acc = 0.0
n = len(test_paths)
for i in range(len(test_paths[:n])):
    acc += query_eval(index, i, test_paths, test_embeddings)
acc /= n
print(f"final acc: {acc}")

In [None]:
# acc = 0.0
# n = len(test_paths)
# for i in range(len(test_paths[:n])):
#     query_class_str = query_class = test_paths[i].parent.name
#     pred_class_int = search(index, test_embeddings[i])
#     NOT_FINISHED__MUST_CONVERT_CLASS_ID_TO_CLASS_STRING
# acc /= n
# print(f"final acc: {acc}")