# 可视图片搜索
_**使用卷积神经网络和 Elasticsearch K-最近邻(KNN)索引检索视觉上相似的图像**_


## 目录


1. [背景](#Background)
1. [下载Zalando Research数据](#Setup)
1. [准备 TensorFlow 模型](#TensorFlow-Model-Preparation)
1. [使用 SageMaker 托管模型](#Hosting-Model)
1. [在 Elasticsearch 中创建一个 KNN 索引](#ES-KNN)
1. [搜索结果评估](#Searching-with-ES-k-NN)
1. [部署全栈视觉搜索应用程序](#)
1. [拓展](#Extensions)

## 背景

在本笔记本中，我们将构建视觉图像搜索应用程序的核心组件。使用可视图像搜索，您可以查找与您所提供照片相似的照片，而无需通过语音或文本来查找这些内容。

视觉图像搜索的核心组件之一是卷积神经网络（CNN）模型，该模型生成代表查询图像和要与查询进行比较的参考项目图像的“特征向量”。参考项目特征向量通常是离线生成的，必须存储在某种数据库中，以便可以对其进行有效搜索。对于小型参考项目数据集，可以使用蛮力搜索将查询与每个参考项目进行比较。但是，蛮力搜索大型数据集极其缓慢且不可行的。

为了能够有效搜索视觉上相似的图像，我们将使用Amazon SageMaker从图像生成“特征向量”，并在Amazon Elasticsearch Service中使用KNN算法。 Amazon Elasticsearch Service的KNN使您可以在向量空间中搜索点，并通过欧几里得距离或余弦相似度（默认值为欧几里得距离）找到这些点的“最近邻居”。用例包括建议（例如，音乐应用程序中的“您可能喜欢的其他歌曲”功能），图像识别和欺诈检测。

我们将按照以下步骤构建可视图像搜索：进行一些初始设置后，我们将使用TensorFlow准备模型以生成特征向量，然后从*__feidegger__*(一种*__zalandoresearch__*数据集)生成Fashion Images的特征向量。这些特征向量将导入到Amazon Elasticsearch KNN 索引中。接下来，我们将用一些图片测试下图像查询功能，并将结果可视化。

In [None]:
#Install tqdm to have progress bar
!pip install tqdm

#install necessary pkg to make connection with elasticsearch domain
!pip install elasticsearch
!pip install requests
!pip install requests-aws4auth

# Use SageMaker version 1.72.1
!pip install sagemaker==1.72.1

In [None]:
import boto3
import re
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()

s3_resource = boto3.resource("s3")
s3 = boto3.client('s3')

In [None]:
cfn = boto3.client('cloudformation')

def get_cfn_outputs(stackname):
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

## Setup variables to use for the rest of the demo
cloudformation_stack_name = "vis-search"

outputs = get_cfn_outputs(cloudformation_stack_name)

bucket = outputs['s3BucketTraining']
es_host = outputs['esHostName']

outputs

### 下载Zalando Research数据

该数据集包含8732幅高分辨率图像，每幅图像均是Zalando商店中的衣服，图片都是白底的。

**下载Zalando Research数据**：原始数据来自：https://github.com/zalandoresearch/feidegger

 **Citation:** <br>
 *@inproceedings{lefakis2018feidegger,* <br>
 *title={FEIDEGGER: A Multi-modal Corpus of Fashion Images and Descriptions in German},* <br>
 *author={Lefakis, Leonidas and Akbik, Alan and Vollgraf, Roland},* <br>
 *booktitle = {{LREC} 2018, 11th Language Resources and Evaluation Conference},* <br>
 *year      = {2018}* <br>
 *}*

In [None]:
## Data Preparation
use_small_data=True

images_path = 'data/feidegger/fashion'

if(use_small_data):
    !wget https://us-east-1-binc.s3.amazonaws.com/ai-day-2021/visual-search/image_data_2k.tgz
    !tar -xf image_data_2k.tgz
else:
    !wget https://us-east-1-binc.s3.amazonaws.com/ai-day-2021/visual-search/image_data.tgz
    !tar -xf image_data.tgz


In [None]:
# Uploading dataset to S3

!aws s3 sync data s3://$bucket/data/ --quiet && echo upload to $bucket/data finished

## 准备TensorFlow模型

我们将使用TensorFlow后端准备一个模型，以将图像“特征化”为特征向量。 TensorFlow具有底层 Module API和高级Keras API。

我们将从预先训练的模型开始，避免花费时间和金钱从头开始训练模型。 因此，作为准备模型的第一步，我们将从Keras应用程序中导入预训练的模型。 研究人员已经对具有不同层数的各种经过预训练的CNN架构进行了试验，发现有几种好的选择。

在本笔记本中，我们将基于ResNet架构选择模型，这是一种常用的选择。 在层数的各种选择中，从18到152，我们将使用50层。 这也是一个常见的选择，可以平衡结果特征向量（嵌入）的表现力和计算效率（层数越少意味着效率越高，但表现力越低）。

In [None]:
import os
import json
import time
import tensorflow as tf
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
import sagemaker
from PIL import Image
from sagemaker.tensorflow import TensorFlow

In [None]:
# Set the channel first for better performance
from tensorflow.keras import backend
backend.set_image_data_format('channels_first')
print(backend.image_data_format())

现在，我们将导入一个ResNet50模型，该模型在Imagenet数据集上经过训练，可以在没有实际clssifier的情况下提取特征。更具体地说，我们将使用该层来生成浮点数的行向量，以作为“嵌入”或图像特征的表示。 我们还将模型另存为**export/Servo/1**下的*SavedModel*格式，以通过SageMaker TensorFlow服务API进行服务。

In [None]:
#Import Resnet50 model
model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False,input_shape=(3, 224, 224),pooling='avg')


In [None]:
model.summary()

In [None]:
#Creating the directory strcture
dirName = 'export/Servo/1'
if not os.path.exists(dirName):
    os.makedirs(dirName)
    print("Directory " , dirName ,  " Created ")
else:    
    print("Directory " , dirName ,  " already exists")    

In [None]:
#Save the model in SavedModel format
model.save('./export/Servo/1/', save_format='tf')

In [None]:
#Check the model Signature
!saved_model_cli show --dir ./export/Servo/1/ --tag_set serve --signature_def serving_default

## 使用 SageMaker 托管模型

在保存特征提取器模型后，我们将使用Sagemaker Tensorflow Serving api部署模型。Sagemaker Tensorflow Serving是用于生产环境的机器学习模型托管系统，具有灵活，高性能的特性。 使用TensorFlow Serving可以轻松部署新算法和实验，同时保持相同的服务器体系结构和API。TensorFlow Serving提供与TensorFlow模型的现成集成，但可以轻松扩展以服务于其他类型的模型和数据。我们将定义**inference.py**来自定义TensorFlow Serving API的输入数据。 我们还需要添加**requirements.txt**到此容器中，以使用额外的库。

In [None]:
#check the actual content of inference.py
!pygmentize src/inference.py

In [None]:
import tarfile

#zip the model .gz format
model_version = '1'
export_dir = 'export/Servo/' + model_version
with tarfile.open('model.tar.gz', mode='w:gz') as archive:
    archive.add('export', recursive=True)

In [None]:
#Upload the model to S3
sagemaker_session = sagemaker.Session()
inputs = sagemaker_session.upload_data(path='model.tar.gz', key_prefix='model')
inputs

将模型上传到S3之后，我们将使用TensorFlow Serving容器托管模型。 我们会使用ml.p3.16xlarge实例类型。您可能需要开一个support case以增加SageMaker托管实例类型的服务配额。 我们将使用此端点生成特征并将其导入ElasticSearch。 您还可以选择小型实例，例如“ ml.m4.xlarge”以节省成本。

In [None]:
#Deploy the model in Sagemaker Endpoint. This process will take ~10 min.
from sagemaker.tensorflow.serving import Model

sagemaker_model = Model(entry_point='inference.py', model_data = 's3://' + sagemaker_session.default_bucket() + '/model/model.tar.gz',
                                  role = role, framework_version='2.1.0', source_dir='./src' )

predictor = sagemaker_model.deploy(initial_instance_count=3, instance_type='ml.c5.xlarge')



In [None]:
# get the features for a sample image
payload = s3.get_object(Bucket=bucket,Key='data/feidegger/fashion/0VB21C000-A11@12.1.jpg')['Body'].read()
predictor.content_type = 'application/x-image'
predictor.serializer   = None
features = predictor.predict(payload)['predictions'][0]

features

## 在 Elasticsearch 中创建一个 KNN 索引

Amazon Elasticsearch Service的KNN使您可以在向量空间中搜索点，并通过欧几里得距离或余弦相似度（默认值为欧几里得距离）找到这些点的“最近邻居”。 用例包括建议（例如，音乐应用程序中的“您可能喜欢的其他歌曲”功能），图像识别和欺诈检测。

KNN需要Elasticsearch 7.1或更高版本。 OpenDistro for Elasticsearch文档中提供了有关Elasticsearch功能的完整文档，包括设置和统计信息的描述。 有关k最近邻算法的背景信息

在这一步中，我们将获取zalando图像的所有特征，并将这些特征导入到Elastichseach7.4域中。

In [None]:
#Define some utility function

#return all s3 keys
def get_all_s3_keys(bucket):
    """Get a list of all keys in an S3 bucket."""    
    keys = []

    kwargs = {'Bucket': bucket}
    while True:
        resp = s3.list_objects_v2(**kwargs)
        for obj in resp['Contents']:
            keys.append('s3://' + bucket + '/' + obj['Key'])

        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break

    return keys

In [None]:
# get all the zalando images keys from the bucket make a list
s3_uris = get_all_s3_keys(bucket)
len(s3_uris)

In [None]:
# define a function to extract image features
from time import sleep

sm_client = boto3.client('sagemaker-runtime')
ENDPOINT_NAME = predictor.endpoint

def get_predictions(payload):
    return sm_client.invoke_endpoint(EndpointName=ENDPOINT_NAME,
                                           ContentType='application/x-image',
                                           Body=payload)

def extract_features(s3_uri):
    key = s3_uri.replace(f's3://{bucket}/', '')
    payload = s3.get_object(Bucket=bucket,Key=key)['Body'].read()
    try:
        response = get_predictions(payload)
    except:
        sleep(0.1)
        response = get_predictions(payload)

    del payload
    response_body = json.loads((response['Body'].read()))
    feature_lst = response_body['predictions'][0]
    
    return s3_uri, feature_lst


In [None]:
# This process cell will take approximately 24-25 minutes on a t3.medium notebook instance
# with 3 m5.xlarge SageMaker Hosted Endpoint instances
from multiprocessing import cpu_count
from tqdm.contrib.concurrent import process_map

workers = 4 * cpu_count()
result = process_map(extract_features, s3_uris, max_workers=workers)


In [None]:
# setting up the Elasticsearch connection
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
region = 'us-east-1' # e.g. us-east-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

es = Elasticsearch(
    hosts = [{'host': es_host, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [None]:
#Define KNN Elasticsearch index maping
knn_index = {
    "settings": {
        "index.knn": True
    },
    "mappings": {
        "properties": {
            "zalando_img_vector": {
                "type": "knn_vector",
                "dimension": 2048
            }
        }
    }
}

In [None]:
#Creating the Elasticsearch index
es.indices.create(index="idx_zalando",body=knn_index,ignore=400)
es.indices.get(index="idx_zalando")

In [None]:
# defining a function to import the feature vectors corrosponds to each S3 URI into Elasticsearch KNN index
# This process will take around ~3 min.


def es_import(i):
    es.index(index='idx_zalando',
             body={"zalando_img_vector": i[1], 
                   "image": i[0]}
            )
    
process_map(es_import, result, max_workers=workers)

## 搜索结果评估

在这一步中，我们将使用SageMaker SDK和Boto3 SDK查询Elasticsearch以检索最近的邻居。 值得一提的是**zalando**数据集与Imagenet数据集非常相似。 现在，如果您遇到一个领域特定的问题，那么您需要在预先训练的特征提取器模型（例如VGG，Resnet，Xeception，Mobilenet等）之上训练该数据集，并创建一个新的特征提取器模型。

In [None]:
#define display_image function
def display_image(bucket, key):
    response = s3.get_object(Bucket=bucket,Key=key)['Body']
    img = Image.open(response)
    return display(img)

In [None]:
import requests
import random
from PIL import Image
import io
urls = []
# yellow pattern dess
urls.append('https://fastly.hautelookcdn.com/products/D7242MNR/large/13494318.jpg')
# T shirt kind dress
urls.append('https://fastly.hautelookcdn.com/products/M2241/large/15658772.jpg')
#Dotted pattern dress
urls.append('https://fastly.hautelookcdn.com/products/19463M/large/14537545.jpg')

img_bytes = requests.get(random.choice(urls)).content
query_img = Image.open(io.BytesIO(img_bytes))
query_img

###### SageMaker SDK 方法

In [None]:
#SageMaker SDK approach
predictor.content_type = 'application/x-image'
predictor.serializer   = None
features = predictor.predict(img_bytes)['predictions'][0]

In [None]:
import json
k = 5
idx_name = 'idx_zalando'
res = es.search(request_timeout=30, index=idx_name,
                body={'size': k, 
                      'query': {'knn': {'zalando_img_vector': {'vector': features, 'k': k}}}})

In [None]:
for i in range(k):
    key = res['hits']['hits'][i]['_source']['image']
    key = key.replace(f's3://{bucket}/','')
    img = display_image(bucket,key)

##### Boto3 方法

In [None]:
client = boto3.client('sagemaker-runtime')
ENDPOINT_NAME = predictor.endpoint # our endpoint name
response = client.invoke_endpoint(EndpointName=ENDPOINT_NAME,
                                       ContentType='application/x-image',
                                       Body=img_bytes)

response_body = json.loads((response['Body'].read()))
features = response_body['predictions'][0]



In [None]:
import json
k = 5
idx_name = 'idx_zalando'
res = es.search(request_timeout=30, index=idx_name,
                body={'size': k, 
                      'query': {'knn': {'zalando_img_vector': {'vector': features, 'k': k}}}})


In [None]:
for i in range(k):
    key = res['hits']['hits'][i]['_source']['image']
    key = key.replace(f's3://{bucket}/','')
    img = display_image (bucket,key)

# 部署一个全栈视觉搜索应用程序

In [None]:
s3_resource.Object(bucket, 'backend/template.yaml').upload_file('./backend/template.yaml', ExtraArgs={'ACL':'public-read'})


sam_template_url = f'https://{bucket}.s3.amazonaws.com/backend/template.yaml'

# Generate the CloudFormation Quick Create Link

print("单击下面的URL,以创建用于视觉搜索的后端API:\n")
print((
    'https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/create/review'
    f'?templateURL={sam_template_url}'
    '&stackName=vis-search-api'
    f'&param_BucketName={outputs["s3BucketTraining"]}'
    f'&param_DomainName={outputs["esDomainName"]}'
    f'&param_ElasticSearchURL={outputs["esHostName"]}'
    f'&param_SagemakerEndpoint={predictor.endpoint}'
))

既然您具有一个可正常工作的Amazon SageMaker终端节点来提取图像特征并在Elasticsearch上创建了KNN索引，您就可以构建一个现实世界的，全栈，且有ML能力的Web应用程序了。 您刚刚创建的SAM模板将部署Amazon API Gateway和AWS Lambda函数。 Lambda函数运行您的代码以响应发送到API网关的HTTP请求。

In [None]:
# Review the content of the Lambda function code.
!pygmentize backend/lambda/app.py

### 一旦CloudFormation堆栈显示CREATE_COMPLETE，请继续下面的单元格：

In [None]:
# Save the REST endpoint for the search API to a config file, to be used by the frontend build

import json
api_endpoint = get_cfn_outputs('vis-search-api')['ImageSimilarityApi']

with open('./frontend/src/config/config.json', 'w') as outfile:
    json.dump({'apiEndpoint': api_endpoint}, outfile)

### 步骤 2: 部署前端服务

In [None]:
# add NPM to the path so we can assemble the web frontend from our notebook code

from os import environ

npm_path = ':/home/ec2-user/anaconda3/envs/JupyterSystemEnv/bin'

if npm_path not in environ['PATH']:
    ADD_NPM_PATH = environ['PATH']
    ADD_NPM_PATH = ADD_NPM_PATH + npm_path
else:
    ADD_NPM_PATH = environ['PATH']
    
%set_env PATH=$ADD_NPM_PATH

In [None]:
%cd ./frontend/

!npm install

In [None]:
!npm run-script build

In [None]:
hosting_bucket = f"s3://{outputs['s3BucketHostingBucketName']}"

!aws s3 sync ./build/ $hosting_bucket --acl public-read

### 步骤 3: 浏览您的前端服务，并上传图片

In [None]:
print('点击下面的URL:\n')
print(outputs['S3BucketSecureURL'] + '/index.html')

您应该看到以下页面：

![Website](pi3small.png)

在网站上，尝试将以下URL粘贴在URL文本字段中。

`https://i4.ztat.net/large/VE/12/1C/14/8K/12/VE121C148-K12@10.jpg`

## 拓展

我们使用了在Imagenet数据集上进行训练的预训练Resnet50模型。 现在，根据您的用例，您可以使用自己的数据集微调任何预先训练的模型，例如VGG，Inception和MobileNet，并将模型托管在Amazon SageMaker中。

您还可以使用Amazon SageMaker Batch转换作业从存储的S3图像中提取大量特征，然后可以使用AWS Glue将该数据导入Elasticeearch域。

### 清理资源

确保您停止笔记本实例，删除Amazon SageMaker终端节点并删除Elasticsearch域，以防止产生任何额外费用。

In [None]:
# Delete the endpoint
predictor.delete_endpoint()

In [None]:
# Empty S3 Contents
training_bucket_resource = s3_resource.Bucket(bucket)
training_bucket_resource.objects.all().delete()

hosting_bucket_resource = s3_resource.Bucket(outputs['s3BucketHostingBucketName'])
hosting_bucket_resource.objects.all().delete()