# 2. VGG16 Image Embeddings

_created by Austin Poor_

In this notebook, I use a pretrained VGG-16 model to create image embeddings for each of the film stills.

The notebook [1.format-images.ipynb](./1.format-images.ipynb), has placed uniform images in an S3 bucket for this notebook to pull down, process, and then upload the results (as individual parquet files) to another S3 bucket.

In [None]:
!pip install -qr requirements.txt

In [1]:
import datetime as dt
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor

import boto3
import numpy as np
from PIL import Image
from tqdm import tqdm

import pyarrow as pa
import pyarrow.parquet as pq

import tensorflow as tf
from tensorflow.keras.applications.vgg16 import preprocess_input

In [3]:
tmp_dir = Path("./tmp")
tmp_dir.mkdir(exist_ok=True)
[f.unlink() for f in tmp_dir.glob("*") if f.is_file()];

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,

In [4]:
SOURCE_BUCKET = "apoor-clean-movie-stills"
DEST_BUCKET = "apoor-vgg-movie-vecs"

s3 = boto3.client("s3")

In [5]:
batch_size = 1_000 # Max of 1,000 per S3

In [6]:
input_shape = (500, 500, 3)

vgg16 = tf.keras.applications.VGG16(
    include_top=False,
    weights='imagenet',
    input_shape=input_shape
)
vgg16.summary()

Model: "vgg16"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 300, 300, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 300, 300, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 300, 300, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 150, 150, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 150, 150, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 150, 150, 128)     147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 75, 75, 128)       0     

In [7]:
def iter_keys(bucket: str, batch_size: int = 1_000) -> [str]:
    """Iterates through keys in an S3 `bucket`
    in groups of `batch_size`

    :param bucket: Name of the S3 bucket to search
    :param batch_size: The max number of keys to return at a time.
        Note: S3 will return a maximum of 1,000 keys at a time.
    :yields: A list of keys to files in the S3 bucket, `bucket`
    """
    last_key = ""
    while True:
        resp = s3.list_objects_v2(
            Bucket=SOURCE_BUCKET,
            MaxKeys=batch_size,
            StartAfter=last_key
        )
        keys = [c["Key"] for c in resp["Contents"]]
        yield keys
        if not resp["IsTruncated"]: break
        else: last_key = keys[-1]
            
            
def download_object(bucket: str, key: str, tmp_dir: Path) -> Path:
    """Downloads a single file from an S3 bucket
    and stores it in a temporary directory

    :param bucket: The bucket to search in S3
    :param key: The object's key in `bucket`
    :param tmp_dir: The temporary directory to save the
        downloaded file.
    :returns: A path to the downloaded file in `tmp_dir`
    """
    res = s3.get_object(Bucket=bucket, Key=key)
    filename = tmp_dir / key
    with open(filename, "wb") as f:
        f.write(res["Body"].read())
    return filename
    
    
def batch_download(bucket: str, keys: [str], tmp_dir: Path) -> [Path]:
    """Downloads a batch of objects from an S3 bucket.

    Uses the function `download_image` in a multi-threaded
    map -- using `concurrent.futures.ThreadPoolExecutor`.

    :param bucket: S3 bucket where objects are stored
    :param keys: List of object keys stored in `bucket`
    :param tmp_dir: Local directory to save downloaded images
    :returns: List of paths for locally stored objects
        downloaded from S3.
    """
    def curried_download(key: str): 
        return download_object(bucket,key,tmp_dir)
    with ThreadPoolExecutor() as P:
        return list(P.map(curried_download,keys))

    
def clean_tmp_files(paths: [Path]):
    """Unlinks a list of files using `pathlib.Path.unlink`.

    :param paths: List of paths to files that should be deleted.
    """
    [Path(p).unlink() for p in paths]
    
    
def load_image(path: Path) -> np.ndarray:
    """Loads a JPEG image at `path` as an ndarray.

    Uses `tf.io.read_file` and `tf.image.decode_jpeg`

    :param path: Path to an image file
    :returns: ndarray representation of the image
        with dimensions (img_witdh,img_height,img_color_channels)
    """
    return tf.image.decode_jpeg(tf.io.read_file(str(path)))

    
def load_images(paths: [Path]) -> np.ndarray:
    """Loads a group of images as numpy arrays,
    and concatenates them together.

    :param paths: List of paths to image files being loaded
    :returns: A single numpy ndarray with dimensions
        (n_image,img_witdh,img_height,img_color_channels)
    """
    return np.concatenate([
        np.expand_dims(load_image(p),0)
        for p in paths
    ],0)


def format_input(data: np.ndarray) -> np.ndarray:
    """Formats image for processing using `tf.keras`'s 
    supplied function for preprocessing VGG16 inputs.
    
    :param data: ndarray of images with shape: (n,w,h,c)
    :returns: ndarray with images rescaled / typed to
        match what VGG16 expects.
    """
    return preprocess_input(data)


def vgg_process(data: np.ndarray):
    """Creates VGG-16 embeddings from image data.

    :param data: ndarray of image data
    :returns: ndarray of image embeddings
    """
    return vgg16.predict(data)


def format_output(data: np.ndarray) -> np.ndarray:
    """Reformats image embeddings for parquet
    storrage.

    :param data: ndarray of image embeddings (4-dimensional)
    :returns: (Mostly-)flattened ndarray with dimensions:
        (n_images, flattened_vgg_output)
    """
    batch_size, *_ = data.shape
    return data.reshape((batch_size, -1))


def make_arrow_table(row: np.ndarray, filename: Path) -> pa.Table:
    """Converts a flat numpy array into an arrow Table,
    where the key is `filename`'s stem and the value is
    the row of data.

    :param row: A flat numpy array
    :param filename: A path where the filename's stem will become
        the key in the arrow table
    :returns: An arrow table with `filename`'s stem as a key
        and `row` as the data
    """
    return pa.table({Path(filename).stem: row})


def write_parquet(row: np.ndarray, filename: Path) -> Path:
    """Writes the data in `row` as an arrow table,
    to a parquet file.

    Saves the file to `filename` where the extension
    is changed to `.parquet`.

    In the arrow table, `row`'s key is the stem of
    `filename`.

    For example, if `filename = "dir/test.jpg"` then
    the result will be a parquet file: `dir/test.parquet`
    which stores an arrow table with the key `test`.

    :param row: Data to be stored in a parquet file
    :param filename: Source data's filename
    :returns: Path to the newly created parquet file
    """
    table = make_arrow_table(row, filename)
    new_filename = filename.with_suffix(".parquet")
    pq.write_table(table, new_filename)
    return new_filename


def write_parquets(data: np.ndarray, filenames: [Path]) -> [Path]:
    """Writes the rows in `data` to parquet files
    based of the paths in the list, `filenames`.

    The rows in `data` should correspond to the paths
    in `filenames`.

    See `write_parquet` for more details.

    :param data: a 2-D ndarray with data to be stored
        as parquet files.
    :param filenames: List of source `Paths` corresponding
        to the rows in `data`.
    :returns: A list of `Paths` to the newly created parquet files
    """
    return [write_parquet(r, f) for r, f in zip(data, filenames)]

def upload_parquet_file(bucket: str, filename: Path):
    """Upload a parquet file to S3.

    The key used in S3 will be the name and extension
    from filename (aka no directory names included).

    For example, if `filename = "path/to/file.ext"`, then
    the object's key in S3 will be `file.ext`.

    :param bucket: S3 bucket to store the file
    :param filename: File to be stored in S3
    """
    key = filename.name
    s3.upload_file(str(filename), bucket, key)


def upload_parquet_files(bucket: str, filenames: [Path]):
    """Upload multiple parquet files to S3.

    Uses the function `upload_parquet_file` in a multi-threaded 
    map with `concurrent.futures.ThreadPoolExecutor`.    

    :param bucket: S3 bucket to upload files
    :param filenames: List of `Path`s to parquets being uploaded
    """
    def curried_upload(filename):
        upload_parquet_file(bucket, filename)

    with ThreadPoolExecutor() as P:
        list(P.map(curried_upload, filenames))

In [None]:
start_time = dt.datetime.now()
print(f"START TIME: {start_time}")
print(f"Loading batches of {batch_size:,d} images.\n")

bar = tqdm(
    enumerate(iter_keys(SOURCE_BUCKET, batch_size)),
    desc="Starting...",
    unit="batches",
    ncols=80
)

for i, image_keys in bar:
    loop_start = dt.datetime.now()
    bar.set_description("Downloading images")
    image_paths = batch_download(SOURCE_BUCKET, image_keys, tmp_dir)

    bar.set_description("Loading images")
    input_data = load_images(image_paths)
    input_data = format_input(input_data)

    bar.set_description("Removing tmp files")
    clean_tmp_files(image_paths)

    bar.set_description("Embedding with VGG16")
    encoding = vgg_process(input_data)
    output_data = format_output(encoding)
    
    bar.set_description("Saving to parquet")
    parquet_paths = write_parquets(output_data, image_paths)

    print("> Uploading encodings...")
    bar.set_description("Uploading embeddings")
    upload_parquet_files(DEST_BUCKET, parquet_paths)

    bar.set_description("Removing tmp files")
    clean_tmp_files(parquet_paths)
    
    bar.write(f"[{i:4,d}] COMPLETED IN {dt.datetime.now() - loop_start}")

print(f"\nFULL TIME TO COMPLETE: {dt.datetime.now() - start_time}")