In [1]:
"""
Control class for pulse generation using Swabian pulse streamer

Reference: 
    [1] examples in 'pulsestreamer' python package
    [2] 'pi3diamond' control software in Sen Yang group

Author: ChunTung Cheung 
Email: ctcheung1123@gmail.com
Created:  2023-01-11
Modified: 2023-01-11
"""



from pulsestreamer import findPulseStreamers
from pulsestreamer import PulseStreamer
#import enum types 
from pulsestreamer import TriggerStart, TriggerRearm
#import class Sequence and OutputState for advanced sequence building
from pulsestreamer import Sequence, OutputState

CHANNEL_MAP = {
    'ch0':0,'ch1':1,'ch2':2,'ch3':3,'ch4':4,'ch5':5,'ch6':6,'ch7':7,'ch8':8,
    '0':0,'1':1,'2':2,'3':3,'4':4,'5':5,'6':6,'7':7,'8':8,
    0:0,1:1,2:2,3:3,4:4,5:5,6:6,7:7,8:8,
}
import numpy as np
HIGH=1
LOW=0
INF = np.iinfo(np.int64).max

class PulseGenerator(PulseStreamer):

    def __init__(self, ip="", chmap=CHANNEL_MAP):
        if ip == "":
            devices = findPulseStreamers()
            # DHCP is activated in factory settings
            if devices !=[]:
                print("Detected Pulse Streamer 8/2: ")
                print(devices)
                print("------------------------------------------------------\n")
                #Connect to the first discovered Pulse Streamer
                ip = devices[0][0]
            else:
                # if discovery failed try to connect by the default hostname
                # IP address of the pulse streamer (default hostname is 'pulsestreamer')
                print("No Pulse Streamer found")
                ip = 'pulsestreamer'
        super().__init__(ip)
        self.chmap = chmap
        self.seq = Sequence()
        self.chmap.update(CHANNEL_MAP)

    def setTrigger(self, start=TriggerStart.IMMEDIATE, rearm=TriggerRearm.MANUAL):
        #Default: Start the sequence after the upload and disable the retrigger-function
        
        # classTriggerStart(enumeration)
        # This enumeration describes the selectable start modes of the Pulse Streamer

        # IMMEDIATE
        # Trigger immediately after a sequence is uploaded. (default)

        # SOFTWARE
        # Trigger by calling startNow() method.

        # HARDWARE_RISING
        # External trigger on the rising edge.

        # HARDWARE_FALLING
        # External trigger on the falling edge.

        # HARDWARE_RISING_AND_FALLING
        # External trigger on rising and falling edges.

        super().setTrigger(start=start, rearm=rearm)
        
    def stream(self, n_runs=1, state_i=OutputState.ZERO(), state_f=OutputState.ZERO()):
        #run the sequence 
        #n_runs = 'INFIITE' # repeat the sequence all the time

        # #reset the device - all outputs 0V
        # super().reset()

        #set constant state of the device
        super().constant(state_i) #all outputs 0V

        super().stream(self.seq, n_runs, state_f)

    def reset(self):
        #reset system and start next sequence
        input("\nPress ENTER to reset system and start delay-compensated sequence")
        super().reset()

    def setDigital(self, ch, pulse_patt):
        self.seq.setDigital(self.chmap[ch], pulse_patt)
        # super().setDigital(self.chmap[ch], pulse_patt)

    def setAnalog(self, ch, pulse_patt):
        self.seq.setAnalog(self.chmap[ch], pulse_patt)
        # super().setAnalog(self.chmap[ch], pulse_patt)
    
    def plotSeq(self):
        print(self.seq.getData())
        print("\nThe channel pulse pattern are shown in a Pop-Up window. To proceed with streaming the sequence, please close the sequence plot.")
        self.seq.plot()

    def seqTranslator(self, seq):
        # unfinished!!!!
        # translate "time-separated" sequence to "channel-seperate" sequence
        seq_temp = Sequence()
        for (channels, duration) in seq:
            for ch in channels:
                self.setDigital()
        return None


In [2]:
PS_ip = "169.254.8.2"
PSch_Laser = 0 # trigger the laser
PSch_DAQClock = 1 # as the clock for DAQ
PSch_DAQstart = 3 # trigger pulse to start the DAQ
chmap = {"laser":PSch_Laser, "clock":PSch_DAQClock, "daqtrig":PSch_DAQstart}
pg  = PulseGenerator(ip=PS_ip, chmap=chmap)

Connect to Pulse Streamer via JSON-RPC.
IP / Hostname: 169.254.8.2
Pulse Streamer 8/2 firmware: v1.7.2
Client software: v1.7.0


In [3]:
# period = 100E6# ns
# duty = 0.5
# sampling_rate = 1E-6 # GHz
n_sample = 50
# sampling_rate = 0.5E-3 # GHz
sampling_rate = 5E-7 # GHz
clock_period = 1.0/sampling_rate
trigger_rate = sampling_rate/100.0
trigger_period = 1.0/trigger_rate

off_begin = 0.0
on_time = 0.5*clock_period
off_end = 0.5*clock_period
seq_clock = [(off_begin, LOW), (on_time, HIGH), (off_end, LOW)]*n_sample
seq_laser = [(0.0, LOW), (clock_period/2.0, HIGH), (clock_period/2.0, LOW)]
seq_daqtrig = seq_clock
# pg.setDigital("laser", [(period*duty, HIGH), (period*(1-duty), LOW)])
pg.setDigital("clock", seq_clock)
pg.setDigital("daqtrig", seq_daqtrig)
pg.setDigital("laser", seq_laser)

In [4]:
# pg.setTrigger(start=TriggerStart.SOFTWARE)
# pg.setTrigger(start=TriggerStart.IMMEDIATE, rearm=TriggerRearm.AUTO)
pg.setTrigger()

In [5]:
pg.stream(n_runs=INF)

In [100]:
pg.startNow()

0

In [6]:
pg.reset()