In [None]:
import nidaqmx 
import atiiaftt as ati
import time
import numpy as np
from nidaqmx.stream_readers import AnalogMultiChannelReader
import matplotlib.pyplot as plt
import pandas as pd
from nidaqmx.constants import AcquisitionType
from pprint import pprint

start_task
close_task

read_raw_voltage
read_avg_voltage
read_raw_ft
read_avg_ft

set_bias(bias=None)

set_channel_number
set_sampling_freq
set_read_freq

get_calibration_file_path
get_tool_offset
get_channel_number
get_sampling_freq
get_read_freq
get_sample_number

In [None]:
class ftsensor:
    def __avg(lst):
        return sum(lst) / len(lst)

    def __init__(self, calibration_file_path="..\FT44764\FT44764.cal", tool_offset=[0,0,0,0,0,0]):
        self.calibration_file_path = calibration_file_path
        self.tool_offset = tool_offset
        self.channel_number = 6
        self.sampling_freq = 6400
        self.read_freq = 64
        self.sample_number = self.sampling_freq//self.read_freq
        self.raw_readings = np.zeros((self.channel_number, self.sample_number), dtype=np.float64)
        
        self.sensor = ati.FTSensor()
        self.calibrate(calibration_file_path=self.calibration_file_path)
        self.set_tool(self.tool_offset)

    def calibrate(self, calibration_file_path):
        self.sensor.createCalibration(calibration_file_path, 1)
        self.sensor.setForceUnits("N".encode("utf-8"))
        self.sensor.setTorqueUnits("N-m".encode("utf-8"))
        
    def set_tool(self, tool_offset):
        self.sensor.setToolTransform(tool_offset, "mm".encode("utf-8"), "deg".encode("utf-8"))

    def start_task(self):
        self.task = nidaqmx.Task()
        for i in range(self.channel_number):
            self.task.ai_channels.add_ai_voltage_chan(f"Dev1/ai{i}")
        self.task.timing.cfg_samp_clk_timing(self.sampling_freq, sample_mode = AcquisitionType.CONTINUOUS, samps_per_chan = self.sample_number)
        self.reader = AnalogMultiChannelReader(self.task.in_stream)
    
    def close_task(self):
        self.task.close()

    def read_raw_voltage(self):
        self.reader.read_many_sample(data=self.raw_readings, number_of_samples_per_channel=self.sample_number, timeout=1/self.read_freq)
        return self.raw_readings
    
    def read_avg_voltage(self):
        raw_voltage = self.read_raw_voltage()
        return [ftsensor.__avg(channel) for channel in raw_voltage]
    
    def read_raw_ft(self):
        raw_voltage = self.read_raw_voltage()
        return [self.sensor.convertoFT([raw_voltage[i][j] for i in range(self.channel_number)]) for j in range(self.sample_number)]
    
    def read_ft(self):
        avg_voltage = self.read_avg_voltage()
        return self.sensor.convertToFt(avg_voltage)

In [None]:
sensor = ftsensor()
sensor.start_task()
raw_voltage = sensor.read_raw_voltage()
avg_voltage = sensor.read_avg_voltage()
raw_ft = sensor.read_raw_ft()
ft = sensor.read_ft()
sensor.close_task()

In [None]:
pprint(raw_voltage)

In [None]:
CHANNEL_NUM = 6
SAMPLING_FREQ = 6400
READ_FREQ = 64
SAMPLE_NUM = SAMPLING_FREQ//READ_FREQ
values = np.zeros((CHANNEL_NUM, SAMPLE_NUM), dtype=np.float64)

DURATION = 5

sensor = ati.FTSensor()

CALFILEPATH = "..\FT44764\FT44764.cal"
INDEX = 1
cal = sensor.createCalibration(CALFILEPATH, INDEX)

sensor.setForceUnits("N".encode("utf-8"))
sensor.setTorqueUnits("N-m".encode("utf-8"))

TOOL_OFFSET = [0,0,0,0,0,0]
sensor.setToolTransform(TOOL_OFFSET, "mm".encode("utf-8"), "deg".encode("utf-8"))

with nidaqmx.Task() as task:
    for i in range(CHANNEL_NUM):
        task.ai_channels.add_ai_voltage_chan(f"Dev1/ai{i}")

    task.timing.cfg_samp_clk_timing(SAMPLING_FREQ, sample_mode = AcquisitionType.CONTINUOUS, samps_per_chan = SAMPLE_NUM)
    reader = AnalogMultiChannelReader(task.in_stream)
    
    reader.read_many_sample(data=values, number_of_samples_per_channel=SAMPLE_NUM, timeout=1/READ_FREQ)

In [None]:
current_time = time.time()
readings_inside_loop = []
while time.time() < current_time+DURATION:
    with nidaqmx.Task() as task:
        task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
        task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
        task.ai_channels.add_ai_voltage_chan("Dev1/ai2")
        task.ai_channels.add_ai_voltage_chan("Dev1/ai3")
        task.ai_channels.add_ai_voltage_chan("Dev1/ai4")
        task.ai_channels.add_ai_voltage_chan("Dev1/ai5")

        readings_inside_loop.append(sensor.convertToFt(task.read()))
readings_inside_loop = np.array(readings_inside_loop)

print("Shape size: ", readings_inside_loop.shape)
std_readings_inside_loop = readings_inside_loop.transpose()[0].std()
print("Standard Deviation of Force X (N): ", std_readings_inside_loop)
avg_readings_inside_loop = readings_inside_loop.transpose()[0].mean()
print("Average of Force X (N): ", avg_readings_inside_loop)

In [None]:
current_time = time.time()
readings_outside_loop = []
with nidaqmx.Task() as task:
    task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
    task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
    task.ai_channels.add_ai_voltage_chan("Dev1/ai2")
    task.ai_channels.add_ai_voltage_chan("Dev1/ai3")
    task.ai_channels.add_ai_voltage_chan("Dev1/ai4")
    task.ai_channels.add_ai_voltage_chan("Dev1/ai5")
    while time.time() < current_time+DURATION:
        readings_outside_loop.append(sensor.convertToFt(task.read()))
readings_outside_loop = np.array(readings_outside_loop)


print("Shape size: ", readings_outside_loop.shape)
std_readings_outside_loop = readings_outside_loop.transpose()[0].std()
print("Standard Deviation of Force X (N): ", std_readings_outside_loop)
avg_readings_outside_loop = readings_outside_loop.transpose()[0].mean()
print("Average of Force X (N): ", avg_readings_outside_loop)

In [None]:
readings_multi_read_one = []
with nidaqmx.Task() as read_task:
    read_task = nidaqmx.Task()
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai2")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai3")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai4")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai5")
    reader = AnalogMultiChannelReader(read_task.in_stream,)
    values_read = np.zeros(6, dtype=np.float64)



    current_time = time.time()
    while time.time() < current_time+DURATION:
        reader.read_one_sample(values_read)
        
        readings_multi_read_one.append(sensor.convertToFt(values_read.tolist()))

readings_multi_read_one = np.array(readings_multi_read_one)


print("Shape size: ", readings_multi_read_one.shape)
std_readings_multi_read_one = readings_multi_read_one.transpose()[0].std()
print("Standard Deviation of Force X (N): ", std_readings_multi_read_one)
avg_readings_multi_read_one = readings_multi_read_one.transpose()[0].mean()
print("Average of Force X (N): ", avg_readings_multi_read_one)

In [None]:
readings_multi_read_many = []
with nidaqmx.Task() as read_task:
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai2")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai3")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai4")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai5")

    sample_rate = 6400
    samples_to_acq = DURATION * sample_rate
    values_read = np.zeros((6, samples_to_acq), dtype=np.float64)
    cont_mode = AcquisitionType.CONTINUOUS
    read_task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = cont_mode, samps_per_chan = samples_to_acq)               
    reader = AnalogMultiChannelReader(read_task.in_stream)

    current_time = time.time()
    reader.read_many_sample(data=values_read, number_of_samples_per_channel=samples_to_acq, timeout=11)
    test_time = time.time()
    print("time passed: ", test_time-current_time,"(s)")

readings_multi_read_many = np.array([np.array(channel) for channel in values_read]).transpose()
readings_multi_read_many = np.array([sensor.convertToFt(reading.tolist()) for reading in readings_multi_read_many])

std_readings_multi_read_many = readings_multi_read_many.transpose()[0].std()
print("Standard Deviation of Force X (N): ", std_readings_multi_read_many)
avg_readings_multi_read_many = readings_multi_read_many.transpose()[0].mean()
print("Average of Force X (N): ", avg_readings_multi_read_many)

In [None]:
sample_rate = 6400
samples_to_acq = DURATION * sample_rate
values_read = np.zeros((6, samples_to_acq), dtype=np.float64)

with nidaqmx.Task() as read_task:
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai2")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai3")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai4")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai5")
    
    read_task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = AcquisitionType.CONTINUOUS, samps_per_chan = samples_to_acq)               
    
    reader = AnalogMultiChannelReader(read_task.in_stream)

    current_time = time.time()
    reader.read_many_sample(data=values_read, number_of_samples_per_channel=samples_to_acq, timeout=DURATION)
    test_time = time.time()
    print("time passed: ", test_time-current_time,"(s)")

final_readings = [[__avg(sample) for sample in channel] for channel in np.reshape(values_read, (6, 32000//100,100))]

final_ft = np.array([sensor.convertToFt([final_readings[i][j] for i in range(len(final_readings))]) for j in range(len(final_readings[0]))])

std_final_ft = final_ft.transpose()[0].std()
print("Standard Deviation of Force X (N): ", std_final_ft)
avg_final_ft = final_ft.transpose()[0].mean()
print("Average of Force X (N): ", avg_final_ft)

In [None]:
sample_rate = 6400
samples_read = 100

with nidaqmx.Task() as read_task:
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai2")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai3")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai4")
    read_task.ai_channels.add_ai_voltage_chan("Dev1/ai5")
    
    read_task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = AcquisitionType.CONTINUOUS, samps_per_chan = samples_read)               
    
    read_task.start()
    reading = task.read(number_of_samples_per_channel=samples_read)
    read_task.stop()
    print(readings)

final_readings = [[__avg(sample) for sample in channel] for channel in np.reshape(values_read, (6, 32000//100,100))]

final_ft = np.array([sensor.convertToFt([final_readings[i][j] for i in range(len(final_readings))]) for j in range(len(final_readings[0]))])

std_final_ft = final_ft.transpose()[0].std()
print("Standard Deviation of Force X (N): ", std_final_ft)
avg_final_ft = final_ft.transpose()[0].mean()
print("Average of Force X (N): ", avg_final_ft)

In [None]:
print((test_reading[0]))

In [None]:
FOLDER_DIR = "..\Joey\Record_Data"

demo_df = pd.read_csv(FOLDER_DIR+"\Demo\Sensor_2.csv")

demo_df.drop(demo_df.columns[6:], axis=1, inplace=True)

print("Shape size: ", demo_df.shape)
demo_df["Force X (N)"].astype(int)
# print("Standard Deviation of Force X (N): ", demo_df["Force X (N)"].std())
# print("Average of Force X (N): ", demo_df["Force X (N)"].mean())