In [2]:
from pickletools import uint8
import matplotlib.pyplot as plt
import numpy as np
import sys
import time
import tkinter as tk
from tkinter import ttk
import customtkinter as ctk
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from PIL import Image, ImageTk
from data_organizer_Kfall import DataOrganizer
from sklearn.model_selection import train_test_split
import serial
import time
import struct
import threading
import queue

In [3]:
# load data from ./saved_data
X_test = np.load('./saved_data/X_test.npy')
y_test = np.load('./saved_data/y_test.npy')
test_len = len(X_test)
y_pred = np.random.randint(2, size=test_len)
print(X_test.shape)
print(y_test.shape)


(190, 50, 6)
(190,)


In [10]:
class InferenceGUI:

    def __init__(self, X_test, y_test, y_pred, 
                 port='COM3', baudrate=115200, MCU = 'STM32', confidence = 0.95):
        
        self.MCU = MCU
        self.port = port
        self.baudrate = baudrate
        self.X_test = X_test
        self.y_test = y_test
        #self.y_pred = y_pred
        self.test_data = iter(zip(X_test, y_test))
        self.total = 0
        self.correct = 0
        self.confidence = confidence
        self.pred = 0
        self.queue = queue.Queue()
        
        self.setup_UI()

    def setup_UI(self):

        self.root = tk.Tk()
        self.root.geometry("1024x768")
        self.root.title("Fall Detection")
        self.root.resizable(True, True)
        self.pred_cache = tk.StringVar()
        if self.MCU == 'STM32':

            self.read_serial_data()
            self.root.protocol("WM_DELETE_WINDOW", self.on_close)
            self.root.mainloop()
        else:
            #TODO: MAXIM MCU
            pass

    def on_close(self):
        # Close the serial connection
        if hasattr(self, 'ser') and self.ser.is_open:
            self.ser.close()
        # Destroy the window
        self.root.destroy()

    def read_serial_data(self):

        self.ser = serial.Serial(port=self.port, baudrate=self.baudrate, timeout=3)   
        # flush the serial port
        self.ser.flush()
        self.ser.flushInput()
        self.ser.flushOutput()
        self.welcome_page()

        # Start a new thread to update the page
        threading.Thread(target=self.update_page, daemon=True).start()
    
    def send_host_request(self):
        # Sending the host request
        self.ser.write(b'Connect\r\n')
        #time.sleep(0.1)
    
    def send_data_to_MCU(self, data):
        self.ser.write(data)
        #time.sleep(0.1)
    

    def update_page(self):

        while True:
            if self.ser.in_waiting:
                line = self.ser.readline().decode('utf-8').strip()
                # mode 1: inference on PC
                if line == 'Mode 1 selected.':
                    self.show_inference_from_PC_page()

                # mode 2: inference on MCU
                elif line == 'Mode 2 selected.':
                    self.show_inference_from_MCU_page()
                    # output = self.ser.readline().decode('utf-8').strip().split(',') # z.B.-127,127
                    # output0 = output[0]
                    # output1 = output[1]
                    # if output0 > output1:
                    #     self.pred = 0
                    #     #self.pred_cache.set("You are safe!")
                    # else:
                    #     self.pred = 1
                        #self.pred_cache.set("You are Falling!!! Call 112!!!")

                # mode 0: idle
                elif line == 'Mode 0 selected.':
                    self.show_idle_page()

            self.root.update_idletasks()
            
    def clear_page(self):
        for widget in self.root.winfo_children():
            widget.destroy()
    
    def welcome_page(self):

        self.clear_page()

        self.welcome_label = ttk.Label(self.root, text="Please press the switch button \nto switch mode..", font=("Arial", 40))
        self.welcome_label.pack(expand=True)


    def show_inference_from_PC_page(self):
        # Clear the current content
        self.clear_page()

        # Central frame for mode selection
        self.mode_selection_frame = ttk.Frame(self.root)
        self.mode_selection_frame.pack(expand=True)

        # Label for mode selection
        self.mode_label = ttk.Label(self.mode_selection_frame, text="Select prediction mode:", font=("Arial", 40))
        self.mode_label.pack(pady=(10, 20))  # Adding some vertical padding

        # Frame to hold buttons
        self.button_frame = ttk.Frame(self.mode_selection_frame)
        self.button_frame.pack(pady=10)

        # Create a style
        style_button = ttk.Style()
        # Configure the style
        style_button.configure('TButton', font=('Arial', 25))  # Increase the font size

        # Auto mode button
        self.auto_mode_button = ttk.Button(self.button_frame, text="Auto", command=self.enter_auto_mode, style='TButton')
        self.auto_mode_button.pack(side=tk.LEFT, padx=10)

        # Manual mode button
        self.manual_mode_button = ttk.Button(self.button_frame, text="Manual", command=self.enter_manual_mode, style='TButton')
        self.manual_mode_button.pack(side=tk.RIGHT, padx=10)
    
    
    def enter_auto_mode(self):
        # Clear the current content
        self.clear_page()

        # Create the content for the "Auto" page
        self.auto_page_label = ttk.Label(self.root, text="Auto Page", font=("Arial", 40))
        self.auto_page_label.pack(expand=True)

        # Back button
        self.back_button = ttk.Button(self.root, text="Back", command=self.show_inference_from_PC_page, style='TButton')
        self.back_button.pack(pady=10)

    def enter_manual_mode(self):
        self.clear_page()

        self.top_frame = tk.Frame(self.root)
        self.top_frame.pack(side='top', fill='both', expand=True)
        # Create a frame for the left half of the window
        self.left_frame = tk.Frame(self.top_frame, bd=2, relief='solid', bg='white')
        self.left_frame.pack(side='left', fill='both', expand=True)
        self.left_frame.pack_propagate(False)  # Prevent the frame from changing size

        # Create a frame for the right half of the window
        self.right_frame = tk.Frame(self.top_frame, bd=2, relief='solid', bg='white')
        self.right_frame.pack(side='left', fill='both', expand=True)
        self.right_frame.pack_propagate(False)  # Prevent the frame from changing size

        # Back button
        self.back_button = ttk.Button(self.root, text="Back", command=self.show_inference_from_PC_page, style='TButton')
        self.back_button.pack(side=tk.LEFT, padx=120, pady=10)

        # Next button
        self.next_button = ttk.Button(self.root, text="Next Prediction", command=self.on_button_click, style='TButton')
        self.next_button.pack(side=tk.RIGHT, padx=120, pady=10)

  
        self.label = tk.Label(self.left_frame, text='Ground Truth Label:', font=("Helvetica", 24), bg='white')
        self.label.pack()

        self.label_text = tk.StringVar()
        self.label2 = tk.Label(self.left_frame, textvariable=self.label_text, font=("Helvetica", 50), bg='white')
        self.label2.place(relx=0.5, rely=0.5, anchor='center')

        self.prediction_label = tk.Label(self.right_frame, text='Prediction:', font=("Helvetica", 24), bg='white')
        self.prediction_label.pack()

        # Create a label to display the prediction
        self.prediction_text = tk.StringVar()
        self.prediction_label2 = tk.Label(self.right_frame, textvariable=self.prediction_text, font=("Helvetica", 50), bg='white')
        self.prediction_label2.place(relx=0.5, rely=0.5, anchor='center')

    def on_button_click(self):
        print('on_button_click')
        #y_pred = self.ser.readline().decode('utf-8').strip().split(',')# z.B.-127,127

        # take the highest value
        #y_pred = max(map(int, y_pred))
        x, y = next(self.test_data)
        # shape of x needs to be (1, 50, 6), and type needs to be float32
        x = np.expand_dims(x, axis=0).astype(np.float32)
        #x_bytes = struct.pack(f'{x.size}f', *x.flatten())
        x_bytes = x.tobytes()
        #print(x_bytes.shape) # (1, 50, 6)
        self.send_host_request()
        # write data to MCU
        self.send_data_to_MCU(x_bytes)
        print('sent data to MCU')
        output = self.ser.readline().decode('utf-8').strip() # z.B.-127,127
        if output == 'Data received.':
            print(output)
        #     self.total += 1
        #     if y == y_pred:
        #         self.correct += 1

        #     if y == 0:
        #         self.label_text.set('Not Falling')
        #         self.left_frame.config(bg='green')
        #         self.label.config(background='green')
        #         self.label2.config(background='green')
        #     else:
        #         self.label_text.set('Falling')
        #         self.left_frame.config(bg='red')
        #         self.label.config(background='red')
        #         self.label2.config(background='red')
        #     if y_pred == 0:
        #         self.prediction_text.set('Not Falling')
        #         self.right_frame.config(bg='green')
        #         self.prediction_label.config(background='green')
        #         self.prediction_label2.config(background='green')
        #     else:
        #         self.prediction_text.set('Falling')
        #         self.right_frame.config(bg='red')
        #         self.prediction_label.config(background='red')
        #         self.prediction_label2.config(background='red')
        # except StopIteration:
            # self.label_text.set('No more test data')


    
    # def show_inference_from_MCU_page(self):
    #     # Clear the current content
    #     self.clear_page()

        

    #     self.pred_label = ttk.Label(self.root, font=("Arial", 50, 'bold'))
    #     self.pred_label.place(relx=0.5, rely=0.5, anchor='center')


    #     self.pred_label.pack(expand=True)
    #     while True:
    #         output = self.ser.readline().decode('utf-8').strip().split(',') # z.B.-127,127
    #         output0 = int(output[0])
    #         output1 = int(output[1])
  
    #         if output0 >= output1 or output1 <= self.confidence*127:
    #             self.pred_label.config(text="You are safe!")
    #         else:
    #             self.pred_label.config(text="You are Falling!!!\nCall 112!!!")
    #             # hold for 3 seconds
    #             #time.sleep(3)
    #             print(output)
        

    def show_inference_from_MCU_page(self):
        # Clear the current content
        self.clear_page()

        # Create the label with the updated text
        self.pred_label = ttk.Label(self.root, font=("Arial", 50, 'bold'))
        self.pred_label.place(relx=0.5, rely=0.5, anchor='center')
        #output = self.ser.readline().decode('utf-8').strip().split(',') # e.g., "-127,127"
        try:
            # refresh every 50ms
            self.update_output()
            # output0 = int(output[0])
            # output1 = int(output[1])
            # #self.update_output(output0, output1)
            # if output0 >= output1 or output1 <= self.confidence * 127:
            #     self.pred_label.config(text="You are safe!")
            # else:
            #     self.pred_label.config(text="You are Falling!!! Call 112!!!")
        except (ValueError, IndexError):
            pass  # Handle errors if output is not as expected

    
    def update_output(self):
        output = self.ser.readline().decode('utf-8').strip() # e.g., "-127,127"
        
        if output != 'Mode 2 selected.' \
                and output != 'Mode 1 selected.' \
                and output != 'Mode 0 selected.' \
                and output != 'Echo' \
                and output != 'Data received.': 
            output = output.split(',') # e.g., "-127,127"
            output0 = int(output[0])
            output1 = int(output[1])
            #if output0 >= output1 or output1 <= self.confidence * 127:
            if output0 >= output1:
                self.pred_label.config(text="You are safe!")
            else:
                self.pred_label.config(text="You are Falling!!! Call 112!!!")
            
            self.root.after(50, self.update_output)
   

    def show_idle_page(self):
        # Clear the current content
        self.clear_page()

        # Create new content for idle page
        self.idle_label = ttk.Label(self.root, text="You are in Sleep Mode\nZZZ....\n\nPress the switch button \nto switch mode.", font=("Arial", 40))
        self.idle_label.pack(expand=True)



inference_gui = InferenceGUI(X_test, y_test, y_pred, 
                             port='COM3', baudrate=115200, MCU = 'STM32', confidence = 0.95)


In [134]:
classes = ['You are Falling!!! Call 112!!!', 'Your are safe!']
mode = [0, 1, 2] # 0: Sleep, 1: Inference on PC, 2: Inference on MCU
print('mode: ', mode)

ser = serial.Serial(port='COM3', baudrate=115200, timeout=3)
# flush the serial port
ser.flush()
ser.flushInput()
ser.flushOutput()

mode:  [0, 1, 2]


In [5]:
ser.close()

NameError: name 'ser' is not defined

In [136]:
#print(X_test[0].shape)
x = np.expand_dims(X_test[0], axis=0).astype(np.float32)
# Pack each float value in the flattened array
x_bytes = x.tobytes()
print(x_bytes)


b'\xecQ8>\x1f\x85\xeb\xbd\x8bl\xe7\xbe\xe8W\x8b\xc14\xaf\xf5\xc1\xb8X7Ash\x91=P\x8d\x17\xbe/\xdd\xa4\xbe\x9b\r\x9f\xc1\x84X\xac\xc1\xdc\x03\xd1@\n\xd7#<o\x12\x83\xbe\x96C\x8b\xbe\xd0y\xf2\xc1Aox\xc1PX!@B`\xe5;H\xe1\xba\xbe\x8blg\xbe\x0c\xe8\x12\xc2\xc5\xe2\x95\xc10\xae>@\x96C\x0b=\x17\xd9\xce\xbe\x81\x95\x83\xbeZ\x835\xc2\xafc\xc7\xc1\x00\xaf\xea@sh\x91=o\x12\xc3\xbe\xdd$\x86\xbe\x06\xd9G\xc2:\xd9\xd2\xc1\xfc\xad3A\xf4\xfd\xd4=\x0c\x02\xab\xbe\x9e\xef\xa7\xbe\xbcYn\xc2\xf0#\xd5\xc1\x81\xc3=A#\xdb\xf9=\xbe\x9f\x9a\xbe\xfc\xa9\xb1\xbe\xfd\xcf\x8f\xc2hy\xdc\xc1\x94\xad\x1dA5^\xba=\x98n\x92\xbe5^\xba\xbe\xb6\xe5\xa4\xc2hy\xdc\xc1x\x04\xf2@\xa6\x9b\xc4;\xc1\xca\xa1\xbeF\xb6\xb3\xbe\xde@\xbf\xc2gC\xb8\xc1\xf6\x83\xd6@\x12\x83@\xbd\xd7\xa3\xb0\xbe\xc1\xca\xe1\xbe\xdd9\xf5\xc2ZM\x91\xc1\xf6\x83\xd6@\xa6\x9b\xc4;\x1b/\x9d\xbeB`%\xbf?\xa1\x0b\xc3\xbf\xb8\xb8\xc1\x92\x84\xf7@\x12\x83\xc0=\x1dZ\xa4\xbe/\xdd$\xbfa\xd9\x12\xc3:\xd9\xd2\xc1\xe2-.A\x02+\x87=\xecQ\xb8\xbe\xb0r\x08\xbf\xa8\xcd!\xc3n\xd9

In [144]:
while True:
    if ser.in_waiting > 0:
        line = ser.readline().decode('utf-8').strip()
        #print(line) # output[0]=17 output[1]=-17
        
        if line == 'Mode 2 selected.':
            #print("Fall detection with data sent from MCU.")
            output0 = line[0]
            output1 = line[1]
        # if  'output[' in line:
        #     outputs = line.split()
        #     output0 = int(outputs[0].split('=')[1])
        #     output1 = int(outputs[1].split('=')[1])
            # print the highest probability between output0 and output1
            # if output0 > output1:
            #     print('You are safe!')
            # else:
            #     print('You are falling!!! Call 112!!!')
        if line == 'Mode 1 selected.':
            
            ser.write(b'Connect\r\n')
            #ser.write(x_bytes)
            ser.write(x_bytes)
            print('sssssssssssssssss')
            if line != 'Echo' and line != 'Mode 1 selected.':
                print(line)
            
        elif line == 'Mode 0 selected.':
            print("Sleep mode.")
      
            

Sleep mode.
sssssssssssssssss
Sleep mode.
sssssssssssssssss


KeyboardInterrupt: 