In [1]:
import glob
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import random
import seaborn as sn
import boto3
import re
from io import BytesIO
import base64
# import tqdm
import ast
import sagemaker
import time
import json

import torch
import torch.nn as nn
import torchvision.models as models
from torch.autograd import Variable
from torchvision import transforms

In [2]:
from sagemaker.utils import name_from_base

In [3]:
role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()

# Herlper functions

In [4]:
#return all s3 keys
def get_all_s3_keys(bucket, filt=None):
    """Get a list of all keys in an S3 bucket."""    
    keys = []

    kwargs = {'Bucket': bucket}
    while True:
        resp = s3.list_objects_v2(**kwargs)
        for obj in resp['Contents']:
            key = obj['Key']
            if filt is not None:
                if filt not in key:
                    continue
            keys.append('s3://' + bucket + '/' + key)

        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break

    return keys

# Make Model

In [None]:

resnet50 = models.resnet50(pretrained=True)

_ = resnet50.eval()
# _ = resnet50.cuda()

modules=list(resnet50.children())[:-1]
resnet50=nn.Sequential(*modules)
for p in resnet50.parameters():
    p.requires_grad = False

transform = transforms.Compose([transforms.ToTensor()])

device = torch.device('cuda:0' if torch.cuda.is_available() else "cpu")

resnet50 = resnet50.to(device)

device


# Test model with a single image

In [5]:
s3 = boto3.client('s3')

In [None]:
data_bucket = 'sagemaker-us-east-2-333209439517'

In [None]:
s3_uris = get_all_s3_keys(data_bucket, filt='jpg')

s3_uris[0], len(s3_uris)

In [None]:
s3_uri = s3_uris[0]
s3_uri

In [None]:
# s3_uri = 's3://sagemaker-us-east-2-333209439517/geological_similarity/andesite/012L6.jpg'

In [None]:
payload = s3.get_object(Bucket=data_bucket,Key=s3_uri.replace(f's3://{data_bucket}/', ''))['Body'].read()

In [None]:
im_file = BytesIO(payload)  # convert image to file-like object
img = Image.open(im_file)   # img is now PIL Image object

img

In [None]:
im = np.asarray(img)# convert image to numpy array

In [None]:
im.shape

In [None]:
im = np.moveaxis(im, -1, 0)

In [None]:
im.shape

In [None]:
img = transform(im) # convert to tensor
#img = img.reshape(1,3,28,28)
img = torch.unsqueeze(img, 0)
img = img.to(device)


In [None]:
img.shape

In [None]:

with torch.no_grad():
    feature = resnet50(img)

feature = feature.cpu().detach().numpy().reshape(-1)

feature.shape

# Save model to s3

In [None]:
img.shape

In [None]:
import tarfile

In [None]:
input_shape = [1, 3, 28, 28]
trace = torch.jit.trace(resnet50.float().eval(), torch.zeros(input_shape).float())

In [None]:
trace.save("model.pth")

In [None]:
with tarfile.open("model.tar.gz", "w:gz") as f:
    f.add("model.pth")

In [None]:
bucket

In [None]:
compilation_job_name = name_from_base("TorchVision-ResNet50")
prefix = compilation_job_name + "/model"

In [None]:
compilation_job_name, prefix

In [None]:
model_path = sess.upload_data(path="model.tar.gz", key_prefix=prefix)

In [None]:
model_path = 's3://sagemaker-us-east-1-333209439517/TorchVision-ResNet50-2021-09-13-00-30-10-117/model/model.tar.gz'

# Deploy model

In [None]:
import sagemaker
from sagemaker.pytorch import PyTorchModel
from sagemaker import get_execution_role, Session

In [None]:
role

In [None]:
predictor.delete_endpoint()

In [None]:
model = PyTorchModel(
    entry_point="inference.py",
    source_dir="code",
    role=role,
    model_data=model_path,
    framework_version="1.5.0",
    py_version="py3",
)

In [None]:
# SageMakerFullAccess - policy is a managed policy that includes all the necessary permissions required to perform most actions on SageMaker

In [None]:
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

# set local_mode to False if you want to deploy on a remote
# SageMaker instance

local_mode = False

if local_mode:
    instance_type = "local"
else:
    instance_type = "ml.t2.medium" #"ml.c4.xlarge"

predictor = model.deploy(
    initial_instance_count=1,
    instance_type=instance_type,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

In [None]:
predictor.__dict__

In [None]:
predictor.endpoint_name

In [None]:
predictor.content_type, predictor.serializer, predictor.deserializer

In [None]:
encoded_image = base64.b64encode(payload).decode('utf-8')
req = {'inputs':encoded_image}

In [None]:
im_bytes = base64.b64decode(encoded_image)   # im_bytes is a binary image
im_file = BytesIO(im_bytes)  # convert image to file-like object
image = Image.open(im_file)   # img is now PIL Image object
im = np.asarray(image)# convert image to numpy array
print(im.shape)
image

## Predict using 'predict' function

In [None]:
res = predictor.predict(req)

In [None]:
embedding = np.array(res).reshape(-1)

In [None]:
embedding.shape

In [None]:
ENDPOINT_NAME = predictor.endpoint_name

## Predict using 'invoke_endpoint'

In [None]:
# define a function to extract image features
from time import sleep 

sm_client = boto3.client('sagemaker-runtime')

In [None]:
ENDPOINT_NAME = 'pytorch-inference-2021-09-13-14-49-32-180'

In [None]:
resp = sm_client.invoke_endpoint(EndpointName=ENDPOINT_NAME, Body=json.dumps(req),ContentType='application/json') 
embedding = json.loads((resp['Body'].read()))
embedding = list(np.array(embedding).reshape(-1))

In [None]:
len(embedding)

## Utility functions for prediction on an s3 object

In [None]:
def get_predictions(payload): 
    return sm_client.invoke_endpoint(EndpointName=ENDPOINT_NAME, 
                                     Body=payload,
                                     ContentType='application/json') 

def extract_features(s3_uri, bucket): 
    key = s3_uri.replace(f's3://{bucket}/', '') 
    payload = s3.get_object(Bucket=bucket,Key=key)['Body'].read() 

    sleep(0.1) 
    
    # get image in base64 format
    encoded_image = base64.b64encode(payload).decode('utf-8')
    # get predictions
    response = get_predictions(json.dumps({'inputs':encoded_image})) 
    # get response
    response_body = json.loads((response['Body'].read())) 
    # reshape to list of len=2048
    feature_lst = list(np.array(response_body).reshape(-1))
    
    return s3_uri, feature_lst

## Get features for all images

In [None]:
s3_uri = s3_uris[0]
s3_uri

In [None]:
uri, features = extract_features(s3_uri, data_bucket)

In [None]:
uri

In [None]:
len(features)

In [None]:
s3_resource = boto3.resource('s3')

In [None]:
s3_resource

In [None]:
bucket

In [None]:
all_features = []
for s3_uri in s3_uris:
    _, features = extract_features(s3_uri, data_bucket)
    all_features.append(features)

In [None]:
print(len(all_features))

In [None]:
f2 = [str(x) for x in all_features]

In [None]:
f2[0]

In [None]:
data = {'uris':s3_uris, 'features':all_features}

In [None]:
s3object = s3_resource.Object(bucket, 'embeddings/resnet50_pretrained_features.json')
s3object.put(Body=(bytes(json.dumps(data).encode('UTF-8'))))

In [None]:
bucket

In [None]:
import json
import boto3

s3 = boto3.client('s3')
data = 'your_json_object here'
s3.put_object(
     Body=str(json.dumps(data))
     Bucket='your_bucket_name'
     Key='your_key_here'
)

In [None]:
np_feats = np.array(all_fe)

# Connect to Elasticsearch

In [23]:
!pip install requests_aws4auth
!pip install elasticsearch=='7.13.4' #https://opensearch.org/docs/clients/index/

Collecting requests_aws4auth
  Downloading requests_aws4auth-1.1.1-py2.py3-none-any.whl (31 kB)
Installing collected packages: requests-aws4auth
Successfully installed requests-aws4auth-1.1.1
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/pytorch_p36/bin/python -m pip install --upgrade pip' command.[0m
Collecting elasticsearch==7.13.4
  Downloading elasticsearch-7.13.4-py2.py3-none-any.whl (356 kB)
[K     |████████████████████████████████| 356 kB 30.5 MB/s eta 0:00:01
Installing collected packages: elasticsearch
Successfully installed elasticsearch-7.13.4
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/pytorch_p36/bin/python -m pip install --upgrade pip' command.[0m


In [None]:
# !pip install  nltk
# !pip install jsonlines
# !pip install pandarallel
# !pip install --upgrade grpcio 
# !pip install --upgrade s3fs


In [24]:
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection

In [25]:
import elasticsearch

In [26]:
elasticsearch.__version__

(7, 13, 4)

In [27]:
region

'us-east-1'

In [28]:
service = 'es'

In [29]:
# ssm = boto3.client('ssm', region_name=region)

In [30]:
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, service, session_token=credentials.token)

In [31]:
role

'arn:aws:iam::333209439517:role/service-role/AmazonSageMaker-ExecutionRole-20210623T145063'

In [32]:
credentials

<botocore.credentials.RefreshableCredentials at 0x7f261ee1f2b0>

In [33]:
host = "search-search-all-wroz4g764qrjndse4eqbika2ia.us-east-1.es.amazonaws.com"
    
es_index = 'knn-test-1'
# "search-domainname-yourDomainEndpoint.REGION.es.amazonaws.com"

In [34]:
def connect_to_ES(esEndPoint):
    print ('Connecting to the ES Endpoint {0}'.format(esEndPoint))
    try:
        esClient = Elasticsearch(
            hosts=[{'host': esEndPoint, 'port': 443}],
            http_auth=('cac','Test123$'),
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection)
        return esClient
    except Exception as E:
        print("Unable to connect to {0}".format(esEndPoint))
        print(E)

In [35]:
es = connect_to_ES(host)
es

Connecting to the ES Endpoint search-search-all-wroz4g764qrjndse4eqbika2ia.us-east-1.es.amazonaws.com


<Elasticsearch([{'host': 'search-search-all-wroz4g764qrjndse4eqbika2ia.us-east-1.es.amazonaws.com', 'port': 443}])>

In [36]:
es.indices.exists(index=es_index)

False

In [37]:
def create_index(index):
    """
    This function will create an index using knn settings
    """
    if not es.indices.exists(index=index):
        index_settings = {
            "settings": {
                "index.knn": True,
                "index.mapping.total_fields.limit": "2000"
            },
            "mappings": {
                "properties": {
                    "embeddings": {
                        "type": "knn_vector",
                        "dimension": 2048
                    }
                }
            }
        }

        es.indices.create(index=index, body=json.dumps(index_settings))
        print("Created the elasticsearch index successufly ")
    else:
        print("elasticsearch index already exists")

In [38]:
es_index

'knn-test-1'

In [39]:
#Create the index using knn settings
create_index(es_index)

Created the elasticsearch index successufly 


In [53]:
# You can check if the index is created within your es cluster
es.indices.get_alias("*")

{'.opendistro_security': {'aliases': {}},
 '.kibana_1': {'aliases': {'.kibana': {}}},
 'knn-test-1': {'aliases': {}}}

## Load features into OpenSearch

### Load features from s3

In [6]:
s3_uris = get_all_s3_keys(bucket, filt='embeddings/resnet')

In [43]:
s3_uris

['s3://sagemaker-us-east-1-333209439517/embeddings/resnet50/resnet_vectors_andesite.json',
 's3://sagemaker-us-east-1-333209439517/embeddings/resnet50/resnet_vectors_gneiss.json',
 's3://sagemaker-us-east-1-333209439517/embeddings/resnet50/resnet_vectors_marble.json',
 's3://sagemaker-us-east-1-333209439517/embeddings/resnet50/resnet_vectors_quartzite.json',
 's3://sagemaker-us-east-1-333209439517/embeddings/resnet50/resnet_vectors_rhyolite.json',
 's3://sagemaker-us-east-1-333209439517/embeddings/resnet50/resnet_vectors_schist.json']

In [8]:
s3_uri = s3_uris[0]
s3_uri

's3://sagemaker-us-east-1-333209439517/embeddings/resnet50/resnet_vectors_andesite.json'

In [9]:
payload = s3.get_object(Bucket=bucket,Key=s3_uri.replace(f's3://{bucket}/', ''))['Body'].read()

In [11]:
type(payload)

bytes

In [12]:
data = json.loads(payload.decode('utf8').replace("'", '"'))

In [13]:
data.keys()

dict_keys(['labels', 'filenames', 'features'])

In [14]:
data['labels']

'andesite'

In [15]:
data['filenames'][0]

'/mnt/osn3/caceres/classes/geological/geological_similarity/andesite/04ZDG.jpg'

In [17]:
len(eval(data['features'][0]))

2048

In [45]:
for filename, embeddings in zip(data['filenames'], data['features']):
    embeddings = ast.literal_eval(embeddings)
    filename = filename.replace('/mnt/osn3/caceres/classes/geological', 's3://sagemaker-us-east-2-333209439517')
    print(filename, len(embeddings))
    
    break
    
# sagemaker-us-east-2-333209439517/geological_similarity

s3://sagemaker-us-east-2-333209439517/geological_similarity/andesite/04ZDG.jpg 2048


In [46]:
import tqdm

In [51]:

def ingest_data_into_es(filt):
    """
        Ingest pre-calculated vectors into OpenSearch index. 
        
        filt: string filter for keys
    """
    s3_uris = get_all_s3_keys(bucket, filt='embeddings/resnet')

    count = 0
    lost_records = 0
    total = 0
    for s3_uri in s3_uris:
        payload = s3.get_object(Bucket=bucket,Key=s3_uri.replace(f's3://{bucket}/', ''))['Body'].read()
        data = json.loads(payload.decode('utf8').replace("'", '"'))

        for filename, embeddings in tqdm.tqdm(zip(data['filenames'], data['features'])):
            try:
                embeddings = ast.literal_eval(embeddings)
                filename = filename.replace('/mnt/osn3/caceres/classes/geological', 's3://sagemaker-us-east-2-333209439517')

                # make record
                record = {}
                record['id'] = count
                record['uri'] = filename
                record['embeddings'] = embeddings

                # ingest into ES
                es.index(index=es_index, id=count, body=record)

                count += 1
            except Exception as error:
                logger.error(f"An error {error} for record {record}")
                lost_records += 1
            
            total += 1

    print(f'{lost_records} out of {total} are lost records')

    print(f'{count} out of {total} records has been processed')

    return {
        'statusCode': 200,
        'body': json.dumps(str(count) + ' records processed.')
    }

In [52]:
ingest_data_into_es()

5000it [01:17, 64.70it/s]
5000it [01:08, 72.66it/s]
4998it [01:14, 66.99it/s]
5000it [01:10, 70.50it/s]
5000it [01:13, 68.49it/s]
5000it [01:11, 70.01it/s]


NameError: name 'records' is not defined

## Check that data is indeed in ES

In [54]:
res = es.search(index=es_index, body={
                    "query": {
                            "match_all": {}
                        }},
           size=10)

In [55]:
type(res)

dict

In [56]:
res.keys()

dict_keys(['took', 'timed_out', '_shards', 'hits'])

In [57]:
res['took']

288

In [58]:
res['timed_out']

False

In [59]:
res['_shards']

{'total': 5, 'successful': 5, 'skipped': 0, 'failed': 0}

In [61]:
res['hits'].keys()

dict_keys(['total', 'max_score', 'hits'])

In [68]:
res['hits']['hits'][0].keys()

dict_keys(['_index', '_type', '_id', '_score', '_source'])

In [69]:
res['hits']['hits'][0]['_index']

'knn-test-1'

In [70]:
res['hits']['hits'][0]['_type']

'_doc'

In [71]:
res['hits']['hits'][0]['_id']

'3'

In [72]:
res['hits']['hits'][0]['_score']

1.0

In [75]:
len(res['hits']['hits'][0]['_source']['embeddings'])

2048

In [76]:
res['hits']['hits'][0]['_source']['uri']

's3://sagemaker-us-east-2-333209439517/geological_similarity/andesite/0UHNC.jpg'

## Query with embeddings

In [85]:
query_embeddings = res['hits']['hits'][0]['_source']['embeddings']
num_similar_images = 4

In [86]:
es_query ={
            "query": {
                "knn": {
                    "embeddings": {
                        "vector": query_embeddings,
                        "k": 5
                    }
                }
            }
    }

res = es.search(index=es_index, body=es_query, size=num_similar_images)



In [88]:
for hit in res['hits']['hits']:
    print(hit['_source']['uri'], hit['_score'])

s3://sagemaker-us-east-2-333209439517/geological_similarity/andesite/0UHNC.jpg 1.0
s3://sagemaker-us-east-2-333209439517/geological_similarity/andesite/OQOXQ.jpg 0.0057354467
s3://sagemaker-us-east-2-333209439517/geological_similarity/andesite/AG89G.jpg 0.0056438325
s3://sagemaker-us-east-2-333209439517/geological_similarity/andesite/H7U69.jpg 0.005360121


In [91]:
a = [hit['_source']['uri'] for hit in res['hits']['hits']][0]

In [93]:
type(a)

str

In [95]:
from urllib.parse import urlparse

In [96]:
o = urlparse(a, allow_fragments=False)
o

ParseResult(scheme='s3', netloc='sagemaker-us-east-2-333209439517', path='/geological_similarity/andesite/0UHNC.jpg', params='', query='', fragment='')

In [98]:
o.netloc, o.path.lstrip('/')

('sagemaker-us-east-2-333209439517',
 'geological_similarity/andesite/0UHNC.jpg')

In [99]:
type(o.netloc)

str

In [100]:
type(o.path.lstrip('/'))

str