In [1]:
#!/usr/bin/env python
import socket
import time
from datetime import datetime
import threading
import pyvisa
import usbtmc

In [2]:
'''
Setup communication
'''
# pip install pyvisa, pyvisa-py'
#find ID's with "lsUSB"
#then run this as super user
#echo 'SUBSYSTEM=="usb", ACTION=="add", ATTRS{idVendor}=="0957", ATTRS{idProduct}=="1798" GROUP="tty", MODE="0660"' >> /etc/udev/rules.d/99-com.rules
# sudo usermod -a -G tty pi

rm = pyvisa.ResourceManager()
rm.list_resources('?*')



('ASRL/dev/ttyAMA0::INSTR', 'USB0::2391::6040::MY58103677::0::INSTR')

In [3]:
#inst = rm.open_resource('USB0::1027::60786::100031819913::0::RAW')
#USB0::2391::6040::MY58103677::INSTR #read this on the scope
#inst =  usbtmc.Instrument(2391,6040)
inst = rm.open_resource('USB0::2391::6040::MY58103677::0::INSTR')

In [4]:
#Did we connect to the device we think we connected to?
print(inst)

USBInstrument at USB0::2391::6040::MY58103677::0::INSTR


In [5]:
inst.write('*IDN?')
print(inst.read())

AGILENT TECHNOLOGIES,DSO-X 2014A,MY58103677,02.43.2018020635



In [None]:


def query(query):
    
    # send query
    inst.write(query)
    
    # collect return
    out =''
    while True:
        line = inst.read_bytes(1)
        if line == b'1`':
            if len(out)>0:
                break
            continue
        out += line.decode('utf-8')
        if b'\n' in line: # newline marks end of return
            break
            
    return out.strip('\n')

#add the commands we want to use - see manual
HMF2525_keys = [
    'FUNC',
    'OUTPut',
    'FREQuency',
    'PERiod',
    'VOLTage',
    'VOLTage:UNIT',
    'VOLTage:HIGH',
    'VOLTage:LOW',
    'VOLTage:OFFSet',
    'FUNCtion:PULSe:WIDTh:HIGH',
    'FUNCtion:PULSe:WIDTh:LOW',
    'FUNCtion:PULSe:DCYCle',
    'FUNCtion:PULSe:ETIMe',
    'TRIGger:SOURce',# {IMMediate | EXTernal}
    'BURSt:MODE',# TRIGgered | GATed
    'BURSt:NCYCles',
    'BURSt:INTernal:PERiod',
    'BURSt:PHASe', #0 to 360
    'BURSt:STATe',
    ]


def set_single_param(key, val):
    '''
    set single parameter on the signal generator
    '''
    if key in HMF2525_keys:
        time.sleep(0.1)
        inst.write(key+' '+str(val))
        print(f'   set HMF2525  {key}: {val}')
    else:
        print(f"ERROR: don't know wha to do with {key}: {val}, skipping")
    # i.e. parameters['chopper:phase'] = 10 --> chopper.phase = 10
    
def set_params(parameters):
    '''
    set multiple parameters on the signal generator
    '''
    for key in parameters.keys():
        set_single_param(key, parameters[key])
        

def get_params():
    '''
    get all params from signal generator
    returns a dictionary of key:value for each key in HMF2525_keys
    '''
    parameters = {}
    for key in HMF2525_keys:
        print(f'    HMF2525 query {key}?')
        parameters[key] = query(key+'?')
        print(parameters[key])
    return parameters


In [None]:
'''
test communication with signal generator
should return FUNC = PULS
'''
print('initial query',query('*ESR?'))
inst.write('RAMP')
#FUNCtion {SINusoid | SQUare | RAMP | PULSe | ARBitrary}
query('FUNC?')

In [None]:
inst.write(f'VOLT {2.0}')
#10 is good, 14 is bad

In [None]:
def data_to_dict(inp):
    '''
    converts a string to a dictionary, splitting the data at ':' for each key/val pair
    the string is assumed to be generated by casting a dictionary to a string, i.e. str(dict)
    '''
    spl = inp.split("', '")
    spl[0] = spl[0][2:]
    spl[-1] = spl[-1][:-2]
    parameters = {}
    for i, sp in enumerate(spl):
        key_val = sp.split("': '")
        parameters[key_val[0]] = key_val[1]
    return parameters



class client(threading.Thread):
    def __init__(self, socket, address):
        threading.Thread.__init__(self)
        self.sock = socket
        self.addr = address
        self.start()

    def run(self):
        data = self.sock.recv(BUFFER_SIZE).decode() # blocking until it recieves data
        now = datetime.now()
        current_time = now.strftime("%H:%M:%S")
        print("Current Time =", current_time)
        if data == '?':
            parameters = get_params()
            self.sock.send(str(parameters).encode())
            print('    sent parameters')
        elif '=' in data:
            key = data.split('=')[0].strip()
            val = data.split('=')[1].strip()
            set_single_param(key, val)
            self.reply_single(key)
        elif data[0] == '?' or data[-1]=='?':
            self.reply_single(data.strip('?'))
        else:
            parameters = data_to_dict(data)
            parse_parameters(parameters)
            #process(parameters)
            #conn.close()
            
    def reply_single(self, key):
        '''
        ask signal generator about key, send answer to the socket
        '''
        val = query(key+'?')
        self.sock.send(str(key+' = '+ str(val)).encode())

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "127.0.0.1"
port = 5006
BUFFER_SIZE = 1024
serversocket.bind((host, port))
serversocket.listen(5)
print ('server started and listening')
while 1:
    clientsocket, address = serversocket.accept()
    client(clientsocket, address)