# Bag of Colors

This is my personal attempt at making a bag of colors implementation. It was made into an interactive notebook, so that it ends up well documented and easy to pick up by someone else. 

For more details on this algorithm, please see the original paper:

> Christian Wenger, Matthijs Douze, Hervé Jégou, "Bag-of-colors for improved image search". Online: <https://dl.acm.org/citation.cfm?id=2072298.2072034>


In [None]:
import os
from os import path
import random
import numpy as np
from PIL import Image, ImageCms, ImageFile
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import faiss
import h5py as h5

In [None]:
%matplotlib inline
ImageFile.LOAD_TRUNCATED_IMAGES = True

#### Constants

In my experiments, I have used the data set from the [ImageCLEF](http://imageclef.org) 2018 caption challenge. Any other sufficiently large image data set will work as well. Nevertheless, checking other implementation details is recommended: the image chunking block size was adjusted to work for smaller images.

In [None]:
N_BLOCKS = 256
BLOCK_SIZE = 10 # original is 16

# update constants to point to a directory of images
DATA_DIR = "training_data"
TEST_DATA_DIR = "testing_data"
# we'll pick a small sample of the data set for experimentation purposes 
N_DATA = 25
SAMPLE_FILES = [path.join(DATA_DIR, fname) for (i, fname) in zip(range(N_DATA), os.listdir(DATA_DIR))]
# all files!
ALL_FILES = [path.join(DATA_DIR, fname) for (i, fname) in enumerate(os.listdir(DATA_DIR))]
ALL_FILES.sort()
assert len(ALL_FILES) > 10000
TEST_FILES = [path.join(TEST_DATA_DIR, fname) for (i, fname) in enumerate(os.listdir(TEST_DATA_DIR))]
TEST_FILES.sort()

Now, let's pick up one image and show how it should be adapted for the algorithm.

In [None]:
sample_image = Image.open(SAMPLE_FILES[4])
sample_image = sample_image.resize([160, 160])
_ = plt.imshow(sample_image)

Here's what the article says about BoCs:

1. Resize each image to 256×256 pixels, convert it to CIE-Lab and split it in blocks of 16×16 pixels (i.e., 256 blocks in total).

2. For each block, find the most occurring color. Ties are randomly resolved. If this color corresponds to less than 5 occurrences (out of 256), select an arbitrary color from the block.

3. At this point, we have extracted 256 Lab colors per image. The set of 256×$N$ colors from all images is clustered using a k-means algorithm, producing a $k_c$ Lab colors palette.

My data set is already converted to a smaller dimension, so I'll be using blocks of 10x10 on 160x160 images instead (still 256 blocks in total).

In [None]:
# create color profiles for RGB <-> LAB conversions
srgb_profile = ImageCms.createProfile("sRGB")
lab_profile  = ImageCms.createProfile("LAB")

rgb2lab_transform = ImageCms.buildTransformFromOpenProfiles(srgb_profile, lab_profile, "RGB", "LAB")
lab2rgb_transform = ImageCms.buildTransformFromOpenProfiles(lab_profile, srgb_profile, "LAB", "RGB")

In [None]:
def convert_to_cielab(img: Image) -> Image:
    # first make sure it's RGB
    if img.mode != "RGB":
        img = img.convert("RGB")

    # then apply transformation
    return ImageCms.applyTransform(img, rgb2lab_transform)

def convert_to_rgb(img: Image) -> Image:
    return ImageCms.applyTransform(img, lab2rgb_transform)

In [None]:
sample_image_cie = convert_to_cielab(sample_image)

Let's turn color block extraction into a function.


In [None]:
def extract_dominant_colors(image: np.ndarray) -> np.ndarray:
    """
    Args: 
      image: np.array [W, H, C = 3] dtype=uint8
    Returns: np.array [256, 3] dtype=uint8
    """
    assert len(image.shape) == 3
    (w, h, c) = image.shape
    assert c == 3
    
    def dominant_color(block, occurrence_threshold=4):
        """
        Args:
        block: np.array [W, H, 3]
        occurrence_threshold: int if most occurring color is less than this,
        pick a random color from the block instead. Using 4 instead of 5
        because the blocks are also a bit smaller
        """
        block = np.reshape(block, [-1, 3])
        hist = {}
        
        for color in block:
            [c,i,e] = color
            key = (c, i, e)
            if key in hist:
                hist[key] += 1
            else:
                hist[key] = 1
        
        (color, count) = max(hist.items(), key=lambda e:e[1])
        if count < occurrence_threshold:
            # not significant enough, choose a random color
            return list(random.choice(block))
        return list(color)
    
    colors = np.zeros([N_BLOCKS, 3], dtype=np.uint8)
    k = 0
    for i in range(0, w, BLOCK_SIZE):
        for j in range(0, h, BLOCK_SIZE):
            block = image[i: i + BLOCK_SIZE, j: j + BLOCK_SIZE]
            dcolor = dominant_color(block)
            colors[k] = dcolor
            k += 1
    return colors

Let's see what colors we get with the sample image.

In [None]:
sample_colors = extract_dominant_colors(np.array(sample_image_cie))
s = sample_colors
#s = np.sort(sample_colors, axis=0)
#s = np.unique(sample_colors, axis=0)
s = np.reshape(s, [16, 16, 3])
sample_colors_img = Image.fromarray(s, 'LAB')
sample_colors_img = convert_to_rgb(sample_colors_img)
plt.figure(figsize=(3, 3))
plt.imshow(sample_image)
plt.figure(figsize=(3, 3))
_ = plt.imshow(sample_colors_img)

This looks ok on this end!

### Visual color codebook generation

Now that we can fetch the dominant colors of each image, let's produce a color vocabulary (codebook) with k-means clustering. I'll be using Faiss for this, by accumulating all colors into a 2-dimensional array. Let's experiment with multiple values of $k$.

In [None]:
def collect_dominant_colors(files: list) -> np.ndarray:
    """Collect the dominant colors of the set into a single ndarray.
    Args:
      files: list of image file names
    Returns:
      np.ndarray [N * 256, 3] dtype=f32
    """
    all_colors = np.zeros([len(files) * 256, 3], dtype=np.float32)
    for i, file in enumerate(files):
        img = Image.open(file).resize([160, 160])
        img = np.array(convert_to_cielab(img))
        colors = extract_dominant_colors(img)
        all_colors[i * 256: (i + 1) * 256] = colors.astype(np.float32)
    return all_colors

In [None]:
def generate_codebook(colors, k, niter=25, gpu_res=None, gpu_device=None) -> (np.ndarray, faiss.Index):
    """
    Args:
      colors : np.ndarray [N, 3] of colors
      k : the size of the codebook
      niter : number of k-means clustering iterations
      gpu_res : faiss.GpuResources or None, required for a GPU backed index
      gpu_device : int or None, whether to make a GPU backed index
    Returns: tuple (centroids, index)
      centroids : np.array [k, 3]
      index : faiss.Index trained with the codebook (L2 metric)
    """
    # we'll use the Clustering API so that we can choose
    # the clustering index
    cp = faiss.ClusteringParameters()
    cp.niter = niter
    cp.verbose = False
    cp.spherical = False
    clus = faiss.Clustering(3, k, cp)
    index = faiss.IndexFlatL2(3)
    if gpu_res is not None and gpu_device is not None:
        index = faiss.index_cpu_to_gpu(gpu_res, gpu_device, index)

    clus.train(colors, index)
    obj = faiss.vector_float_to_array(clus.obj)
    loss = obj[-1]
    print("Finished training codebook of size {}. Loss: {}".format(k, loss))
    centroids = faiss.vector_float_to_array(clus.centroids).reshape([k, 3])
    return centroids, index

In [None]:
x_colors = collect_dominant_colors(SAMPLE_FILES)

In [None]:
kmeans_16 = generate_codebook(x_colors, 16, niter=50)
kmeans_32 = generate_codebook(x_colors, 32, niter=50)
kmeans_64 = generate_codebook(x_colors, 64, niter=50)
kmeans_128 = generate_codebook(x_colors, 128, niter=50)
kmeans_256 = generate_codebook(x_colors, 256, niter=50)

We now have the codebook in the index. We can use it directly to build our bags of colors. We can also see how the codebook looks like.

In [None]:
def view_codebook(centroids, figsize=(6,2)):
    # sort the colors, so that they look pretty
    carr = centroids.tolist()
    carr = sorted(carr, key=lambda v: v[2])
    carr = np.reshape(np.array(carr, dtype=np.float32), [-1, 16, 3])
    # convert to image
    codebook_img = Image.fromarray(carr, 'LAB')
    codebook_img = convert_to_rgb(codebook_img)
    plt.figure(figsize=figsize)
    _ = plt.imshow(codebook_img)

In [None]:
view_codebook(kmeans_16[0])
view_codebook(kmeans_32[0])
view_codebook(kmeans_64[0])
view_codebook(kmeans_128[0])
view_codebook(kmeans_256[0], figsize=(12, 2))


In [None]:
kmeans_16 = None
kmean_32 = None
kmeans_64 = None
kmeans_128 = None
kmeans_256 = None

Ok, time to use more of the data set! Since our data set is too big for k-means clustering, we'll pick a random portion to serve as a template set.

In [None]:
N_TEMPLATE = 25000 # twenty-five thousand
RANDOM_SEED = 386104

print("Using {} template samples".format(N_TEMPLATE))
random.seed(RANDOM_SEED)
TEMPLATE_FILES = random.sample(ALL_FILES, k=N_TEMPLATE)

In [None]:
template_colors = collect_dominant_colors(TEMPLATE_FILES)

Power up! We'll use the GPU for building and retrieving from these codebooks. 

In [None]:
res = faiss.StandardGpuResources()

In [None]:
kmeans_64 = generate_codebook(template_colors, 64, niter=100, gpu_res=res, gpu_device=0)
kmeans_128 = generate_codebook(template_colors, 128, niter=100, gpu_res=res, gpu_device=0)
kmeans_256 = generate_codebook(template_colors, 256, niter=100, gpu_res=res, gpu_device=0)
kmeans_512 = generate_codebook(template_colors, 512, niter=100, gpu_res=res, gpu_device=0)

In [None]:
view_codebook(kmeans_64[0])
view_codebook(kmeans_128[0])
view_codebook(kmeans_256[0])
view_codebook(kmeans_512[0])

### Bag of Color generation

For each color in an image, look for the nearest color in the codebook, and increment that position in the bag.

In [None]:
def generate_bags(codebook: faiss.Idex, files: iterable) -> np.ndarray:
    """Generate the bags of colors.
    Args:
      codebook: faiss.Index containing the codebook
      files: list of file names (length N)
    Returns:
      np.array [N, k]
    """
    assert codebook.ntotal > 0
    all_bags = np.zeros([len(files), codebook.ntotal], dtype=np.float32)
    for i, file in enumerate(files):
        img = Image.open(file).resize([160, 160])
        img = np.array(convert_to_cielab(img), dtype=np.float32).reshape([-1, 3])
        # batch search for the code of pixels
        codes = codebook.assign(img, 1)
        for j in range(len(img)):
            all_bags[i, codes[j]] += 1
    return all_bags

In [None]:
sample_bags = generate_bags(kmeans_256[1], SAMPLE_FILES)

Let's see how a bag looks like.

In [None]:
def view_bag(x: np.ndarray):
    # the histogram of the data
    plt.figure(figsize=(8, 2))
    plt.bar(range(len(x)), x, facecolor='blue', alpha=0.75)

In [None]:
view_bag(sample_bags[0])
view_bag(sample_bags[12])
view_bag(sample_bags[14])

Bags are often sparse, with some colors of high frequency. These bags can be normalized to attenuate this effect.

## Bag normalization techniques

In [None]:
def max_normalize(bocs: np.ndarray) -> np.ndarray:
    """Linearly normalize the bags so that the maximum of each bag is 1."""
    return bocs / np.max(bocs, axis=1, keepdims=True)

In [None]:
def tf_idf_normalize(bocs: np.ndarray) -> np.ndarray:
    """tf-idf normalization."""
    tf = bocs / np.sum(1e-10 + bocs, axis=1, keepdims=True)
    dcount = np.sum(bocs.astype(np.bool).astype(np.float), axis=0)
    idf = np.log(len(bocs) / dcount)
    return tf * idf

In [None]:
def power_normalize(bocs: np.ndarray) -> np.ndarray:
    """Power-law and L1 vector normalization."""
    # element-wise square root, then L1 normalization
    o = np.sqrt(bocs)
    o /= np.sum(o, axis=1, keepdims=True)
    return o

### Examples

In [None]:
# max normalization
nbags = max_normalize(sample_bags)
view_bag(nbags[0])
view_bag(nbags[1])
view_bag(nbags[2])

In [None]:
# td-idf normalization
nbags = tf_idf_normalize(sample_bags)
view_bag(nbags[0])
view_bag(nbags[1])
view_bag(nbags[2])

In [None]:
# power-law + L1 normalization
nbags = power_normalize(sample_bags)
view_bag(nbags[0])
view_bag(nbags[1])
view_bag(nbags[2])

### Save all bags in the training set

This code will save the outcome in an hdf5 file.

In [None]:
OUTPUT_FILE = "bocs-256-train.h5"
z = generate_bags(kmeans_256[1], ALL_FILES)
# no normalization, this can be done later
k = z.shape[1]
n_samples = len(ALL_FILES)
with h5.File(OUTPUT_FILE, mode='w') as f:
    f.create_dataset('data', data=z, shape=[n_samples, k], dtype='float32')
    h5ids = f.create_dataset('id', shape=[n_samples], dtype=h5.special_dtype(vlen=str))
    for (i, bag) in enumerate(z):
        h5set[i] = bag
        h5ids[i] = path.basename(ALL_FILES[i])[:-4]
print("Bags of colors (palette of size {}) was saved in {}".format(k, OUTPUT_FILE))

### Save all bags in the testing set

In [None]:
OUTPUT_FILE = "bocs-256-test.h5"
z = generate_bags(kmeans_256[1], TEST_FILES)
k = z.shape[1]
n_samples = len(TEST_FILES)
with h5.File(OUTPUT_FILE, mode='w') as f:
    f.create_dataset('data', data=z, shape=[n_samples, k], dtype='float32')
    h5ids = f.create_dataset('id', shape=[n_samples], dtype=h5.special_dtype(vlen=str))
    for i, filename in enumerate(TEST_FILES):
        h5ids[i] = path.basename(TEST_FILES[i])[:-4]
print("Bags of colors (palette of size {}) was saved in {}".format(k, OUTPUT_FILE))

That's all, folks!