# Flux off project

The pupose of this jupyter notebook is to provide a proof of concept to map the circuit breaker trip time to the magnetic field of the device.

In [1]:
import serial
import time 
import csv
import os
import threading

def response_mtb(ser_MTB):
    response = b""
    while True:
        char = ser_MTB.read()
        if char == b'\r':
            break
        response += char
    dec_response = response.decode().strip()
    return dec_response

def send_command_mtb(ser_MTB,comm):
    time.sleep(0.2)
    ser_MTB.write((comm + "\r").encode())  # Ensure each command ends with CR

def send_command_response_mtb(ser_MTB,comm):
    send_command_mtb(ser_MTB,comm)
    return response_mtb(ser_MTB)


def response_ard(ser_ard):
    response = b""
    while True:
        char = ser_ard.read()
        if char == b'\r':
            break
        response += char
    dec_response = response.decode().strip()
    return dec_response

def sample(ser_ard):
    ser_ard.write(("Sample" + "\n").encode())  # Ensure each command ends with CR

def read_and_write_data(file_path, ser, stop_event):
    print("Writing thread started.")
    with open(file_path, "w") as file:
        writer = csv.writer(file) 
        # Write the CSV header
        writer.writerow(['Timestamp', 'Raw'])
        while not stop_event.is_set():
            data = response_ard(ser)
            data = data.split(",")
            writer.writerow([data[0],data[1]])
    print("Writing thread stopped.")

def check_condition(ser_MTB, stop_event):
    start_time = time.time()  # Record the start time
    timeout = 300  # Timeout in seconds (5 minutes)

    while send_command_response_mtb(ser_MTB, "s") != "s12" and not stop_event.is_set():
        if time.time() - start_time > timeout:
            print("Timeout reached. Stopping writing thread.")
            stop_event.set()
            break
        time.sleep(0.25)  # Adjust sleep time as needed

    if not stop_event.is_set():
        print("Condition met. Stopping writing thread.")
        stop_event.set()


def record_serial_data(port, baud_rate,amps,itter):
    try:
        ser = serial.Serial(port, baud_rate,timeout=1)
        ser.setDTR(False)
        time.sleep(1)
        ser.flushInput()
        ser.setDTR(True)
        print("Serial port opened successfully.")
        # Open serial port MTB
        ser_MTB = serial.Serial(port_MTB, baud_rate_MTB, bytesize=bytesize_MTB, parity=parity_MTB, stopbits=stopbits_MTB, timeout=1)
        print("MTB Serial port opened successfully.")

        while send_command_response_mtb(ser_MTB,"s") != "S0":
            send_command_mtb(ser_MTB,"S0")
        
        print(send_command_response_mtb(ser_MTB,"V00.00"))
        temp_str = amps.split("_")
        command = "I" + str(temp_str[0]) + "." + str(temp_str[1])
        print(send_command_response_mtb(ser_MTB,command))
        print(send_command_response_mtb(ser_MTB,"S1"))
        time.sleep(2)
        print(send_command_response_mtb(ser_MTB,"s"))
    
        # Alter the file name with the iteration value and the current value
        new_file_name = f"{amps}A_{itter}.csv"
        # Construct the full path to the file within the "samples" directory
        file_path = os.path.join(samples_directory, new_file_name)

        # Create an Event object to signal when to stop the writing thread
        stop_event = threading.Event()

        # Start the writing thread
        sample(ser)
        write_thread = threading.Thread(target=read_and_write_data, args=(file_path, ser, stop_event))
        write_thread.start()

        # Start the condition checking thread
        condition_thread = threading.Thread(target=check_condition, args=(ser_MTB, stop_event))
        condition_thread.start()

        # Once the condition is met, wait for the writing thread to finish
        write_thread.join()

        # Wait for the condition checking thread to finish
        condition_thread.join()

    
        print("Done.")

    except serial.SerialException as e:
        print(f"Serial port error: {e}")
    finally:
        if ser.is_open:
            ser.close()
            ser_MTB.close()
            print("Serial port closed.")

if __name__ == "__main__":

    # Define serial port settings
    port_MTB = "COM21"  # Change this to your serial port
    baud_rate_MTB = 2400
    bytesize_MTB = serial.EIGHTBITS
    parity_MTB = serial.PARITY_NONE
    stopbits_MTB = serial.STOPBITS_TWO  # Set to two stop bits
    
    

    current_value_amps = "19_5"

    # Create a directory called "samples" if it doesn't exist
    samples_directory = "samples/" + str(current_value_amps)
    if not os.path.exists(samples_directory):
        os.makedirs(samples_directory)
    
    # COM port and baud rate settings
    port = "COM26"
    baud_rate = 115200

    # Record serial data

    for i in range(10):
        record_serial_data(port, baud_rate,current_value_amps,i)


Serial port opened successfully.
MTB Serial port opened successfully.
s0
V  0.0
I19.50
S1
Writing thread started.
Timeout reached. Stopping writing thread.
Writing thread stopped.
Done.
Serial port closed.
Serial port opened successfully.
MTB Serial port opened successfully.
s0
V  0.0
I19.50
S1
Writing thread started.
Timeout reached. Stopping writing thread.
Writing thread stopped.
Done.
Serial port closed.
Serial port opened successfully.
MTB Serial port opened successfully.
s0
V  0.0
I19.50
S1
Writing thread started.
Timeout reached. Stopping writing thread.
Writing thread stopped.
Done.
Serial port closed.
Serial port opened successfully.
MTB Serial port opened successfully.
s0
V  0.0
I19.50
S1
Writing thread started.
Timeout reached. Stopping writing thread.
Writing thread stopped.
Done.
Serial port closed.
Serial port opened successfully.
MTB Serial port opened successfully.
s0
V  0.0
I19.50
S1
Writing thread started.
Timeout reached. Stopping writing thread.
Writing thread stopp