In [1]:
import nidaqmx as ni
import numpy as np

In [2]:
def is_device_available(device_name):
    avail_device_names = ni.system.System().devices.device_names
    return device_name in set(dev.lower() for dev in avail_device_names)

def is_counter_available(device_handle, counter_name):
    avail_counter_names = [
        ctr.split('/')[-1] for ctr in device_handle.co_physical_chans.channel_names
    ]
    return counter_name in avail_counter_names

def is_ai_term_available(device_handle, term_name):
    avail_ai_term_names = [
        term.rsplit('/', 1)[-1].lower() for term in device_handle.ai_physical_chans.channel_names
    ]
    return term_name in avail_ai_term_names

def is_ao_term_available(device_handle, term_name):
    avail_ao_term_names = [
        term.rsplit('/', 1)[-1].lower() for term in device_handle.ao_physical_chans.channel_names
    ]
    return term_name in avail_ao_term_names

def is_pfi_term_available(device_handle, term_name):
    avail_pfi_term_names = [
        term.rsplit('/', 1)[-1].lower() for term in device_handle.terminals if 'PFI' in term
    ]
    return term_name in avail_pfi_term_names


In [None]:
class ni_tasks_manager:

    tasks_dict = dict()
    
    def create_task(task_name):
        TD = ni_tasks_manager.tasks_dict
        if task_name in TD.keys():
            raise Exception(f'{task_name} has already benn created!')
        
        try:
            task = ni.Task(task_name)
        except:
            raise Exception(f'Fail to create task named {task_name}.')
        TD[task_name] = task        
        return task

    def remove_task(name=None, task=None):
        TD = ni_tasks_manager.tasks_dict
        if task:
            if not task in TD.values():
                raise Exception('Fail to find {task} from our record!')
            for k, t in TD.items():
                if id(task) == id(t):
                    name = k
                    break
        if name:
            try:
                task = TD.pop(name)
                try:
                    if not task.is_task_done():
                        task.stop()
                    task.close()
                    del task
                except ni.DaqError:
                    print('Fail to delete task {name}')
            except:
                raise Exception('Fail to remove task {name}')
        else:
            print('Specify which task to be removed.')

    def is_name_exist(task_name):
        return task_name in ni_tasks_manager.tasks_name_list.keys()
    
    def is_task_exist(task):
        return id(task) in [id(t) for t in ni_tasks_manager.values()]


In [24]:
device_name = 'dev2'

if not is_device_available(device_name):
    print(f'{device_name} is not available!')

device_handle = ni.system.Device(device_name)

In [None]:
class sample_clock:

    def __init__(self, device_name, counter_name, period=0.01, duty_cycle=0.9, frame_size=None):
        self.device_name = device_name
        self.counter_name = counter_name
        self.source = f'/{self.device_name}/{self.counter_name}'
        self.period = period
        self.duty_cycle = duty_cycle
        self.frame_size = frame_size
        self.mode = ni.constants.AcquisitionType.FINITE if self.frame_size else ni.constants.AcquisitionType.CONTINUOUS
        self.task = None

    @property
    def sample_rate(self):
        return 1/self.period
    
    @sample_rate.setter
    def sample_rate(self, rate):
        self.period = 1/rate

    def init_task(self):
        self.task = ni_tasks_manager.create_task(f'clock{id(self)}')
        self.task.co_channels.add_co_pulse_chan_freq(
            self.source,
            idle_state=ni.constants.Level.LOW,
            freq=self.sample_rate,
            duty_cycle=self.duty_cycle,
        )
        self.task.timing.cfg_implicit_timing(
            sample_mode=self.mode, samps_per_chan=self.frame_size,
        )

    def clear_task(self):
        if self.task:
            ni_tasks_manager.remove_task(task=self.task)
            del self.task
    

In [121]:
device_name = 'dev2'
counter_name = 'ctr0'

period = 0.01   # seconds
sample_rate = 1/period
DutyCycle = 0.9
frame_size = 10

# print()
# task_clk.co_channels.add_co_pulse_chan_freq(
#     f'/{device_name}/{counter_name}',
#     idle_state=ni.constants.Level.LOW,
#     freq=sample_rate,
#     duty_cycle=DutyCycle,
# )
# task_clk.timing.cfg_implicit_timing(
#     sample_mode=ni.constants.AcquisitionType.FINITE,
#     samps_per_chan=frame_size,
# )

In [134]:
clk = sample_clock(device_name, counter_name, period=0.01, frame_size=10, duty_cycle=0.9)
clk.init_task()

In [138]:
clk.task

Task(name=clock1666340143056)

In [137]:
ni_tasks_manager.tasks_dict

{}

In [107]:
task_clk.stop()

In [106]:
task_clk.control(ni.constants.TaskMode.TASK_COMMIT)