# Jina Workshop @ EuroPython: Building a Neural Image Search Engine

In this workshop we will build a neural search engine for images of Pokemons.

# Downloading data and model

Skip this if you've already downloaded them.

## Download and Extract Data

For this example we're using Pokemon sprites from [veekun.com](https://veekun.com/dex/downloads). To download them run:

```sh
sh ./get_data.sh
```

## Download and Extract Pretrained Model

In this example we use [BiT (Big Transfer) model](https://github.com/google-research/big_transfer), To download it:

```sh
sh ./download.sh
```

# The problem

We want to search Pokemon by pictures! For example 

![1.png](attachment:07a57670-02d0-4b37-89d4-e40fa91fe617.png)

May return

![2.png](attachment:4fdcaf41-1adb-4eef-9770-94a112369dfe.png) ![3.png](attachment:dd9fe45f-6be4-4822-ae92-2d281b56c0ff.png)

# Code

Required imports

In [1]:
from jina.types.document.generators import from_files
%load_ext autoreload
%autoreload 2

import os
import sys
from shutil import rmtree

from jina import Flow, DocumentArray, Document

from jinahub.image.normalizer import ImageNormalizer
from jinahub.image.encoder.big_transfer import BigTransferEncoder

2021-07-06 15:58:41.569240: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-07-06 15:58:41.569262: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


Some configuration options.

- restrict the nr of docs we index
- the path to the images

In [14]:
num_docs = int(os.environ.get('JINA_MAX_DOCS', 1))
image_src = 'data/**/*.png'

Environment variables

- workspace (folder where the encoded data will be stored)
- port we will listen on

In [15]:
workspace = './workspace'
os.environ['JINA_WORKSPACE'] = workspace
os.environ['JINA_PORT'] = os.environ.get('JINA_PORT', str(45678))

We need to make sure to not index on top of an existing workspace. 

This can cause problems if you are using different configuration options between the two runs.

In [16]:
if os.path.exists(workspace):
    print(f'Workspace at {workspace} exists. Will delete')
    rmtree(workspace)

# Flows

The Flow is the main pipeline in Jina. It describes the way data should be loaded, processed, stored etc. within the system. 

It is made up of components (called Executors), which are the ones doing the specific task.

Ex. we have an Encoder Executor, which loads the model and *encodes* that data; crafter Executor, which preprocesses the data; Indexer Executor, which stores and retrieves the data etc.

## Index Flow

Depending on your need the Flow can be configured in different ways. 

While indexing (storing) data, we can optimize the pipeline to process the data in parallel

In [33]:
f = Flow.load_config('flows/index.yml')

[1;30m[0m

In [34]:
f.plot('index.png')

The Flow is a context manager (like a file handler).

We load data into the pipeline from the directory we provided above. 

`request_size` dictates how many images should be sent in one request (~batching).

In [19]:
docs = DocumentArray(from_files(image_src, size=num_docs, read_mode='rb'))

In [22]:
docs[-1].content

b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00P\x00\x00\x00P\x08\x06\x00\x00\x00\x8e\x11\xf2\xad\x00\x00\x03\x13IDATx\xda\xed\xdb\xbdq\xdb@\x10\x05`\x95\xc0v\x1c\xa2\r\xb5\xc1\xd0%8U\xa8\x12\x18\xba\x05\x85n\xc1%(T\n{e\xbf\x99\xa7\xa7\xbd#(\x92\xb8\xc3\xe1ifG\xc4\x1f9\xf8\xb4{\x7f\xa0\x1e\xe6y~p|=\x8c`@\x03\x1a\xd0\x80\x0e\x03\x1a\xd0\x80\x06t\x18\xd0\x80\x064\xa0\xc3\x80\x064\xa0\x01\x1d\x064\xa0\x01o\x11\xc7\xc7i\xe60\xe0\x05p\xa7\x1f\xd3\xfc\xf6r\xfc\x10\xbdA\xf6\x9bi\xbf\x9f?\x85B\x1a\x90\xe0\x90m%\xbc^\x11\xbb(S\xe0\xc4k\xde\xd6\x88c\x9c\xa5\x81x8\x1c\xe6]\x02*^D\x96}@e4m\x0f["6\x05\x0c\x04@\xbdN\xd3\xfc\xf8m\xfa\x84\xa7\x9dH)Z!6\x07\x8cx\xcf\xc4\xbf?@\\\x8a\xd6\x03b3@\x94f\xdcx\xc0!\x140k\xf7v\x0f\x08\x08\x05<\t`\x1c\xff\xf9\xeb\xedC\xa0\xcd\xeb\x05\xb1\t\xa0\x02\xbcgc\x05\x10\xd9\xca\x90%\xc4\xe1\x01\xf9\xc6\xb9\x93\x88\xd7Y\xfb\xc7\xe7D\xf6\x9dC\\{l\xd8\x0c\x90\xb3\xabT\x8e8\xc6\xed\xa0\x02f\xd7\xaf\x99\x85\xcd\x00\x03\xe3\xe9\xf9\xa5\n\x98\xcdR\x180\xae\xcf\xb2p7\x80\x9ca\x

In [40]:
from jina import Executor, requests
from jina.logging.logger import JinaLogger

In [41]:
class MyLogger(Executor):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.logger = JinaLogger('MyLogger')
    
    @requests
    def log(self, docs, **kwargs):
        self.logger.warning(len(docs))

In [46]:
docs[0].content

b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00P\x00\x00\x00P\x08\x06\x00\x00\x00\x8e\x11\xf2\xad\x00\x00\x03\x8bIDATx\xda\xed\xdc\xcdq\xdb0\x10\x05`\x95\xe0\x12tu\x19\xbe\xe6\xa82|u\tj\xc1e\xf8\xa8k\x8e.\x81-\xa4\x84\\\x19/G\xeb<\xaewA\xf0\x1f\x84\x1egv\xecP\x16\'\xf8\xb4\x00\x16 \xedS\xdb\xb6\'\xc6\xf4 \x02\x01\tH@\x022\x08H@\x02\x12\x90A@\x02\x12\x90\x80\x0c\x02\x12\x90\x80\x04,6\xce\x97k\x1b\x05\x01\x9dx}zj%\x14\xe9\xf5\xf6\xa7\xbd6\x7f{!\xe7$\xf6\x80,\x1e\xee\xf6|\xee\xbe\xa6\xe0ll\tY,\x9e\x1cS\xf0,\xe4\xc3\x01*^\xfb\xf9;\xc4\xf3\x00{\xe7\xef\xd9\xab\xf10\x80]c\xbf\xe0$\xba\xec38\xb9x\xf2^\x8c5\x11\x8b\xc2\xb3\x8d\x0e\x91\x1c@\xc4\xc3\x0fam\xc4\xb2\xb2\xef\xf2\xeb\x7f\x833\xc6\xb8\x1e,\xe2\xe9\xb1\x01bQx\n\x98\x8b\xe7f\x9f9\x08\x98\x03\x08\xdd\xbfw\x98\x0c\\\x03\xb1\x08\xc0\xae\xe1w\xc01x.\xa0va\x07\xafJ@]]x\x13\x87\xc4\xcb\xfbG\x17\xb9\x80\xa9\xe8>\xa0Z\x01\xbd\x10\xb8\xac\x02\x1a\xc7\xbf \xf3\xaa\x06\xd4\x0c\xc3LS\xbc^a\x8c\x93\x86\xf9\xf9\xa8|\xa9\x1e\x10wX,\xa0\xe2\xd9e\x9d\

In [42]:
with f:
    f.post(
        on='/index',
        inputs=docs,
        request_size=64,
    )

[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[1;30m[0m[1;30m[0;32m[0m[0m         logger@76588[I]:ready and listening
[1;30m[1;30m[0m[0m[1;30m[0m[0;32m[0m        crafter@76588[I]:ready and listening
[1;30m[0m[1;30m[0m[0m[1;30m[0m[1;30m[0m[1;30m[0m[0;33mBigTransferEncoder@84953[W]:You tried to use a GPU but no GPU was found on your system. Defaulting to CPU!
[0m[0;32mBigTransferEncoder@84953[I]:BiT model path: pretrained
        logger2@76588[I]:ready and listening
[0m

2021-07-06 16:42:17.638318: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2021-07-06 16:42:17.638386: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2021-07-06 16:42:17.638437: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (x1): /proc/driver/nvidia/version does not exist




2021-07-06 16:42:17.979837: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[0;32m        encoder@76588[I]:ready and listening
[0m[0;32m        gateway@76588[I]:ready and listening
[0m[1;30m[0m           Flow@76588[I]:🎉 Flow is ready to use!
	🔗 Protocol: 		[1mGRPC[0m
	🏠 Local access:	[4m[36m0.0.0.0:33113[0m
	🔒 Private network:	[4m[36m192.168.178.144:33113[0m
	🌐 Public address:	[4m[36m2001:16b8:4626:9300:b1eb:24b7:e83:ad65:33113[0m
[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[0m[0;33m       MyLogger@84910[W]:2
[0m[1;30m[0m[1;30m[0m[1;30m[0m[1;30m[0m[0;31m        logger2@84934[E]:TypeError("object of type 'NoneType' has no len()")
 add "--quiet-error" to suppress the exception details
Traceback (most recent call last):
  File "/home/cristian/envs/europython/lib/python3.7/site-packages/jina/peapods/runtimes/zmq/zed.py", line 289, in _msg_callback
    self._zmqlet.send_message(self._callback(msg))
  File "/home/cristian/envs/europython/lib/python3.7/site-packages/jina/peapods/runtimes/zmq/zed.py", line 275, in _callback
    self._pre_hook(

# Searching

When searching we need to make sure the data is processed in serial manner.

In [None]:
f = Flow.load_config('flows/query.yml')

In [None]:
f.plot('search.png')

This will activate the REST API.

You can use [Jinabox.js](https://jina.ai/jinabox.js/) to find the Pokemon which matches most clearly. Just set the endpoint to `http://127.0.0.1:45678/api/search` and drag from the thumbnails on the left or from your file manager.

In [None]:
search_image = blabla. ..

In [None]:
search_image.plot()

In [None]:
with f:
    f.post(
        on='/search',
        inputs=search_image
    )

Retrieving tags (image path)

In [23]:
...

Ellipsis

# Questions?

---

# 5 minutes break

---

# Advanced Topics


**NOTE**: After configuring these, you will need to re-index your data and search again. 

## 1. Changing Encoders

We can switch the `Encoder` easily.

This is the component that is the actual **model**. This encodes the images into a vector space upon which you can perform cosine similarity (or other linear algebra operations).


`pods/encode.yml`:

```yaml
!ImageKerasEncoder
with:
  model_name: ResNet50V2 # any model could go here
  pool_strategy: avg
  channel_axis: -1
```

## 2. Changing Crafters

**NOTE** The `CenterImageCropper` and `PngToDiskDriver` are already added. This section explains why and how we add them.

These are the components that transform your data. In this case, we crop and resize the image. You can try out other alterations to the images and see if you get better results.

In `pods/craft.yml`:

- remove `target_size: 96` from `ImageNormalizer`

```yaml
- !CenterImageCropper
with:
  target_size: 96
  channel_axis: -1
metas:
  name: img_cropper
```

We also need to specify the request paths, both for `IndexRequest` and for `SearchRequest`:

```yaml
      - !CraftDriver
        with:
          traversal_paths: ['r']
          executor: img_cropper
```

We can save an intermediary file to examine the cropped image to see if everything looks as expected. Add this to the `IndexRequest`:

```yaml
      - !PngToDiskDriver
        with:
          prefix: 'crop'
```

Now you can find the intermediary forms of the file in `workspace/`, under the folders with the given prefix.


## 3. JinaD on AWS

### What is JinaD?

...

remote flow

remote indexing

Attendants could query during the workshop

# Questions?

## 4. Optimization

In every Flow that is build with Jina, there are quite some parameters to set.
For example, if you use a pre-trained model for encoding, there is no obvious best choice for a given dataset.
Jina allows you to try out a lot of different parameters automatically in order to get the best results.

Therefor, you need to provide Jina some sort of evaluation metric.
In the pokemon dataset, there are two edition with the same pokemon, but different images: `red-blue` and `red-green`.

![1.png](attachment:cb8f8cf6-92f7-4b7f-9a75-43029261979b.png)![1.png](attachment:1a98c688-569b-4b08-aa1d-3dbb7fadd38f.png)

Thus, we will setup the following evaluation metric:
Index one edition and search with the other edition.
We have a success, if the right pokemon is at the first place in the search results.

We need to do the following steps:

- A) build an `index` and a `search` Flow, which are repeaditly runnable
- B) use the `red-blue` dataset for `index` and the `red-green` for `search`
- C) implement an `EvaluationCallback` which will calculate, if the right pokemon is in the first place
- D) set the needed `OptimizationParameter` via a `parameter.yml` file
- E) setup the optimization process itself

**NOTE**: Under the hood, we use [optuna](https://optuna.readthedocs.io/en/stable/) for hyperparameter optimization.

### Do the imports

In [None]:
import glob
from jina import Document
from jina.optimizers.flow_runner import SingleFlowRunner, MultiFlowRunner
from jina.optimizers import FlowOptimizer, MeanEvaluationCallback

### A) & B)

Jina provides the 
- `SingleFlowRunner` for making a Flow repeaditly runnable and the 
- `MultiFlowRunner` to chain multiple `SingleFlowRunner` for the `FlowOptimizer`

For setup you need:
- `flow_yaml`: the definition of a Flow
- `documents`: the Documents, which are send to the Flow in each optimization step
- `execution_method`: tell the Flow, whether `index` or `search` should be used

Beware, that we introduce `JINA_MODEL_NAME_VAR: ${{JINA_MODEL_NAME}}` in the two new Flow definition.
This variable will allow us to change the model in the Flow in each optimization step.

In [None]:
def get_input_iterator(edition, full_document):
    image_src = f'data/pokemon/main-sprites/{edition}/*.png'
    for filename in glob.iglob(image_src, recursive=True):
        if full_document:
            with open(filename, 'rb') as fp:
                yield Document(buffer=fp.read(), tags={'filename': filename})
        else:
            yield filename

def get_flows():
    index_flow = SingleFlowRunner(
        flow_yaml='flows/index_opt.yml',
        documents=get_input_iterator('red-blue', True),
        request_size=64,
        execution_method='index',
        overwrite_workspace=True
    )

    search_flow = SingleFlowRunner(
        flow_yaml='flows/query_opt.yml',
        documents=get_input_iterator('red-green', False),
        request_size=64,
        execution_method='search'
    )

    multi_flow = MultiFlowRunner([index_flow, search_flow])
    return multi_flow


### C) implement an `EvaluationCallback` which will calculate, if the right pokemon is in the first place

Since the files are named the same for both editions, we use the filename as an identifier, whether we found the right pokemon.
The `PokemonCallback` checks for each Document, whether the correct result is in the first position.

In [None]:
def get_id(filename):
    return filename.split('/')[-1].split('.')[0]

class PokemonCallback(MeanEvaluationCallback):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._eval_name = "pokedex_eval"

    def get_empty_copy(self):
        return PokemonCallback(self._eval_name)
    
    def __call__(self, response):
        self._n_docs += len(response.search.docs)
        for doc in response.search.docs:
            if doc.matches:
                document_id = get_id(doc.uri)
                first_match_id = get_id(str(doc.matches[0].tags.fields['filename']))
                if document_id == first_match_id:
                    self._evaluation_values[self._eval_name] += 1

### D) set the needed `OptimizationParameter` via a `parameter.yml` file

We defined in the `parameter_few.yml` the models, that the optimizer should try out.
For demonstation purpose, we just added three models, in order to make the optimizer run rather short.
If you want to try mode models, please use the `parameter.yml` file and increase `n_trials` parameter in the `FlowOptimizer` below

### E) setup the optimization process itself

Finally, we build the `FlowOptimizer` object.
It needs:
- `flow_runner`: the repeaditly runnable Flow object
- `parameter_yaml`: the parameters which the optimizer can change
- `evaluation_callback`: our previously defined evaluation function
- `workspace_base_dir`: A directory for temporary data
- `n_trials`: The amount of optimization steps, that should be performed
- `sampler`: the way, the `FlowOptimizer` should sample new values in each step. For more info please look at the [optuna docs](https://optuna.readthedocs.io/en/stable/reference/generated/optuna.samplers.RandomSampler.html#optuna.samplers.RandomSampler).

In [None]:

def optimize(flows):
    optimizer = FlowOptimizer(
        flow_runner=flows,
        parameter_yaml='optimize/parameters_few.yml',
        evaluation_callback=PokemonCallback(eval_name='correct'),
        workspace_base_dir='workspace',
        n_trials=3,
        sampler='RandomSampler'
    )
    result = optimizer.optimize_flow()
    result.save_parameters('optimize/best_config.yml')

flows = get_flows()
optimize(flows)