# Visual image search
_**Using a Convolutional Neural Net and Elasticsearch k-Nearest Neighbors Index to retrieve visually similar images**_

---

---

## Contents


1. [Background](#Background)
1. [Setup](#Setup)
1. [GluonCV(MXNet) Model Preparation](#GluonCV(MXNet)-Model-Preparation)
1. [SageMaker Model Hosting (BYOC)](#Hosting-Model-BYOC)
1. [Build a KNN Index in Elasticsearch](#ES-KNN)
1. [Evaluate Index Search Results](#Searching-with-ES-k-NN)
1. [Extensions](#Extensions)

## Background
在这个笔记本中，我们将建立一个视觉图像搜索应用程序的核心组件。视觉图像搜索用于界面中，在这里，你不是通过语音或文字来询问什么，而是展示你要找的东西的照片例子。

视觉图像搜索的核心组件之一是一个卷积神经网络（CNN）模型，它生成代表查询图像和参考项目图像的 "特征向量"，以便与查询进行比较。参考项目的特征向量通常是离线生成的，并且必须存储在某种数据库中，以便能够有效地进行搜索。对于小的参考项目数据集，可以使用蛮力搜索，将查询与每个参考项目进行比较。然而，这对于大型数据集来说是不可行的，因为蛮力搜索会变得非常慢。

为了能够有效地搜索视觉上相似的图像，我们将使用Amazon SageMaker从图像中生成 "特征向量"，并在Amazon Elasticsearch服务中使用KNN算法。亚马逊Elasticsearch服务的KNN让你在向量空间中搜索点，并通过欧氏距离或余弦相似度（默认为欧氏距离）为这些点找到 "最近的邻居"。用例包括推荐（例如，在音乐应用程序中的 "你可能喜欢的其他歌曲 "功能）、图像识别和欺诈检测。

以下是我们建立视觉图像搜索的步骤。在一些初始设置之后，我们将使用Mxnet准备一个模型来生成特征向量，然后从地形数据集生成图片的特征向量。这些特征向量将被导入Amazon Elasticsearch KNN Index。接下来，我们将探索一些测试图像查询，并将结果可视化。


In [None]:
#Install tqdm to have progress bar
!pip install tqdm -i https://opentuna.cn/pypi/web/simple

#install necessary pkg to make connection with elasticsearch domain
!pip install elasticsearch==7.13.0 -i https://opentuna.cn/pypi/web/simple
!pip install requests -i https://opentuna.cn/pypi/web/simple
!pip install requests-aws4auth -i https://opentuna.cn/pypi/web/simple

!pip install gluoncv -i https://opentuna.cn/pypi/web/simple

In [None]:
import boto3
import re
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()

s3_resource = boto3.resource("s3")
s3 = boto3.client('s3')

In [None]:
cfn = boto3.client('cloudformation')

def get_cfn_outputs(stackname):
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

## Setup variables to use for the rest of the demo
cloudformation_stack_name = "vis-search"

outputs = get_cfn_outputs(cloudformation_stack_name)

bucket = outputs['s3BucketTraining']
es_host = outputs['esHostName']

outputs

In [None]:
import os
print(os.path.abspath('.'))


In [None]:
%cd /home/ec2-user/SageMaker/sagemaker-visual-image-search


In [None]:
#!wget https://sagemaker-cn-northwest-1-456370280007.s3.cn-northwest-1.amazonaws.com.cn/visual-search/datas.pkl
!wget https://sagemaker-cn-northwest-1-456370280007.s3.cn-northwest-1.amazonaws.com.cn/visual-search/datas.zip

In [None]:
!mkdir images

In [None]:
'''
# 从pkl提取图片

import joblib

datas = joblib.load(open('datas.pkl', 'rb'))

print(type(datas))

from tqdm import tqdm

for i in tqdm(range(datas.shape[0])):
#     im = Image.fromarray(datas[i]*255)
#     im = im.convert('L')
#     im.save('images/'+str(i)+'.png')
    plt.imshow(datas[i])
    plt.axis('off')
    plt.savefig('images/'+str(i)+'.png')

'''


In [None]:
!unzip -o -d ./images datas.zip

In [None]:
%matplotlib inline
from PIL import Image

import matplotlib.pyplot as plt

In [None]:
!aws s3 cp --recursive images/ s3://$bucket/images/


## GluonCV(MXNet) Model Preparation

我们将使用MXNet后端来准备一个模型，将图像 "featurizing "为特征向量。MXNet有一个本地模块API，以及一个更高级别的Gluon API。

我们将从一个预训练的模型开始，避免花时间和从头训练一个模型。因此，作为准备模型的第一步，我们将从GluonCV应用程序中导入一个预训练的模型。研究人员已经试验了各种不同层数的预训练CNN架构，发现有几种很好的可能性。

在这个笔记本中，我们将选择一个基于ResNet架构的模型，这是一个常用的选择。在层数的各种选择中，从18到152不等，我们将使用50层。这也是一个常见的选择，它平衡了所产生的特征向量（嵌入）的表现力和计算效率（较少的层数意味着更高的效率，但代价是较少的表现力）。


In [None]:
!wget -O endpoint/model.zip "https://sagemaker-cn-northwest-1-456370280007.s3.cn-northwest-1.amazonaws.com.cn/visual-search/model.zip"

In [None]:
!cd endpoint && unzip -o -q model.zip

In [None]:
import mxnet as mx
from mxnet.gluon import nn
from mxnet import gluon, image, init, nd
from gluoncv.model_zoo import get_model
from gluoncv.data.transforms.presets.imagenet import transform_eval

In [None]:
num_gpus = 0
ctx = [mx.gpu(i) for i in range(num_gpus)] if num_gpus > 0 else [mx.cpu()]

model_name = 'ResNet50_v2'
classes = 5

finetune_net = get_model(model_name, pretrained=True)
with finetune_net.name_scope():
    finetune_net.output = nn.Dense(classes)
finetune_net.output.initialize(init.Xavier(), ctx = ctx)
finetune_net.collect_params().reset_ctx(ctx)
finetune_net.hybridize()

finetune_net.load_parameters('endpoint/model/model-0000.params')

In [None]:
ctx

In [None]:
input_pic = './images/100.png'

# Load Images
img = image.imread(input_pic)

# Transform
img = transform_eval(img).copyto(ctx[0])
    
finetune_net(img)

## SageMaker Model Hosting (BYOC)
```
接下来我们从AWS提供的mxnet-inference容器镜像，利用自定义Dockerfile来build自己的推理镜像，即自带容器（BYOC），然后部署推理endpoint。
```

In [None]:
import os
print(os.path.abspath('.'))

%cd /home/ec2-user/SageMaker/sagemaker-visual-image-search/endpoint


In [None]:
!sh ./build_and_push.sh image-embedding

In [None]:
def is_endpoint_running(endpoint_name):
    """
    Content of check_name could be "InService" or other.
    if the named endpoint doesn't exist then return None.
    """
    client = boto3.client('sagemaker')
    endpoints = client.list_endpoints()
    endpoint_name_list = [(ep["EndpointName"], ep["EndpointStatus"]) for ep in endpoints["Endpoints"]]
    for check_name in endpoint_name_list:
        if endpoint_name == check_name[0]:
            return check_name[1]
    return None

def deploy_endpoint():
    
    if is_endpoint_running(endpoint_name) is not None:
        print("Endpoint already exist and will return.")
        return

    try:
        sm = boto3.Session().client('sagemaker')
        primary_container = {'Image': endpoint_ecr_image_path}
        print("model_name: ", endpoint_name)
        print("endpoint_ecr_image_path: ", endpoint_ecr_image_path)
        
        try:
            create_model_response = sm.create_model(ModelName=endpoint_name,
                                                    ExecutionRoleArn=role,
                                                    PrimaryContainer=primary_container)
        except:
            pass

        # create endpoint config
        endpoint_config_name = endpoint_name + '-config'
        try:
            create_endpoint_config_response = sm.create_endpoint_config(EndpointConfigName=endpoint_config_name,
                                                                        ProductionVariants=[{
                                                                            'InstanceType': instance_type,
                                                                            'InitialVariantWeight': 1,
                                                                            'InitialInstanceCount': 1,
                                                                            'ModelName': endpoint_name,
                                                                            'VariantName': 'AllTraffic'}])
        except:
            pass

        # create endpoint
        create_endpoint_response = sm.create_endpoint(
            EndpointName=endpoint_name,
            EndpointConfigName=endpoint_config_name)

    except Exception as e:
        print("!!! Cannot create endpoint - Exception is >> {}".format(e))
        if type(e).__name__ == "StateMachineAlreadyExists":
            print("Skip sf creation because it is created before.")
        else:
            raise e

    print("<<< Completed model endpoint deployment. " + str(endpoint_name))



In [None]:
account = !aws sts get-caller-identity --query Account --output text
endpoint_name = "image-embedding"
endpoint_ecr_image_path = account[0] + ".dkr.ecr.cn-northwest-1.amazonaws.com.cn/image-embedding"
instance_type = "ml.m5.xlarge"

In [None]:
deploy_endpoint()

## Build a KNN Index in Elasticsearch


亚马逊Elasticsearch服务目前叫做OpenSearch的KNN让你在向量空间中搜索点，并通过欧氏距离或余弦相似度（默认为欧氏距离）为这些点找到 "最近的邻居"。用例包括推荐（例如，音乐应用程序中的 "你可能喜欢的其他歌曲 "功能）、图像识别和欺诈检测。

KNN需要Elasticsearch 7.1或更高版本。关于k-nearest neighbors算法的背景信息，请参考[Sagemaker K-Nearest Neighbors (k-NN) Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/k-nearest-neighbors.html)和[k-Nearest Neighbor (k-NN) search in Amazon OpenSearch Service](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/knn.html)

在这一步，我们将获得所有的特征地形图像，并将这些特征导入Elastichseach7.4域。

In [None]:
#Define some utility function

#return all s3 keys
def get_all_s3_keys(bucket):
    """Get a list of all keys in an S3 bucket."""    
    keys = []

    kwargs = {'Bucket': bucket}
    while True:
        resp = s3.list_objects_v2(**kwargs)
        for obj in resp['Contents']:
            if obj['Key'].endswith('.png'):
                keys.append('s3://' + bucket + '/' + obj['Key'])

        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break

    return keys

In [None]:
# get all the geo images keys from the bucket make a list
s3_uris = get_all_s3_keys(bucket)

In [None]:
len(s3_uris)

In [None]:
s3_uris[:10]

In [None]:
# define a function to extract image features
from time import sleep
import json

sm_client = boto3.client('sagemaker-runtime')
ENDPOINT_NAME = 'image-embedding'  # predictor.endpoint

def get_predictions(payload):
    return sm_client.invoke_endpoint(EndpointName=ENDPOINT_NAME,
                                           ContentType='application/json',  # 'application/x-image'
                                           Body=payload)

def extract_features(s3_uri):
    key = s3_uri.replace(f's3://{bucket}/', '')
    payload = json.dumps({'bucket' : bucket, 'image_uri' : key, 'content_type': "application/json"})  # s3.get_object(Bucket=bucket,Key=key)['Body'].read()
    try:
        response = get_predictions(payload)
    except:
        sleep(0.1)
        response = get_predictions(payload)

    del payload
    response_body = json.loads((response['Body'].read()))
    feature_lst = response_body  # response_body['predictions'][0]
    
    return s3_uri, feature_lst


In [None]:
resulti = extract_features(s3_uris[0])
print(resulti)

In [None]:
s3_uris[0]

In [None]:
%%time
# This process cell will take approximately 24-25 minutes on a t3.medium notebook instance
# with 3 m5.xlarge SageMaker Hosted Endpoint instances, If it fails, please try again！
from multiprocessing import cpu_count
from tqdm.contrib.concurrent import process_map

workers = 4  # TODO if your endpoint instance is large enough: 2 * cpu_count()
result = process_map(extract_features, s3_uris, max_workers=workers, chunksize=4)  # TODO use 100 images for sample, s3_uris[:100]

In [None]:
# setting up the Elasticsearch connection
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
region = 'cn-northwest-1' # e.g. us-east-1/cn-northwest-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

es = Elasticsearch(
    hosts = [{'host': es_host, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [None]:
#Define KNN Elasticsearch index maping
knn_index = {
    "settings": {
        "index.knn": True
    },
    "mappings": {
        "properties": {
            "geo_img_vector": {
                "type": "knn_vector",
                "dimension": 2048
            }
        }
    }
}

In [None]:
#Creating the Elasticsearch index
#es.indices.delete(index="idx_geo")
es.indices.create(index="idx_geo",body=knn_index,ignore=400)
es.indices.get(index="idx_geo")

In [None]:
%%time
# defining a function to import the feature vectors corrosponds to each S3 URI into Elasticsearch KNN index
# This process will take around ~3 min.If it fails, please try again！


def es_import(i):
    es.index(index='idx_geo',
             body={"geo_img_vector": i[1]['predictions'][0], 
                   "image": i[0]}
            )
    
_ = process_map(es_import, result, max_workers=workers)

## Evaluate Index Search Results

在这一步，我们将使用SageMaker SDK以及Boto3 SDK来查询Elasticsearch，以检索最近的邻居。如果你有一个非常特殊的领域问题，那么你需要在预先训练好的特征提取器模型（如VGG、Resnet、Xeception、Mobilenet等）上训练该数据集，并建立一个新的特征提取器模型。

In [None]:
#define display_image function
def display_image(bucket, key):
    response = s3.get_object(Bucket=bucket,Key=key)['Body']
    img = Image.open(response)
    return display(img)

In [None]:
import requests
import random
from PIL import Image
import io
# urls = []
# # yellow pattern dess
# urls.append('https://fastly.hautelookcdn.com/products/D7242MNR/large/13494318.jpg')
# # T shirt kind dress
# urls.append('https://fastly.hautelookcdn.com/products/M2241/large/15658772.jpg')
# #Dotted pattern dress
# urls.append('https://fastly.hautelookcdn.com/products/19463M/large/14537545.jpg')

# img_bytes = requests.get(random.choice(urls)).content
# query_img = Image.open(io.BytesIO(img_bytes))
# query_img

###### SageMaker SDK Method

In [None]:
'''
#SageMaker SDK approach
predictor.content_type = 'application/x-image'
predictor.serializer   = None
features = predictor.predict(img_bytes)['predictions'][0]
'''
# Boto3 approach
tmp_bucket = bucket
s3_uri = f's3://{bucket}/images/3100.png'
key = s3_uri.replace(f's3://{tmp_bucket}/', '')
payload = json.dumps({'bucket' : tmp_bucket, 'image_uri' : key, 'content_type': "application/json"})  # s3.get_object(Bucket=bucket,Key=key)['Body'].read()
response = get_predictions(payload)
response_body = json.loads((response['Body'].read()))
features = response_body['predictions'][0]



In [None]:
key

In [None]:
import json
k = 5
idx_name = 'idx_geo'
res = es.search(request_timeout=30, index=idx_name,
                body={'size': k, 
                      'query': {'knn': {'geo_img_vector': {'vector': features, 'k': k}}}})

In [None]:
for i in range(k):
    key = res['hits']['hits'][i]['_source']['image'] 
    key = key.replace(f's3://{bucket}/','')
    if key.startswith('s3'):
        key = 'images'+key
    print(key)
    img = display_image(bucket,key)
   

# Deploying a full-stack visual search application

In [None]:
sam_template_url = f'https://sagemaker-cn-northwest-1-456370280007.s3.cn-northwest-1.amazonaws.com.cn/visual-search/template-cn.yaml'



# 生成CloudFormation快速创建链接，复制到浏览器中开启CloudFormation向导，等待执行完毕

print("Click the URL below to create the backend API for visual search:\n")
print((
    'https://cn-northwest-1.console.amazonaws.cn/cloudformation/home?region=cn-northwest-1#/stacks/create/review'
    f'?templateURL={sam_template_url}'
    '&stackName=vis-search-api'
    f'&param_BucketName={outputs["s3BucketTraining"]}'
    f'&param_DomainName={outputs["esDomainName"]}'
    f'&param_ElasticSearchURL={outputs["esHostName"]}'
    f'&param_SagemakerEndpoint=image-embedding'  # TODO predictor.endpoint
))

BucketName = f"s3://{outputs['s3BucketTraining']}"
DomainName = f"s3://{outputs['esDomainName']}"
ElasticSearchURL = f"s3://{outputs['esHostName']}"
SagemakerEndpoint = "image-embedding"

Now that you have a working Amazon SageMaker endpoint for extracting image features and a KNN index on Elasticsearch, you are ready to build a real-world full-stack ML-powered web app. The SAM template you just created will deploy an Amazon API Gateway and AWS Lambda function. The Lambda function runs your code in response to HTTP requests that are sent to the API Gateway.

In [None]:
# Review the content of the Lambda function code.
#!pygmentize backend/lambda/app.py

### Once the CloudFormation Stack shows CREATE_COMPLETE, proceed to this cell below:

In [None]:
%cd /home/ec2-user/SageMaker/sagemaker-visual-image-search

In [None]:
# Save the REST endpoint for the search API to a config file, to be used by the frontend build

import json
api_endpoint = get_cfn_outputs('vis-search-api')['ImageSimilarityApi']
print(api_endpoint)
#api_endpoint = 'https://s42995b1j7.execute-api.cn-northwest-1.amazonaws.com.cn/Prod'

with open('./frontend/src/config/config.json', 'w') as outfile:
    json.dump({'apiEndpoint': api_endpoint}, outfile)

## Step 2: Deploy frontend services

In [None]:
# add NPM to the path so we can assemble the web frontend from our notebook code

from os import environ

npm_path = ':/home/ec2-user/anaconda3/envs/JupyterSystemEnv/bin'

if npm_path not in environ['PATH']:
    ADD_NPM_PATH = environ['PATH']
    ADD_NPM_PATH = ADD_NPM_PATH + npm_path
else:
    ADD_NPM_PATH = environ['PATH']
    
%set_env PATH=$ADD_NPM_PATH

In [None]:
%cd ./frontend/

!npm install

In [None]:
!npm run-script build

In [None]:
hosting_bucket = f"s3://{outputs['s3BucketHostingBucketName']}"

!aws s3 sync ./build/ $hosting_bucket --acl public-read

## Step 3: Browse your frontend service, and upload an image

In [None]:
print('Click the URL below:\n')
print(outputs['WebsiteURL'] + '/index.html')

You should see the following page:

![final_app](./pics/final_app.png)

On the website, try pasting the following URL in the URL text field.

`https://sagemaker-cn-northwest-1-456370280007.s3.cn-northwest-1.amazonaws.com.cn/visual-search/images/100.png`

## Extensions

We have used pretrained Resnet50 model which is trained on Imagenet dataset. Now based on your use-case you can fine tune any pre-trained models, such as VGG, Inception, and MobileNet with your own dataset and host the model in Amazon SageMaker.

You can also use Amazon SageMaker Batch transform job to have a bulk feaures extracted from your stored S3 images and then you can use AWS Glue to import that data into Elasticeearch domain.


### Cleanup

Make sure that you stop the notebook instance, delete the Amazon SageMaker endpoint and delete the Elasticsearch domain to prevent any additional charges.

In [None]:
# Delete the endpoint
# predictor.delete_endpoint()

In [None]:
# Empty S3 Contents
# training_bucket_resource = s3_resource.Bucket(bucket)
# training_bucket_resource.objects.all().delete()

# hosting_bucket_resource = s3_resource.Bucket(outputs['s3BucketHostingBucketName'])
# hosting_bucket_resource.objects.all().delete()