In [4]:
import numpy as np
import nidaqmx
import threading
import queue

import time
import os

import sys

from PyQt6 import QtWidgets, QtCore, QtGui
from PyQt6.QtCore import Qt, pyqtSlot
from PyQt6.QtWidgets import (QWidget, QTextEdit, QSlider, QGraphicsScene, QGraphicsView, QWidget,QPushButton,QVBoxLayout,QHBoxLayout,QLabel, QCheckBox)
from PyQt6.QtGui import QImage, QPixmap

In [2]:

class PID_control():

    def __init__(self):
        self.kp = 10
        self.ki = 0
        self.kd = 0
        self.prev_1 = 0
        self.prev_2 = 0

    def process(self, val_1, val_2, aim_1, aim_2):

        back_1 = (aim_1-val_1)*self.kp + val_1
        back_2 = (aim_2-val_2)*self.kp + val_2

        return back_1, back_2


class niManager():

    def __init__(self):
        
        self.sample_rate = 10000
        self.buffer_size = 10 
        self.recovery_rate = self.sample_rate/self.buffer_size
        
        self.num_samples = 0
        self.num_samples_tot = 40*self.recovery_rate
        self.sample_array = np.zeros((7, self.num_samples_tot))

        self.deviceName = "myDaq" 
        self.num_channels = 2

        self.buffer_data = np.array((self.num_channels,self.buffer_size))

        self.data_que_1 = queue.Queue( size = 100)
        if self.num_channels == 2:
            self.data_que_2 = queue.Queue( size = 100)

        self.PID = PID_control()

    def change_num_channels(self, num):
        self.num_channels = num

    def main(self):
        event = threading.threading.Event()
        self.create_ai_read()
        self.create_ao_output()

        sampling_thread = threading.threading.Thread(target= self.ao_sampling, args=(event,))

        self.ai_read.start()

        _ = input("hit me to stop")
        event.set() 
        self.ai_read.close()
        sampling_thread.join()
        

    def create_ai_read(self):

        self.ai_read = nidaqmx.Task()

        if self.num_channels == 2:
            self.ai_read.ai_channels.add_ai_voltage_chan(os.path.join(self.deviceName,"ai0:1"), terminal_config = nidaqmx.constants.TerminalConfiguration.DIFF)
        elif self.num_channels == 1:
            self.ai_read.ai_channels.add_ai_voltage_chan(os.path.join(self.deviceName,"ai0"), terminal_config = nidaqmx.constants.TerminalConfiguration.DIFF)
        
        self.ai_read.timing.cfg_samp_clk_timing(rate=self.sample_rate, sample_mode=nidaqmx.constants.AcquisitionType.CONTINUOUS,
                                                    samps_per_chan=self.buffer_size)
        
        self.ai_stream = nidaqmx.stream_readers.AnalogMultiChannelReader(self.ai_read.in_stream)
        self.NiAlReader.register_every_n_samples_acquired_into_buffer_event(self.buffer_size, self.ai_rec_callback)

    def ai_rec_callback(self, task_idx, event_type, num_samples, callback_data):

        self.readerStreamIn.read_many_sample(self.data_buffer, self.buffer_size, timeout = nidaqmx.constants.WAIT_INFINITELY)
        
        self.sample_array[:1, self.num_samples] = np.mean(self.buffer_data,axis = -1)
        
        self.data_que.put(np.concatenate((self,num_samples, self.sample_array[:1, self.num_samples].T)))
        if self.num_samples == self.num_samples_tot:
            self.ai_read.stop()
        self.num_samples += 1

        return 0
    
    def create_ao_output(self):
    
        self.ao_write_1 = nidaqmx.Task()
        self.ao_write_1.ao_channels.add_ao_voltage_chan(os.path.join(self.deviceName,"ao0"), "", min_val=- 10.0, max_val=10.0)
            
        if self.num_channels == 2:
            self.ao_write_2 = nidaqmx.Task()
            self.ao_write_2.ao_channels.add_ao_voltage_chan(os.path.join(self.deviceName,"ao1"), "", min_val=- 10.0, max_val=10.0)

        self.NiAlWriter.start()
        for i in range(5):
            self.ao_write.write(0,1000)

    def checkLimits(self, data):
        """
        Check that writing value is in the limits
        """
        if data < -10:
            data = -10
        elif data > 10:
            data = 10

        return data

    def ao_sampling(self, event: threading.Event):

        end_of_line = len(self.signal[0,:])

        while True:

            if self.data_que.empty == True:
                time.sleep(1e-9)
            else:
                val = self.data_que.get()
                val_1 = val[1]/self.res_1
                val_2 = val[2]/self.res_2
                tuned_val = self.PID.process(val_1, val_2, self.signal[0,val[0]], self.signal[1,val[0]])
                tuned_val[0] = self.checkLimits(tuned_val[0])
                tuned_val[1] = self.checkLimits(tuned_val[1])
                
                self.ao_write_1.write(tuned_val[0])
                self.ao_write_2.write(tuned_val[1])
                self.sample_array[2, val[0]] = self.signal[0,val[0]]
                self.sample_array[3, val[0]] = self.signal[1,val[0]]
                self.sample_array[4, val[0]] = tuned_val[0]
                self.sample_array[5, val[0]] = tuned_val[1]
                self.sample_array[6, val[0]] = val[0]
                

                if (val[0] == end_of_line) & (event.is_set()):
                    break
        
        self.ao_write_1.stop()
        self.ao_write_2.stop()

        self.ao_write_1.clsoe()
        self.ao_write_2.close()

        print("Saving...")
        np.save("./current.npy", self.sample_array)

        return 0

In [3]:
class App(QWidget):

    def __init__(self, args):
        super().__init__()

        self.args = args

        self.sample_Hz = 1000

        self.tune_flag = False
        self.measure_flag = False

        #Init driver and signal pipe
        self.driver = niManager()
        self.driver.setData.connect(self.signal_NI)
        
        self.win= QWidget()


    @pyqtSlot(object)
    def signal_NI(self, data):
        self.time = np.roll(self.time,-1); self.time[-1] = data[0]*self.sample_Hz

        self.target_1 = np.roll(self.target,-1); self.target[-1] = data[1]
        self.target_2 = np.roll(self.target,-1); self.target[-1] = data[2]

        self.measured_1 = np.roll(self.measured,-1); self.measured[-1] = data[3]
        self.measured_2 = np.roll(self.measured,-1); self.measured[-1] = data[4]

        #Update plots
        self.dataLineTarget_1.setData(self.time, self.target_1)
        self.dataLineTarget_2.setData(self.time, self.target_2)
        self.dataLineMeasured_1.setData(self.time, self.measured_1)
        self.dataLineMeasured_2.setData(self.time, self.measured_2)

        #Update fields
        self.CurrentValueLabel.setText("Coil 1 target: {:.2f} A \nCoil 2 target: {:.2f} A\nCoil 1 measured: {:.2f} A \nCoil 2 measured".format(data[1],data[2],data[3],data[4]))

        #Update axis
        if data[0]*self.samplingHz>10:
            self.plotI.setXRange(data[0]*self.samplingHz-10, data[0]*self.samplingHz+10, padding=0)
            self.plotB.setXRange(data[0]*self.samplingHz-10, data[0]*self.samplingHz+10, padding=0)
    
    def init_ui(self):
        self.main_layout = QVBoxLayout()
        #Buttoms
        self.button_layout = QHBoxLayout()
        #Text fields
        self.text_layout = QHBoxLayout()
        #plots
        self.plot_layout = QHBoxLayout()

        self.main_layout.addLayout(self.button_layout)
        self.main_layout.addLayout(self.text_layout)  
        self.main_layout.addLayout(self.plot_layout)  
    
    def init_buttons(self):
        self.btn_start = QPushButton("Start")
        self.btn_start.pressed.connect(self.start_func)
        self.btn_start.setStyleSheet("background-color : green")

        self.btn_tune = QPushButton("Tune")
        self.btn_tune.pressed.connect(self.tune_func)
        self.btn_tune.setStyleSheet("background-color : green")

        self.btn_stop = QPushButton("Stop")
        self.btn_stop.pressed.connect(self.stop_func)
        self.btn_stop.setStyleSheet("background-color : red")

    def __del__(self):
        # Restore sys.stdout
        sys.stdout = sys.__stdout__

    def stop_func(self):
        self.btn_start.setStyleSheet("background-color : white")
        if (self.tune_flag == False) & (self.measure_flag == False):
            exit(0)

    def start_func(self):
        self.btn_start.setStyleSheet("background-color : white")
    
    def tune_func(self):
        self.btn_tune.setStyleSheet("background-color : white")
    
    def init_text_fields(self):
        self.path_field = QTextEdit("Write Path")
        self.text_layout.addWidget(self.path_field)

        self.print_field = QTextEdit("print field")
        self.text_layout.addWidget(self.print_field)

    
    def init_plots(self):
        self.plot_coils = pg.PlotWidget()
        self.plt_layout.addWdiget(self.plot_coils)
        
                            
        


In [None]:

app = QtWidgets.QApplication(sys.argv)
w = App(args)
sys.exit(app.exec())