In [None]:
from ipynb.fs.full.utils import *

In [None]:
def add_to_collection(c_id, image, max_faces, image_name=None, bucket=None):
    if image_name is None:
        image_name = '.'.join(image.split('.')[:-1])
    
    client = boto3.client('rekognition')
    
    response = None
    if bucket is None:
        image_bytes = open(image,'rb')

        try:
            response = client.index_faces(
                CollectionId = c_id, 
                Image = {
                    'Bytes': image_bytes.read(),
                },
                ExternalImageId = image_name, 
                MaxFaces = max_faces,
                DetectionAttributes = ['ALL']
            )
            image_bytes.close()
            
        except Exception as e:
            print("Couldn't index faces in image: {}.".format(repr(e)))
    else:
        try:
            response = client.index_faces(
                CollectionId = c_id, 
                Image = {'S3Object':{'Bucket':bucket,'Name':image}},
                ExternalImageId = image_name, 
                MaxFaces = max_faces,
                DetectionAttributes=['ALL']
            )
        except Exception as e:
            print("Couldn't index faces in image: {}.".format(repr(e)))
    
    if response is None:
        return [], []
        
    indexed_faces = [
            {
                **face['Face'], 
                **face['FaceDetail']
            }
        for face in response['FaceRecords']]

    unindexed_faces = [
        face['FaceDetail']
        for face in response['UnindexedFaces']
    ]

    print(
        'Indexed {} faces in {}. Could not index {} faces.'.format(
            len(indexed_faces),
            image_name, 
            len(unindexed_faces)
        )
    )
    return indexed_faces, unindexed_faces

In [None]:
def list_collection_faces(c_id, max_results):
    client = boto3.client('rekognition')
    
    faces = []
    try:
        response = client.list_faces(
            CollectionId = c_id, 
            MaxResults = max_results
        )
        faces = [face['FaceId'] for face in response['Faces']]
        
        print('Found {} faces in collection {}.'.format(len(faces), c_id))
    except Exception as e:
        print('Couldn\'t list faces in collection {}: {}.'.format(c_id, repr(e)))

    return faces

In [None]:
def search_faces(c_id, face_id, threshold=90, max_faces=1):
    client = boto3.client('rekognition')
        
    faces = []
    try:
        response = client.search_faces(
            CollectionId = c_id, 
            FaceId = face_id,
            FaceMatchThreshold = threshold, 
            MaxFaces = max_faces
        )
        faces = [face['Face'] for face in response['FaceMatches']]
        print('Found {} faces in {} that match {}.'.format(len(faces), c_id, face_id))
    except Exception as e:
        print('Couldn\'t search for faces in {} that match {}: {}'.format(c_id, face_id, repr(e)))

    return faces

In [None]:
def search_faces_by_image(
    c_id,
    image, 
    threshold, 
    max_faces, 
    bucket=None
 ):
    client = boto3.client('rekognition')

    response = None
    if bucket is None:
        image_bytes = open(image,'rb')
        try:
            response = client.search_faces_by_image(
                CollectionId = c_id, 
                Image = {
                    'Bytes': image_bytes.read()
                },
                FaceMatchThreshold = threshold,
                MaxFaces = max_faces
            )
            image_bytes.close()
            
        except Exception as e:
            print('Couldn\'t search for faces in {} that are in {}: {}.'.format(image, c_id, repr(e)))
    else:
        try:
            response = client.search_faces_by_image(
                CollectionId = c_id, 
                Image = {
                    'S3Object':{
                        'Bucket':bucket,
                        'Name':image
                    }
                },
                FaceMatchThreshold = threshold,
                MaxFaces = max_faces
            )
        except Exception as e:
            print('Couldn\'t search for faces in {} that are in {}: {}.'.format(image, c_id, repr(e)))

    collection_faces = []
    try:
        if response is not None:
            for face in response['FaceMatches']:
                face_info = face['Face'] 
                face_info['Similarity'] = face['Similarity']
                collection_faces.append(face_info)
                  
        print('Found {} faces in the collection'.format(len(collection_faces)))
    except Exception as e:
        print('Couldn\'t search for faces in {}: {}.'.format(c_id, repr(e)))
              
    return collection_faces

In [None]:
def delete_faces(c_id, face_ids):
    client = boto3.client('rekognition')
    
    deleted_ids = []
    try:
        response = self.rekognition_client.delete_faces(
            CollectionId = c_id, 
            FaceIds = face_ids
        )
        deleted_ids = response['DeletedFaces']
        print('Deleted {} faces from collection {}.'.format(len(deleted_ids), c_id))
    except Exception as e:
        print('Couldn\'t delete faces from {}: {}.'.format(c_id, repr(e)))

    return deleted_ids


In [None]:
def create_collection(c_id):
    client = boto3.client('rekognition')

    response = None
    try:
        response = client.create_collection(
            CollectionId = c_id
        )
        
        response['CollectionId'] = c_id
        
        print('Created collection {}.'.format(c_id))
    except Exception as e:
        print('Couldn\'t create collection {}: {}.'.format(c_id, repr(e)))

    return response

In [None]:
def list_all_collections(max_results):
    client = boto3.client('rekognition')
    
    collections = []
    try:
        response = client.list_collections(MaxResults = max_results)
        collections = [col_id for col_id in response['CollectionIds']]
        
    except Exception as e:
        print('Couldn\'t list collections: {}.'.format(repr(e)))
   
    return collections

In [None]:
def delete_collection(c_id):
    client = boto3.client('rekognition')
        
    try:
        client.delete_collection(CollectionId=c_id)
        return 0
    except Exception as e:
        print('Couldn\'t delete collection {}: {}.'.format(c_id, repr(e)))
        return -1

In [None]:
def display_found(bucket, target, found):

    get_object(bucket, target, "../images/" + target)
    target_image = Image.open("../images/" + target)

    for face in found:
        print('Found: {}'.format(face['ExternalImageId']))
        exists_in_s3 = False
        try:
            external_id = face['ExternalImageId'] + '.jpg'
            get_object(bucket, external_id, "../images/" + external_id)
            face_image = Image.open("../images/" + external_id)
            exists_in_s3 = True
        except Exception:
            try:
                external_id = face['ExternalImageId'] + '.jpeg'
                get_object(bucket, external_id, "../images/" + external_id)
                face_image = Image.open("../images/" + external_id)
                exists_in_s3 = True
            except Exception:
                try:
                    external_id = face['ExternalImageId'] + '.png'
                    get_object(bucket, external_id, "../images/" + external_id)
                    face_image = Image.open("../images/" + external_id)
                    exists_in_s3 = True
                except Exception:
                    pass
        print(exists_in_s3)
        if not exists_in_s3:
            continue
        
        # copy target image so we can resize
        target_disp = target_image.copy()
        
        # resize to 800,600
        if face_image.size[0] > face_image.size[1]:
            face_image = resize(
                face_image, 
                width = 800, 
                height=int(face_image.size[1]*800/face_image.size[0])
            )
        else:
            face_image = resize(
                face_image, 
                height = 800, 
                width=int(face_image.size[0]*800/face_image.size[1])
            )
            
        if target_disp.size[0] > target_disp.size[1]:
            target_disp = resize(
                target_disp, 
                width = 800, 
                height=int(target_disp.size[1]*800/target_disp.size[0])
            )
        else:
            target_disp = resize(
                target_disp, 
                height = 800, 
                width=int(target_disp.size[0]*800/target_disp.size[1])
            )
            
        # extract bounding box
        box = face['BoundingBox']
        left = face_image.width * box['Left']
        top = face_image.height * box['Top']
        right = (face_image.width * box['Width']) + left
        bottom = (face_image.height * box['Height']) + top
        
        # draw a box around the matching face
        draw = ImageDraw.Draw(face_image)
        draw.rectangle([left, top, right, bottom], outline='aqua', width=6)
        
        # draw text similarity on the matching face
        similarity = 'MS: %.3f' % face['Similarity']
        fnt_size = int(20 * face_image.width/800)
        try:
            fnt = ImageFont.truetype("Pillow/Tests/fonts/FreeMono.ttf", fnt_size)
        except Exception:
            fnt=ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", fnt_size)
        
        text_fill = (255,255,255,128)
        w, h = fnt.getsize(similarity)
        pos_y = int(top)
        pos_x = int(left)
        draw.rectangle((pos_x, pos_y, pos_x + w, pos_y + h), fill='black')
        draw.text((pos_x, pos_y), similarity, font=fnt, fill=text_fill)
        
        concat = concat_horizontal(face_image, target_disp)
        target_disp.close()
        face_image.close()
        
        print('showing....')
        concat.show()
        
    target_image.close()