# Development Notebook for "Race Track" Inference Demo

This demo makes use of jupyter widgets in order to display the simulated race between devices running inference on a dataset. You will need to use a jupyter notebook (or jupyter lab with extra configuration) in order for the jupyter widgets to display in your environment.

This code/demo is still under development by Andres Meza (anmeza@ucsd.edu).  

In [1]:
!pip3 install nest-asyncio==1.5.5
import nest_asyncio
nest_asyncio.apply()



In [8]:
import ipywidgets as widgets
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from matplotlib import pyplot as plt

from PIL import Image
from PIL import ImageFont
from PIL import ImageDraw

import logging
import threading
import time
import datetime

import plotting 
import numpy as np
from axi_stream_driver import NeuralNetworkOverlay

import asyncio
import nest_asyncio
nest_asyncio.apply()
import matplotlib.pyplot as plt
%matplotlib inline

In [34]:
#I: Utility Functions
def get_html_heading(text, text_align="center"):
    html_style=f'\"font-family:monospace,monospace;color:black;font-size:32px;text-align:{text_align};font-weight: bold\"'
    return f"<h1 style={html_style}>{text}</h1>"

def get_html_label(text, text_align="center", font_size="30px"):
    html_style=f'\"font-family:monospace,monospace;color:black;font-size:{font_size};text-align:{text_align};font-weight: bold\"'
    return f"<h5 style={html_style}>{text}</h5>"

def generate_test_race_track():
    img_paths = list()
    for i in range(200):
        img = Image.new('RGB', (500,500), (250,250,250))
        draw = ImageDraw.Draw(img)
        #font = ImageFont.truetype("ARIALN.TTF", 400)
        font = ImageFont.load_default()
        if i>=10:
            draw.text((80, 0),str(i),(0,0,0), font=font)
        else:
            draw.text((150, 0),str(i),(0,0,0), font=font)
        img.save('digit_number_img_'+str(i)+'.jpg')
        img_paths.append('digit_number_img_'+str(i)+'.jpg')
    return img_paths

def generate_cifar10_race_track(length=num_samples):
    img_paths = list()
    for i in range(length):
        fname = 'cifar10/cifar_'+str(i)+'.png' 
        #Matplotlib interprets the image correctly, where PIL .fromArray doesn't... probably a format thing? Just using mpl since it works
        plt.imsave(fname, x_test[i])
        img_paths.append(fname)
    return img_paths

#I: Racer class
class Racer:
    
    def __init__(self, name, inf_time, default_img_path):
        self.name        = name
        self.inf_time    = inf_time
        self.default_img = open(default_img_path, "rb").read()
        self.build_ui()
        self.loop = asyncio.get_event_loop()

        
    def build_ui(self):
        #S: Build the racer's name label
        self.uie_name = widgets.HTML(value = get_html_heading(self.name, text_align="left"),
                                     layout=widgets.Layout(width='100%',
                                                           padding="0px 0px 0px 0px", 
                                                           margin="0px auto 0px auto"))
        
        #S: Build the racer's standing 🏆
        self.uie_standing = widgets.HTML(value = get_html_label("--/-- 🏆", text_align="right", font_size="35px"),
                                   layout=widgets.Layout(width='100%',
                                                         padding="10px 0px 0px 0px", 
                                                         margin="0px auto 0px auto"))
        
        
        #S: Build horizontal container for the racer's name and standing labels  
        self.uie_racer_info = widgets.HBox([self.uie_name, self.uie_standing],
                                          layout=widgets.Layout(width='100%', 
                                                                height='auto',
                                                                padding="0px 0px 0px 0px",
                                                                margin="0px 0px 0px 0px"))
        
        #S: Build the racer's image element and
        #S# set it to the provided default image 
        self.uie_img = widgets.Image(value=self.default_img,
                                     format='jpg',
                                     width=300,
                                     height=400,
                                     layout=widgets.Layout(padding="0px 0px 0px 0px", 
                                                           margin="50px auto 0px auto"))
        
        
        #S: Build the racer's progress label 
        self.uie_pbar_lbl_prcnt = widgets.HTML(value = get_html_label("0%", text_align="right"),
                                   layout=widgets.Layout(width='100%',
                                                         padding="0px 0px 0px 0px", 
                                                         margin="0px auto 0px auto"))
        
        
        #S: Build the racer's state label
        self.uie_pbar_lbl_state = widgets.HTML(value = get_html_label("🚦 Waiting", text_align="left"),
                                               layout=widgets.Layout(width='100%',
                                                                     padding="0px 0px 0px 0px",
                                                                     margin="0px auto 0px auto"))

        #S: Build horizontal container for the racer's state and progress labels  
        self.uie_pbar_lbls = widgets.HBox([self.uie_pbar_lbl_state, self.uie_pbar_lbl_prcnt],
                                          layout=widgets.Layout(width='100%', 
                                                                height='auto',
                                                                padding="0px 0px 0px 0px",
                                                                margin="0px 0px 0px 0px"))
        
        #S: Build the racer's progress bar
        self.uie_pbar = widgets.FloatProgress(
            value=0,
            min=0,
            max=1.0,
            description='',
            bar_style='info',
            style={'bar_color': '#207097'},
            orientation='horizontal',
            layout=widgets.Layout(width='100%',
                                  padding="0px 0px 0px 0px", 
                                  margin="50px auto 0px auto",
                                  valign="bottom")
        )
        
        #S: Build the top level container for the racer's ui elements
        self.uie = widgets.VBox([self.uie_racer_info,
                                 self.uie_img, 
                                 self.uie_pbar, 
                                 self.uie_pbar_lbls
                                ],
                                layout=widgets.Layout(width='100%', 
                                                      padding="0px 10px 0px 10px", 
                                                      margin="0px 10px 0px 0px"),
                                box_style="info")
        
    def get_ui(self):
        return self.uie
    
    def update_uie_standing(self, standing, total_racers, inf_time):
        if standing <= total_racers:
            #S: Generate well-formatted standing string following 
            #S# this template "<racer's place>/<# of racers> 🏆"
            time_ms = inf_time*1000 # Time in ms?
            stnd_str = f"{time_ms:.2f}ms per inf - {standing}".rjust(len(str(total_racers)), '0') + f"/{total_racers} 🏆"
            
            #S: Update the ui element displaying the racer's standing
            self.uie_standing.value = get_html_label(stnd_str, text_align="right", font_size="35px")
            
    def reset_uie_standing(self):
        stnd_str = "--/-- 🏆"
        self.uie_standing.value = get_html_label(stnd_str, text_align="right", font_size="35px")
        
    def race_update(self, racer_info, track_info):
        #S: Unpack race information
        r_state, r_standing, total_racers = racer_info
        track_position, track_length, img_path = track_info
        
        #S: Display current image being evaluated
        self.uie_img.value = open(img_path, "rb").read()

        #S: Update ui elements displaying racer's progress 
        self.uie_pbar.value = (track_position+1)/track_length
        self.uie_pbar_lbl_prcnt.value = get_html_label(f"{round((track_position+1)/track_length*100, 1)}%", text_align="right")
        
        #S: Update the racer's standing
        self.update_uie_standing(r_standing, total_racers)
        
        if  1 > self.uie_pbar.value > 0:
            #S: Update the ui element displaying the racer's state
            #S# to running
            self.uie_pbar_lbl_state.value = get_html_label("🏎️ Running", text_align="left")
        elif self.uie_pbar.value == 1:
            #S: Update the ui element displaying the racer's state
            #S# to finished
            self.uie_pbar_lbl_state.value = get_html_label("🏁 Finished", text_align="left")             
        else:
            #S: Update the ui element displaying the racer's state
            #S# to unknown
            self.uie_pbar_lbl_state.value = get_html_label("⚠️ Unknown", text_align="left") 
        
    def race(self, race_track, log=False):
        #S: Configure logging object and log thread start 
        if log:    
            format = "%(asctime)s: %(message)s"
            logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
            logging.info("Racer \'%s\': starting", self.name)
        
        #S: Update the ui element displaying the racer's state
        #S# to running
        self.uie_pbar_lbl_state.value = get_html_label("🏎️ Running", text_align="left") 
        
        #S: Store the "track length" (i.e., the number of test images)
        #S# for use in determining racer's progress
        track_length = len(race_track)
        
        #S: "Run" inference on test images and update
        #S# ui elements accordingly
        self.run_race(race_track, track_length)
            
        
        #S: Update the ui element displaying the racer's state
        #S# to finished
        self.uie_pbar_lbl_state.value = get_html_label("🏁 Finished", text_align="left") 
        
        #S: Log thread finish
        if log:
            logging.info("Racer \'%s\': finishing", self.name)
    def run_race(self, race_track, track_length):
        self.reset_uie_standing()
        return self.loop.run_until_complete(self.__async__run_race(race_track, track_length))
    
    async def __async__run_race(self, race_track, track_length):
        for i, img_path in enumerate(race_track):
            
            #fire both async functions, one to open and update the displayed image, one to run inference
            await asyncio.gather(asyncio.ensure_future(self.inf(img=None,i=i)),asyncio.ensure_future(self.upd_img(img_path)))
            #S: Update ui elements displaying racer's progress 
            
            self.uie_pbar.value = (i+1)/track_length
            self.uie_pbar_lbl_prcnt.value = get_html_label(f"{round((i+1)/track_length*100, 1)}%", text_align="right")
            
            #S: Sleep to simulate inference time on actual device
            #inf() 
            
    async def upd_img(self, img_path=None):
        self.uie_img.value = await self.read_img(img_path)
        
    async def read_img(self, img_path=None):
        return open(img_path, "rb").read()
    
    async def inf(self, img=None, i=0):
        #Default Simulated Inferences, sleeps for an average given inference time
        #...I should check if inf_time is not None here, but its extra latency, and you should know not to do that anyway...
        await asyncio.sleep(self.inf_time)
        return #result
    
    

class pynq_Racer(Racer):
    def __init__(self, name, default_img_path, alt_data_x=None, alt_data_y=None):
        super(pynq_Racer,self).__init__(name, None, default_img_path)
        self.x_test = alt_data_x
        self.y_test = alt_data_y
        self.nn = NeuralNetworkOverlay('design_1.bit', self.x_test[1].shape, self.y_test.shape[1])
        self.t_time = 0
        
    def run_race(self, race_track, track_length):
        self.reset_uie_standing()
        self.inf_time = 0
        self.t_time = 0
        self.loop.run_until_complete(self.__async__run_race(race_track, track_length))
        self.inf_time = self.t_time/track_length
    
    async def __async__run_race(self, race_track, track_length):
        for i, img_path in enumerate(race_track):
            
            #fire both async functions, one to open and update the displayed image, one to run inference
            await asyncio.gather(asyncio.ensure_future(self.inf(img=None,i=i)),asyncio.ensure_future(self.upd_img(img_path)))
            #S: Update ui elements displaying racer's progress 
            
            self.uie_pbar.value = (i+1)/track_length
            self.uie_pbar_lbl_prcnt.value = get_html_label(f"{round((i+1)/track_length*100, 1)}%", text_align="right")
            
            #S: Sleep to simulate inference time on actual device
            #inf() 
            
    async def inf(self, img=None, i=0):
        #run inference on one image, return the result (despite not using it)
        return await self.run_pred(x_test[i])
    async def run_pred(self, x):
        y_hw, latency, throughput = self.nn.predict(x, profile=True)
        self.t_time += latency
    
            
class Race_Event:
    def __init__(self, racers, race_track):
        self.racers     = racers
        self.race_track = race_track
        self.build_ui()
        
    def build_ui(self):
        #S: Build the top level container for the race's ui elements
        self.uie = widgets.HBox([racer_x.get_ui() for racer_x in self.racers],
                                layout=widgets.Layout(width='99%', height='auto',
                                                      padding="0px 0px 0px 0px",
                                                      margin="0px 0px 0px 0px"))
        
        self.uie = widgets.GridBox([racer_x.get_ui() for racer_x in self.racers], 
                                   layout=widgets.Layout(grid_template_columns='48.5% 48.5%',
                                                         grid_template_rows='auto auto',
                                                         grid_gap='15px 1%'),
                                   box_style="")
        
    def get_ui(self):
        return self.uie
    
    def begin_race_v1(self, log=False):
        #S: Configure logging object and log race start 
        if log:
            format = "%(asctime)s: %(message)s"
            logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
            logging.info("Race: starting")
        
        #S: Initialize, store, and start racer threads
        threads = list()
        for racer_x in self.racers:
            x = threading.Thread(target=racer_x.race, args=(self.race_track,log))
            threads.append(x)
            x.start()
        
        #S: Run join on racer threads so this process waits
        #S# for all racers to finish before continuing
        for thread in threads:
            thread.join()
        
        #S: Log race finished
        if log:
            logging.info("Race: finishing")
           
        #S: Determine race standings
        r_inf_times = list()
        for i, r in enumerate(self.racers):
            r_inf_times.append((i, r.inf_time))
        r_inf_times.sort(key = lambda x: x[1], reverse=False)
        
        #S: Update racer's standings on ui
        total_racers = len(self.racers)
        for i, rit in enumerate(r_inf_times):
            #print(i, rit)
            #print(rit)
            r_index, _ = rit
            self.racers[r_index].update_uie_standing(i+1, total_racers, rit[1])
            
    #TODO: Add in begin_race_v2(...) into this notebook

In [40]:
#S: Create a test "race track" (i.e. list of image file paths)
race_track = generate_cifar10_race_track()
ms = 0.001 
#S: Create Racers
racer1 = pynq_Racer("hls4ml - </br> Pynq-Z2 XC7Z020 SoC </br>", "logos/hls4ml_logo.png", x_test, y_test)
#racer1 = Racer("hls4ml - </br> XC7Z020 SoC </br> 44,330uJ per Inference", 27.3*ms, "logos/hls4ml_logo.png")
racer2 = Racer("STMicroelectronics -</br> Nucleo-H7A3ZI-Q </br>", 70.36*ms, "logos/STMicro_logo.png")
racer3 = Racer("Andes - </br> AndesCore™ D45 </br>", 227.69*ms, "logos/Andes_logo.png")
racer4 = Racer("Renesas - </br> RX65N-Cloud-Kit </br> ", 289.35*ms, "logos/Renesas_logo.png")

#S: Create Race
race_event = Race_Event([racer1, racer2, racer3, racer4], race_track)

#S: Get race ui and display it (jupyter does this automatically)
race_event.get_ui()

GridBox(children=(VBox(box_style='info', children=(HBox(children=(HTML(value='<h1 style="font-family:monospace…

In [41]:
#S: Run race with logging
race_event.begin_race_v1(log=False)

In [42]:
#S: Run race with logging
race_event.begin_race_v1(log=True)

21:11:11: Race: starting
21:11:11: Racer 'hls4ml - </br> Pynq-Z2 XC7Z020 SoC </br>': starting
21:11:11: Racer 'STMicroelectronics -</br> Nucleo-H7A3ZI-Q </br>': starting
21:11:11: Racer 'Andes - </br> AndesCore™ D45 </br>': starting
21:11:11: Racer 'Renesas - </br> RX65N-Cloud-Kit </br> ': starting
21:11:17: Racer 'hls4ml - </br> Pynq-Z2 XC7Z020 SoC </br>': finishing
21:11:21: Racer 'STMicroelectronics -</br> Nucleo-H7A3ZI-Q </br>': finishing
21:11:36: Racer 'Andes - </br> AndesCore™ D45 </br>': finishing
21:11:42: Racer 'Renesas - </br> RX65N-Cloud-Kit </br> ': finishing
21:11:42: Race: finishing
