# Modular flow system

I have decided to adapt the design system from Jina into this repo, at least for prototyping purposes. Their distributed systems approach seems quite good but is too muc complexity for me to add right away. Insetead I'm going to replicate the essential design pattern, that of Flows and Executors.

**Flows** are specific patterns of data manipulation and storage. **Executors** are the operators that transform the data within the flow. 

There are two core flows to any search system: indexing, and querying. The plan here is to make executors that can be composed into flows and then compose the flows into a UI that supports querying and, to some extent, indexing as well.

The core executors for this use case are:
 - Loader
 - Crafter
 - Encoder
 - Indexer
 - Ranker
 - Gateway
 
In this file I try to build these so that the Jupyter notebook itself can be run as a Flow for indexing and then querying. From there it should be easy to abstract the functions and classes and messaging or whatever is necessary for microservices etc.

In [None]:
# move these to main function eventually but for now we're going in notebook order
args = {
    "path": "/home/mage/Pictures/memes/",
    "query": "scary cat",
}

## Loader

The loader takes a directory or list of image files and checks them against database or checkpoint. If there is a saved checkpoint and the files haven't changed, it loads the checkpoint and sends the data directly to Ranker. If not, it sends them to Crafter. Ideally  it could send new images to Crafter and load dictionary of old images at the same time, without re-encoding old images.

The process of indexing could actually happen in the background while querying happens on the old index! This means putting the logic in the Flow rather than the Loader, I suppose.

Maybe build dictionary `{filename_timestamp : vector}` to databse as a simple version control mechanism. Then, if any filenames exist but with a different timestamp, we load those under their own key. And we can throw out any filename_timestamp that doesn't exist, before indexing. 

In [None]:
from pathlib import Path

root = Path(args['path'])

In [None]:
def slugify(filepath):
    return f'{filepath.stem}_{str(filepath.stat().st_mtime).split(".")[0]}'

In [None]:
# filenames = path.iterdir()
def get_image_files(path):
    return [(f, slugify(f)) for f in path.rglob('*') if f.suffix in ['.jpg', '.png', '.jpeg']]

In [None]:
filepaths = get_image_files(root)

In [None]:
len(filepaths)

In [None]:
filepaths[:5]

So we have a list of paths and slugified filenames from the folder. We want to see if there's an archive, so that we don't have to recalculate tensors for images we've seen before. Then we want to pass that directly to the indexer, but send the new images through the crafter and encoder first.



But I need to separate out the logic for the crafter and encoder from the simple loading of archives and pictures. This component should only provide the dictionary of archived CLIP embeddings, the treemap (eventually) and the locations of the new images to review, and let the downstream components deal with them.

In [None]:
import torch
import torchvision

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
def files_archive_loader(filepaths, root, device):
    dbpath = root/'memery.pt'
#     dbpath_backup = root/'memery.pt'
    db = db_loader(dbpath)
    
    current_slugs = [slug for path, slug in filepaths]    
    archive_db = {k:db[k] for k in db if k in current_slugs}   
    archive_slugs = [v['slug'] for v in archive_db.values()]
    new_files = [(str(path), slug) for path, slug in filepaths if slug not in archive_slugs]
    
    return(archive_db, new_files)

In [None]:
def db_loader(dbpath):
    # check for savefile or backup and extract
    if dbpath.exists():
        db = torch.load(dbpath)
#     elif dbpath_backup.exists():
#         db = torch.load(dbpath_backup)
    else:
        db = {}
    return(db)

In [None]:
def treemap_loader(treepath):
    treemap = AnnoyIndex(512, 'angular')

    if treepath.exists():
        treemap.load(str(treepath))
    else:
        treemap = None
    return(treemap)

In [None]:
archive_db, new_files = files_archive_loader(get_image_files(Path(args['path'])), root, device)

In [None]:
len(archive_db)

In [None]:
len(new_files)

In [None]:

len(new_files),len(archive_db)

## Crafter

Takes a list of image filenames and transforms them to batches of the correct dimensions for CLIP. Need to figure out a way around torchvision's loader  idiosyncrasies here: currently it just loads images from subfolders, needs to operate okay if pointed at a single folder of images, or recursively, or an arbitrary list of files.

Then, too, it would be nice to eventually putthis work on the client computer using torchscript or something. So that it only sends 224x224x3 images over the wire. And we only have to compute those once per image, since we're storing a database of finished vectors which should be even smaller


In [None]:
from torchvision.datasets import VisionDataset
from PIL import Image

In [None]:
def make_dataset(new_files):
    '''Returns a list of samples of a form (path_to_sample, class) and in 
    this case the class is just the filename'''
    samples = []
    slugs = []
    for i, f in enumerate(new_files):
        path, slug = f
        samples.append((str(path), i))
        slugs.append((slug, i))
    return(samples, slugs)

In [None]:
def pil_loader(path: str) -> Image.Image:
    # open path as file to avoid ResourceWarning (https://github.com/python-pillow/Pillow/issues/835)
    with open(path, 'rb') as f:
        img = Image.open(f)
        return img.convert('RGB')

In [None]:
class DatasetImagePaths(VisionDataset):
    def __init__(self, new_files, transforms = None):
        super(DatasetImagePaths, self).__init__(new_files, transforms=transforms)
        samples, slugs = make_dataset(new_files)
        self.samples = samples
        self.slugs = slugs
        self.loader = pil_loader
        self.root = 'file dataset'
    def __len__(self):
        return(len(self.samples))
        
    def __getitem__(self, index):
        path, target = self.samples[index]
        sample = self.loader(path)
        if self.transforms is not None:
            sample = self.transforms(sample)
        return sample, target

In [None]:
crafted = DatasetImagePaths(new_files)

In [None]:
if len(crafted) > 0:
    crafted[0][0].show()

Okay, that seems to work decently. Test with transforms, which I will just find in CLIP source code and copy over, to prevent having to import CLIP in this executor.

In [None]:
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize

In [None]:
def clip_transform(n_px):
    return Compose([
        Resize(n_px, interpolation=Image.BICUBIC),
        CenterCrop(n_px),
        ToTensor(),
        Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)),
    ])

In [None]:
crafted_transformed = DatasetImagePaths(new_files, clip_transform(224))

In [None]:
# crafted_transformed[0][0].shape

In [None]:
# to_pil = torchvision.transforms.ToPILImage()
# img = to_pil(crafted_transformed[0][0])
# img.show()

Put that all together, and wrap in a DataLoader for batching. In future, need to figure out how to pick batch size and number of workers programmatically bsed on device capabilities.

In [None]:
def crafter(new_files, device, batch_size=128, num_workers=4): 
    with torch.no_grad():
        imagefiles=DatasetImagePaths(new_files, clip_transform(224))
        img_loader=torch.utils.data.DataLoader(imagefiles, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    return(img_loader)

In [None]:
img_loader = crafter(new_files, device)

In [None]:
img_loader

## Encoder

CLIP wrapper takes batched tensors or text queries and returns batched 512-dim vectors. size of batch depends on GPU, but if we're putting all that on a server anyway it's a matter of accounting. Does batching go here though? Or in the crafter?

cool thing here is we can use one encoder for both image and text, just check type on the way in. but first probably keep it simple and make two functions.

could index previous queries as vectors in a different map and use for predictive/history -- keep a little database of previous queries already in vector format and their ranked NNs, so that the user can see history offline?

In [None]:
import clip
from tqdm import tqdm
model, _ = clip.load("ViT-B/32", device)

In [None]:
def image_encoder(img_loader, device):
    image_embeddings = torch.tensor(()).to(device)
    with torch.no_grad():
        for images, labels in tqdm(img_loader):
            batch_features = model.encode_image(images)
            image_embeddings = torch.cat((image_embeddings, batch_features)).to(device)
    
    image_embeddings = image_embeddings / image_embeddings.norm(dim=-1, keepdim=True)
    return(image_embeddings)

In [None]:
new_embeddings = image_encoder(img_loader, device)

In [None]:
def text_encoder(text, device):
    with torch.no_grad():
        text = clip.tokenize(text).to(device)
        text_features = model.encode_text(text)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)
    return(text_features)

## Indexer

Annoy treemap or FAISS or other solutions. Given a dataset of tensors, returns a dictionary or database or treemap structure, something that is searchable for later. It would be nice to be able to diff this somehow, or make sure that it's up-to-date. Maybe keeping two copies is okay? One for backup and quick-searching, one for main search once it's indexed any new images. 

This executor `needs` both Encoder and Loader to send it the new and old vectors, respectively. So it needs to be preceded by some kind of **join_all** component that can makesure we're not missing new data before handing it over to the indexer. Hm

In [None]:
root = Path(args['path'])

In [None]:
def join_all(db, new_files, new_embeddings):
    for i, file in enumerate(new_files):
        path, slug = file
        start = len(db)
        index = i + start
        archive_db[slug] = {
            'slug': slug,
            'fpath': path,
            'embed': new_embeddings[i],
            'index': index
        }
    return(db)

In [None]:
db = join_all(archive_db,
         new_files,
         new_embeddings
        )

In [None]:
len(db)

And build treemap

In [None]:
from annoy import AnnoyIndex

In [None]:
def build_treemap(db):
    treemap = AnnoyIndex(512, 'angular')
    for v in db.values():
        treemap.add_item(v['index'], v['embed'])

    # Build the treemap, with 5 trees rn
    treemap.build(5)

    return(treemap)
    

In [None]:
t = build_treemap(db)

In [None]:
t.get_n_items(), t.get_n_trees()

In [None]:
def save_archives(root, treemap, db):
    dbpath = root/'memery.pt'
    if dbpath.exists():
#         dbpath.rename(root/'memery-bak.pt')
        dbpath.unlink()
    torch.save(db, dbpath)
    
    treepath = root/'memery.ann'
    if treepath.exists():
#         treepath.rename(root/'memery-bak.ann')
        treepath.unlink()
    treemap.save(str(treepath))
    
    return(str(dbpath), str(treepath))

In [None]:
save_archives(root, t, db)

## Ranker

Takes a query and an index and finds the nearest neighbors or most similar scores. Ideally this is just a simple Annoy `get_nns_by_vector`, or in the simple case a similarity score across all the vectors.

In [None]:
def ranker(query_vec, treemap):
    nn_indexes = treemap.get_nns_by_vector(query_vec[0], treemap.get_n_items())
    return(nn_indexes)

In [None]:
from IPython.display import Image as IMG

In [None]:
def printi(filenames, n=5):
    for im in filenames[:n]:
        display(IMG(filename=im[0], width=200))

In [None]:
def rank_5(text):
    query_vec = text_encoder(text, device)
    indexes = ranker(query_vec, t)
    filenames =[[v['fpath'] for k,v in db.items() if v['index'] == ind] for ind in indexes]
    return(filenames)

In [None]:
printi(rank_5(args['query']))

I think we have to call that a success!

## Gateway

Takes a query and processes it through either Indexing Flow or Querying Flow, passing along arguments. The main entrypoint for each iteration of the index/query process.

Querying Flow can technically process either text or image search, becuase the CLIP encoder will put them into the same embedding space. So we might as well build in a method for either, and make it available to the user, since it's impressive and useful and relatively easy to build.

Eventually the Gateway process probably needs to be quite complicated, for serving all the different users and for delivering REST APIs to different clients. For now we will run this locally, in a notebook. Then build out a GUI from there using `mediapy` or `widgets`. That should reveal the basic necessities of the UI, and then we can separate out the GUI client from the server.

In [None]:
def indexFlow(path):
    root = Path(path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    filepaths = get_image_files(root)
    archive_db, new_files = files_archive_loader(filepaths, root, device)
    print(f"Loaded {len(archive_db)} encodings")
    print(f"Encoding {len(new_files)} new images")
    crafted_files = crafter(new_files, device)
    new_embeddings = image_encoder(crafted_files, device)
    
    db = join_all(archive_db, new_files, new_embeddings)
    print("Building treemap")
    t = build_treemap(db)
    
    print(f"Saving {len(db)}images")
    save_paths = save_archives(root, t, db)
    print("Done")
    return(save_paths)

In [None]:
save_paths = indexFlow(args['path'])

In [None]:
save_paths

To search:

In [None]:
def queryFlow(path, query): 
    root = Path(path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    dbpath = root/'memery.pt'
    db = db_loader(dbpath)
    treepath = root/'memery.ann'
    treemap = treemap_loader(treepath)
    
    if treemap == None or db == {}:
        dbpath, treepath = indexFlow(root)
        treemap = treemap_loader(treepath)
        db = file
    
    print(f"Searching {len(db)} images")
    query_vec = text_encoder(query, device)
    indexes = ranker(query_vec, treemap)
    ranked_files = [[v['fpath'] for k,v in db.items() if v['index'] == ind] for ind in indexes]
    return(ranked_files)

        

In [None]:
ranked = queryFlow(args['path'], 'dog')

In [None]:
printi(ranked)

## Interactive process
Currently the objective is to take the following inputs:
- a location with images
- a text or image query,

and return the following outputs:
- a list of image files within that location ranked by similarity to that query,

with a minimum of duplicated effort, and a general ease-of-use for both the programmer and the casual API user.

## TODO:

- Cleanup repo
- Rough interactive GUI

- Optimize the image loader and number of trees based on memory and db size
- Type annotations

## DONE:
- _Code for joining archived data to new data_
- _Code for saving indexes to archive_
- _Flows_
