In [2]:
import os
import sys
import PySpin
import numpy as np
import ipywidgets as widgets
from enum import Enum

from tkinter import filedialog as fd
from tkinter import *

# Required for advanced widgets
from traitlets import traitlets

# Required for streaming
import cv2
import time
from bokeh.plotting import figure, output_file, show
from bokeh.io import output_notebook, show, push_notebook

# For async operation
import threading
import asyncio

ModuleNotFoundError: No module named '_tkinter'

## WIDGETS

In [None]:
""" WIDGETS """


class LoadedButton(widgets.Button):
    """A button that can hold a value as an attribute"""
    def __init__(self, value=None, *args, **kwargs):
        super(LoadedButton, self).__init__(*args, **kwargs)
        self.add_traits(value=traitlets.Any(value))


streaming_toggle = widgets.ToggleButton(
                        value=False,
                        description='Start Streaming',
                        disabled=False,
                        button_style='info', # 'success', 'info', 'warning', 'danger' or ''
                        tooltip='Streaming',
                        icon='play'
                    )

inference_mode = widgets.ToggleButtons(
                        options=['DETECTION', 'CLASSIFICATION'],
                        description='Mode:',
                        disabled=False,
                        button_style='', # 'success', 'info', 'warning', 'danger' or ''
                        tooltips=['This network determines the  most likely class given a set of\
                                  predetermined, trained options. Object detection can also provide a\
                                  location within the image (in the form of a "bounding box" surrounding the\
                                  class), and can detect multiple objects',
                                  'This network determines the best option from a list of predetermined options\
                                  ; the camera gives a percentage that determines the likelihood of the currently\
                                  perceived image being one of the classes it has been trained to recognize.'],
                      )

chunkdata_checkbox = widgets.Checkbox(
                    value=True,
                    description='Chunk Data',
                    disabled=False
                )

trigger_checkbox = widgets.Checkbox(
                    value=True,
                    description='Trigger Mode',
                    disabled=False
                )

inference_checkbox = widgets.Checkbox(
                    value=True,
                    description='Inference',
                    disabled=False
                )

boundingbox_checkbox = widgets.Checkbox(
                    value=True,
                    description='Bounding Box',
                    disabled=False
                )

inferencelabel_checkbox = widgets.Checkbox(
                    value=True,
                    description='Inference Label',
                    disabled=False
                )

browse_file = LoadedButton(
                description='Browse Inference Label',
                disabled=False,
                button_style='',
                tooltip='Browsw File',
                icon='file',
                value=''
            )

class InferecneNetworkType(Enum):
    # This network determines the  most likely class given a set of predetermined,
    # trained options. Object detection can also provide a location within the
    # image (in the form of a "bounding box" surrounding the class), and can
    # detect multiple objects.
    DETECTION = 1
    # This network determines the best option from a list of predetermined options;
    # the camera gives a percentage that determines the likelihood of the currently
    # perceived image being one of the classes it has been trained to recognize.
    CLASSIFICATION = 2

    
CHOSEN_INFERENCE_NETWORK_TYPE = InferecneNetworkType.DETECTION if (inference_mode.value == "DETECTION") else InferecneNetworkType.CLASSIFICATION

LABEL = []

## INFERENCE HELPER

In [None]:
""" INFERENCE HELPER """

# This function enables or disables the given chunk data type based on
# the specified entry name.
def set_chunk_enable(nodemap, entry_name, enable):
    result = True
    ptr_chunk_selector = PySpin.CEnumerationPtr(nodemap.GetNode("ChunkSelector"))
    
    ptr_entry = PySpin.CEnumEntryPtr(ptr_chunk_selector.GetEntryByName(entry_name))
    if not PySpin.IsReadable(ptr_entry):
        print('Unable to find ' + entry_name + ' in ChunkSelector...')
        return False

    ptr_chunk_selector.SetIntValue(ptr_entry.GetValue())

    # Enable the boolean, thus enabling the corresponding chunk data
    print('Enabling ' + entry_name + '...')
    ptr_chunk_enable = PySpin.CBooleanPtr(nodemap.GetNode("ChunkEnable"))
    if not PySpin.IsAvailable(ptr_chunk_enable):
        print('not available')
        return False
    
    if enable:
        if ptr_chunk_enable.GetValue():
            print('enabled')
        elif PySpin.IsWritable(ptr_chunk_enable):
            ptr_chunk_enable.SetValue(True)
            print('enabled')
        else:
            print('not writable')
            result = False
    else:
        if not ptr_chunk_enable.GetValue():
            print('disabled')
        elif PySpin.IsWritable(ptr_chunk_enable):
            ptr_chunk_enable.SetValue(False)
            print('disabled')
        else:
            print('not writable')
            result = False

    return result


# This function configures the camera to add inference chunk data to each image.
# When chunk data is turned on, the data is made available in both the nodemap
# and each image.
def configure_chunk_data(nodemap):
    result = True
    print('\n*** CONFIGURING CHUNK DATA ***')

    try:
        # Activate chunk mode
        #
        # *** NOTES ***
        # Once enabled, chunk data will be available at the end of the payload
        # of every image captured until it is disabled. Chunk data can also be
        # retrieved from the nodemap.
        
        ptr_chunk_mode_active = PySpin.CBooleanPtr(nodemap.GetNode("ChunkModeActive"))
        if not PySpin.IsWritable(ptr_chunk_mode_active):
            print('Unable to active chunk mode. Aborting...')
            return False
        
        ptr_chunk_mode_active.SetValue(True)
        print('Chunk mode activated...')

        # Enable inference related chunks in chunk data

        # Retrieve the chunk data selector node
        ptr_chunk_selector = PySpin.CEnumerationPtr(nodemap.GetNode("ChunkSelector"))
        if not PySpin.IsReadable(ptr_chunk_selector):
            print('Unable to retrieve chunk selector (enum retrieval). Aborting...')
            return False
        
        # Enable chunk data inference Frame Id
        result = set_chunk_enable(nodemap, "InferenceFrameId", True)
        if result == False:
            print("Unable to enable Inference Frame Id chunk data. Aborting...")
            return result
        
        if CHOSEN_INFERENCE_NETWORK_TYPE == InferecneNetworkType.DETECTION:
            # Detection network type

            # Enable chunk data inference bounding box
            result = set_chunk_enable(nodemap, "InferenceBoundingBoxResult", True)
            if result == False:
                print("Unable to enable Inference Bounding Box chunk data. Aborting...")
                return result
        else:
            # Enable chunk data inference result
            result = set_chunk_enable(nodemap, "InferenceResult", True)
            if result == False:
                print("Unable to enable Inference Result chunk data. Aborting...")
                return result

            # Enable chunk data inference confidence
            result = set_chunk_enable(nodemap, "InferenceConfidence", True)
            if result == False:
                print("Unable to enable Inference Confidence chunk data. Aborting...")
                return result
            
    except PySpin.SpinnakerException as ex:
        print('Unexpected exception: %s' % ex)
        return False

    return result

# This function disables each type of chunk data before disabling chunk data mode.
def disable_chunk_data(nodemap):
    print('\n*** DISABLING CHUNK DATA ***')

    result = True
    try:
        ptr_chunk_selector = PySpin.CEnumerationPtr(nodemap.GetNode("ChunkSelector"))

        if not PySpin.IsReadable(ptr_chunk_selector):
            print('Unable to retrieve chunk selector. Aborting...')
            return False

        result = set_chunk_enable(nodemap, "InferenceFrameId", False)
        if result == False:
            print('Unable to disable Inference Frame Id chunk data. Aborting...')
            return result

        if CHOSEN_INFERENCE_NETWORK_TYPE == InferecneNetworkType.DETECTION:
            # Detection network type 

            # Disable chunk data inference bounding box
            result = set_chunk_enable(nodemap, "InferenceBoundingBoxResult", False)
            if result == False:
                print('Unable to disable Inference Bounding Box chunk data. Aborting...')
                return result
        else:
            # Classification network type

            # Disable chunk data inference result
            result = set_chunk_enable(nodemap, "InferenceResult", False)
            if result == False:
                print('Unable to disable Inference Result chunk data. Aborting...')
                return result
            
            # Disable chunk data inference confidence
            result = set_chunk_enable(nodemap, "InferenceConfidence", False)
            if result == False:
                print('Unable to disable Inference Confidence chunk data. Aborting...')
                return result
        
        # Deactivate ChunkMode
        ptr_chunk_mode_active = PySpin.CBooleanPtr(nodemap.GetNode("ChunkModeActive"))
        if not PySpin.IsWritable(ptr_chunk_mode_active):
            print('Unable to deactivate chunk mode. Aborting...')
            return False

        ptr_chunk_mode_active.SetValue(False)
        print('Chunk mode deactivated...')

        # Disable Inference
        ptr_inference_enable = PySpin.CBooleanPtr(nodemap.GetNode("InferenceEnable"))
        if not PySpin.IsWritable(ptr_inference_enable):
            print('Unable to disable inference. Aborting...')
            return False

        ptr_inference_enable.SetValue(False)
        print('Inference disabled...')
    
    except PySpin.SpinnakerException as ex:
        print('Unexpected exception: %s' % ex)
        return False

    return result 

# This function enables/disables inference on the camera and configures the inference network type
def configure_inference(nodemap, is_enabled):
    if is_enabled:
        print('\n*** CONFIGURING INFERENCE (' + ("DETECTION" if ((CHOSEN_INFERENCE_NETWORK_TYPE) \
                                            == InferecneNetworkType.DETECTION) \
                                            else 'CLASSIFICATION') + ') ***')
    else:
        print('\n*** DISABLING INFERENCE ***')

    try:
        if is_enabled:
            ptr_inference_network_type_selector = PySpin.CEnumerationPtr(nodemap.GetNode("InferenceNetworkTypeSelector"))
            if not PySpin.IsWritable(ptr_inference_network_type_selector):
                print('Unable to query InferenceNetworkTypeSelector. Aborting...')
                return False
            
            network_type_string = ("Detection" if CHOSEN_INFERENCE_NETWORK_TYPE == InferecneNetworkType.DETECTION 
                                    else "Classification")

            # Retrieve entry node from enumeration node
            ptr_inference_network_type = PySpin.CEnumEntryPtr(ptr_inference_network_type_selector.GetEntryByName(network_type_string))
            if not PySpin.IsReadable(ptr_inference_network_type):
                print('Unable to set inference network type to %s' %network_type_string + ' (entry retrieval). Aborting...')
                return False
            
            inference_network_value = ptr_inference_network_type.GetNumericValue()
            ptr_inference_network_type_selector.SetIntValue(int(inference_network_value))

            print('Inference network type set to' + network_type_string + '...')

        print(('Enabling' if is_enabled else 'Disabling') + ' inference...')
        ptr_inference_enable = PySpin.CBooleanPtr(nodemap.GetNode("InferenceEnable"))
        if not PySpin.IsWritable(ptr_inference_enable):
            print('Unable to enable inference. Aborting...')
            return False
        
        ptr_inference_enable.SetValue(is_enabled)
        print('Inference '+'enabled...' if is_enabled else 'disabled...')
    
    except PySpin.SpinnakerException as ex:
        print('Unexpected exception: %s' % ex)
        return False

    return True

# This function disables trigger mode on the camera.
def disable_trigger(nodemap):
    print('\n*** IMAGE ACQUISITION ***')

    try:
        ptr_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerMode"))
        if not PySpin.IsWritable(ptr_trigger_mode):
            print('Unable to configure TriggerMode. Aborting...')
            return False
        
        ptr_trigger_off = PySpin.CEnumEntryPtr(ptr_trigger_mode.GetEntryByName("Off"))
        if not PySpin.IsReadable(ptr_trigger_off):
            print('Unable to query TriggerMode Off. Aborting...')
            return False
        
        print('Configure TriggerMode to ' + ptr_trigger_off.GetSymbolic())
        ptr_trigger_mode.SetIntValue(int(ptr_trigger_off.GetNumericValue()))
    
    except PySpin.SpinnakerException as ex:
        print('Unexpected exception: %s' % ex)
        return False

    return True

# This function configures camera to run in "inference sync" trigger mode.
def configure_trigger(nodemap):
    print('\n*** CONFIGURING TRIGGER ***')

    try:
        # Configure TriggerSelector 
        ptr_trigger_selector = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerSelector"))
        if not PySpin.IsWritable(ptr_trigger_selector):
            print('Unable to configure TriggerSelector. Aborting...')
            return False
        
        ptr_frame_start = PySpin.CEnumEntryPtr(ptr_trigger_selector.GetEntryByName("FrameStart"))
        if not PySpin.IsReadable(ptr_frame_start):
            print('Unable to query TriggerSelector FrameStart. Aborting...')
            return False
        
        print('Configure TriggerSelector to ' + ptr_frame_start.GetSymbolic())
        ptr_trigger_selector.SetIntValue(int(ptr_frame_start.GetNumericValue()))

        # Configure TriggerSource 
        ptr_trigger_source = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerSource"))
        if not PySpin.IsWritable(ptr_trigger_source):
            print('Unable to configure TriggerSource. Aborting...')
            return False

        ptr_inference_ready = PySpin.CEnumEntryPtr(ptr_trigger_source.GetEntryByName("InferenceReady"))
        if not PySpin.IsReadable(ptr_inference_ready):
            print('Unable to query TriggerSource InferenceReady. Aborting...')
            return False

        print('Configure TriggerSource to ' + ptr_inference_ready.GetSymbolic())
        ptr_trigger_source.SetIntValue(int(ptr_inference_ready.GetNumericValue()))

        # Configure TriggerMode 
        ptr_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerMode"))
        if not PySpin.IsWritable(ptr_trigger_mode):
            print('Unable to configure TriggerMode. Aborting...')
            return False
        
        ptr_trigger_on = PySpin.CEnumEntryPtr(ptr_trigger_mode.GetEntryByName("On"))
        if not PySpin.IsReadable(ptr_trigger_on):
            print('Unable to query TriggerMode On. Aborting...')
            return False

        print('Configure TriggerMode to ' + ptr_trigger_on.GetSymbolic())
        ptr_trigger_mode.SetIntValue(int(ptr_trigger_on.GetNumericValue()))

    except PySpin.SpinnakerException as ex:
        print('Unexpected exception: %s' % ex)
        return False

    return True


## RENDERING HELPER

In [None]:
""" REDERING HELPER """
def show_init_screen():
    image_data = cv2.imread("visual_identity.png")
    image_data = cv2.cvtColor(image_data, cv2.COLOR_BGR2RGBA)
    image_data = cv2.flip(image_data, 0)
    myImage.data_source.data['image']=[image_data]
    push_notebook()
    
def get_box_count(image):
    chunk_data = image.GetChunkData()
    box_result = chunk_data.GetInferenceBoundingBoxResult()
    box_count = box_result.GetBoxCount()
    return box_count

def get_box_result(image):
    chunk_data = image.GetChunkData()
    box_result = chunk_data.GetInferenceBoundingBoxResult()
    return box_result

# LABEL_DETECTION = ["background", "aeroplane", "bicycle",     "bird",  "boat",        "bottle", "bus", \
#                     "car",        "cat",       "chair",       "cow",   "diningtable", "dog",    "horse", \
#                     "motorbike",  "person",    "pottedplant", "sheep", "sofa",        "train",  "monitor"]



def render_bounding_box(frame, box_count, box_result):
#     sys.stdout.write('\r')
#     sys.stdout.write(str(box_count))
#     sys.stdout.flush()        
    if box_count == 0:
        return frame
    else:
        for i in range(box_count):
            box = box_result.GetBoxAt(i)
            result_frame = cv2.rectangle(frame, (box.rect.topLeftXCoord,box.rect.topLeftYCoord)\
                                               ,(box.rect.bottomRightXCoord,box.rect.bottomRightYCoord), (0,255,0), 3)
            if inferencelabel_checkbox.value == True:
                result_frame = cv2.putText(result_frame, str(LABEL[box.classId]) + ": " + str(int(box.confidence * 100)) + "%", (box.rect.topLeftXCoord,box.rect.topLeftYCoord), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 0), 3)
        return result_frame
    
def image_resize(image, width = None, height = None, inter = cv2.INTER_AREA):
    # initialize the dimensions of the image to be resized and
    # grab the image size
    dim = None
    (h, w) = image.shape[:2]

    # if both the width and height are None, then return the
    # original image
    if width is None and height is None:
        return image

    # check to see if the width is None
    if width is None:
        # calculate the ratio of the height and construct the
        # dimensions
        r = height / float(h)
        dim = (int(w * r), height)

    # otherwise, the height is None
    else:
        # calculate the ratio of the width and construct the
        # dimensions
        r = width / float(w)
        dim = (width, int(h * r))

    # resize the image
    resized = cv2.resize(image, dim, interpolation = inter)

    # return the resized image
    return resized

In [None]:
""" STREAMING WINDOW VARIABLES """
image_data = cv2.imread('visual_identity.png')
image_data = cv2.cvtColor(image_data, cv2.COLOR_BGR2RGBA)
image_data = cv2.flip(image_data, 0)
image_data = image_resize(image_data, height = 600)
width = image_data.shape[1]
height = image_data.shape[0]
streaming_window = figure(x_range=(0,width), y_range=(0,height), \
                                       output_backend="webgl", width=width, height=height)
myImage = streaming_window.image_rgba(image=[image_data], x=0, y=0, dw=width, dh=height)

In [None]:
class Streaming(object):    
    def __init__(self, cam):
        self.cam = cam
        self.streaming_window = streaming_window
        self.myImage = myImage
        self.stopped = False
        self.t2 = threading.Thread(target=self.update, args=())
        
    def start_thread(self):
        self.t2.start()
        return self
    
    def update(self):
        while True:
            if self.stopped:
                return
            
            try:
                image_result = self.cam.GetNextImage()
                #  Ensure image completion
                if image_result.IsIncomplete():
                    print('Image incomplete with image status %d ...' % image_result.GetImageStatus())
                else:     
                    t1 = time.time()
                    if chunkdata_checkbox.value == True:
                        chunk_data = image_result.GetChunkData()
                        box_result = chunk_data.GetInferenceBoundingBoxResult()
                        box_count = box_result.GetBoxCount()
                    self.frame = image_result.GetNDArray()
                    self.frame = cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGBA)
                    if boundingbox_checkbox.value == True:
                        self.frame = render_bounding_box(self.frame, box_count, box_result)
                    self.frame = cv2.flip(self.frame, 0)
                    self.frame = image_resize(self.frame, height = 600)
                    self.myImage.data_source.data['image']=[self.frame]
                    t2 = time.time()
                    s = f"""{int(1/(t2-t1))} FPS"""
                    self.streaming_window.title.text = s
                    push_notebook()
                image_result.Release()
            except PySpin.SpinnakerException as ex:
                print('Error: %s' % ex)
                return False
    
    def read(self):
        return self.frame
    
    def stop(self):
        #self.t2.join()
        self.stopped = True
        self.t2.join()

## GUI FILE ACCESS HELPER

In [None]:
""" GUI FILE ACCESS HELPER """ 
def update_inference_label(file_path):
    f = open(file_path,'r')
    for line in f:
        LABEL.append(line.strip())

## BUTTON CLICK HANDLER

In [None]:
""" BUTTON CLICK HANDLER """
def on_start_streaming(cam):
    t = threading.currentThread()
    nodemap_tldevice = cam.GetTLDeviceNodeMap()
    cam.Init()
    nodemap = cam.GetNodeMap()
    
    # Turn on nodes for inference
    if inference_checkbox.value == True:
        result = configure_inference(nodemap, True)
        print("config_chunkdata returns " + str(result))
    
    if trigger_checkbox.value == True:
        result = configure_trigger(nodemap)
        print("configure_trigger returns " + str(result))
    
    
    if chunkdata_checkbox.value == True:
        result = configure_chunk_data(nodemap)
        print("config_chunkdata returns " + str(result))
        
    cam.BeginAcquisition()
    stream = Streaming(cam).start_thread()
    
    while getattr(t, "do_run", True):
        time.sleep(0.5)
    
    stream.stop()
    cam.EndAcquisition()
    
    if chunkdata_checkbox.value == True:
        result = disable_chunk_data(nodemap)
        print("disable_chunkdata returns " + str(result))
        
    if trigger_checkbox.value == True:
        result = disable_trigger(nodemap)
        print("disable_trigger returns " + str(result))
    
    if inference_checkbox.value == True:
        result = configure_inference(nodemap, False)
        print("config_chunkdata returns " + str(result))
    
    cam.DeInit()
    #print("Stopping as you wish.")

def browse_file_handler(click):
    root = Tk()
    file_name = fd.askopenfilename(initialdir = os.getcwd(),title = "Select file")
    print('File ' + os.path.basename(file_name) + ' is selected.')
    root.destroy()
    click.value = file_name
    update_inference_label(file_name)

In [None]:
import asyncio
def wait_for_change(widget, value):
    future = asyncio.Future()
    def getvalue(change):
        # make the new value available
        future.set_result(change.new)
        widget.unobserve(getvalue, value)
    widget.observe(getvalue, value)
    #print(str(future) + "From the function")
    return future

In [None]:
system = PySpin.System.GetInstance()
version = system.GetLibraryVersion()
cam_list = system.GetCameras()
num_cameras = cam_list.GetSize()

# This example only works with 1 camera is connected. 
if num_cameras == 0:
    # Clear camera list before releasing system
    cam_list.Clear()

    # Release system instance
    system.ReleaseInstance()

    print('Not enough cameras!')
    exit()
elif num_cameras > 1:
    # Clear camera list before releasing system
    cam_list.Clear()

    # Release system instance
    system.ReleaseInstance()

    print('This example only works when 1 camera is connected.')
    exit()

async def f():
    t = threading.Thread(name= 't', target=on_start_streaming, args=(cam_list[0],))
    output_notebook()
    show(streaming_window, notebook_handle=True)
    while True:
#         sys.stdout.write('\r')
#         sys.stdout.write("Reading Value ")
#         sys.stdout.flush()            
        x = await wait_for_change(streaming_toggle, 'value')
#         print("Value changed to %s" %x)
        if x == True:
            print("Streaming ON")
            streaming_toggle.icon = 'stop'
            streaming_toggle.description='Stop Streaming'
            t.start() 
        else:
            t.do_run = False
            t.join() 
            show_init_screen()
            print("Streaming OFF")
            streaming_toggle.icon = 'play'
            streaming_toggle.description='Start Streaming'
            t = threading.Thread(name= 't', target=on_start_streaming, args=(cam_list[0],))

asyncio.ensure_future(f())
display(browse_file)
display(inference_checkbox)
# display(trigger_checkbox)
# display(chunkdata_checkbox)
display(boundingbox_checkbox)
display(inferencelabel_checkbox)
display(inference_mode)
display(streaming_toggle)

browse_file.on_click(browse_file_handler)