In [None]:
import boto3
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()

s3_resource = boto3.resource("s3")
s3 = boto3.client('s3')

In [None]:
cfn = boto3.client('cloudformation')

def get_cfn_outputs(stackname):
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

## Setup variables to use for the rest of the demo
cloudformation_stack_name = "vis-search"

outputs = get_cfn_outputs(cloudformation_stack_name)

bucket = outputs['s3BucketTraining']
es_host = outputs['esHostName']

outputs

In [None]:
## Data Preparation

import os 
import json
import urllib.request
from multiprocessing import cpu_count
from tqdm.contrib.concurrent import process_map

images_path = 'data/feidegger/fashion'
filename = 'metadata.json'

my_bucket = s3_resource.Bucket(bucket)

if not os.path.isdir(images_path):
    os.makedirs(images_path)

def download_metadata(url):
    if not os.path.exists(filename):
        urllib.request.urlretrieve(url, filename)
        
# download metadata.json to local notebook
download_metadata('https://raw.githubusercontent.com/zalandoresearch/feidegger/master/data/FEIDEGGER_release_1.2.json')

def generate_image_list(filename):
    metadata = open(filename,'r')
    data = json.load(metadata)
    url_lst = []
    for i in data:
        url_lst.append(i['url'])
    return url_lst


def download_image(url):
    urllib.request.urlretrieve(url, images_path + '/' + url.split("/")[-1])
                    
# generate image list            
url_lst = generate_image_list(filename)     

workers = 2 * cpu_count()

# downloading images to local disk; This process will take approximately 2-5 minutes on a t3.medium notebook instance
_ = process_map(download_image, url_lst, max_workers=workers, chunksize=1)

In [None]:
# Uploading dataset to S3

files_to_upload = []
dirName = 'data'
for path, subdirs, files in os.walk('./' + dirName):
    path = path.replace("\\","/")
    directory_name = path.replace('./',"")
    for file in files:
        files_to_upload.append({
            "filename": os.path.join(path, file),
            "key": directory_name+'/'+file
        })

def upload_to_s3(file):
    my_bucket.upload_file(file['filename'], file['key'])

# uploading images to s3; This process will take approximately 2-5 minutes on a t3.medium notebook instance
_ = process_map(upload_to_s3, files_to_upload, max_workers=workers, chunksize=1)

In [None]:
import tensorflow.keras as keras
from tensorflow.keras.applications.resnet50 import ResNet50
from PIL import Image

In [None]:
keras.backend.set_image_data_format(data_format='channels_last')

# Import Resnet50 model
model = ResNet50(weights='imagenet', include_top=False, pooling='avg', input_shape=(224,224,3))

In [None]:
# Creating the directory strcture
dirName = 'model/1'
if not os.path.exists(dirName):
    os.makedirs(dirName)
    print("Directory " , dirName ,  " Created ")
else:
    print("Directory " , dirName ,  " already exists")    

In [None]:
%time
# Save the model in SavedModel format
model.save('./model/1/', save_format='tf')

In [None]:
# Check the model Signature
!/home/ec2-user/anaconda3/envs/tensorflow2_p38/bin/saved_model_cli show --dir ./model/1/ --tag_set serve --signature_def serving_default

In [None]:
import tarfile

# zip the model .gz format
model_version = '1'
export_dir = 'model/' + model_version
with tarfile.open('model.tar.gz', mode='w:gz') as archive:
    archive.add(export_dir, recursive=True)

In [None]:
# Upload the model to S3
sagemaker_session = sagemaker.Session()
model_path = sagemaker_session.upload_data(path='model.tar.gz', key_prefix='vis-search/tf/model')
model_path

In [None]:
# Deploy the model in Sagemaker Endpoint. This process will take ~10 min.
from sagemaker.tensorflow import TensorFlowModel

sagemaker_model = TensorFlowModel(
    model_data=model_path,
    role=role,
    framework_version='2.8'
)

predictor = sagemaker_model.deploy(initial_instance_count=3, instance_type='ml.m5.xlarge')

In [None]:
from io import BytesIO
import numpy as np
import requests

sm_runtime_client = boto3.client("sagemaker-runtime")

# get the features for a sample image
def download_file(url):
    r = requests.get(url)
    if r.status_code == 200:
        file = r.content
        return file
    else:
        print("file failed to download")
        return None
    
def get_s3_obj(s3_uri):
    key = s3_uri.replace(f's3://{bucket}/', '')
    payload = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
    return payload

def image_preprocessing(img_bytes, return_body=True):
    img = Image.open(BytesIO(img_bytes)).convert("RGB")
    img = img.resize((224, 224))
    img = np.asarray(img)
    img = np.expand_dims(img, axis=0)
    if return_body:
        body = json.dumps({"instances": img.tolist()})
        return body
    else:
        return img
    
def get_features(img_bytes, sagemaker_endpoint=predictor.endpoint_name):
    res = image_preprocessing(img_bytes, return_body=True)
    response = sm_runtime_client.invoke_endpoint(
        EndpointName=sagemaker_endpoint,
        ContentType="application/json",
        Body=res,
    )
    response_body = json.loads((response["Body"].read()))
    features = response_body["predictions"][0]
    return features

image_bytes = get_s3_obj('s3://e2eviz-s3buckettraining-1ddugc6fvajd6/data/feidegger/fashion/0000723855b24fbe806c20a1abd9d5dc.jpg?imwidth=400&filter=packshot')
    
features = get_features(image_bytes)
features

In [None]:
# return all s3 keys
def get_all_s3_keys(bucket):
    """Get a list of all keys in an S3 bucket."""    
    keys = []

    kwargs = {'Bucket': bucket}
    while True:
        resp = s3.list_objects_v2(**kwargs)
        for obj in resp['Contents']:
            keys.append('s3://' + bucket + '/' + obj['Key'])

        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break

    return keys

In [None]:
# get all the zalando images keys from the bucket make a list
s3_uris = get_all_s3_keys(bucket)

In [None]:
# define a function to extract image features
from time import sleep

def extract_features(s3_uri):
    key = s3_uri.replace(f's3://{bucket}/', '')
    payload = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
    try:
        response = get_features(payload)
    except:
        sleep(0.1)
        response = get_features(payload)

    del payload
    feature_lst = response
    
    return s3_uri, feature_lst

In [None]:
# This process cell will take approximately 24-25 minutes on a t3.medium notebook instance
# with 3 m5.xlarge SageMaker Hosted Endpoint instances
from multiprocessing import cpu_count
from tqdm.contrib.concurrent import process_map

workers = 2 * cpu_count()
img_feature_vectors = process_map(extract_features, s3_uris, max_workers=workers, chunksize=1)

In [None]:
# setting up the Elasticsearch connection
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

region = boto3.Session().region_name # e.g. us-east-1
credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, region)

oss = OpenSearch(
    hosts = [{'host': es_host, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [None]:
# Define KNN Elasticsearch index maping
knn_index = {
    "settings": {
        "index.knn": True
    },
    "mappings": {
        "properties": {
            "zalando_img_vector": {
                "type": "knn_vector",
                "dimension": 2048
            }
        }
    }
}

In [None]:
# Creating the Elasticsearch index
oss.indices.create(index="idx_zalando",body=knn_index,ignore=400)
oss.indices.get(index="idx_zalando")

In [None]:
# defining a function to import the feature vectors corrosponds to each S3 URI into Elasticsearch KNN index
# This process will take around ~3 min.

def es_import(elem):
    oss.index(index='idx_zalando',
             body={
                "zalando_img_vector": elem[1], 
                "image": elem[0]
             })

_ = process_map(es_import, img_feature_vectors, max_workers=workers, chunksize=1)

In [None]:
# define display_image function
def display_image(bucket, key, size=(300, 300)):
    response = s3.get_object(Bucket=bucket,Key=key)['Body']
    img = Image.open(response)
    img = img.resize(size)
    return display(img)

In [None]:
import requests
import random


urls = url_lst[0:10]

img_bytes = download_file(random.choice(urls))
features = get_features(img_bytes)

In [None]:
k = 5
idx_name = 'idx_zalando'
res = oss.search(request_timeout=30, index=idx_name,
                body={'size': k, 
                      'query': {'knn': {'zalando_img_vector': {'vector': features, 'k': k}}}})

In [None]:
for i in range(k):
    key = res['hits']['hits'][i]['_source']['image']
    key = key.replace(f's3://{bucket}/','')
    img = display_image(bucket, key)

In [None]:
# download ready-made lambda package for backend api
!aws s3 cp s3://aws-ml-blog/artifacts/visual-search/function.zip ./

s3_resource.Object(bucket, 'backend/function.zip').upload_file('./function.zip')

In [None]:

from os import environ

npm_path = ':/home/ec2-user/anaconda3/envs/JupyterSystemEnv/bin'

if npm_path not in environ['PATH']:
    ADD_NPM_PATH = environ['PATH']
    ADD_NPM_PATH = ADD_NPM_PATH + npm_path
else:
    ADD_NPM_PATH = environ['PATH']
    
%set_env PATH=$ADD_NPM_PATH

In [None]:
%cd ./frontend/

!npm i --omit=dev

In [None]:
!npm run-script build

In [None]:
hosting_bucket = f"s3://{outputs['s3BucketHostingBucketName']}"

!aws s3 sync ./build/ $hosting_bucket

In [None]:
%cd ../

In [None]:
print('Click the URL below:\n')
print(f'https://{outputs["cfDomain"]}/index.html')