In [15]:
import boto3
import cv2

from botocore.config import Config
STREAM_NAME = "demo-rtsp"

my_config = Config(
    region_name='us-east-1',
)

kvs = boto3.client("kinesisvideo", config=my_config)
# kvs.list_streams()
endpoint = kvs.get_data_endpoint(
    APIName="GET_HLS_STREAMING_SESSION_URL",
    StreamARN="arn:aws:kinesisvideo:us-east-1:089757678707:stream/demo-rtsp/1709836324700"

    )['DataEndpoint']
kvam = boto3.client("kinesis-video-archived-media", endpoint_url=endpoint, config=my_config)
url = kvam.get_hls_streaming_session_url(
    StreamName=STREAM_NAME,
    #PlaybackMode="ON_DEMAND",
    PlaybackMode="LIVE"
    )['HLSStreamingSessionURL']
vcap = cv2.VideoCapture(url)

In [43]:
import cv2
import requests
import datetime
import hashlib
import hmac
from botocore.credentials import create_credential_resolver
from botocore.session import get_session
import numpy as np
import json

def sign(key, msg):
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def get_signature_key(key, date_stamp, region_name, service_name):
    k_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
    k_region = sign(k_date, region_name)
    k_service = sign(k_region, service_name)
    k_signing = sign(k_service, 'aws4_request')
    return k_signing

def send_frame_with_signed_request(frame, service, region, host, endpoint):
    # Encode frame to JPEG
    ret, buffer = cv2.imencode('.jpg', frame)
    if not ret:
        print("Failed to encode frame")
        return
    byte_frame = buffer.tobytes()

    # AWS Signature Version 4 signing process
    session = get_session()
    credentials = create_credential_resolver(session).load_credentials()
    access_key = credentials.access_key
    secret_key = credentials.secret_key

    if access_key is None or secret_key is None:
        raise Exception('No access key or secret key available')

    t = datetime.datetime.utcnow()
    amz_date = t.strftime('%Y%m%dT%H%M%SZ')
    date_stamp = t.strftime('%Y%m%d')

    canonical_uri = endpoint
    canonical_querystring = ''
    canonical_headers = 'host:' + host + '\n' + 'x-amz-date:' + amz_date + '\n'
    signed_headers = 'host;x-amz-date'
    payload_hash = hashlib.sha256(byte_frame).hexdigest()
    canonical_request = f"POST\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"

    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
    string_to_sign = f"{algorithm}\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"

    signing_key = get_signature_key(secret_key, date_stamp, region, service)
    signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()

    authorization_header = f"{algorithm} Credential={access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}"
    headers = {'x-amz-date': amz_date, 'Authorization': authorization_header, 'Content-Type': 'application/octet-stream'}

    # Send the POST request
    response = requests.post('https://' + host + canonical_uri, data=byte_frame, headers=headers)
    try:
        response_data = response.text  # Or parse as JSON with response.json() if expected response is JSON
        return response_data
    except Exception as e:
        print(f"Failed to get valid response: {e}")
        return None
    

In [44]:
from botocore.exceptions import NoCredentialsError

def write_to_kinesis(stream_name, data, partition_key):
    kinesis_client = boto3.client('kinesis', region_name='us-east-1')  # Adjust region if necessary
    try:
        response = kinesis_client.put_record(
            StreamName=stream_name,
            Data=data,
            PartitionKey=partition_key
        )
        print(f"Record written to Kinesis: {response}")
    except NoCredentialsError:
        print("Credentials not available for AWS Kinesis.")
    except Exception as e:
        print(f"Error writing to Kinesis: {e}")

In [52]:
STREAM_NAME = "demo-rtsp"

my_config = Config(
    region_name='us-east-1',
)

kvs = boto3.client("kinesisvideo", config=my_config)
# kvs.list_streams()
endpoint = kvs.get_data_endpoint(
    APIName="GET_HLS_STREAMING_SESSION_URL",
    StreamARN="arn:aws:kinesisvideo:us-east-1:089757678707:stream/demo-rtsp/1709836324700"

    )['DataEndpoint']
kvam = boto3.client("kinesis-video-archived-media", endpoint_url=endpoint, config=my_config)
url = kvam.get_hls_streaming_session_url(
    StreamName=STREAM_NAME,
    #PlaybackMode="ON_DEMAND",
    PlaybackMode="LIVE"
    )['HLSStreamingSessionURL']
vcap = cv2.VideoCapture(url)
while True:
    ret, frame = vcap.read()
    
    if ret:
        service = 'sagemaker'  # Update your AWS service
        region = 'us-east-1'  # Update your AWS region
        host = 'runtime.sagemaker.us-east-1.amazonaws.com'  # Update your API host
        endpoint = '/endpoints/weapon-detection-yolor-endpoint2024-03-12-16-39-14/invocations/'  # Update your endpoint path
        response_data = send_frame_with_signed_request(frame, service, region, host, endpoint)
        if response_data:
            # Define your Kinesis Data Stream name and partition key
            kinesis_stream_name = 'your_kinesis_stream_name'
            partition_key = 'example_partition_key'  # This should be something that makes sense for your data distribution
            
            # Convert your response data to a string or bytes, if it's not already
            # For example, if response_data is a string, you can encode it to bytes
            data_to_write = response_data.encode('utf-8')
            kinesis_stream_name = 'weapon-detection-cloudformation-demo-Kds-4Wah1uIxMffE'
            partition_key = 'cam-1'
            if len(json.loads(response_data)['output']['boxes']) > 1 :
                write_to_kinesis(kinesis_stream_name, data_to_write, partition_key)

vcap.release()



Record written to Kinesis: {'ShardId': 'shardId-000000000000', 'SequenceNumber': '49650110392451551685483161928792874994325185248238764034', 'ResponseMetadata': {'RequestId': 'c735a682-f097-4283-995e-6b8ce6131484', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c735a682-f097-4283-995e-6b8ce6131484', 'x-amz-id-2': 'xTP0q0GembI1PftjvHDv9ukhKE9jvM3g4frwI43VlQsoehMlHoNTKnXM1FrsmlOqGGSNSTZMpo+SSNvtOyC1MgPaCA/U+kQO', 'date': 'Wed, 13 Mar 2024 15:48:30 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
Record written to Kinesis: {'ShardId': 'shardId-000000000000', 'SequenceNumber': '49650110392451551685483161983377084675745309658017955842', 'ResponseMetadata': {'RequestId': 'e8491a4a-719d-88f0-b622-d779399ce18c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e8491a4a-719d-88f0-b622-d779399ce18c', 'x-amz-id-2': 'kR6JR5wK6ZRFbCeyVd1bi3NAYrLI7yZLqYQS3nFcnT9hptIaMhpnj/kYAIY+jKPxzRzBCi3LLqF6r743n