forked from PFTL/SimpleDaq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
simple_daq.py
129 lines (105 loc) · 4.25 KB
/
simple_daq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
Simple DAQ Controller
=====================
Python For The Lab revolves around controlling a simple DAQ device built on top of an Arduino.
The DAQ device is capable of generating up to two analog outputs in the range 0-3.3V and to acquire
several analog inputs.
Because of the pedagogy of the course Python for the Lab, it was assumed that the device can generate
value by value and not a sequence. This forces the developer to think on how to implement a solution
purely on Python.
"""
import serial
from time import sleep, time
class SimpleDaq():
""" Controller for the serial devices that ships with Python for the Lab.
"""
DEFAULTS = {'write_termination': '\n',
'read_termination': '\n',
'encoding': 'ascii',
'baudrate': 9600,
'write_timeout': 1,
'read_timeout': 1,
}
"""Dictionary storing the defaults to communicate through the serial port.
"""
rsc = None
"""Resource. It is the actual library providing low level communication. """
def __init__(self, port):
""" Automatically initializes the communication with the device.
"""
self.initialize(port)
def initialize(self, port):
""" Opens the serial port with the DEFAULTS.
"""
self.rsc = serial.Serial(port=port,
baudrate=self.DEFAULTS['baudrate'],
timeout=self.DEFAULTS['read_timeout'],
write_timeout=self.DEFAULTS['write_timeout'])
sleep(0.5)
def idn(self):
"""Identify the device.
"""
return self.query("IDN")
def get_analog_value(self, channel):
"""Gets the value from an analog port.
:param int port: Port number to read.
:return int: The value read.
"""
query_string = 'IN:CH{}'.format(channel)
value = int(self.query(query_string))
return value
def set_analog_value(self, channel, value):
""" Sets a voltage to an output port.
:param int port: Port number. Range depends on device
:param Quantity value: The output value in Volts.
"""
value = int(value.m_as('V')/3.3*4095)
write_string = 'OUT:CH{}:{}'.format(channel, value)
self.write(write_string)
def finalize(self):
""" Closes the communication with the device.
"""
if self.rsc is not None:
self.rsc.close()
def query(self, message):
"""Sends a message to the devices an reads the output.
:param str message: Message sent to the device. It should generate an output, or it will timeout waiting to read from it.
:return str: The message read from the device
"""
self.write(message)
return self.read()
def write(self, message):
""" Writes a message to the device using the DEFAULT end of line and encoding.
:param str message: The message to send to the device
"""
if self.rsc is None:
raise Warning("Trying to write to device before initializing")
msg = (message + self.DEFAULTS['write_termination']).encode(self.DEFAULTS['encoding'])
self.rsc.write(msg)
sleep(0.1)
def read(self):
""" Reads from the device using the DEFAUTLS end of line and encoding.
:return str: The message received from the device.
"""
line = "".encode(self.DEFAULTS['encoding'])
read_termination = self.DEFAULTS['read_termination'].encode(self.DEFAULTS['encoding'])
t0 = time()
new_char = "".encode(self.DEFAULTS['encoding'])
while new_char != read_termination:
new_char = self.rsc.read(size=1)
line += new_char
if time()-t0 > self.DEFAULTS['read_timeout']:
raise Exception("Readout time reached when reading from the device")
return line.decode(self.DEFAULTS['encoding'])
if __name__ == "__main__":
import pint
ur = pint.UnitRegistry()
d = SimpleDaq('/dev/ttyACM0')
# input('Waiting to ready')
print(d.query('IDN'))
#d.write('OUT:CH0:4000')
input('Press to read value')
print(d.query('IN:CH0'))
out_value = ur('3.0V')
d.set_analog_value(0, out_value)
d.finalize()