# Find Small Images Inside Large Images

In this tutorial we will build an image search engine capable of finding small images inside bigger ones. This requires a different architecture than typical image search engines since we need to perform object detection.

As we want to find small images inside big images, simply encoding both the indexed images and the query image and matching will not work. Imagine that you have the following big image.


<div>
<img src="https://github.com/jina-ai/tutorial-notebooks/blob/main/small-images-inside-big-images/cat-bird.jpeg?raw=1" width="300" align="left"/>
</div>

It contains a scene with a cat in the background, a bird and a few other items in the scene.

Now let’s suppose that the query image is a simple bird.

<div>
<img src="https://github.com/jina-ai/tutorial-notebooks/blob/main/small-images-inside-big-images/bird.jpeg?raw=1" width="100" align="left"/>
</div>


Encoding the query image will generate embeddings that effectively represent it. However, it’s not easy to build an encoder that effectively represents the big image, since it contains a complex scene with different objects. The embeddings will not be representative enough and therefore we need to think about a better approach.

Can you think of another solution ?

Encoding a complex image is not easy, but if we can identify objects inside the big image and encode each one of them. It will result in better, more representative embeddings.

## ⏰ Installing & Importing Dependencies

We will start this tutorial by installing the necessary ***pip*** dependencies.

In [None]:
! pip install Pillow jina==3.0 torch==1.9.0 torchvision==0.10.0 transformers==4.9.1 yolov5==5.0.7 lmdb==1.2.1 matplotlib

In [None]:
# clean up
! rm -rf workspace images query
! rm data.zip

**Downloading and unzipping data**

In [None]:
! wget https://open-images.s3.eu-central-1.amazonaws.com/data.zip
! unzip data.zip

## **Executors**
In this section, we will start developing the necessary executors, for both query and index flows.

### **CLIPImageEncoder**
This encoder encodes an image into embeddings using the CLIP model. 
We want an executor that loads the CLIP model and encodes it during the query and index flows. 

Our executor should:
* support both **GPU** and **CPU**: That's why we will provision the `device` parameter and use it when encoding.
* be able to process documents in batches in order to use our resources effectively: To do so, we will use the parameter `batch_size`
* be able to encode the full image during the query flow and encode only chunks during the index flow: This can be achieved with `traversal_paths` and method `DocumentArray.batch`.

In [None]:
from typing import Optional, Tuple

import torch
from jina import Executor, requests
from docarray import DocumentArray
from transformers import CLIPFeatureExtractor, CLIPModel


class CLIPImageEncoder(Executor):
    """Encode image into embeddings using the CLIP model."""

    def __init__(
        self,
        pretrained_model_name_or_path: str = 'openai/clip-vit-base-patch32',
        device: str = 'cpu',
        batch_size: int = 32,
        traversal_paths: str = '@r',
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.batch_size = batch_size
        self.traversal_paths = traversal_paths
        self.pretrained_model_name_or_path = pretrained_model_name_or_path

        self.device = device
        self.preprocessor = CLIPFeatureExtractor.from_pretrained(
            pretrained_model_name_or_path
        )
        self.model = CLIPModel.from_pretrained(self.pretrained_model_name_or_path)
        self.model.to(self.device).eval()

    @requests
    def encode(self, docs: DocumentArray, parameters: dict, **kwargs):
        if docs is None:
            return

        document_batches_generator =  DocumentArray(
            filter(
                lambda x: x.tensor is not None,
                docs[parameters.get('traversal_paths', self.traversal_paths)],
            )
        ).batch(batch_size=parameters.get('batch_size', self.batch_size))


        with torch.inference_mode():
            for batch_docs in document_batches_generator:
                tensors_batch = [d.tensor for d in batch_docs]
                tensor = self._generate_input_features(tensors_batch)
                

                embeddings = self.model.get_image_features(**tensor)
                embeddings = embeddings.cpu().numpy()

                batch_docs.embeddings = embeddings


    def _generate_input_features(self, images):
        input_tokens = self.preprocessor(
            images=images,
            return_tensors='pt',
        )
        input_tokens = {
            k: v.to(torch.device(self.device)) for k, v in input_tokens.items()
        }
        return input_tokens

### **YoloV5Segmenter**
Since we want to retrieve small images in bigger images, the technique that we will heavily rely on is segmenting. Basicly, we want to perform object detection on the indexed images. This will generate bounding boxes around objects detected inside the images. The detected objects will be extracted and added as chunks to the original documents.
BTW, guess what is the state-of-the-art object detection model ?
Right, we will use YoloV5.


Our **YoloV5Segmenter** should be able to load the `ultralytics/yolov5` model from Torch hub, otherwise, load a custom model. To achieve this, the executor accepts parameter `model_name_or_path` which will be used when loading. We will implement the method `load` which checks if the model exists in the the Torch Hub, otherwise, loads it as a custom model.

For our use case, we will just rely on `yolov5s` (small version of `yolov5`). Of course, for better quality, you can choose a more complicated model or your custom model.

Furtheremore, we want **YoloV5Segmenter** to support both **GPU** and **CPU** and it should be able to process in batches. Again, this is as simple as adding parameters `device` and `batch_size` and using them during segmenting.

To perform segmenting, we will implement method `_segment_docs` which performs the following steps:
1. For each batch (a batch consists of several images), use the model to get predictions for each image
2. Each prediction of an image can contain several detections (because yolov5 will extract as much bounding boxes as possible, along with their confidence scores). We will filter out detections whose scores are below the `confidence_threshold` to keep a good quality.

Each detection is actually 2 points -top left (x1, y1) and bottom right (x2, y2)- a confidence score and a class. We will not use the class of the detection, but it can be useful in other search applications.

3. With the detections that we have, we create crops (using the 2 points returned). Finally, we add these crops to image documents as chunks.

In [None]:
from typing import Dict, Iterable, Optional

import torch
from jina import Executor, requests
from docarray import Document, DocumentArray


class YoloV5Segmenter(Executor):

    def __init__(
        self,
        model_name_or_path: str = 'yolov5s',
        confidence_threshold: float = 0.3,
        batch_size: int = 32,
        device: str = 'cpu',
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.model_name_or_path = model_name_or_path
        self.confidence_threshold = confidence_threshold
        self.batch_size = batch_size

        if device != 'cpu' and not device.startswith('cuda'):
            self.logger.error('Torch device not supported. Must be cpu or cuda!')
            raise RuntimeError('Torch device not supported. Must be cpu or cuda!')
        if device == 'cuda' and not torch.cuda.is_available():
            self.logger.warning(
                'You tried to use GPU but torch did not detect your'
                'GPU correctly. Defaulting to CPU. Check your CUDA installation!'
            )
            device = 'cpu'
        self.device = torch.device(device)
        self.model = self._load(self.model_name_or_path)

    @requests
    def segment(
        self, docs: Optional[DocumentArray] = None, parameters: Dict = {}, **kwargs
    ):

        if docs:
            document_batches_generator = DocumentArray(
            filter(
                lambda x: x.tensor is not None,
                docs,
            )
        ).batch(batch_size=parameters.get('batch_size', self.batch_size))
            self._segment_docs(document_batches_generator, parameters=parameters)

    def _segment_docs(self, document_batches_generator: Iterable, parameters: Dict):
        with torch.no_grad():
            for document_batch in document_batches_generator:
                images = [d.tensor for d in document_batch]
                predictions = self.model(
                    images,
                    size=640,
                    augment=False,
                ).pred

                for doc, prediction in zip(document_batch, predictions):
                    for det in prediction:
                        x1, y1, x2, y2, conf, cls = det
                        if conf < parameters.get(
                            'confidence_threshold', self.confidence_threshold
                        ):
                            continue
                        crop = doc.tensor[int(y1): int(y2), int(x1): int(x2), :]
                        doc.chunks.append(Document(tensor=crop))

    def _load(self, model_name_or_path):
        if model_name_or_path in torch.hub.list('ultralytics/yolov5'):
            return torch.hub.load(
                'ultralytics/yolov5', model_name_or_path, device=self.device
            )
        else:
            return torch.hub.load(
                'ultralytics/yolov5', 'custom', model_name_or_path, device=self.device
            )


**Indexers**
After developing the encoder, we will need 2 kinds of indexers: 
1. SimpleIndexer: This indexer will take care of storing chunks of images. It also supports vector similarity search which is important to match small query images against segments of original images.

2. LMDBStorage: LMDB is a simple memory-mapped transactional key-value store. It is convenient for this example because we can use it to store original indexed images so that we can retrieve them later. We will use it to create LMDBStorage which offers 2 functionnalities: indexing documents and retrieving documents by ID.

**SimpleIndexer**
To implement SimpleIndexer, we can leverage jina's `DocumentArray` with SQLite as storage backend. This will allow us to persist Documents in a database and allow Nearest Neighbors search using `DocumentArray.match`.
Our indexer will create an instance of `DocumentArray` when it's initialized. We also specify the backend configuration like the connection and table name.

To index, we implement the method `index` which is bound to the index flow. It's as simple as extending the received docs.

On the other hand, for search, we implement the method `search`. We bind it to the query flow using the decorator `@requests(on='/search')`.
In jina, searching for query documents can be done by adding the results to the `matches` attribute of each query document. Since docs is a `DocumentArray` we can use method `match` to match query against the indexed documents.
Read more about `match` [here](https://docarray.jina.ai/fundamentals/documentarray/matching/).
There's another detail here: We already indexed documents before search, but we need to match query documents against chunks of the indexed images. 
To do so, we can loop over chunks using the following selector: `self._index['@c']`. This will provide an iterator over chunks for the `match` method.

In [None]:
import os
from typing import Optional

from jina import Executor, requests
from docarray import DocumentArray


class SimpleIndexer(Executor):

    FILE_NAME = 'index.db'
    TABLENAME = 'storage'
    
    def __init__(
        self,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self._index = DocumentArray(
            storage='sqlite',
            config={
                'connection': os.path.join(self.workspace, SimpleIndexer.FILE_NAME),
                'table_name': SimpleIndexer.TABLENAME,
            },
        )  # with customize config

    @requests(on='/index')
    def index(
        self,
        docs: 'DocumentArray',
        **kwargs,
    ):
        if docs:
            self._index.extend(docs)

    @requests(on='/search')
    def search(
        self,
        docs: 'DocumentArray',
        **kwargs,
    ):
        if not docs:
            return

        docs.match(self._index['@c'])


**LMDBStorage**

In order to implement the LMDBStorage, we need the following parts:

**I. Handler**

This will be a context manager that we will use when we access our LMDB database. We will create it as a standalone class.


**II. LMDBStorage constructor**

The constructor should initialize a few attributes:
* the `map_size` of the database
* the index file: again, to keep things clean, we will store the index file inside the workspace folder. Therefore we can use the `workspace` attribute.


**III. `LMDBStorage.index`**

In order to index documents, we first start a transaction (so that our Storage executor is ACID-compliant). Then, we traverse the root documents. Finally, each document is serialized to bytes and then added to the database (the key is the document ID)


**IV. `LMDBStorage.search`**

Unlike search in the SimpleIndexer, we only wish to get the matched Documents by ID and return them. Actually, the matched documents will be empty and will only contain the IDs. The goal is to return full matched documents using IDs.
To accomplish this, again, we start a transaction, traverse the matched documents, get each matched document by ID and use the results to fill our documents.

In [None]:
import os
from typing import Dict, List

import lmdb
from jina import Executor, requests
from docarray import Document, DocumentArray


class _LMDBHandler:
    def __init__(self, file, map_size):
        # see https://lmdb.readthedocs.io/en/release/#environment-class for usage
        self.file = file
        self.map_size = map_size

    @property
    def env(self):
        return self._env

    def __enter__(self):
        self._env = lmdb.Environment(
            self.file,
            map_size=self.map_size,
            subdir=False,
            readonly=False,
            metasync=True,
            sync=True,
            map_async=False,
            mode=493,
            create=True,
            readahead=True,
            writemap=False,
            meminit=True,
            max_readers=126,
            max_dbs=0,  # means only one db
            max_spare_txns=1,
            lock=True,
        )
        return self._env

    def __exit__(self, exc_type, exc_val, exc_tb):
        if hasattr(self, '_env'):
            self._env.close()


class LMDBStorage(Executor):
    def __init__(
        self,
        map_size: int = 1048576000,  # in bytes, 1000 MB
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.map_size = map_size
        self.file = os.path.join(self.workspace, 'db.lmdb')
        if not os.path.exists(self.workspace):
            os.makedirs(self.workspace)

    def _handler(self):
        return _LMDBHandler(self.file, self.map_size)

    @requests(on='/index')
    def index(self, docs: DocumentArray, parameters: Dict, **kwargs):
        if docs is None:
            return
        with self._handler() as env:
            with env.begin(write=True) as transaction:
                for d in docs:
                    transaction.put(d.id.encode(), d.to_bytes())

    @requests(on='/search')
    def search(self, docs: DocumentArray, parameters: Dict, **kwargs):
        if docs is None:
            return

        with self._handler() as env:
            with env.begin(write=True) as transaction:
                for doc in docs:
                    for match_doc in doc.matches:
                      id = match_doc.id
                      serialized_doc = Document.from_bytes(transaction.get(match_doc.id.encode()))
                      match_doc.copy_from(serialized_doc)
                      match_doc.id = id


**SimpleRanker**

You might think why do we need a ranker at all ?
Actually, a ranker is needed because we will be matching small query images against chunks of parent documents. But how can we get back to parent documents (aka full images) given the chunks ? And what if 2 chunks belonging to the same parent are matched ?
We can solve this by aggregating the similarity scores of chunks that belong to the same parent (using an aggregation method, in our case, will be the `min` value).
So, for each query document, we perform the following:

1. We create an empty collection of parent scores. This collection will store, for each parent, a list of scores of its chunk documents.
2. For each match, since it's originally a chunk document, we can retrieve its `parent_id`. And it's also a match document so we get its match score and add that value to the parent socres collection.
3. After processing all matches, we need to aggregate the scores of each parent using the `min` metric.
4. Finally, using the aggregated score values of parents, we can create a new list of matches (this time consisting of parents, not chunks). We also need to sort the matches list by aggregated scores.

When query documents exit the SimpleRanker, they now have matches consisting of parent documents. However, parent documents just have IDs. That's why, during the previous steps, we created LMDBStorage in order to actually retrieve parent documents by IDs and fill them with data.

In [None]:
from collections import defaultdict
from typing import Dict, Iterable, Optional

from jina import Executor, requests
from docarray import Document, DocumentArray
from docarray.score import NamedScore


class SimpleRanker(Executor):
    def __init__(
        self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metric = 'cosine'

    @requests(on='/search')
    def rank(
        self, docs: DocumentArray, parameters: Dict = {}, **kwargs
    ):
        if docs is None:
            return

        for doc in docs:
            parents_scores = defaultdict(list)
            for m in doc.matches:
                parents_scores[m.parent_id].append(m.scores[self.metric].value)
 
            # Aggregate match scores for parent document and
            # create doc's match based on parent document of matched chunks
            matches = []
            for match_parent_id, scores in parents_scores.items():
                score = min(scores)
                matches.append(
                    Document(id=match_parent_id, scores={self.metric: NamedScore(value=score)})
                )

            # Sort the matches
            doc.matches.clear()
            doc.matches.extend(sorted(matches, key=lambda d: d.scores[self.metric].value))


### Enabling GPU
Usually, indexing takes some time because YoloV5Segmenter and CLIPImageEncoder can be slow. However, you can speed up indexing by enabling GPU. To do so, you'll need to enable GPUs for the notebook:

* Navigate to Edit→Notebook Settings
* Select GPU from the Hardware Accelerator drop-down
* Change the device to 'cuda' in the following line

In [None]:
device = 'cpu' # change to 'cuda'

## Indexing
Now, after creating executors, it's time to use them in order to build an index Flow and index our data.


### Building the index Flow
We create a Flow object and add executors one after the other with the right parameters:

1. YoloV5Segmenter: We should also specify the device
2. CLIPImageEncoder: It also receives the device parameter. And since we only encode the chunks, we specify  `'traversal_paths': 'c'`
3. SimpleIndexer: We need to specify the workspace parameter
4. LMDBStorage: We also need to specify the workspace parameter. Furtheremore, the executor can be in parallel to the other branch. We can achieve this using `needs='gateway'`.
5. A final executor which just waits for both branchs.

After building the index Flow, we can plot it to verify that we're using the correct architecture.

In [None]:
from jina import Flow
index_flow = Flow().add(uses=YoloV5Segmenter, name='segmenter', uses_with={'device': device}) \
  .add(uses=CLIPImageEncoder, name='encoder', uses_with={'device': device, 'traversal_paths': '@c'}) \
  .add(uses=SimpleIndexer, name='chunks_indexer', workspace='workspace') \
  .add(uses=LMDBStorage, name='root_indexer', workspace='workspace', needs='gateway') \
  .add(name='wait_both', needs=['root_indexer', 'chunks_indexer'])
index_flow

Now it's time to index the dataset that we have downloaded. Actually, we will index images inside the `images` folder.
This helper function will convert image files into Jina Documents and yield them:

In [None]:
from glob import glob
from jina import Document

def input_generator():
    for filename in glob('images/*.jpg'):
        doc = Document(uri=filename, tags={'filename': filename})
        doc.load_uri_to_image_tensor()
        yield doc

The final step in this section is to send the input documents to the index Flow. Note that indexing can take a while

In [None]:
with index_flow:
    input_docs = input_generator()
    index_flow.post(on='/index', inputs=input_docs, show_progress=True)

## Searching:
Now, let's build the search Flow and use it in order to find sample query images.

Our Flow contains the following executors:

1. CLIPImageEncoder: It receives the device parameter. This time, since we want to encode root query documents, we specify that `'traversal_paths': '@r'`
2. SimpleIndexer: We need to specify the workspace parameter
3. SimpleRanker
4. LMDBStorage: We need to specify the workspace parameter.

In [None]:
from jina import Flow
query_flow = Flow().add(uses=CLIPImageEncoder, name='encoder', uses_with={'device': device, 'traversal_paths': '@r'}) \
  .add(uses=SimpleIndexer, name='chunks_indexer', workspace='workspace') \
  .add(uses=SimpleRanker, name='ranker') \
  .add(uses=LMDBStorage, workspace='workspace', name='root_indexer')

Let's plot our Flow

In [None]:
query_flow

We create the following helper function in order to plot the result documents:

In [None]:
import matplotlib.pyplot as plt

def show_docs(docs):
    for doc in docs[:3]:
        plt.imshow(doc.tensor)
        plt.show()

Finally, we can start querying. We will use images inside the query folder.
For each image, we will create a Jina Document. Then we send our documents to the query Flow and receive the response. 

For each query document, we can print the image and it's top 3 search results

In [None]:
import glob
with query_flow:
    docs = [Document(uri=filename) for filename in glob.glob('query/*.jpg')]
    for doc in docs:
        doc.load_uri_to_image_tensor()
    docs = query_flow.post('/search', docs, return_results=True)
for doc in docs:
    print('query:')
    plt.imshow(doc.tensor)
    plt.show()
    print('results:')
    show_docs(doc.matches)