In [1]:
from import_all import *
from utilities import *

In [2]:
import sys
import threading, queue
import sys

In [3]:
#----Connect to instrument----#
#load the station from yaml configuration file
station = Station(config_file="electrochemistry.station.yaml")
source_meter_sweeping = station.load_keithley1()
source_meter_fixed = station.load_keithley2()

Connected to: Keithley Instruments Inc. 2636B (serial:4629010, firmware:3.4.0) in 0.54s
Connected to: Keithley Instruments Inc. 2614B (serial:4070377, firmware:3.0.4) in 0.47s


In [4]:
#---Save paths----#
csv_save_path = "../csv/250916/"
db_save_path = "../db/250916/"
device_name = "250916 S121 Gr-hbn 5um PEO" #master database file]
xpmnt_name = "reopen"#xpmnt name within database. Saved as seperate csv

In [5]:
####################################
#----Configure arbitrary sweeper----#
####################################

#----Set gate and drain source parameters
#note: if more than one "independent" variable, error when plotting using plottr-inspectr, although csv will be fine.
#can add arbitrary number of sweepers, as long as channel exists.

ch1 = { "channel": source_meter_sweeping.smua,
        "name": "tg",
        "independent":  False,
        "manual": False,
        "manual_vRange": [],
        "threshold_check": False,
        "threshold":100e-12
               
}

ch2 = { "channel": source_meter_sweeping.smub,
        "name": "bg",
        "independent":  False,
        "manual": False,
        "manual_vRange": [],
        "threshold_check": False,
        "threshold":2200e-12              
}

ch3 = { "channel": source_meter_fixed.smub,
        "name": "ds",
        "independent": False,
        "manual": False,
        "manual_vRange": [],
        "threshold_check": False,
        "threshold":0
               
}

tempDict = { "channel": source_meter_fixed.smua,
        "name": "temperature",
        "independent": False,
        "manual": False,
        "manual_vRange": [],
        "threshold_check": False,
        "threshold":0
               
}


#----parameters----#
sweepers_save_order = [ch1, ch2, ch3,tempDict]
sweepers = [ch1,ch2,ch3,tempDict]
write_period = 1
rampPoints = 100
rampDt = 0.005

temp_measure = True
temp_voltage = 0.005





In [6]:
#----Connect to database----#
initialise_or_create_database_at(f'{db_save_path}{device_name}_arbSweeper.db')
#Set up experiment object
test_exp = load_or_create_experiment(
    experiment_name=xpmnt_name,
    sample_name=device_name,
)

meas_forward, time, independent_params = setup_database_registers_arb(station, test_exp, sweepers_save_order,time_independent = True)
meas_forward.write_period = write_period

In [7]:
def sinewave(nPoints, amplitude, cycles):
    length = np.pi * 2 * cycles
    return amplitude*np.sin(np.arange(0, length, length / nPoints))

In [8]:
def set_voltage(name, voltage):
    for sweeper in sweepers:
        if sweeper["name"] == name:
            chan = sweeper["channel"]
    ramp_voltage(chan, voltage)

def set_2voltage(name1, name2, voltage1, voltage2):
    for sweeper in sweepers:
        if sweeper["name"] == name1:
            chan1 = sweeper["channel"]
            chan1v = chan1.volt()
        if sweeper["name"] == name2:
            chan2 = sweeper["channel"]  
            chan2v = chan2.volt()           
    
    ramp_two_voltage(chan1,chan2, voltage1, voltage2, reset = False, x_initial = chan1v, y_initial = chan2v, rampdV = 1e-3)


In [9]:
def handle_input(data):
    data = data.split(' ')
    if len(data) < 3:
            print('improper input')
            return
    if data[0] == 'set':
        try:
            set_voltage(data[1], float(data[2]))
        except:
            print('failed setting voltage')
            print(data)

    if data[0] == 'set2':
        name1 = data[1]
        name2 = data[2]
        voltage1 = data[3]
        voltage2 = data[4]
        try:
            set_2voltage(name1, name2, float(voltage1), float(voltage2))
        except:
            print('failed setting two voltage')
            print(data)
             
            
         

In [10]:
def threshold_check(n1, n2):
    # n1 is threshold, n2 is value
    if n1 < 0:
        if n1 > n2:
            return True
        
    else:
        if n1 < n2:
            return True
        
    return False

In [11]:
if temp_measure:
    tempDict["voltage"] = temp_voltage
    tempDict["channel"].volt(temp_voltage)

with meas_forward.run() as forward_saver:   
    time.reset_clock()
    stop_event = threading.Event()

    q = queue.Queue()

    def user_input():
        while not stop_event.is_set():  # Runs until stop_event is set
            uin = input("Enter input: ")
            q.put(uin)  # Send the input to the queue
            sleep(0.001)

    thread1 = threading.Thread(target=user_input, daemon=True)
    thread1.start()
    threshold_met = False
    uin = None

    while not stop_event.is_set():
        
        t = time()
        get_readings = []
        for sweeper in sweepers_save_order:
            y = sweeper["channel"].curr()
            get_readings.append((sweeper["channel"].curr,y))
            
            if sweeper["threshold_check"]:
                threshold_met = threshold_check(sweeper["threshold"],y)

            if not sweeper["independent"]:
                get_readings.append((sweeper["channel"].volt, sweeper["channel"].volt()))

            if "temperature" in sweeper["name"] and temp_measure:
                        v = sweeper["voltage"]
                        try:
                            temperature = rToT(v/y)
                        except:
                            temperature = 0
                            print("TEMP MEASUREMENT FAILED")
                        get_readings.append((sweeper["channel"].temperature, temperature))

        independent_params = []
        for xyz, sweeper in zip([sweepers[0]["channel"].volt(),sweepers[1]["channel"].volt(),sweepers[2]["channel"].volt()],sweepers):
            if sweeper["independent"]:
                independent_params.append((sweeper["channel"].volt, xyz))

        forward_saver.add_result(
            *independent_params,
            *get_readings,
            (time, t)
            )

        data_forward = forward_saver.dataset
        
        if threshold_met:
            ramp_two_voltage(sweepers[0]["channel"], sweepers[1]["channel"], 0, 0, reset=False, x_initial = sweepers[0]["channel"].volt(), y_initial = sweepers[1]["channel"].volt())
            print('quitting...')
            data_forward.to_pandas_dataframe().to_csv(f"{csv_save_path}{device_name}_{xpmnt_name}_forward_{data_forward.run_id}.csv")
            stop_event.set()
        
        try:
            uin = q.get(timeout = 0.2)
        except queue.Empty:
            continue  # No input available yet, loop back

        if uin == 'quit':
            print('quitting...')
            data_forward.to_pandas_dataframe().to_csv(f"{csv_save_path}{device_name}_{xpmnt_name}_forward_{data_forward.run_id}.csv")
            stop_event.set()
            break
        elif uin:
            handle_input(uin)
        uin = uin.lower()

Starting experimental run with id: 7. 
ramping <Keithley2600Channel: keithley1_smua of Keithley2600: keithley1> and <Keithley2600Channel: keithley1_smub of Keithley2600: keithley1> to 3.5 and 3.7
ramping <Keithley2600Channel: keithley1_smua of Keithley2600: keithley1> and <Keithley2600Channel: keithley1_smub of Keithley2600: keithley1> to 4.5 and 4.6
ramping <Keithley2600Channel: keithley1_smua of Keithley2600: keithley1> and <Keithley2600Channel: keithley1_smub of Keithley2600: keithley1> to 6.0 and 6.1
ramping <Keithley2600Channel: keithley1_smua of Keithley2600: keithley1> and <Keithley2600Channel: keithley1_smub of Keithley2600: keithley1> to 7.0 and 7.1
ramping <Keithley2600Channel: keithley1_smua of Keithley2600: keithley1> and <Keithley2600Channel: keithley1_smub of Keithley2600: keithley1> to 7.5 and 7.6
ramping <Keithley2600Channel: keithley1_smua of Keithley2600: keithley1> and <Keithley2600Channel: keithley1_smub of Keithley2600: keithley1> to 0.0 and 0.0
quitting...
