## Import SAS & Open Source Packages
Postprocessing done in SAS Micro Analytics Services.

In [None]:
# Import Open Source packages
import threading
import time
import websocket
import json
import numpy as np
import base64
import cv2
# Import SAS Packages
import esppy
# Import helper
from helper.helpers import create_scoring_schema

### Connect to SAS Event Stream Processing

In [None]:
esp = esppy.ESP(hostname='http://localhost:9900')                  # Connect to SAS ESP
esp_project = esp.create_project('object_detection', n_threads=10) # Create a SAS ESP project
esp_project.add_continuous_query('contquery')                      # Add a Query to project

### Add a Source Window

In [None]:
# Window: Video Capture
vid_capture = esp.SourceWindow(
    autogen_key  = True,                             # create key automatically
    schema       = ('id*:int64', 'image:blob'),      # window schema (columns)
    index_type   = 'empty',                          # window index type
    insert_only  = True,                             # window accepts inserts only
    pubsub       = True                              # window can be used for publishing data
)

esp_project.windows['w_input_image'] = vid_capture   # add source window to project
esp_project

### Add a Resize Window

In [None]:
# Window: Video Resize
vid_capture_resize = esp.CalculateWindow(
    schema       = ('id*:int64','image:blob','_image_:blob'), # window schema
    algorithm    = 'ImageProcessing',                         # algorithm to be used in calculation window
    function     = 'resize',                                  # function to use
    height       = 416,                                       # resize to height = 416 pixels
    width        = 416,                                       # resize to width = 416 pixels
    input_map    = dict(imageInput='image'),                  # define column of the input image
    output_map   = dict(imageOutput='_image_')                # define column of the resized image
)

esp_project.windows['w_resize_image'] = vid_capture_resize    # add resize window to project

vid_capture.add_target(vid_capture_resize, role='data')       # connect source window to resize window
esp_project

### Add Model Request and Model Reader Windows

In [None]:
# Window: Model Reader
model_reader = esp.ModelReaderWindow()

esp_project.windows['w_read_model'] = model_reader                       # add window to project

# Window: Model Request
model_request = esp.SourceWindow(
    schema      = ('req_id*:int64', 'req_key:string', 'req_val:string'), # window schema
    index_type  = 'empty',                                               # window index type
    insert_only = True                                                   # window accepts inserts only
)

esp_project.windows['w_request_model'] = model_request                   # add window to project
model_request.add_target(model_reader, role='request')                   # connect request window to reader window
esp_project

### Create a Scoring Window

In [None]:
# Window: Model Score
model_score = esp.ScoreWindow(
    schema=create_scoring_schema(number_objects=20),    # window schema created programmatically
)
model_score.add_offline_model(
    model_type='astore'                                 # window receives an offline model (model is not trained during stream)
)
esp_project.windows['w_score_image'] = model_score      # add window to project
model_reader.add_target(model_score, role='model')      # connect model window to score window
vid_capture_resize.add_target(model_score, role='data') # connect resize window to score window
esp_project

### Create an Annotation Window

In [None]:
annotator = esp.ProceduralWindow(
    schema=(create_scoring_schema(number_objects=20)+',image_annotated:blob'),              # window schema created programmatically
    pubsub=True                                                                             # window can be used for subscribing data
)
# Use annotator plugin to visualize bounding boxes
annotator.add_cxx_plugin(source='astore', 
                         name='annotator_plugin', 
                         function='annotateImage')
annotator.set_cxx_plugin_context(cxx_name="annotator_plugin", 
                                 cxx_function="initAnnotator", 
                                 coord_source='astore', 
                                 coord_type='yolo', 
                                 in_image_field='image', 
                                 out_image_field='image_annotated', 
                                 tracker_prefix='Object', 
                                 frame_number_field='id', 
                                 scale_x='1280.0', 
                                 scale_y='720.0', 
                                 offset_x='1.0', 
                                 offset_y='1.0', 
                                 out_format='jpg', 
                                 show_text='true')
annotator.set_finalized_callback(name='annotator_plugin', 
                                 function='finalizeAnnotator')
esp_project.windows['w_annotator'] = annotator
model_score.add_target(annotator, role='data')                                              # connect scoring window with annotator window
esp_project

### Load the project

In [None]:
esp.load_project(esp_project)

### Publish the model into the ESP project

In [None]:
# Defines a simple CSV publisher
pub = model_request.create_publisher(
    blocksize=1, 
    rate=0, 
    pause=0, 
    opcode='insert', 
    format='csv'
)
pub.send('i,n,1,"usegpuesp","1"\n')                                                   # Enable GPU usage
pub.send('i,n,2,"ndevices","1"\n')                                                    # Define number of used GPUs
pub.send('i,n,3,"action","load"\n')                                                   # Call load action
pub.send('i,n,4,"type","astore"\n')                                                   # model type is astore
pub.send('i,n,5,"reference","/data/notebooks/esppy/git_ignore/Tiny-Yolov2.astore"\n') # path to astore file
pub.send('i,n,6,,\n')
pub.close()

## Define Video-Subscriber & -Publisher

In [None]:
def on_message(_, message):
    try:
        data = json.loads(message)
        imageBufferBase64 = data['events'][0]['event']['image_annotated']['image_annotated']
        nparr = np.frombuffer(base64.b64decode(imageBufferBase64), dtype=np.uint8)
        frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        cv2.imshow('frame',frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            exit()
    except Exception as e:
        print(e)
        
annotator_subscriber = annotator.create_subscriber(format='json', mode='streaming', pagesize=1, on_message=on_message)
annotator_subscriber.start()

In [None]:
video_publisher = vid_capture.create_publisher(format='csv', opcode='insert', pause=0, separator=',')
video_file = 'git_ignore/turnaround.mp4'
video_fps = 25
cap = cv2.VideoCapture(video_file)
prev = 0
while True:
    time_elapsed = time.time() - prev
    if time_elapsed > 1./video_fps:
        prev = time.time()
        ret, frame = cap.read()
        frame = cv2.resize(frame, (1280, 720))
        _, buffer = cv2.imencode('.jpg', frame)
        encoded_string = base64.b64encode(buffer)
        strToSend = 'i, n, ' + str(10) + ',' + encoded_string.decode() + ',' + '\n'
        video_publisher.send(strToSend)