In [3]:
import glob
import sys
import serial
import time
import random
import re

In [9]:
# constructs one set command
def construct_write_command(command_name, command_value):
    return "    set " + command_name + " = " + command_value + "\n"

# constructs one get command
def construct_read_command(command_name):
    return "    get " + command_name + "\n"

# defines all possible commands for the testing tool
def define_commands():
    return ["boot_state",\
            "main_altitude",\
            "acc_threshold",\
            "mach_timer_duration",\
            "timer1_start",\
            "timer1_trigger",\
            "timer1_duration",\
            "timer2_start",\
            "timer2_trigger",\
            "timer2_duration",\
            "timer3_start",\
            "timer3_trigger",\
            "timer3_duration",\
            "timer4_start",\
            "timer4_trigger",\
            "timer4_duration",\
            "ev_moving",\
            "ev_ready",\
            "ev_liftoff",\
            "ev_burnout",\
            "ev_apogee",\
            "ev_lowalt",\
            "ev_touchdown",\
            "ev_custom1",\
            "ev_custom2",\
            "servo1_init_pos",\
            "servo2_init_pos",\
            "rec_elements",\
            "rec_speed"]

# defines all possible timer states which want to be tested and if they are allowed or not #
def define_timer_state(index):
    array = ["MOVING",\
            "READY",\
            "LIFTOFF",\
            "MAX_V",\
            "APOGEE",\
            "POST_APOGEE",\
            "TOUCHDOWN",\
            "CUSTOM_1",\
            "CUSTOM_2",\
            "sdfgkj"]
    
    acceptance = ["P",\
                  "P",\
                  "P",\
                  "P",\
                  "P",\
                  "P",\
                  "P",\
                  "P",\
                  "P",\
                  "F"]
    
    return array[index], acceptance[index], len(array)

# defines all possible main altitudes which want to be tested and if they are allowed or not #
def define_main_altitude(index):
    array = ["-50",\
             "50",\
             "100",\
             "200",\
             "1000",\
             "10000"]
    
    acceptance = ["F",\
                  "P",\
                  "P",\
                  "P",\
                  "P",\
                  "P"]
    return array[index], acceptance[index], len(array)

# defines all possible acceleration thresholds which want to be tested and if they are allowed or not #
def define_acc_threshold(index):
    array = ["10",\
            "30",\
            "50",\
            "100",\
            "1000"]
    
    acceptance = ["F",\
                  "P",\
                  "P",\
                  "F",\
                  "F"]
    
    return array[index], acceptance[index], len(array)

# defines all possible timer durations which want to be tested and if they are allowed or not #
def define_timer_duration(index):
    array = ["0",\
             "10",\
            "1000",\
            "10000",\
            "5000"]
    
    acceptance = ["P",\
                  "P",\
                  "P",\
                  "P",\
                  "P"]
    return array[index], acceptance[index], len(array)

# defines all possible acceleartion axes which want to be tested and if they are allowed or not #
def define_sim_axes(index):
    array = ["--x",\
              "--y",\
              "--z"]
    return array[index]

# defines all possible noise seeds which want to be tested and if they are allowed or not #
def define_sim_noise(index):
    array = ["--ns1",\
             "--ns10",\
             "--ns69"]
    return array[index]

# constructs the simulation command based on the axis and the noise seed #
def construct_sim_command(axis, noise):
    return "    sim --hop " + define_sim_axes(axis) + " " + define_sim_noise(noise) + "\n"
    
# constructs an array of all the different variables bound to the command in the right order, based on an index #
def define_variables(main_index, threshold_index,\
                     timer1_start_index, timer2_start_index, timer3_start_index, timer4_start_index,\
                     timer1_trigger_index, timer2_trigger_index, timer3_trigger_index, timer4_trigger_index,\
                     timer1_dur_index, timer2_dur_index, timer3_dur_index, timer4_dur_index):
    
    return ["CATS_FLIGHT",\
            define_main_altitude(main_index)[0],\
            define_acc_threshold(threshold_index)[0],\
            "0",\
            define_timer_state(timer1_start_index)[0],\
            define_timer_state(timer1_trigger_index)[0],\
            define_timer_duration(timer1_dur_index)[0],\
            define_timer_state(timer2_start_index)[0],\
            define_timer_state(timer2_trigger_index)[0],\
            define_timer_duration(timer2_dur_index)[0],\
            define_timer_state(timer3_start_index)[0],\
            define_timer_state(timer3_trigger_index)[0],\
            define_timer_duration(timer3_dur_index)[0],\
            define_timer_state(timer4_start_index)[0],\
            define_timer_state(timer4_trigger_index)[0],\
            define_timer_duration(timer4_dur_index)[0],\
            "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "16,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "16,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",\
            "0",\
            "0",\
            "4294967295",\
            "100Hz"]

# constructs an array of all the different variable acceptances in the right order, based on an index #
def define_acceptance_criteria(main_index, threshold_index,\
                               timer1_start_index, timer2_start_index, timer3_start_index, timer4_start_index,\
                               timer1_trigger_index, timer2_trigger_index, timer3_trigger_index, timer4_trigger_index,\
                               timer1_dur_index, timer2_dur_index, timer3_dur_index, timer4_dur_index):
    
    return ["P",\
            define_main_altitude(main_index)[1],\
            define_acc_threshold(threshold_index)[1],\
            "P",\
            define_timer_state(timer1_start_index)[1],\
            define_timer_state(timer1_trigger_index)[1],\
            define_timer_duration(timer1_dur_index)[1],\
            define_timer_state(timer2_start_index)[1],\
            define_timer_state(timer2_trigger_index)[1],\
            define_timer_duration(timer2_dur_index)[1],\
            define_timer_state(timer3_start_index)[1],\
            define_timer_state(timer3_trigger_index)[1],\
            define_timer_duration(timer3_dur_index)[1],\
            define_timer_state(timer4_start_index)[1],\
            define_timer_state(timer4_trigger_index)[1],\
            define_timer_duration(timer4_dur_index)[1],\
            "P",\
            "P",\
            "P",\
            "P",\
            "P",\
            "P",\
            "P",\
            "P",\
            "P",\
            "P",\
            "P",\
            "P",\
            "P"]

# constructs the write and read config based on the commands above. 
# The write config is used to set all configurations
# The read config is used to check if the configuration was set correctly after rebooting.
def construct_config(main_index, threshold_index,\
                     timer1_start_index, timer2_start_index, timer3_start_index, timer4_start_index,\
                     timer1_trigger_index, timer2_trigger_index, timer3_trigger_index, timer4_trigger_index,\
                     timer1_dur_index, timer2_dur_index, timer3_dur_index, timer4_dur_index) -> str:
    commands = define_commands()
    variables = define_variables(main_index, threshold_index,\
                                 timer1_start_index, timer2_start_index, timer3_start_index, timer4_start_index,\
                                 timer1_trigger_index, timer2_trigger_index, timer3_trigger_index, timer4_trigger_index,\
                                 timer1_dur_index, timer2_dur_index, timer3_dur_index, timer4_dur_index)
    
    acceptance_criteria = define_acceptance_criteria(main_index, threshold_index,\
                                                     timer1_start_index, timer2_start_index, timer3_start_index, timer4_start_index,\
                                                     timer1_trigger_index, timer2_trigger_index, timer3_trigger_index, timer4_trigger_index,\
                                                     timer1_dur_index, timer2_dur_index, timer3_dur_index, timer4_dur_index)
    write_config = ""
    read_config = ""
    for i in range(0, len(commands)):
        write_config = write_config + construct_write_command(commands[i], variables[i])
        read_config = read_config + construct_read_command(commands[i])
        
    return write_config.splitlines(), read_config.splitlines(), acceptance_criteria
    


# default configuration #
def get_config() -> str:
    return """
    set boot_state = CATS_FLIGHT
    set main_altitude = 200
    set acc_threshold = 45
    set mach_timer_duration = 0
    set timer1_start = MOVING
    set timer1_trigger = MOVING
    set timer1_duration = 0
    set timer2_start = MOVING
    set timer2_trigger = MOVING
    set timer2_duration = 0
    set timer3_start = MOVING
    set timer3_trigger = MOVING
    set timer3_duration = 0
    set timer4_start = MOVING
    set timer4_trigger = MOVING
    set timer4_duration = 0
    set ev_moving = 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set ev_ready = 16,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set ev_liftoff = 16,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set ev_burnout = 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set ev_apogee = 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set ev_lowalt = 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set ev_touchdown = 16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set ev_custom1 = 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set ev_custom2 = 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
    set servo1_init_pos = 0
    set servo2_init_pos = 0
    set rec_elements = 4294967295
    set rec_speed = 100Hz
    """

# sets some desired parameter and then checks if the parameter was set correclty, dependent on the acceptance criteria #
def set_parameter(config, criteria, ser):
    
    command_index = 5
    value_index = 7
    
    bcommand = bytes(f"{config}\n", encoding="UTF-8")
    ser.write(bcommand)
    test_passed = False
    failed_command = ""
    command_words = config.split(" ")
    config_var = bytes(f"dsfgaer", encoding="UTF-8")
    config_val = bytes(f"dsfgaer", encoding="UTF-8")
    if len(command_words) > 5:
        config_var = bytes(f"{command_words[command_index]}", encoding="UTF-8")
        config_val = bytes(f"{command_words[value_index]}", encoding="UTF-8")
        
    returned_command = ser.readline()
    if(config_var in returned_command):
        if(config_val in ser.readline()):
            if(criteria == 'P'):
                #print('Correct Config Was Set.')
                test_passed = True
                ser.readline()
                ser.readline()
                ser.readline()
            else:
                failed_command = "[SET] Command [" + str(returned_command)+ "] should not have been set but was."
                test_passed = False
                ser.readline()
                ser.readline()
                ser.readline()
        else:
            if(criteria == 'F'):
                #print('Command was correctly not set.')
                test_passed = True
                ser.readline()
                ser.readline()
                ser.readline()
            else:
                failed_command = "[SET] Command [" + str(returned_command) + "] should have been set but was not."
                test_passed = False
                ser.readline()
                ser.readline()
                ser.readline()
    else:
        print('[SET HARD FAULT]: ' + str(returned_command))
        ser.readline()
        ser.readline()
        ser.readline()
    #print('--------------')
    return test_passed, failed_command

# Reads out a desired parameter and checks if it is the same parameter, than what was set using the write configuration.
def get_parameter(read_config, write_config, criteria, ser):
    
    command_index = 5
    value_index = 7
    
    bcommand = bytes(f"{read_config}\n", encoding="UTF-8")
    ser.write(bcommand)
    test_passed = False
    failed_command = ""
    command_words = write_config.split(" ")
    config_var = bytes(f"dsfgaer", encoding="UTF-8")
    config_val = bytes(f"dsfgaer", encoding="UTF-8")
    if len(command_words) > 2:
        config_var = bytes(f"{command_words[command_index]}", encoding="UTF-8")
        config_val = bytes(f"{command_words[value_index]}", encoding="UTF-8")
        
    returned_command = ser.readline()
    if(config_var in returned_command):
        returned_line = ser.readline()
        if(config_val in returned_line):
            if(criteria == 'P'):
                #print('Correct Config Was Set.')
                test_passed = True
                ser.readline()
            else:
                failed_command = "[GET] Command [" + str(returned_command)+ "] was not set correctly: " + str(config_val) + " should not have been set, but " + str(returned_line) + " was set."
                test_passed = False
                ser.readline()
                ser.readline()
                ser.readline()
        else:
            if(criteria == 'F'):
                #print('Command was correctly not set.')
                test_passed = True
                ser.readline()
            else:
                failed_command = "[GET] Command [" + str(returned_command) + "] was not set correctly: " + str(config_val) + " Expected, but " + str(returned_line) + " Gotten."
                test_passed = False
                ser.readline()
                ser.readline()
                ser.readline()
        ser.readline()
        ser.readline()
        ser.readline()
    else:
        print('[GET HARD FAULT]: ' + str(returned_command))
        ser.readline()
        ser.readline()
        ser.readline()
        ser.readline()
        ser.readline()
        ser.readline()
        ser.readline()
    #print('--------------')
    return test_passed, failed_command

# Opens up a connection to the CATS #
def open_cats_connection(ser):
    ser.write(bytes(f" \n", encoding="UTF-8"))
    # Flush Datapoints #
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()

# Reboots the system and reopens connection #
def reboot(ser):
    ser.write(bytes(f"reboot\n", encoding="UTF-8"))
    ser.close()
    time.sleep(2)
    ser.open()
    open_cats_connection(ser)
    return

# starts the simulation #
def start_sim(axis, noise, ser):
    command = construct_sim_command(axis, noise);
    ser.write(bytes(f"{command}\n", encoding="UTF-8"))
    return

# gets the flight stants and prints them #
def get_flight_stats(flight_number, ser):
    command = "    stats " + str(flight_number) + "\n"
    ser.write(bytes(f"{command}\n", encoding="UTF-8"))
    for i in range(0, 30):
        print(ser.readline())
    return

# used for flight stats conversion to get the good numbers #
def convert_to_num(string):
    return float(re.sub("[^0-9|.]", "", string))

# saves the config #
def save_config(ser):
    ser.write(bytes(f"save\n", encoding="UTF-8"))
    return


# compares the flight stats to some metric described below #
# Returns a true or false if the metrics are in the allowed state #
def compare_flight_stats(flight_number, axis, ser):
    # Flush serial interface #
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    ser.readline()
    
    estimated_height = 100
    allowed_abs_height_error = 2
    
    estimated_time_at_apogee = 20100
    allowed_abs_time_error = 4000
    
    estimated_max_velocity = 30
    allowed_abs_vel_error = 0.5
    
    estimated_max_acc = 28
    allowed_abs_acc_error = 1
    
    P0 = 98050
    P0_abs_error = 100
    
    angle = 1
    angle_allowed_error = 0.05
    
    
    command = "    stats " + str(flight_number) + "\n"
    ser.write(bytes(f"{command}\n", encoding="UTF-8"))
    flight_passed = True
    for i in range(0, 30):
        time.sleep(0.1)
        line = ser.readline()
        line_str = str(line)
        split_line = line_str.split(" ")
        # Read Which flight stats file was read
        if(i == 3):
            if(not(str(flight_number) in line_str)):
                flight_passed = False
                print("Flight " + str(flight_number) + " not recorded!")
                break;
        # Check Apogee Time
        if(i == 6):
            apogee_time_ms = convert_to_num(split_line[-1])
            if(not(abs(apogee_time_ms - estimated_time_at_apogee) < allowed_abs_time_error)):
                flight_passed = False
                print("Apogee was Triggered with an error of " + str(apogee_time_ms - estimated_time_at_apogee) + " ms.")
                break;
        # Check Apogee
        if(i == 7):
            apogee_height = convert_to_num(split_line[-1])
            if(not(abs(apogee_height - estimated_height) < allowed_abs_height_error)):
                flight_passed = False
                print("Apogee was Triggered with an error of " + str(apogee_height - estimated_height) + " m.")
                break;
        # Check velocity
        if(i == 11):
            max_vel = convert_to_num(split_line[-1])
            if(not(abs(max_vel - estimated_max_velocity) < allowed_abs_vel_error)):
                flight_passed = False
                print("Max velocity was calculated with an error of " + str(max_vel - estimated_max_velocity) + " m/s.")
                break;
        # Check acceleration
        if(i == 15):
            max_acc = convert_to_num(split_line[-1])
            if(not(abs(max_acc - estimated_max_acc) < allowed_abs_acc_error)):
                flight_passed = False
                print("Max Acceleration was calculated with an error of " + str(max_acc - estimated_max_acc) + " m/s^2.")
                break;
        # Check Pressure calibration
        if(i == 18):
            pressure_calib = convert_to_num(split_line[-1])
            if(not(abs(pressure_calib - P0) < P0_abs_error)):
                flight_passed = False
                print("Pressure was calibrated with an error of " + str(pressure_calib - P0) + " Pa.")
                break;
        # Check Acceleration angle calibration
        if(i == 20):
            angle_calib = convert_to_num(split_line[-1])
            if(not(abs(angle_calib - angle) < angle_allowed_error)):
                flight_passed = False
                print("Angle was calibrated with an error of " + str(angle_calib - angle) + ".")
                break;
        # Check Acceleration axis calibration
        if(i == 21):
            axis_calib = int(convert_to_num(split_line[-1]))
            if(not(axis_calib == axis)):
                flight_passed = False
                print("Wrong axis was calibrated!")
                break;
    return flight_passed
    

In [12]:
port = 'COM5'

# Starts Simulation #
with serial.Serial(port, 19200, timeout=0.02) as ser:
    # Open CLI #
    reboot(ser)
    
    # Define Testing #
    num_tests = 100
    # Variable which says where the flight stats number starts #
    sim_num = 39
    # Seed for the random indexes below #
    random.seed(2)
    
    # Dont Touch #
    successful_tests = 0
    failures = ["", "", "", "", ""]
    fail_count = 0
    current_failure = ""
    
    
    # Start Test Loop #
    for i in range(0, num_tests):
        
        # Set Testing Configuration #
        main_index = random.randrange(0, define_main_altitude(0)[2])
        acc_index = random.randrange(0, define_acc_threshold(0)[2])
        timer1_start_index = random.randrange(0, define_timer_state(0)[2])
        timer2_start_index = random.randrange(0, define_timer_state(0)[2])
        timer3_start_index = random.randrange(0, define_timer_state(0)[2])
        timer4_start_index = random.randrange(0, define_timer_state(0)[2])
        timer1_trigger_index = random.randrange(0, define_timer_state(0)[2])
        timer2_trigger_index = random.randrange(0, define_timer_state(0)[2])
        timer3_trigger_index = random.randrange(0, define_timer_state(0)[2])
        timer4_trigger_index = random.randrange(0, define_timer_state(0)[2])
        timer1_dur_index = random.randrange(0, define_timer_duration(0)[2])
        timer2_dur_index = random.randrange(0, define_timer_duration(0)[2])
        timer3_dur_index = random.randrange(0, define_timer_duration(0)[2])
        timer4_dur_index = random.randrange(0, define_timer_duration(0)[2])
        write_config, read_config, criteria = construct_config(main_index, acc_index,\
                                            timer1_start_index, timer2_start_index, timer3_start_index, timer4_start_index,\
                                            timer1_trigger_index, timer2_trigger_index, timer3_trigger_index, timer4_trigger_index,\
                                            timer1_dur_index, timer2_dur_index, timer3_dur_index, timer4_dur_index)
        
        #print("Timer1 Start: " + str(timer1_start_index) + " End: " + str(timer1_trigger_index) + " Duration: " + str(timer1_dur_index))
        #print("Timer2 Start: " + str(timer2_start_index) + " End: " + str(timer2_trigger_index) + " Duration: " + str(timer2_dur_index))
        #print("Timer3 Start: " + str(timer3_start_index) + " End: " + str(timer3_trigger_index) + " Duration: " + str(timer3_dur_index))
        #print("Timer4 Start: " + str(timer4_start_index) + " End: " + str(timer4_trigger_index) + " Duration: " + str(timer4_dur_index))
        test_result = True
        
        # Set Testing Configuration on the Board and compare directly #
        for config_line, criteria_line in zip(write_config, criteria):
            result, current_failure = set_parameter(config_line, criteria_line, ser)
            test_result = test_result and result
            if(current_failure != ""):
                print(current_failure)
              
        # Save and Reboot #
        save_config(ser)  
        reboot(ser)
        
        
        # get Testing Configuration on the Board and compare directly #
        for read_config_line, write_config_line, criteria_line in zip(read_config, write_config, criteria):
            result, current_failure = get_parameter(read_config_line, write_config_line, criteria_line, ser)
            test_result = test_result and result
            if(current_failure != ""):
                print(current_failure)
        
        #Start Simulation with given parameters
        sim_axis = random.randrange(0, 3)
        sim_noise = random.randrange(0, 3)
        start_sim(sim_axis, sim_noise, ser)
        
        # Wait for simulation to conclude #
        time.sleep(30)
        
        # Compare sim results #
        sim_result = compare_flight_stats(sim_num, sim_axis, ser)
        test_result = test_result and sim_result
        sim_num = sim_num + 1
        
        time.sleep(0.5)
        
        # Reboot #
        reboot(ser)
        
        if(test_result):
            successful_tests = successful_tests + 1
        else:
            print("Test Failed on iteration " + str(i)) 
            
    print("Testing Concluded. " + str(successful_tests) + "/" + str(num_tests) + " were successful.")
        

Flight 39 not recorded!
Test Failed on iteration 0


SerialException: ClearCommError failed (PermissionError(13, 'The device does not recognize the command.', None, 22))

In [None]:
with serial.Serial(port, 19200, timeout=0.1) as ser:
    ser.write(bytes(f"set main_altitude=200\n", encoding="UTF-8"))
    print(ser.readline())
    print(ser.readline())
    print(ser.readline())
    print(ser.readline())
    print(ser.readline())
    print(ser.readline())
    print(ser.readline())
    print(ser.readline())