In [None]:
import warnings; warnings.filterwarnings('ignore')
from keras.preprocessing.image import load_img, save_img, img_to_array, array_to_img
from keras.applications import Xception, VGG19, InceptionV3, imagenet_utils
from collections import defaultdict
import matplotlib.pyplot as plt
from annoy import AnnoyIndex
import keras.backend as K
import numpy as np
import glob, json, shears, random, six, hashlib, sys, functools

# Build Model

In [None]:
SAMPLE_LAYER_INDEX = -2 # default index into XCeption that will determine the image vector shape

# VGG16, VGG19, and ResNet take 224×224 images; InceptionV3 and Xception take 299×299 inputs
def resize_array(arr, shape=(299,299)):
  '''Resize an array to a new shape'''
  im = array_to_img(arr)
  resized = im.resize(shape)
  return img_to_array(resized)


def fit_transform(obj, sample_layer_index=SAMPLE_LAYER_INDEX):
  # input shape must be n_images, h, w, colors: https://keras.io/preprocessing/image/
  if hasattr(obj, 'shape') and len(obj.shape) == 3:
    # this is an array of a single image
    arr = resize_array(obj)
    arr = np.expand_dims(arr, axis=0)
  elif hasattr(obj, 'shape') and len(obj.shape) == 4:
    # this is an array of images
    arr = np.array([resize_array(i) for i in obj])
  else:
    # this is assumed to be a single image
    arr = img_to_array(img.resize((299,299))) # see SO 47697622
  # only Xception requires preprocessing
  arr = imagenet_utils.preprocess_input(arr)
  # extract the ith layer from the model (here, the -1th layer, or final layer)
  out = K.function([model.input], [model.layers[sample_layer_index].output])([arr])
  # parse out the k dim vector (k is determined by the size of the layer that's sampled from)
  vec = out[0]
  # return the vector to the calling scope
  return vec


model = Xception(weights='imagenet')
setattr(model, 'fit_transform', fit_transform)

# Image Cleaning

In [None]:
@functools.lru_cache(maxsize=1024)
def load_image(path, color_mode='rgb'):
  '''Given the path to an image return a keras image object'''
  im = load_img(path, color_mode=color_mode)
  if color_mode == 'rgb':
    return im
  elif color_mode == 'grayscale':
    im = img_to_array(im).squeeze()
    _im = np.zeros((im.shape[0], im.shape[1], 3))
    # replicate identical color information across all channels
    _im[:,:,0] = im
    _im[:,:,1] = im
    _im[:,:,2] = im
    return _im
  else:
    raise Exception('Requested color_mode is not supported', color_mode)

def clean_image(im, remove_background=False):
  '''Given a keras image object return a keras image object that's cleaned'''
  im = img_to_array(im)
  if remove_background:
    im = shears.remove_dominant_colors(im, mask_size=0.05, n_colors_to_remove=2)
  return array_to_img(im)

# Model I/O Helpers

In [None]:
def uuid(*args, max_size=sys.maxsize, right_pad=True):
  '''Helper method that returns a random integer'''
  if args and isinstance(args[0], six.string_types):
    try:
      i = int(hashlib.sha1(args[0]).hexdigest(), 16)
    except:
      i = int(hashlib.sha1(args[0].encode('utf8')).hexdigest(), 16)
  else:
    i = random.randint(0, max_size)
  if right_pad:
    i = str(i)
    while len(i) < len(str(max_size)):
      i = i + '0'
  return int(i)

@functools.lru_cache(maxsize=1024)
def get_image_windows(img_path, color_mode='rgb'):
  '''Given the path to an image, return an array of the image windows within that image'''
  img = clean_image(load_image(img_path, color_mode=color_mode))
  arr = img_to_array(img)/255
  return get_windows(arr)

def get_windows(im, step=20, win=100, plot=False):
  '''Given a np array `im` with at least two dimensions, return windows of len `win`'''
  windows = []
  dx = 0
  dy = 0
  for _ in range(im.shape[0]//step): # x axis pass
    for _ in range(im.shape[1]//step): # y axis pass
      dim = im[dy:dy+win, dx:dx+win]
      if dim.shape == (win, win, 3):
        windows.append(dim)
      if plot:
        plt.imshow(dim)
        plt.title('{}-{}'.format(dx, dy))
        plt.show()
      dy += step
    dx += step
  return np.array(windows)

def get_image_vectors(img_path):
  '''Given the path to an image, return an ndarray with shape (n_windows_in_img, 1000)'''
  return model.fit_transform(get_image_windows(img_path))

# Investigate feature separation given sample layer index

In [None]:
import scipy
from itertools import combinations
from multiprocessing import Pool

img_paths = glob.glob('utils/voynichese/images/*.jpg')
im = img_paths[0]
n = 40
rows = int(n/10)
columns = 10
size_scalar = 4

def process_layer_index(layer_index):
  print(' * processing layer', layer_index)
  # get this image'svectors
  vectors = model.fit_transform(windows, layer_index)

  fig, axes = plt.subplots(rows, columns, figsize=(columns*size_scalar,rows*size_scalar))
  for odx, o in enumerate(vectors[:n]):
    o = o.flatten().squeeze()
    column = odx%columns
    row = odx//columns
    axes[row][column].scatter(range(len(o)), o, s=0.01)
  plt.savefig('layer-index-{}-scatter.png'.format(layer_index))
  plt.clf()
  
  # get the histogram of vector similarities
  distances = []
  for i, j in combinations(vectors, 2):
    i = i.flatten().squeeze()
    j = j.flatten().squeeze()
    d = scipy.spatial.distance.cosine(i, j)
    distances.append(d)
  plt.hist(distances, bins=50)
  plt.savefig('layer-index-{}-hist.png'.format(layer_index))
  plt.clf()
  
  # find the aggregate distance of all windows
  d = np.sum(distances)
  print(' * aggregate distance', layer_index, d)
  print(' * dims size', layer_index, o.shape)
  return [len(o), d]
  
if False:
  # get this image's windows
  windows = get_image_windows(im)
  distances = {}
  shapes = {}
  for i in range(1, 21, 1):
    #i -= 20
    try:
      shape, distance = process_layer_index(i)
      distances[i] = distance
      shapes[i] = shape
    except:
      print(' * could not process idx', i)

  with open('analysis-results.json', 'w') as out:
    json.dump({
      'distances': distances,
      'shapes': shapes,
    }, out)

# Vectorize Images

In [None]:
import traceback, os

meta_d = {} # d[guid] = {'path': 'collection': 'idx_in_collection'}

'''
img_paths = glob.glob('utils/voynichese/images/*.jpg')
collection = 'voynichese'
'''

img_paths = glob.glob('data/uvm-italian-herbal/images/*.jpg')
collection = 'uvm-italian-herbal'

color_mode = 'grayscale'
out_dir = 'npy'

if not os.path.exists(out_dir):
  os.makedirs(out_dir)

if os.path.exists('meta_d.json'):
  meta_d = {int(k): v for k,v in json.load(open('meta_d.json')).items() }

for idx, i in enumerate(img_paths):
  try:
    guid = uuid(i)
    windows_path = os.path.join(out_dir, '{}-windows'.format(guid))
    vectors_path = os.path.join(out_dir, '{}-vectors'.format(guid))
    if not os.path.exists(windows_path + '.npy') or not os.path.exists(vectors_path + '.npy'):
      print(' * processing image {} of {} in {}'.format(idx+1, len(img_paths), collection))
      windows = get_image_windows(i, color_mode=color_mode)
      vectors = model.fit_transform(windows)
      np.save(windows_path, windows)
      np.save(vectors_path, vectors)
      meta_d[guid] = {'path': i, 'collection': collection, 'idx_in_collection': idx, 'image_type': 'complete'}
  except Exception as exc:
    print(' * could not parse', i, traceback.print_exc())
    
with open('meta_d.json', 'w') as out:
  json.dump(meta_d, out)

# Index Image Vectors

In [None]:
meta_d = {int(k): v for k,v in json.load(open('meta_d.json')).items() }
n_dims = 2048
n_trees = 1024

In [None]:
from copy import deepcopy

insertion_idx = 0

# todo quantize vectors for faster i/o
if True:
  t = AnnoyIndex(n_dims, 'angular')
  for vector_path in glob.glob(os.path.join(out_dir, '*-vectors.npy')):
    window_path = vector_path.replace('-vectors', '-windows')
    vectors = np.load(vector_path)
    windows = np.load(window_path)
    # guid
    guid = int(os.path.basename(vector_path).split('-')[0])
    meta = meta_d[guid]
    # process the collection - each vec in vectors is a window on the given page
    for vec_idx, vec in enumerate(vectors):
      sub_guid = uuid()
      meta_d[sub_guid] = deepcopy(meta)
      meta_d[sub_guid].update({'vec_idx': vec_idx, 'insertion_idx': insertion_idx, 'image_type': 'window'})      
      t.add_item(insertion_idx, vec)
      insertion_idx += 1

  t.build(n_trees)
  t.save('voynich.ann')
  
  with open('meta_d.json', 'w') as out: json.dump(meta_d, out)

# Query Index

In [None]:
def get_knn(guid, display=True, n=10):
  '''Given a GUID, find the insertion_idx for that GUID, use that idx to query the index, then return/display knn'''
  m = meta_d[guid]
  insertion_idx = m.get('insertion_idx', False)
  if not insertion_idx: return False
  knn, distances = u.get_nns_by_item(insertion_idx, n, include_distances=True)
  knn = [insertion_idx_to_guid[insertion_idx] for insertion_idx in knn[1:]] # skip the identity similarity
  distances = distances[1:]
  if display:
    guid = insertion_idx_to_guid[insertion_idx]
    plot_guid(guid)
    for idx, guid in enumerate(knn):
      title = guid_to_title(guid)
      similarity = round(1-distances[idx], 2)
      plot_guid(guid, title='img: {}  sim: {}'.format(title, similarity))
  return knn, distances

def plot_guid(guid, title=None):
  m = meta_d[guid]
  windows = get_image_windows(m['path'])
  window = windows[m['vec_idx']]
  title = title if title else guid_to_title(guid)
  plt.imshow(window)
  plt.title(title)
  plt.show()

def guid_to_title(guid):
  '''Given a GUID return a title for the image'''
  m = meta_d[guid]
  title = os.path.basename(m['path'])
  if m.get('image_type') == 'window':
    title = '{}-{}'.format(title, m['vec_idx'])
  return title
  
u = AnnoyIndex(n_dims, 'angular')
u.load('voynich.ann')
# flip the guid to insertion idx; -1 signifies guid is for full image so has no insertion idx
insertion_idx_to_guid = {meta_d[k].get('insertion_idx', -1): k for k in meta_d}

In [None]:
guids = list([i for i in meta_d.keys() if meta_d[i].get('image_type', None) == 'window'])

In [None]:
get_knn(guids[21])