# Video Analysis Chapter

This notebook contains snippets for interacting with the Amazon Rekognition Video API.

Confirm that the `boto3` module is installed.

In [1]:
!pip3 install boto3



You should consider upgrading via the 'c:\tools\python3.8\python.exe -m pip install --upgrade pip' command.


The `rtsp` module is a wrapper to the Open Computer Vision (OpenCV) library. Let's install it

In [4]:
!pip3 install rtsp



You should consider upgrading via the 'c:\tools\python3.8\python.exe -m pip install --upgrade pip' command.


Configure the global settings for this notebook.

My eufy `base_station` is at `192.168.0.70` with username `admin` and password `EYE_SEE_YOU`. You're environment will have different properties (and hopefully a stronger password).

The `cameras` dictionary maps the camera endpoint to a friendly name. You should choose names that align with your physical placement.

In [9]:
from rtsp import Client
base_station = 'rtsp://admin:EYE_SEE_YOU@192.168.0.70'
cameras = {
    'live0':'office',
    'live1':'kitchen',
    'live2':'living_room',    
}

The `get_frame` function retrieves a single image from a given camera.

In [17]:
from time import sleep
def get_frame(base_station, endpoint):
    rtsp_server_uri = '%s/%s' % (base_station, endpoint)
    with Client(rtsp_server_uri=rtsp_server_uri, verbose=False) as client:
        if not client.isOpened():
            print('{} server is down.'.format(rtsp_server_uri))
            return None
        else:
            print('{} server is up.'.format(rtsp_server_uri))

        while True:
            image = client.read()
            if image is None:
                sleep(0.100)
            else:
                return image
frames= {}
for endpoint in cameras.keys():
    frames[endpoint]= get_frame(base_station,endpoint)

frames    

rtsp://admin:EYE_SEE_YOU@192.168.0.70/live0 server is up.
rtsp://admin:EYE_SEE_YOU@192.168.0.70/live1 server is down.
rtsp://admin:EYE_SEE_YOU@192.168.0.70/live2 server is down.
rtsp://admin:EYE_SEE_YOU@192.168.0.70/live3 server is down.


{'live0': <PIL.Image.Image image mode=RGB size=1920x1080>,
 'live1': None,
 'live2': None,
 'live3': None}

You'll notice that the `fames` are Python Image Library (PIL) formatted. Let's write one to disk in PNG format.

In [18]:
frames['live0'].save('live0.png', format='PNG')

Alternatively, you can request PIL launches a window to display the image. 

In [21]:
frames['live0'].show()

Next, create an Amazon S3 client and persist the `frame['live0']` into a `BytesIO` memory stream.

In [22]:
import boto3
region_name = 'us-east-2'
s3 = boto3.client('s3', region_name=region_name)
bucket = 'ch05-video-use2'

from io import BytesIO
frame_bytes = BytesIO()
frames['live0'].save(frame_bytes, format='PNG')

You can write the `frame_bytes` into S3 using the `PutObject` API.

In [24]:
from datetime import datetime
dt = datetime.now()
object_key = 'frames/%s/%s.png' %(
    cameras['live0'],
    dt.strftime('%Y/%m/%d/%H/%M/%S.%f')
)

response = s3.put_object(
    Bucket=bucket,
    Key=object_key,
    Body=frame_bytes.getvalue(),
    Metadata={
        'Camera': cameras['live0'],        
    })

from json import dumps
print(dumps(response, indent=2))

{
  "ResponseMetadata": {
    "RequestId": "ZAGZ8CJGEY28BJ18",
    "HostId": "jdf5IN7y4hl52BJx4a1/iWKdBQ2hwW9FVuuKgTVZW424q+Ud5EU+LY7kjkE2ku6ZTkZtum1WBlY=",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amz-id-2": "jdf5IN7y4hl52BJx4a1/iWKdBQ2hwW9FVuuKgTVZW424q+Ud5EU+LY7kjkE2ku6ZTkZtum1WBlY=",
      "x-amz-request-id": "ZAGZ8CJGEY28BJ18",
      "date": "Sun, 25 Dec 2022 22:10:45 GMT",
      "etag": "\"7bddaa72bff30f00441e7fa3df9719f2\"",
      "server": "AmazonS3",
      "content-length": "0"
    },
    "RetryAttempts": 0
  },
  "ETag": "\"7bddaa72bff30f00441e7fa3df9719f2\""
}


Amazon CloudWatch lets you create custom performance counters, called metrics. The `increment_frame_count` function will increment the metric for the `camera_name` by one.

In [26]:
cloudwatch = boto3.client('cloudwatch', region_name=region_name)
def increment_frame_count(camera_name, count=1):
    cloudwatch.put_metric_data(
        Namespace='VideoAnalysis',
        MetricData=[
        {
            "MetricName": 'FrameCount',
            "Value":count,
            "Unit": 'Count',
            "Dimensions": [              
                {
                    "Name": 'camera_name',
                    "Value": camera_name
                },
            ]
        }
        ]
    )

increment_frame_count(cameras['live0'])

Let's use the `StartPersonTracking` API to asynchronously track people within the `path_tracking_backyard.mov` file. When the processing completes a notification is sent to the `TOPIC_ARN`.

To run this snippet you'll need to configure the following values:
1. Set the `PUBLISHER_ROLE_ARN` to the AWS IAM role containing the `PersonTrackingAssumeRole.json` policy
1. Set the `TOPIC_ARN` to an Amazon Simple Notification Service (Amazon SNS) topic within your account

Note: If the topic doesn't start with **AmazonRekognition**, you'll need to explicitly grant the `PUBLISHER_ROLE_ARN` access to the `sns:Publish` action of the `TOPIC_ARN` resource.

In [5]:
import boto3
region_name='us-east-2'
bucket = 'ch05-video-use2'
rekognition = boto3.client('rekognition', region_name=region_name)


PUBLISHER_ROLE_ARN='arn:aws:iam::ACCOUNTID:role/PersonTrackingPublisher'
TOPIC_ARN='arn:aws:sns:us-east-2:ACCOUNTID:AmazonRekognitionPersonTrackingTopic'

rekognition = boto3.client('rekognition', region_name='us-east-2')

job = rekognition.start_person_tracking(
    NotificationChannel={
        'RoleArn': PUBLISHER_ROLE_ARN,
        'SNSTopicArn': TOPIC_ARN
    },
    Video={
        'S3Object':{
            'Bucket': bucket,
            'Name': 'tracking/path_tracking_backyard.mov'
        }
    })

Let's pretty print the `job` response from the `StartPersonTracking` API.

In [11]:
from json import dumps
print(dumps(job, indent=2))

{
  "JobId": "117a91707136f9ab9a7212b008f6626adaa4900208bae4adac72c057d9158bc5",
  "ResponseMetadata": {
    "RequestId": "c0bb3572-cecb-4038-8cca-16a14816a5d7",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "c0bb3572-cecb-4038-8cca-16a14816a5d7",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "76",
      "date": "Sun, 08 Jan 2023 15:25:15 GMT"
    },
    "RetryAttempts": 0
  }
}


The `get_tracking_results` utility function waits for the Person Tracking job to complete. Afterward, it'll paginate through the detections and combine them into one logical response.

In [6]:
from time import sleep

def get_tracking_results(jobid):
    combined_response = None
    next_token = None
    while True:
        response = rekognition.get_person_tracking(JobId=jobid)
        if response['JobStatus'] == 'IN_PROGRESS':
            print('Job %s is still running...'% jobid)
            sleep(1)
            continue
        if response['JobStatus'] == 'FAILED':
            print('Job %s is failed due to %s' %(
                jobid,
                response['StatusMessage']
            ))
            return None
        if response['JobStatus'] == 'SUCCEEDED':
            combined_response = response
            next_token = response.get('NextToken',None)
            break

    while next_token is not None:
        response = rekognition.get_person_tracking(
            JobId=jobid,
            NextToken=next_token)

        combined_response['Persons'].extend(response['Persons'])
        next_token = response.get('NextToken',None)

    return combined_response

    
get_tracking_results('117a91707136f9ab9a7212b008f6626adaa4900208bae4adac72c057d9158bc5')

{'JobStatus': 'SUCCEEDED',
 'VideoMetadata': {'Codec': 'h264',
  'DurationMillis': 19315,
  'Format': 'QuickTime / MOV',
  'FrameRate': 29.976701736450195,
  'FrameHeight': 568,
  'FrameWidth': 320,
  'ColorRange': 'LIMITED'},
 'Persons': [{'Timestamp': 900,
   'Person': {'Index': 0,
    'BoundingBox': {'Width': 0.09375,
     'Height': 0.21830986440181732,
     'Left': 0.90625,
     'Top': 0.4102112650871277}}},
  {'Timestamp': 967,
   'Person': {'Index': 0,
    'BoundingBox': {'Width': 0.10625000298023224,
     'Height': 0.2042253464460373,
     'Left': 0.890625,
     'Top': 0.4154929518699646}}},
  {'Timestamp': 1034,
   'Person': {'Index': 0,
    'BoundingBox': {'Width': 0.125,
     'Height': 0.20246478915214539,
     'Left': 0.871874988079071,
     'Top': 0.4225352108478546}}},
  {'Timestamp': 1100,
   'Person': {'Index': 0,
    'BoundingBox': {'Width': 0.13124999403953552,
     'Height': 0.18485915660858154,
     'Left': 0.8531249761581421,
     'Top': 0.43485915660858154}}},
  {'

In [7]:
tracking_results = get_tracking_results('117a91707136f9ab9a7212b008f6626adaa4900208bae4adac72c057d9158bc5')
#tracking_results['Persons'][0]

Let's map the `tracking_results` back to the originating video. Use the OpenCV `VideoCapture` class to open **MOV** and related formatted files. 

In [1]:
from cv2 import VideoCapture
capture = VideoCapture('tracking/path_tracking_backyard.mov')

You can select a specific frame from the video by setting the `CAP_PROP_POS_MSEC` property. This example seeks to the 900ms and gets the `result` and `frame`. If the operation is successful `result` will be **True** and `frame` is not **None**.

In [2]:
from cv2 import CAP_PROP_POS_MSEC
capture.set(CAP_PROP_POS_MSEC, 900)
result, frame = capture.read()

You can use get person detection offsets from the `tracking_results` response.

In [21]:
from cv2 import VideoCapture
from PIL import Image
capture = VideoCapture('tracking/path_tracking_backyard.mov')

first_detection = None
for detection in tracking_results['Persons']:
    if 'Face' in detection['Person']:
        first_detection = detection
        break

capture.set(CAP_PROP_POS_MSEC, first_detection['Timestamp'])
res, frame = capture.read()

image = Image.fromarray(frame)
image.show()


The `PersonDetection` object specifies a `BoundingBox` around the person. Let's draw a red rectangle around the finding.

In [22]:
def denormalize_bounding_box(bounding_box, image_size):
    width = int(bounding_box['Width'] * image_size[0])
    left = int(bounding_box['Left'] * image_size[0])

    height = int(bounding_box['Height'] * image_size[1])
    top = int(bounding_box['Top'] * image_size[1])
    return (top,left,height,width)

from PIL import Image, ImageDraw
image = Image.fromarray(frame)

(top,left,height,width) = denormalize_bounding_box(
    first_detection['Person']['BoundingBox'],
    image.size)

drawing = ImageDraw.Draw(image)
drawing.rectangle(
    xy=(left,top,left+width, left+height),
    outline='red',
    width=5)

image.save('tracking/ViewFrame.jpeg',format='JPEG')
image.show()

When you're finished processing the video, use the `release` method to free the underlying resources.

In [28]:
capture.release()
print('The capture variable is %s' % capture.isOpened())

The capture variable is False
