In [1]:
import serial
import time
import pyvisa 
import numpy as np
from IPython.display import Image, display

# You will have to change this to whatever COM port the pico is assigned when
# you plug it in.
# On Windows you can open device manager and look at the 'Ports (COM & LPT)' dropdown
# the pico will show up as 'USB Serial Device'
PICO_PORT = 'COM3'

MHZ = 1000000

conn = None
conn = serial.Serial(PICO_PORT, baudrate = 152000, timeout = 0.1)


## Helpers

In [2]:
# conn.write(b'program\n')

In [3]:
class KeysightScope:
    """
    Helper class using pyvisa for a USB connected Keysight Oscilloscope 
    """
    def __init__(self, 
                 addr='USB?*::INSTR',
                 timeout=1, 
                 termination='\n'
                 ):
        rm = pyvisa.ResourceManager()
        devs = rm.list_resources(addr)
        assert len(devs), "pyvisa didn't find any connected devices matching " + addr
        self.dev = rm.open_resource(devs[0])
        self.dev.timeout = 15_000 * timeout
        self.dev.read_termination = termination
        self.idn = self.dev.query('*IDN?')
        self.read = self.dev.read
        self.write = self.dev.write
        self.query = self.dev.query

    def _get_screenshot(self, verbose=False):
        if verbose == True:
            print('Acquiring screen image...')
        return(self.dev.query_binary_values(':DISPlay:DATA? PNG, COLor', datatype='s'))

    def save_screenshot(self, filepath, verbose=False, inksaver=False):
        if verbose == True:
            print('Saving screen image...')
        result = self._get_screenshot()

        # Keysight scope defaults to inksaving for image save
        if inksaver == True:
            self.dev.write(':HARDcopy:INKSaver 1')
        else: 
            self.dev.write(':HARDcopy:INKSaver OFF')

        with open(f'{filepath}', 'wb+') as ofile:
            ofile.write(bytes(result))

    def set_time_delay(self, time):
        self.write(f':TIMebase:DELay {time}')

    def set_time_scale(self, time):
        self.write(f':TIMebase:SCALe {time}')

my_instrument = KeysightScope()

In [4]:
class Binary_Routines():
    def __init__(self):
        self.mirror_channels = False

    def catch_response(self):
        resp = conn.readline().decode().strip()
        return resp

    def assert_OK(self):
        resp = conn.readline().decode().strip()
        assert resp == 'ok', 'Expected "ok", received "%s"' % resp

    def debug_on(self):
        conn.write(b'debug on\n')
        self.assert_OK()

    def set_mode(self, sweep_mode: int, timing_mode: int, num_channels: int):
        conn.write(b'reset\n')
        self.assert_OK()
        conn.write(b'mode %d %d \n' % (sweep_mode, timing_mode))
        self.assert_OK()
        conn.write(b'setchannels %d\n' % num_channels)
        self.assert_OK()
        print('sending commands to %d channels' % num_channels)

    def allocate(self, start_address: int, instructions: np.ndarray):
        conn.write(b'setb %d %d\n' % (start_address, len(instructions)))
        response = conn.readline().decode()
        if not response.startswith('ready'):
            response += ''.join([r.decode() for r in conn.readlines()])
            raise Exception(f'setb command failed, response: {repr(response)}')
        print(f'Got response: {response}')

    def table_to_memory(self, instructions: np.ndarray):
        conn.write(instructions.tobytes())
        self.assert_OK(), 'table not written correctly'
        print('table written to memory')

    def end_table(self, instructions: np.ndarray):
        conn.write(b'set 4 %d\n' % len(instructions))
        self.assert_OK(), 'table not stopped correctly'
        print('table end command sent')

    def start_routine(self):
        conn.write(b'start\n')
        self.assert_OK(), 'table did not start'
        print('table executed')

Binary_Routine_Test = Binary_Routines()

In [5]:
# def assert_OK():
#     resp = conn.readline().decode().strip()
#     assert resp == 'ok', 'Expected "ok", received "%s"' % resp

# def allocate_sendb(instructions, channel):
#     ## tell setb how many instructions to expect 
#     conn.write(b'setb %d %d\n' % (channel, len(instructions)))
#     response = conn.readline().decode()
#     if not response.startswith('ready'):
#         response += ''.join([r.decode() for r in conn.readlines()])
#         raise Exception(f'setb command failed, response: {repr(response)}')
#     print(f'Got response: {response}')

# def stop_sendb(instructions):
#     conn.write(b'set 4 %d\n' % len(instructions))
#     assert_OK(), 'table not stopped correctly'
#     print('table finished')

# # def write_sendb(instructions, channel):

# def sendb(instructions: np.ndarray, channel: int = 0, multi_channel = False) -> str:
#     """Sends a table of instructions to the sweeper to be read in binary mode `setb`.

#     Parameters
#     ----------
#     instructions : np.ndarray 
#         the table of instructions to be processed.

#     channel : int  
#         instruction channel, defaults to 0.

#     multi_channel : bool 
#         bool option to send commands to more than one channel and handle channel mirroring, defaults to False.

#     """

#     try:

#         allocate_sendb(instructions, channel)

#         conn.write(instructions.tobytes())
#         assert_OK(), 'table not written correctly'
#         print('table written')
        
#         # conn.write(b'set 4 %d\n' % len(instructions))
#         # assert_OK(), 'table not stopped correctly'
#         # print('table finished')

#         # stop_sendb(instructions)

#     except Exception as e:
#         raise e


In [6]:
## slightly different logic than the ad9959.c funcs due to python vs c
def get_ftw(freq_out: float, freq_sys: float = 500 * MHZ):

    ftw = (freq_out / freq_sys) * 2**32
    
    return(int(ftw))

def get_pow(phase: float):
    
    pow = round(phase / 360 * (2**14 - 1))

    return pow

def get_asf(amp: float):
    
    asf = round(amp * 1024)

    return asf

## Single Stepping Mode

Single Stepping (mode 0) with timing: `<frequency:int 32> <amplitude:int 16> <phase:int 16> <time: int 32>`. Total of 12 bytes per channel per instruction.

In [7]:
# dt = np.dtype([('frequency', np.uint32), ('amplitude', np.uint16), ('phase', np.uint16), ('time', np.uint32)])
dt = np.dtype([
    ('frequency', '>u4'),  # Big-endian uint32
    ('amplitude', '>u2'),  # Big-endian uint16
    ('phase', '>u2'),      # Big-endian uint16
    ('time', '>u4')        # Big-endian uint32
])
f1 = get_ftw(freq_out = 100 * MHZ)
f2 = get_ftw(freq_out = 120 * MHZ)
f3 = get_ftw(freq_out = 110 * MHZ)

a1 = 1023
a2 = 500

p1 = get_pow(phase = 90)
p2 = get_pow(phase = 180)
p3 = get_pow(phase = 270)

t = 1

step_instructions = np.array([
                                # (f2, 1023, p1, t),
                                # (f2, 800, p1, t),
                                # (f2, 1023, p1, t),
                                # (f2, 200, p1, t),
                                # (f2, 800, p1, t),
                                # (f2, 1023, p1, t),
                                # (f2, 200, p1, t),
                                # (f2, 800, p1, t),
                                # (f2, 1023, p1, t),
                                # (f2, 800, p1, t)
                                (f2, a1, p1, t),
                                (f3, a2, p1, t),
                                (f1, a1, p1, t),
                                (f3, a1, p1, t),
                                 ]
                                 , dtype = dt)


In [8]:
step_bytes = step_instructions.tobytes()
import binascii
print(binascii.hexlify(step_bytes))

b'3d70a3d703ff1000000000013851eb8501f41000000000013333333303ff1000000000013851eb8503ff100000000001'


In [9]:
single_step_test = Binary_Routines()

In [10]:
# single_step_test.catch_response()
single_step_test.set_mode(sweep_mode=0, timing_mode=1, num_channels=1)
single_step_test.allocate(start_address=0, instructions=step_instructions)
single_step_test.table_to_memory(instructions=step_instructions)
single_step_test.end_table(instructions=step_instructions)

single_step_test.start_routine()

sending commands to 1 channels
Got response: ready for 48 bytes

table written to memory
table end command sent
table executed


In [11]:
# Binary_Routine_Test.catch_response()

In [12]:
conn.write(b'readtable\n')

10

In [13]:
# conn.write(b'program\n')

In [14]:
MAX_SIZE = 256
# for _ in range(step_instructions.shape[0] + 2): # + 2 to catch header and footer
for _ in range(MAX_SIZE):
    resp = Binary_Routine_Test.catch_response()
    print(resp)
    if "End of" in resp:
        break
    elif "Cannot" in resp:
        break


Instruction Table Dump:
Raw bytes for instruction 0: F2 04 D7 A3 70 3D 05 00 10 06 00 13 03 01 00
Instruction format: F2 04 D7 A3 | 70 3D | 05 00 | 10 06 00 13
Instruction 0: 4060403619 28733 1280 268828691
Raw bytes for instruction 1: F2 04 85 EB 51 38 05 00 10 06 00 10 01 01 00
Instruction format: F2 04 85 EB | 51 38 | 05 00 | 10 06 00 10
Instruction 1: 4060382699 20792 1280 268828688
Raw bytes for instruction 2: F2 04 33 33 33 33 05 00 10 06 00 13 03 01 00
Instruction format: F2 04 33 33 | 33 33 | 05 00 | 10 06 00 13
Instruction 2: 4060361523 13107 1280 268828691
Raw bytes for instruction 3: F2 04 85 EB 51 38 05 00 10 06 00 13 03 00 00
Instruction format: F2 04 85 EB | 51 38 | 05 00 | 10 06 00 13
Instruction 3: 4060382699 20792 1280 268828691
End of Instruction Table


In [None]:
x = get_ftw(freq_out=120 * MHZ, freq_sys=125 * MHZ)
len(str(x)) == len(str(4060403619)) # true
x

4123168604

In [None]:
Binary_Routine_Test.catch_response()

## Frequency Sweeps

Frequency Sweeps (mode 2) with timing: `<start frequency:int 32> <stop frequency:int 32> <delta:int 32> <rate:int 8> <time:int 32>`. Total of 17 bytes per channel per instruction.

In [None]:
dt = np.dtype([('start frequency', np.uint32), 
               ('stop frequency', np.uint32), 
               ('delta', np.uint32), 
               ('rate', np.uint8), 
               ('time', np.uint32)])

f1 = get_ftw(freq_out = 100 * MHZ)
f2 = get_ftw(freq_out = 120 * MHZ)
f3 = get_ftw(freq_out = 110 * MHZ)

d = 100
r = 1
t = 1

freq_sweep_table = np.array([
                            (f1, f3, d, r, t),
                            (f3, f2, d, r, t),
                            (f2, f1, d, r, t),
                            (f1, f1, d, r, t),
                            (f2, f2, d, r, t),
                            (f3, f3, d, r, t),
                            (f3, f1, d, r, t),
                            (f1, f2, d, r, t),
                            (f2, f3, d, r, t),
                            (f1, f1, d, r, t),
                                 ]
                                 , dtype = dt)


In [None]:
freq_sweep_test = Binary_Routines()
# print(freq_sweep_test.catch_response())
# freq_sweep_test.debug_on()
freq_sweep_test.set_mode(sweep_mode=2, timing_mode=1, num_channels=1)
freq_sweep_test.allocate(start_address=0, instructions=freq_sweep_table)
freq_sweep_test.table_to_memory(instructions=freq_sweep_table)
freq_sweep_test.end_table(instructions=freq_sweep_table)
freq_sweep_test.start_routine()

## Phase Sweeps

Phase Sweeps (mode 3) with timing: `<start phase:int 16> <stop phase:int 16> <delta:int 16> <rate:int 8> <time:int 32>`. Total of 11 bytes per channel per instruction.

In [None]:
dt = np.dtype([('start phase', np.uint16), 
               ('stop phase', np.uint16), 
               ('delta', np.uint16), 
               ('rate', np.uint8), 
               ('time', np.uint32)])

p0 = get_pow(phase = 0)
p1 = get_pow(phase = 90)
p2 = get_pow(phase = 180)
p3 = get_pow(phase = 270)
p4 = get_pow(phase = 360)

d = 1
r = 1
t = 2


channel0_instructions = np.array([
                                (0, 0, d, r, t),
                                (0, 0, d, r, t),
                                (0, 0, d, r, t),
                                (0, 0, d, r, t),
                                (0, 0, d, r, t),
                                (0, 0, d, r, t)
                                # (p0, p2, d, r, t),
                                # (p2, p1, d, r, t),
                                # (p1, p0, d, r, t),
                                # (p0, p1, d, r, t),
                                # (p1, p2, d, r, t),
                                # (p2, p0, d, r, t)
                                 ]
                                 , dtype = dt)

channel1_instructions = np.array([
                                # (0, 180, d, r, t),
                                # (180, 90, d, r, t),
                                # (90, 0, d, r, t),
                                # (0, 90, d, r, t),
                                # (90, 180, d, r, t),
                                # (180, p0, d, r, t)
                                (p0, p2, d, r, t),
                                (p2, p1, d, r, t),
                                (p1, p0, d, r, t),
                                (p0, p1, d, r, t),
                                (p1, p2, d, r, t),
                                (p2, p0, d, r, t)
                                 ]
                                 , dtype = dt)


phase_sweep_instructions = np.concatenate((channel0_instructions, channel1_instructions))

In [None]:
phase_sweep_test = Binary_Routines()
phase_sweep_test.debug_on()
phase_sweep_test.set_mode(sweep_mode=3, timing_mode=1, num_channels=2)

phase_sweep_test.allocate(start_address=0, instructions=channel0_instructions)
phase_sweep_test.table_to_memory(instructions=phase_sweep_instructions)

# phase_sweep_test.allocate(channel=1, instructions=channel1_instructions)
# phase_sweep_test.table_to_memory(instructions=channel1_instructions)

phase_sweep_test.end_table(instructions=channel0_instructions)
phase_sweep_test.start_routine()

In [None]:
# #### not always necessary, just if you want to move back to tests.ipynb
# conn.close()

In [None]:
# phase_sweep_test = Binary_Routines()
# phase_sweep_test.debug_on()
# phase_sweep_test.set_mode(sweep_mode=3, timing_mode=1, num_channels=0)

# phase_sweep_test.allocate(start_address=0, instructions=channel0_instructions)
# phase_sweep_test.table_to_memory(instructions=channel0_instructions)

# # phase_sweep_test.allocate(channel=1, instructions=channel1_instructions)
# # phase_sweep_test.table_to_memory(instructions=channel1_instructions)

# phase_sweep_test.end_table(instructions=channel0_instructions)
# phase_sweep_test.start_routine()

In [None]:
conn.write(b'readtable\n')
# phase_sweep_test.assert_OK()

In [None]:
Binary_Routine_Test.catch_response()


In [None]:
# conn.write(b'program\n')