In [68]:
import nidaqmx
import numpy as np
from threading import Thread
import ipywidgets as widgets
import bqplot.pyplot as plt

from bqplot import Axis, Lines, Figure, LinearScale

class SignalSender:
    
    def __init__(self, signal, host='localhost', socket=5557, chan=0):
        self.socket = socket
        self.signal = signal
        self.task =  nidaqmx.Task()
        self.task.ao_channels.add_ao_voltage_chan(
            f'Dev1/ao{chan}'
        )

    def listen(self):
        with self.socket:
            while True:
                #fix to really detect the trigger
                if self.socket.trigger: 
                    self.send_signal()
        
    def send_signal(self):
        with self.task:
            self.task.write(signal)


In [69]:
def sine_wave(t, freq, amp, **kwargs):
    return amp * np.sin(t * freq * (2*np.pi))

def constant(t, amp, **kwargs):
    return np.ones_like(t) * amp


class SignalGenerator(widgets.VBox):
    
    def __init__(self, samples):
            
        lin_scx = LinearScale(domain=(0, samples))
        lin_scy = LinearScale(domain=(0, 10))
        
        ax_x = Axis(label='X', scale=lin_scx, orientation="horizontal")
        ax_y = Axis(label='Y', scale=lin_scy, orientation="vertical")
        t = np.linspace(samples, 10)
        self.data = np.zeros_like(t)
        self.lines = Lines(
            x=t,
            y=self.data,
            scales={
                "x": lin_scx,
                "y": lin_scy
            },
        )
        self.fig = Figure(
            marks=[self.lines,],
            axes=[ax_x, ax_y],
        )
        
        self.func_selector = widgets.RadioButtons(
            options=[("Sine", sine_wave), ("Constant", constant)], value=sine_wave
        )
        
        self.func_selector.observe(self.update_data)

        self.param_setters = {
            "samples": widgets.BoundedIntText(value=1000, min=1, max=100000, description="Sample number"),
            "freq" : widgets.BoundedFloatText(value=10, min=1, max=100, description="Frequency"),
            "amp": widgets.BoundedFloatText(value=2, min=-10, max=10, description="Amplitude"),
        }
        for widget in self.param_setters.values():
            widget.observe(self.update_data)
        
        self.start_btn = widgets.Button(
            description='Start listening',
            disabled=False,
            button_style='info', # 'success', 'info', 'warning', 'danger' or ''
            tooltip='Starts to listen to the socket',
            icon='play'
        )
        self.start_btn.on_click(self.on_start)
        
        
        paramset = widgets.VBox(
            tuple(self.param_setters.values())
        )
        selector = widgets.HBox(
            (paramset, self.func_selector, self.start_btn)
        )
        super().__init__([
            selector,
            self.fig
        ])
        self.update_data(None)
        
        
        
    @property
    def params(self):
        return {key: val.value for key, val in self.param_setters.items()}
        
    def update_data(self, event):
        
        func = self.func_selector.value
        t = np.linspace(0, 10, self.params['samples'])
        self.data = func(t, **self.params)
        self.lines.x = t
        self.lines.y = self.data
        self.fig.marks = [self.lines,]


    def on_start(self, event):
        self.start_btn.button_style = "danger"
        sender = SignalSender(self.data)
        listening = Thread(target=sender.listen)
        listening.run()

In [70]:
sig_g = SignalGenerator(1000)
sig_g


SignalGenerator(children=(HBox(children=(VBox(children=(BoundedIntText(value=1000, description='Sample number'â€¦

DaqNotFoundError: Could not find an installation of NI-DAQmx. Please ensure that NI-DAQmx is installed on this machine or contact National Instruments for support.