Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create redpitaya_scpi.py #6

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
136 changes: 136 additions & 0 deletions redpitaya_scpi.py
@@ -0,0 +1,136 @@
"""SCPI access to Red Pitaya."""

import socket

__author__ = "Luka Golinar, Iztok Jeras"
__copyright__ = "Copyright 2015, Red Pitaya"

class scpi (object):
"""SCPI class used to access Red Pitaya over an IP network."""
delimiter = '\r\n'

def __init__(self, host, timeout=None, port=5000):
"""Initialize object and open IP connection.
Host IP should be a string in parentheses, like '192.168.1.100'.
"""
self.host = host
self.port = port
self.timeout = timeout

try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

if timeout is not None:
self._socket.settimeout(timeout)

self._socket.connect((host, port))

except socket.error as e:
print('SCPI >> connect({:s}:{:d}) failed: {:s}'.format(host, port, e))

def __del__(self):
if self._socket is not None:
self._socket.close()
self._socket = None

def close(self):
"""Close IP connection."""
self.__del__()

def rx_txt(self, chunksize = 4096):
"""Receive text string and return it after removing the delimiter."""
msg = ''
while 1:
chunk = self._socket.recv(chunksize + len(self.delimiter)).decode('utf-8') # Receive chunk size of 2^n preferably
msg += chunk
if (len(chunk) and chunk[-2:] == self.delimiter):
break
return msg[:-2]

def rx_arb(self):
numOfBytes = 0
""" Recieve binary data from scpi server"""
str=b''
while (len(str) != 1):
str = (self._socket.recv(1))
if not (str == b'#'):
return False
str=b''
while (len(str) != 1):
str = (self._socket.recv(1))
numOfNumBytes = int(str)
if not (numOfNumBytes > 0):
return False
str=b''
while (len(str) != numOfNumBytes):
str += (self._socket.recv(1))
numOfBytes = int(str)
str=b''
while (len(str) != numOfBytes):
str += (self._socket.recv(4096))
return str

def tx_txt(self, msg):
"""Send text string ending and append delimiter."""
return self._socket.send((msg + self.delimiter).encode('utf-8'))

def txrx_txt(self, msg):
"""Send/receive text string."""
self.tx_txt(msg)
return self.rx_txt()

# IEEE Mandated Commands

def cls(self):
"""Clear Status Command"""
return self.tx_txt('*CLS')

def ese(self, value: int):
"""Standard Event Status Enable Command"""
return self.tx_txt('*ESE {}'.format(value))

def ese_q(self):
"""Standard Event Status Enable Query"""
return self.txrx_txt('*ESE?')

def esr_q(self):
"""Standard Event Status Register Query"""
return self.txrx_txt('*ESR?')

def idn_q(self):
"""Identification Query"""
return self.txrx_txt('*IDN?')

def opc(self):
"""Operation Complete Command"""
return self.tx_txt('*OPC')

def opc_q(self):
"""Operation Complete Query"""
return self.txrx_txt('*OPC?')

def rst(self):
"""Reset Command"""
return self.tx_txt('*RST')

def sre(self):
"""Service Request Enable Command"""
return self.tx_txt('*SRE')

def sre_q(self):
"""Service Request Enable Query"""
return self.txrx_txt('*SRE?')

def stb_q(self):
"""Read Status Byte Query"""
return self.txrx_txt('*STB?')

# :SYSTem

def err_c(self):
"""Error count."""
return rp.txrx_txt('SYST:ERR:COUN?')

def err_c(self):
"""Error next."""
return rp.txrx_txt('SYST:ERR:NEXT?')