In [2]:
import boto3
import io
import numpy as np
import json

# Load boto references
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')
sm_runtime = boto3.Session().client('sagemaker-runtime')

## Create people database

Define the input event wehich specifies the input images, inference endpoint and output people database.

In [3]:
import sagemaker
from sagemaker.utils import name_from_base
sagemaker_session = sagemaker.Session()

event = {
    'InputBucket': 'virtual-concierge-index-ap-southeast-2',
    'InputPrefix': '',
    'EndpointName': 'sagemaker-mxnet-2019-04-28-00-04-50-699' # TODO: Replace this with your endpoint
}

event['OutputBucket'] = sagemaker_session.default_bucket()
event['OutputKey'] = 'virtual-concierge/people.npz'

json.dumps(event)

INFO:sagemaker:Created S3 bucket: sagemaker-us-east-1-882607831196


'{"InputBucket": "virtual-concierge-index-ap-southeast-2", "InputPrefix": "", "EndpointName": "sagemaker-mxnet-2019-04-28-00-04-50-699", "OutputBucket": "sagemaker-us-east-1-882607831196", "OutputKey": "virtual-concierge/people.npz"}'

In [4]:
#!aws s3 rm s3://sagemaker-us-east-1-882607831196/virtual-concierge/people.npz

Load existing vecs names and checksums

In [5]:
def get_etag(event):
    try:
        people_head = s3_client.head_object(Bucket=event['OutputBucket'], Key=event['OutputKey'])
        return people_head['ETag'].strip('"')
    except Exception as e:
        print('Unable to get etag', e)
        return ''

def load_file(event):
    try:
        # Attempt to load people from s3
        people_object = s3.Object(event['OutputBucket'], event['OutputKey'])
        payload = people_object.get()['Body'].read()
        f = io.BytesIO(payload)
        people = np.load(f)
        return people['vecs'].tolist(), people['names'].tolist(), people['keys'].tolist(), set(people['checksums'])
    except Exception as e:
        print('Initialize new file', e)
        return [], [], [], set()
        
# Save the npz to temp file and get payload
def save_file(event, vecs, names, keys, checksums):
    from tempfile import TemporaryFile
    outfile = TemporaryFile()
    np.savez(outfile, vecs=vecs, names=names, keys=keys, checksums=list(checksums))
    outfile.seek(0)
    payload = outfile.read()
    resp = s3_client.put_object(Bucket=event['OutputBucket'], Key=event['OutputKey'], Body=payload)
    return resp['ETag'].strip('"')    

Download the existing file from s3

In [6]:
%%time

vecs, names, keys, checksums = load_file(event)
print('loaded count: {}'.format(len(checksums)))

loaded count: 63
CPU times: user 14.3 ms, sys: 4.62 ms, total: 18.9 ms
Wall time: 47.2 ms


Query s3 for keys that have changed

In [7]:
%%time

def get_new_contents(event, checksums, batch_size=10, batch_limit=100):    
    print('getting contents batch size: {}, limit: {}'.format(batch_size, batch_limit))
    contents = []
    is_truncated = False
    
    def filter_by_checksum(response, checksums):
        return [(content['Key'], content['ETag'].strip('"')) for content in response['Contents']
                if content['Size'] > 0 and not content['ETag'].strip('"') in checksums]
    
    # Get the first response
    response = s3_client.list_objects_v2(
        Bucket=event['InputBucket'],
        Prefix=event.get('InputPrefix'),
        MaxKeys=batch_size
    )
    contents += filter_by_checksum(response, checksums)

    # Get remaining response
    while response['IsTruncated']:
        response = s3_client.list_objects_v2(
            ContinuationToken=response['NextContinuationToken'],
            Bucket=event['InputBucket'],
            Prefix=event.get('InputPrefix'),
            MaxKeys=batch_size
        )
        contents += filter_by_checksum(response, checksums)
        if len(contents) > batch_limit:
            print('Reached limit: {}'.format(len(contents)))
            is_truncated = True
            break
    
    return contents, is_truncated

contents, is_truncated = get_new_contents(event, checksums)
print('Added {} contents, truncated: {}'.format(len(contents), is_truncated))

getting contents batch size: 10, limit: 100
Added 0 contents, truncated: False
CPU times: user 55 ms, sys: 2.57 ms, total: 57.6 ms
Wall time: 3.02 s


Downloading new images and vectorize

In [8]:
%%time

def download_contents(event, contents, vecs, names, keys, checksums=None):
    for (key, checksum) in contents:
        if checksums and checksum in checksums:
            print('Skip', checksum)
            continue
        # Get image object
        image_object = s3.Object(event['InputBucket'], key)
        # Get name from meta data or last part of filename
        name = image_object.metadata.get('fullname') or key.split('/')[-1].split('.')[0]
        # Call endpoint to crop boudning box and return vector 
        payload = image_object.get()['Body'].read()
        response = sm_runtime.invoke_endpoint(EndpointName=event['EndpointName'],
                                              ContentType='application/x-image',
                                              Body=payload)
        vec = json.loads(response['Body'].read().decode())
        try:
            # Attempt to replace key, or else, append to list
            index = keys.index(key)
            vecs[index] = vec
            names[index] = name
            print('Updated', index, name)
        except ValueError:
            print('Added', name)
            vecs.append(vec)
            names.append(name)
            keys.append(key)
        if checksums:
            checksums.add(checksum)
        
print('downloading contents: {}'.format(len(contents)))
download_contents(event, contents, vecs, names, keys, checksums)

downloading contents: 0
CPU times: user 226 µs, sys: 0 ns, total: 226 µs
Wall time: 138 µs


Upload updated file back to s3

In [12]:
people_etag = ''

if len(contents) > 0:
    print('uploading file: {}/{}'.format(event['OutputBucket'], event['OutputKey']))
    people_etag = save_file(event, vecs, names, keys, checksums)

In [13]:
# Return the new items added
response = {
    'Added': len(contents),
    'IsTruncated': is_truncated,
    'Total': len(keys),
    'Unique': len(checksums),
    'ETag': people_etag,
}
response

{'Added': 0, 'IsTruncated': False, 'Total': 63, 'Unique': 63, 'ETag': ''}

## Upadate Device Shadow with file to download

Send a message to IoT shadow `thing_name` to inform it to download new model

In [None]:
import boto3
import json

client = boto3.client('iot-data')

# TODO: Replace this with your thing name
thing_name = 'deeplens_9pJ_x7I6QtO8FsjQfab2tQ'

Send a message to IoT given a thing name to update their people model

In [None]:
# Set the properties to send to shadow
props = {
    'people': {
        'Etag': people_etag,
        'Bucket': event['OutputBucket'],
        'Key': event['OutputKey']
    }
}
props

shadow = {
    'state': {
        'desired' : props
    }    
}
response = client.update_thing_shadow(
    thingName=thing_name,
    payload=json.dumps(shadow)
)
shadow = json.loads(response["payload"].read())
shadow['state']['desired']

### Get Device Shadow delta, download and report back

Get the shing shadow delta, and inspect for `people` data to download new model

This code would typically be run at the edge in a greengrass lambda.

In [None]:
import hashlib

people_path = 'models/people.npz'

In [None]:
%%time

def download_model(people_path, props):
    try:
        with open(people_path,'rb') as f:
            people_etag = hashlib.md5(f.read()).hexdigest()
            print('read etag', people_etag)
    except:
        people_etag = ''
        
    if people_etag == props['Etag']:
        print('people unchanged')
    else:
        print('downloading to', people_path)
        with open(people_path,'wb') as f:
            response = s3_client.get_object(Bucket=props['Bucket'], Key=props['Key'])   
            f.write(response['Body'].read())

# Read the shadow, call download model if we have 
response = client.get_thing_shadow(thingName=thing_name)
shadow = json.loads(response["payload"].read())
delta = shadow['state'].get('delta')
print('shadow delta', delta)

if delta and 'people' in delta:
    download_model(people_path, shadow['state']['desired']['people'])

Update the report value in shadow once we have downloaded the file

In [None]:
# Set the reported state to update
shadow = {
    'state': {
        'reported': props
    }    
}
response = client.update_thing_shadow(
    thingName=thing_name,
    payload=json.dumps(shadow)
)
shadow = json.loads(response["payload"].read())
shadow['state']['reported']