From 939bcd9fd17e44fb0c39299232586ce648da0208 Mon Sep 17 00:00:00 2001 From: Lightsaver7 <94392770+Lightsaver7@users.noreply.github.com> Date: Wed, 5 Apr 2023 15:58:05 +0200 Subject: [PATCH] Added functions for easier work with SCPI commands --- Examples/python/redpitaya_scpi.py | 929 +++++++++++++++++++++++++++++- 1 file changed, 905 insertions(+), 24 deletions(-) diff --git a/Examples/python/redpitaya_scpi.py b/Examples/python/redpitaya_scpi.py index fa3eaa7903..879d68e642 100755 --- a/Examples/python/redpitaya_scpi.py +++ b/Examples/python/redpitaya_scpi.py @@ -1,9 +1,11 @@ """SCPI access to Red Pitaya.""" import socket +import struct +import numpy as np -__author__ = "Luka Golinar, Iztok Jeras" -__copyright__ = "Copyright 2015, Red Pitaya" +__author__ = "Luka Golinar, Iztok Jeras, Miha Gjura" +__copyright__ = "Copyright 2023, Red Pitaya" class scpi (object): """SCPI class used to access Red Pitaya over an IP network.""" @@ -48,27 +50,27 @@ def rx_txt(self, chunksize = 4096): return msg[:-2] def rx_arb(self): - numOfBytes = 0 """ Recieve binary data from scpi server""" - str=b'' - while (len(str) != 1): - str = (self._socket.recv(1)) - if not (str == b'#'): + numOfBytes = 0 + data=b'' + while len(data) != 1: + data = self._socket.recv(1) + if data != b'#': return False - str=b'' - while (len(str) != 1): - str = (self._socket.recv(1)) - numOfNumBytes = int(str) - if not (numOfNumBytes > 0): + data=b'' + while len(data) != 1: + data = self._socket.recv(1) + numOfNumBytes = int(data) + if numOfNumBytes <= 0: return False - str=b'' - while (len(str) != numOfNumBytes): - str += (self._socket.recv(1)) - numOfBytes = int(str) - str=b'' - while (len(str) != numOfBytes): - str += (self._socket.recv(4096)) - return str + data=b'' + while len(data) != numOfNumBytes: + data += (self._socket.recv(1)) + numOfBytes = int(data) + data=b'' + while len(data) != numOfBytes: + data += (self._socket.recv(4096)) + return data def tx_txt(self, msg): """Send text string ending and append delimiter.""" @@ -79,6 +81,885 @@ def txrx_txt(self, msg): self.tx_txt(msg) return self.rx_txt() + +# SCPI command functions + + def sour_set( + self, + chan: int, + func: str = "sine", + volt: float = 1, + freq: float = 1000, + offset: float = 0, + phase: float = 0, + dcyc: float = 0.5, + data: np.ndarray = None, + burst: bool = False, + ncyc: int = 1, + nor: int = 1, + period: int = None, + trig: str = "int", + sdrlab: bool = False, + siglab: bool = False, + ) -> None: + + """ + Set the parameters for signal generator on one channel. + + Parameters + ----------- + chan (int) : + Output channel (either 1 or 2). + func (str, optional) : + Waveform of the signal (SINE, SQUARE, TRIANGLE, SAWU, + SAWD, PWM, ARBITRARY, DC, DC_NEG). + Defaults to `sine`. + volt (int, optional) : + Amplitude of signal {-1, 1} Volts. {-5, 5} for SIGNALlab 250-12. + Defaults to 1. + freq (int, optional) : + Frequency of signal. Not relevant if 'func' is "DC" or "DC_NEG". + Defaults to 1000. + offset (int, optional) : + Signal offset {-1, 1} Volts. {-5, 5} for SIGNALlab 250-12. + Defaults to 0. + phase (int, optional) : + Phase of signal {-360, 360} degrees. + Defaults to 0. + dcyc (float, optional) : + Duty cycle, where 1 corresponds to 100%. + Defaults to 0.5. + data (ndarray, optional) : + Numpy ``ndarray`` of max 16384 values, floats in range {-1,1} + (or {-5,5} for SIGNALlab). + Define the custom waveform if "func" is "ARBITRARY". + Defaults to `None`. + burst (bool, optional) : + Enable/disable Burst mode. (`True` - BURST, `False` - CONINUOUS) + Generate "nor" number of "ncyc" periods with total time "period". + Defaults to `False`. + ncyc (int, optional) : + Number of periods in one burst. + Defaults to 1. + nor (int, optional) : + Number of repeated bursts. + Defaults to 1. + period (_type_, optional) : + Total time of one burst in µs {1, 5e8}. Includes the signal and delay. + Defaults to `None`. + trig (str, optional): + Trigger source (EXT_PE, EXT_NE, INT, GATED). + Defaults to `int` (internal). + sdrlab (bool, optional): + `True` if operating with SDRlab 122-16. + Defaults to `False`. + siglab (bool, optional): + `True` if operating with SIGNALlab 250-12. + Defaults to `False`. + + The settings will work on any Red Pitaya board. If operating on a board + other than STEMlab 125-14, change the bool value of the appropriate + parameter to true (sdrlab, siglab) + + Raises + ------ + + Raises errors if the input parameters are out of range. + + """ + + ### Constants ### + waveform_list = ["SINE","SQUARE","TRIANGLE","SAWU","SAWD","PWM","ARBITRARY","DC","DC_NEG"] + trigger_list = ["EXT_PE","EXT_NE","INT","GATED"] + buff_size = 16384 + + ### Limits ### + volt_lim = 1 + offs_lim = 1 + phase_lim = 360 + freq_up_lim = 50e6 # 50 MHz + freq_down_lim = 0 + + if siglab: + volt_lim = 5 + offs_lim = 5 + elif sdrlab: + freq_down_lim = 300e3 # 300 kHz + + + + ### CHECK FOR ERRORS ### + + try: + assert chan in (1,2) + except AssertionError as channel_err: + raise ValueError("Channel needs to be either 1 or 2") from channel_err + + try: + assert func.upper() in waveform_list + except AssertionError as waveform_err: + raise ValueError(f"{func.upper()} is not a defined waveform") from waveform_err + + try: + assert freq_down_lim < freq <= freq_up_lim + except AssertionError as freq_err: + raise ValueError(f"Frequency is out of range {freq_down_lim, freq_up_lim} Hz") from freq_err + + try: + assert abs(volt) <= volt_lim + except AssertionError as ampl_err: + raise ValueError(f"Amplitude is out of range {-volt_lim, volt_lim} V") from ampl_err + + try: + assert abs(offset) <= offs_lim + except AssertionError as offs_err: + raise ValueError(f"Offset is out of range {-offs_lim, offs_lim} V") from offs_err + + try: + assert 0 <= dcyc <= 1 + except AssertionError as dcyc_err: + raise ValueError(f"Duty Cycle is out of range {0, 1}") from dcyc_err + + try: + assert abs(phase) <= phase_lim + except AssertionError as phase_err: + raise ValueError(f"Phase is out of range {-phase_lim, phase_lim} deg") from phase_err + + if data is not None: + + try: + assert data.shape[0] <= buff_size + except AssertionError as data_err: + raise ValueError(f"Data array is too long. Max length is {buff_size}") from data_err + + #try: + # assert max(absolute(data)) <= volt_lim + #except AssertionError: + # raise ValueError(f"Amplitude of data is out of range {-volt_lim, volt_lim}") + + try: + assert ncyc >= 1 + except AssertionError as ncyc_err: + raise ValueError("NCYC minimum is 1") from ncyc_err + + try: + assert nor >= 1 + except AssertionError as nor_err: + raise ValueError("NOR minimum is 1") from nor_err + + if period is not None: + try: + assert period >= 1 + except AssertionError as period_err: + raise ValueError("Minimal burst period 1 µs") from period_err + + try: + assert trig.upper() in trigger_list + except AssertionError as trig_err: + raise ValueError(f"{trig.upper()} is not a defined trigger source") from trig_err + + try: + assert not((siglab is True) and (sdrlab is True)) + except AssertionError as board_err: + raise ValueError("Please select only one board option. 'siglab' and 'sdrlab' cannot be true at the same time.") from board_err + + + + ### Variables ### + wf_data = [] + + + ### SEND COMMANDS TO RP ### + self.tx_txt(f"SOUR{chan}:FUNC {func.upper()}") + self.tx_txt(f"SOUR{chan}:VOLT {volt}") + + if func.upper() not in waveform_list[7:9]: + self.tx_txt(f"SOUR{chan}:FREQ:FIX {freq}") + + self.tx_txt(f"SOUR{chan}:VOLT:OFFS {offset}") + self.tx_txt(f"SOUR{chan}:PHAS {phase}") + + if func.upper() == "PWM": + self.tx_txt(f"SOUR{chan}:DCYC {dcyc}") + + if (data is not None) and (func.upper() == "ARBITRARY"): + for n in data: + wf_data.append(f"{n:.5f}") + cust_wf = ", ".join(map(str, wf_data)) + + self.tx_txt(f"SOUR{chan}:TRAC:DATA:DATA {cust_wf}") + + if burst: + self.tx_txt(f"SOUR{chan}:BURS:STAT BURST") + self.tx_txt(f"SOUR{chan}:BURS:NCYC {ncyc}") + self.tx_txt(f"SOUR{chan}:BURS:NOR {nor}") + + if period is not None: + self.tx_txt(f"SOUR{chan}:BURS:INT:PER {period}") + else: + self.tx_txt(f"SOUR{chan}:BURS:STAT CONTINUOUS") + + self.tx_txt(f"SOUR{chan}:TRIG:SOUR {trig.upper()}") + + #print(f"SOUR{chan} set successfully") + + def acq_set( + self, + dec: int = 1, + trig_lvl: float = 0, + trig_delay: int = 0, + trig_delay_ns: bool = False, + units: str = None, + sample_format: str = None, + averaging: bool = True, + gain: list = None, # 2 channels (double the length if 4-input) + coupling: list = None, # 2 channels + ext_trig_lvl: float = 0, + siglab: bool = False, + input4: bool = False + ) -> None: + + """ + + Set the parameters for signal acquisition. + + Parameters + ----------- + + dec (int, optional) : + Decimation (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, + 4096, 8192, 16384, 32768, 65536) + Defaults to 1. + trig_lvl (float, optional) : + Trigger level in Volts. {-1, 1} Volts on LV gain or {-20, 20} Volts on HV gain. + Defaults to 0. + trig_delay (int, optional) : + Trigger delay in samples (if trig_delay_ns = True, then the delay is in ns) + Defaults to 0. + trig_delay_ns (bool, optional) : + Change the trigger delay to nanoseconds instead of samples. + Defaults to False. + units (str, optional) : + The units in which the acquired data will be returned. + Defaults to "VOLTS". + sample_format (str, optional) : + The format in which the acquired data will be returned. + Defaults to "ASCII". + averaging (bool, optional) : + Enable/disable averaging. When True, if decimation is higher than 1, + each returned sample is the average of the taken samples. For example, + if dec = 4, the returned sample will be the average of the 4 decimated + samples. + Defaults to True. + gain (list(str), optional) : + HV / LV - (High (1:20) or Low (1:1 attenuation)) + The first element in list applies to the SOUR1 and the second to SOUR2. + Refers to jumper settings on Red Pitaya fast analog inputs. + (1:20 and 1:1 attenuator for SIGNALlab 250-12) + Defaults to ["LV","LV"]. + coupling (list(str), optional) : + AC / DC - coupling mode for fast analog inputs. + The first element in list applies to the SOUR1 and the second to SOUR2. + (Only SIGNALlab 250-12) + Defaults to ["DC","DC"]. + ext_trig_lvl (float, optional) : + Set trigger external level in V. + (Only SIGNALlab 250-12) + Defaults to 0. + siglab (bool, optional) : + Set to True if operating with SIGNALlab 250-12. + Defaults to False. + input4 (bool, optional) : + Set to True if operating with STEMlab 125-14 4-Input. + Defaults to False. + + The settings will work on any Red Pitaya board. If operating on SIGNALlab 250-12 + or STEMlab 125-14 4-Input change the bool value of the appropriate parameter to + true (siglab, input4). This will change the available range of input parameters. + + Raises + ------ + + Raises errors if the input parameters are out of range. + + """ + + ### Constants ### + #decimation_list = [1,2,4,8,16,32,64,128,256,512,1024,2048,4096,8192,16384,32768,65536] + gain_list = ["LV","HV"] + coupling_list = ["DC","AC"] + units_list = ["RAW","VOLTS"] + format_list = ["BIN", "ASCII"] + + ### Limits ### + if input4: # Set number of channels + n = 4 + else: + n = 2 + trig_lvl_lim = 1.0 + gain_lvl = "LV" + + if gain is not None: + for i in gain: + if i.upper() == "HV": + trig_lvl_lim = 20.0 + gain_lvl = "HV" + + ### CHECK FOR ERRORS ### + #try: + # assert dec in decimation_list + #except AssertionError as dec_err: + # raise ValueError(f"Decimation needs to be a power of 2 {1, 65536}") + + try: + assert abs(trig_lvl) <= trig_lvl_lim + except AssertionError as trig_err: + raise ValueError(f"Trigger level out of range {-trig_lvl_lim, trig_lvl_lim} V", + f"for gain {gain_lvl}") from trig_err + + try: + assert trig_delay >= 0 + except AssertionError as trig_dly_err: + raise ValueError("Trigger delay cannot be less that 0") from trig_dly_err + + if units is not None: + try: + assert units.upper() in units_list + except AssertionError as unit_err: + raise ValueError(f"{units.upper()} is not a defined unit") from unit_err + + if sample_format is not None: + try: + assert sample_format.upper() in format_list + except AssertionError as format_err: + raise ValueError(f"{sample_format.upper()} is not a defined format") from format_err + + if gain is not None: + try: + assert (gain[0].upper() in gain_list) and (gain[1].upper() in gain_list) + except AssertionError as gain_err: + raise ValueError(f"{gain[0].upper()} or {gain[1].upper()} is not a defined gain") from gain_err + + if siglab and coupling is not None: + try: + assert (coupling[0].upper() in coupling_list) and (coupling[1].upper() in coupling_list) + except AssertionError as coupling_err: + raise ValueError(f"{coupling[0].upper()} or {coupling[1].upper()}", + "is not a defined coupling") from coupling_err + try: + assert abs(ext_trig_lvl) <= trig_lvl_lim + except AssertionError as ext_trig_err: + raise ValueError("External trigger level out of range", + f"{-trig_lvl_lim, trig_lvl_lim} V") from ext_trig_err + + try: + assert not((siglab is True) and (input4 is True)) + except AssertionError as board_err: + raise ValueError("Please select only one board option.", + "'siglab' and 'input4' cannot be true at the same time.") from board_err + + + ### SEND COMMANDS TO RP ### + self.tx_txt(f"ACQ:DEC {dec}") + + if averaging: + self.tx_txt("ACQ:AVG ON") + else: + self.tx_txt("ACQ:AVG OFF") + + if trig_delay_ns: + self.tx_txt(f"ACQ:TRIG:DLY:NS {trig_delay}") + else: + self.tx_txt(f"ACQ:TRIG:DLY {trig_delay}") + + if units is not None: + self.tx_txt(f"ACQ:DATA:UNITS {units.upper()}") + if sample_format is not None: + self.tx_txt(f"ACQ:DATA:FORMAT {sample_format.upper()}") + + if gain is not None: + for i in range(n): + self.tx_txt(f"ACQ:SOUR{i+1}:GAIN {gain[i].upper()}") + + self.tx_txt(f"ACQ:TRIG:LEV {trig_lvl}") + + if siglab and coupling is not None: + for i in range(n): + self.tx_txt(f"ACQ:SOUR{i+1}:COUP {coupling[i].upper()}") + + self.tx_txt(f"ACQ:TRIG:EXT:LEV {ext_trig_lvl}") + + #print("ACQ set successfully") + + def get_settings( + self, + siglab: bool = False, + input4: bool = False + ) -> str: + """ + + Retrieves the settings from Red Pitaya, prints them in console and returns + them as an array with the following sequence: + [decimation, avearge, trig_dly, trig_dly_ns, trig_lvl, buf_size, gain_ch1, gain_ch2, coup_ch1, coup_ch2, ext_trig_lvl] + , gain_ch3, gain_ch4 + Decimation - Current decimation + Average - Current averaging status (ON/OFF) + Trig_dly - Current trigger delay in samples + Trig_dly_ns - Current trigger delay in nanoseconds + Trig_lvl - Current triger level in Volts + Buf_size - Buffer size + Gain_ch1-4 - Current gain on channels (CH3 and CH4 STEMlab 125-14 4-Input only) + Coup_ch1/2 - Current coupling mode for both channels (AC/DC) (SIGNALlab only) + Ext_trig_lvl - Current external trigger level in Volts (SIGNALlab only) + + Note: The last three array elements won't exist if siglab = False + Gain of channels 3 and 4 only if input4 = True + + Parameters + ---------- + siglab (bool, optional): + Set to True if operating with SIGNALlab 250-12. + Defaults to False. + input4 (bool, optional): + Set to True if operating with STEMlab 125-14 4-Input. + Defaults to False. + + """ + + try: + assert not((siglab is True) and (input4 is True)) + except AssertionError as board_err: + raise ValueError("Please select only one board option. 'siglab' and 'input4' cannot be true at the same time.") from board_err + + + settings = [] + + if input4: # Set number of channels + n = 4 + else: + n = 2 + + settings.append(self.txrx_txt("ACQ:DEC?")) + settings.append(self.txrx_txt("ACQ:AVG?")) + settings.append(self.txrx_txt("ACQ:TRIG:DLY?")) + settings.append(self.txrx_txt("ACQ:TRIG:DLY:NS?")) + settings.append(self.txrx_txt("ACQ:TRIG:LEV?")) + settings.append(self.txrx_txt("ACQ:BUF:SIZE?")) + + for i in range(n): + settings.append(self.txrx_txt(f"ACQ:SOUR{i+1}:GAIN?")) + + if siglab: + for i in range(2): + settings.append(self.txrx_txt(f"ACQ:SOUR{i+1}:COUP?")) + + settings.append(self.txrx_txt("ACQ:TRIG:EXT:LEV?")) + + + print(f"Decimation: {settings[0]}") + print(f"Averaging: {settings[1]}") + print(f"Trigger delay (samples): {settings[2]}") + print(f"Trigger delay (ns): {settings[3]}") + print(f"Trigger level (V): {settings[4]}") + print(f"Buffer size: {settings[5]}") + + if input4: + print(f"Gain CH1/CH2/CH3/CH4: {settings[6]}, {settings[7]}, {settings[8]}, {settings[9]}") + else: + print(f"Gain CH1/CH2: {settings[6]}, {settings[7]}") + + if siglab: + print(f"Coupling CH1/CH2: {settings[8]}, {settings[9]}") + print(f"External trigger level (V): {settings[10]}") + + return settings + + def acq_data( + self, + chan: int, + start: int = None, + end: int = None, + num_samples: int = None, + old: bool = False, + lat: bool = False, + binary: bool = False, + convert: bool = False, + input4: bool = False + ) -> list: + """ + Returns the acquired data on a channel from the Red Pitaya, with the following options (for a specific channel): + - only channel => returns the whole buffer + - start and end => returns the samples between them + - start and n => returns 'n' samples from the start position + - old and n => returns 'n' oldest samples in the buffer + - lat and n => returns 'n' latest samples in the buffer + + Parameters + ---------- + chan (int) : + Input channel (either 1 or 2). + (1-4 for STEMlab 125-14 4-Input) + start (int, optional): + Start position of acquired data in the buffer {0,1,...16384} + Defaults to None. + end (int, optional): + End position of acquired data in the buffer {0,1,...16384} + Defaults to None. + n (int, optional): + Number of samples read. + old (bool, optional): + Read oldest samples in the buffer. + lat (bool, optional): + Read latest samples in the buffer. + bin (bool, optional): + Set to True if working with Binary data. + Defaults to False. + convert (bool, optional): + Set to True to convert data to a list of floats (VOLTS) or integers (RAW). + Otherwise returns a list of str (VOLTS) or int (RAW). + Defaults to False. + input4 (bool, optional) : + Set to True if operating with STEMlab 125-14 4-Input. + Defaults to False. + + + Raises + ------ + + Raises errors if the input parameters do not match one of the options. + + """ + + low_lim = 0 + up_lim = 16384 + + # Check input data for errors + if input4: + try: + assert chan in (1,2,3,4) + except AssertionError as chanel_err: + raise ValueError("Channel needs to be either 1, 2, 3 or 4") from chanel_err + else: + try: + assert chan in (1,2) + except AssertionError as chanel_err: + raise ValueError("Channel needs to be either 1 or 2") from chanel_err + + try: + assert not((old is True) and (lat is True)) + except AssertionError as arg_err: + raise ValueError("Please select only one. 'old' and 'lat' cannot be True at the same time.") from arg_err + + if start is not None: + try: + assert 16384 >= start >= 0 + except AssertionError as start_err: + raise ValueError(f"Start position out of range {low_lim, up_lim}") from start_err + + if end is not None: + try: + assert 16384 >= end >= 0 + except AssertionError as end_err: + raise ValueError(f"End position out of range {low_lim, up_lim}") from end_err + + if num_samples is not None: + try: + assert 16384 >= num_samples >= 0 + except AssertionError as sample_err: + raise ValueError(f"Sample number out of range {low_lim, up_lim}") from sample_err + + # Get data type from Red Pitaya + units = self.txrx_txt('ACQ:DATA:UNITS?') + # format = self.txrx_txt("ACQ:DATA:FORMAT?") + + + # Determine the output data + if(start is not None) and (end is not None): + self.tx_txt(f"ACQ:SOUR{chan}:DATA:STA:END? {start},{end}") + + elif(start is not None) and (num_samples is not None): + self.tx_txt(f"ACQ:SOUR{chan}:DATA:STA:N? {start},{num_samples}") + + elif old and (num_samples is not None): + self.tx_txt(f"ACQ:SOUR{chan}:DATA:OLD:N? {num_samples}") + + elif lat and (num_samples is not None): + self.tx_txt(f"ACQ:SOUR{chan}:DATA:LAT:N? {num_samples}") + + else: + self.tx_txt(f"ACQ:SOUR{chan}:DATA?") + + # Convert data + if binary: + buff_byte = self.rx_arb() + + if convert: + if units == "VOLTS": + buff = [struct.unpack('!f',bytearray(buff_byte[i:i+4]))[0] for i in range(0, len(buff_byte), 4)] + elif units == "RAW": + buff = [struct.unpack('!h',bytearray(buff_byte[i:i+2]))[0] for i in range(0, len(buff_byte), 2)] + else: + buff = buff_byte + else: + buff_string = self.rx_txt() + + if convert: + buff_string = buff_string.strip('{}\n\r').replace(" ", "").split(',') + buff = list(map(float, buff_string)) + else: + buff = buff_string + + return buff + + + def uart_set( + self, + speed: int = 9600, + bits: str = "CS8", + parity: str = "NONE", + stop: int = 1, + timeout: int = 0 + ) -> None: + """ + Configures the provided settings for UART. + + Args: + speed (int, optional): Baud rate/speed of UART connection (bits per second). Defaults to 9600. + bits (str, optional): Character size in bits (CS6, CS7, CS8). Defaults to "CS8". + parity (str, optional): Parity (NONE, EVEN, ODD, MARK, SPACE). Defaults to "NONE". + stop (int, optional): Number of stop bits (1 or 2). Defaults to 1. + timeout (int, optional): Timeout for reading from UART (in 1/10 of seconds) {0,...255}. Defaults to 0. + """ + + # Constants + speed_list = [1200,2400,4800,9600,19200,38400,57600,115200,230400,576000,921000,1000000,1152000,1500000,2000000,2500000,3000000,3500000,4000000] + database_list = ["CS6","CS7","CS8"] + parity_list = ["NONE","EVEN","ODD","MARK","SPACE"] + + + # Input Limits Check + try: + assert speed in speed_list + except AssertionError as speed_err: + raise ValueError(f"{speed} is not a defined speed for UART connection. Please check the speed table.") from speed_err + + try: + assert bits in database_list + except AssertionError as bits_err: + raise ValueError(f"{bits} is not a defined character size.") from bits_err + + try: + assert parity in parity_list + except AssertionError as parity_err: + raise ValueError(f"{parity} is not a defined parity.") from parity_err + + try: + assert stop in (1,2) + except AssertionError as stop_err: + raise ValueError("The number of stop bits can only be 1 or 2") from stop_err + + try: + assert 0 <= timeout <= 255 + except AssertionError as timeout_err: + raise ValueError(f"Timeout {timeout} is out of range [0, 255]") from timeout_err + + # Configuring UART + + self.tx_txt("UART:INIT") + self.tx_txt(f"UART:SPEED {speed}") + self.tx_txt(f"UART:BITS {bits.upper()}") + self.tx_txt(f"UART:STOPB STOP{stop}") + self.tx_txt(f"UART:PARITY {parity.upper()}") + self.tx_txt(f"UART:TIMEOUT {timeout}") + + self.tx_txt("UART:SETUP") + print("UART is configured") + + def uart_get_settings( + self + ) -> str: + """ + Retrieves the settings from Red Pitaya, prints them in console and returns + them as an array with the following sequence: + [speed, databits, stopbits, parity, timeout] + + """ + + # Configuring UART + + settings = [] + + settings.append(self.txrx_txt("UART:SPEED?")) + settings.append(self.txrx_txt("UART:BITS?")) + + stop = self.txrx_txt("UART:STOPB?") + if stop == "STOP1": + settings.append("1") + elif stop == "STOP2": + settings.append("2") + + settings.append(self.txrx_txt("UART:PARITY?")) + settings.append(self.txrx_txt("UART:TIMEOUT?")) + + print(f"Baudrate/Speed: {settings[0]}") + print(f"Databits: {settings[1]}") + print(f"Stopbits: {settings[2]}") + print(f"Parity: {settings[3]}") + print(f"Timeout (0.1 sec): {settings[4]}") + + return settings + + def uart_write_string( + self, + string: str, + word_length: bool = False + ) -> None: + """ + Sends a string of characters through UART. + + Args: + string (str, optional): String that will be sent. + word_length (bool, optional): Set to True if UART word lenght is set to 7 (ASCII) or + False if UART word length is set to 8 (UTF-8). Defaults to False. + """ + + if word_length: + # word length 7 / ASCII + code = "ascii" + else: + # word length 8 / UTF-8 + code = "utf-8" + + + # transforming and writing to UART + arr = ',#H'.join(format(x, 'X') for x in bytearray(string, f"{code}")) + self.tx_txt(f"UART:WRITE{len(string)} #H{arr}") + + print("String sent") + + def uart_read_string( + self, + length: int + ) -> str: + """ + Reads a string of data from UART and decodes it from ASCII to string. + + Args: + length (int): Length of data to read from UART. + + Returns: + str: Read data in string format. + """ + + # Check for errors + try: + assert length > 0 + except AssertionError as length_err: + raise ValueError("Length must be greater than 0.") from length_err + + self.tx_txt(f"UART:READ{length}") + res = self.rx_txt() + res = res.strip('{}\n\r').replace(" ", "").split(',') + string = "".join(chr(int(x)) for x in res) # int(x).decode("utf8") + + return string + + + def spi_set( + self, + spi_mode: str = None, + cs_mode: str = None, + speed: int = None, + word_len: int = None + ) -> None: + """ + Configures the provided settings for SPI. + + Args: + spi_mode (str, optional): Sets the mode for SPI; - LISL (Low Idle level, Sample Leading edge) + - LIST (Low Idle level, Sample Trailing edge) + - HISL (High Idle level, Sample Leading edge) + - HIST (High Idle level, Sample Trailing edge) + Defaults to LISL. + cs_mode (str, optional): Sets the mode for CS: - NORMAL (After message transmission, CS => HIGH) + - HIGH (After message transmission, CS => LOW) + Defaults to NORMAL. + speed (int, optional): Sets the speed of the SPI connection. Defaults to 5e7. + word_len (int, optional): Character size in bits (CS6, CS7, CS8). Defaults to "CS8". + """ + + # Constants + speed_max_limit = 100e6 + speed_min_limit = 1 + cs_mode_list = ["NORMAL","HIGH"] + order_list = ["MSB","LSB"] + spi_mode_list = ["LISL","LIST","HISL","HIST"] + bits_min_limit = 7 + + + # Input Limits Check + + try: + assert spi_mode.upper() in spi_mode_list + except AssertionError as spi_mode_err: + raise ValueError(f"{spi_mode} is not a defined SPI mode.") from spi_mode_err + + try: + assert cs_mode.upper() in cs_mode_list + except AssertionError as cs_err: + raise ValueError(f"{cs_mode} is not a defined CS mode.") from cs_err + + try: + assert speed_min_limit <= speed <= speed_max_limit + except AssertionError as speed_err: + raise ValueError(f"{speed} is out of range [{speed_min_limit},{speed_max_limit}].") from speed_err + + try: + assert word_len >= bits_min_limit + except AssertionError as bits_err: + raise ValueError(f"Word length must be greater than {bits_min_limit}. Current word length: {word_len}") from bits_err + + + # Configuring SPI + + self.tx_txt(f"SPI:SET:MODE {spi_mode.upper()}") + self.tx_txt(f"SPI:SET:CSMODE {cs_mode.upper()}") + self.tx_txt(f"SPI:SET:SPEED {speed}") + self.tx_txt(f"SPI:SET:WORD {word_len}") + + self.tx_txt("SPI:SET:SET") + print("SPI is configured") + + def spi_get_settings( + self + ) -> str: + """ + Retrieves the SPI settings from Red Pitaya, prints them in console and returns + them as an array with the following sequence: + [mode, csmode, speed, word_len, msg_size] + + """ + + # Configuring SPI + + self.tx_txt("SPI:SET:GET") + settings = [] + + settings.append(self.txrx_txt("SPI:SET:MODE?")) + settings.append(self.txrx_txt("SPI:SET:CSMODE?")) + settings.append(self.txrx_txt("SPI:SET:SPEED?")) + settings.append(self.txrx_txt("SPI:SET:WORD?")) + settings.append(self.txrx_txt("SPI:MSG:SIZE?")) + + print(f"SPI mode: {settings[0]}") + print(f"CS mode: {settings[1]}") + print(f"Speed: {settings[2]}") + print(f"Word length: {settings[3]}") + print(f"Message queue length: {settings[4]}") + + return settings + + # IEEE Mandated Commands def cls(self): @@ -87,7 +968,7 @@ def cls(self): def ese(self, value: int): """Standard Event Status Enable Command""" - return self.tx_txt('*ESE {}'.format(value)) + return self.tx_txt(f'*ESE {value}') def ese_q(self): """Standard Event Status Enable Query""" @@ -129,8 +1010,8 @@ def stb_q(self): def err_c(self): """Error count.""" - return rp.txrx_txt('SYST:ERR:COUN?') + return self.txrx_txt('SYST:ERR:COUN?') - def err_c(self): + def err_n(self): """Error next.""" - return rp.txrx_txt('SYST:ERR:NEXT?') + return self.txrx_txt('SYST:ERR:NEXT?')