In [1]:
import numpy as np
from xdac import XDAC
import csv
from datetime import datetime
import matplotlib
matplotlib.use("TkAgg")  # Interactive backend for PyCharm/scripts
import matplotlib.pyplot as plt
from time import time, sleep
#UNLOCK XDAC BEFORE USE!
xdac = XDAC()
xdac.unlock()

'OK:120:20:U:1'

In [2]:
def calculate_virp(channel_number):
    v = float(xdac.read_single_channel_voltage(channel_number))
    i = float(xdac.read_single_channel_current(channel_number))
    r = 1000 * v / i if i != 0 else float('inf')
    p = 0.001 * v * i
    return v, i, r, p

In [3]:
def initialise_csv(channel_numbers, filename=None):

    '''
    Create CSV file with appropriate headers to log multi-channel data.
    Args:
        channel_numbers (list)
        filename (str, optional): to reference CSV file
    '''

    # create timestamped CSV file
    filename = f"A13_A14_feedbackcontrol_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

    # create headers
    headers = ["Time (s)"] # first column is always time

    # for each channel, add 4 measurement columns
    for ch in channel_numbers:
        headers.extend([
            f"Ch{ch}_Voltage (V)", 
            f"Ch{ch}_Current (mA)", 
            f"Ch{ch}_Resistance (Ω)", 
            f"Ch{ch}_Power (W)"
        ])
        
    # create new CSV file with mode = 'w', and write headers
    with open(filename, mode='w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        
    print(f"CSV file initialised: {filename}")
    return filename

In [4]:
def save_data_to_csv(data, filename, channel_numbers):
    '''
    Appends latest data point from each channel and 
    writes it as single row to CSV file.
    Args:
        data (dict): Nested dictionary containing data:
        {
            "times": [...],
            "ch_1": {"voltages": [...], "currents": [...], ...},
            "ch_2": {"voltages": [...], "currents": [...], ...},
            ...
        }
        filename (str): path to CSV file to append data to
        channel_numbers (list)

    '''
    with open(filename, mode= "a", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)

        # if we have time data (measurements exist), build row with timestamp
        if data["times"]:
            latest_time = data["times"][-1]
            row = [latest_time]

            # get most recent measurements for each channel            
            for ch in channel_numbers:

                # get channel's data dictionary
                ch_data = data[f"ch_{ch}"]

                # append only most recent measurements
                row.extend([
                    ch_data["voltages"][-1],
                    ch_data["currents"][-1], 
                    ch_data["resistances"][-1],
                    ch_data["powers"][-1]
                    ])
                
            # write complete row to CSV file
            writer.writerow(row)

In [5]:
def initialise_plots(channel_numbers):
    '''
    create individual plot for single channel or overlay plots for multiple channels
    Args:
    channel_numbers (list)
    Returns:
        tuple: (fig, axs, data, lines) where:
            fig: matplotlib figure object
            axs: array of subplot axes, always 4 subplots in 2x2 grid
            data: nested dictionary
            lines: matplotlib line objects
    '''
    plt.ion()  # Enable interactive mode for real-time updates
    
    n_channels = len(channel_numbers)
    
    if n_channels == 1:
        # Single channel: 4 subplots
        fig, axs = plt.subplots(2, 2, figsize=(10, 8))
        axs = axs.ravel()  # flatten 2D array to 1D for easy indexing
    else:
        # Multiple channels: overlay all channels on same 4 subplots
        fig, axs = plt.subplots(2, 2, figsize=(12, 8))
        axs = axs.ravel()  # flatten 2D array to 1D for easy indexing
    
    # initiliase data structures to store measurements
    data = {"times": []}
    lines = {}
    
    colors = ['r-', 'g-', 'b-', 'm-', 'c-', 'y-', 'k-', 'orange', 'purple', 'brown']
    
    for i, ch in enumerate(channel_numbers):
        # Initialize empty data lists for this channel
        data[f"ch_{ch}"] = {
            "voltages": [],
            "currents": [],
            "resistances": [],
            "powers": []
        }
        
        color = colors[i % len(colors)]
        label = f"Ch{ch}"
        
        # create empty plot lines for each measurement
        # all channels use same 4 subplots
        lines[f"ch_{ch}_voltage"], = axs[0].plot([], [], color, label=label)
        lines[f"ch_{ch}_current"], = axs[1].plot([], [], color, label=label)
        lines[f"ch_{ch}_resistance"], = axs[2].plot([], [], color, label=label)
        lines[f"ch_{ch}_power"], = axs[3].plot([], [], color, label=label)
    
    # Configure subplot appearance and labels
    axs[0].set_title("Voltage", fontsize=10)
    axs[0].set_xlabel("Time (s)", fontsize=10)
    axs[0].set_ylabel("Voltage (V)", fontsize=10)
    axs[0].grid(True, alpha=0.3)
    axs[0].tick_params(axis='both', which='major', labelsize=9)
    
    axs[1].set_title("Current", fontsize=10)
    axs[1].set_xlabel("Time (s)", fontsize=10)
    axs[1].set_ylabel("Current (mA)", fontsize=10)
    axs[1].grid(True, alpha=0.3)
    axs[1].tick_params(axis='both', which='major', labelsize=9)
    
    axs[2].set_title("Resistance", fontsize=10)
    axs[2].set_xlabel("Time (s)", fontsize=10)
    axs[2].set_ylabel("Resistance (Ω)", fontsize=10)
    axs[2].grid(True, alpha=0.3)
    axs[2].tick_params(axis='both', which='major', labelsize=9)
    
    axs[3].set_title("Power", fontsize=10)
    axs[3].set_xlabel("Time (s)", fontsize=10)
    axs[3].set_ylabel("Power (W)", fontsize=10)
    axs[3].grid(True, alpha=0.3)
    axs[3].tick_params(axis='both', which='major', labelsize=9)
    
    # Add legends for multiple channels
    if n_channels > 1:
        for ax in axs:
            ax.legend(fontsize=9)

    # subplot spacing and display
    plt.tight_layout()
    fig.canvas.draw()
    fig.show()

    return fig, axs, data, lines

In [6]:
def update_plots(axs, data, lines, channel_numbers, measurements, last_plot_time, plot_interval=2.0):
    '''
    Updates real-time plots with new measurement data from all channels.
    Only refreshes plots every plot_interval=2.0s to prevent excessive CPU usage.
    Returns:
        last_plot_time (float): updated time of when plots were refreshed
    '''

    # get current time for this measurement cycle
    current_time = time()
    data["times"].append(current_time)   # add time to shared time array

    # store new measurements in data structure for each channel
    for i, ch in enumerate(channel_numbers):
        # extract measurements for channel
        v, i_curr, r, p = measurements[i]

        # get reference to channel's data dictionary
        ch_data = data[f"ch_{ch}"]

        # append new measurements to channel's data arrays
        ch_data["voltages"].append(v)
        ch_data["currents"].append(i_curr)
        ch_data["resistances"].append(r)
        ch_data["powers"].append(p)

   # Always update on first data point (len == 1) or after plot_interval=2.0s
    if current_time - last_plot_time >= plot_interval or len(data["times"]) == 1:

        # 
        for i, ch in enumerate(channel_numbers):    
            ch_data = data[f"ch_{ch}"]   # get channel's data

            # update plot lines with x=times, y=measurements            
            lines[f"ch_{ch}_voltage"].set_data(data["times"], ch_data["voltages"])
            lines[f"ch_{ch}_current"].set_data(data["times"], ch_data["currents"])
            lines[f"ch_{ch}_resistance"].set_data(data["times"], ch_data["resistances"])
            lines[f"ch_{ch}_power"].set_data(data["times"], ch_data["powers"])
            
        # Update axis limits for 4 subplots to accommodate new data
        for ax in axs:
            ax.relim()               # recalculate data limits
            ax.autoscale_view()      # adjust axis ranges to fit all data

        # brief pause for GUI to update and remain responsive
        plt.pause(0.01)

        # update time tracking when plot was last refreshed
        last_plot_time = current_time

    return last_plot_time

In [7]:
def feedback_loop(channel_numbers, target_resistances, tolerance_resistance,
                  fail_resistance, update_time=2.0, voltage_step=0.1,
                  filename=None, hold_duration=None,
                  fig=None, axs=None, data=None, lines=None):
    
    """
    Feedback loop to adjust voltage to maintain target resistance for one or multiple channels.
    Args:
        channel_numbers (list)
        target_resistances (list)
        tolerance_resistance (float): same for ALL channels
        fail_resistance (float): same for ALL channels
        base_update_time (float): same for ALL channels
        voltage_step (float): same for ALL channels
        filename (str): set to None for first feedback loop
        hold_duration (float): set to None for the case where feedback loop is directly run
        fig, axs, data, lines: needed for continuation of real-time plots
    
    Returns:
        last_voltages (dict) comprising voltages for each channel
        last_resistances (dict) comprising resistances for each channel
        CSV filename: used for subsequent loops
        fig, axs, data, lines: needed for continuation of real-time plots
    """
    
    if len(channel_numbers) != len(target_resistances):
        raise ValueError("channel_numbers and target_resistances must have the same length")
    
    # Initialise csv filename for saving if feedback_loop is called for the first time
    if filename is None:
        filename = initialise_csv(channel_numbers)
    else:
        print(f"Appending data to existing file: {filename}")

    # Initialise plots if feedback_loop is called for the first time
    if fig is None or axs is None or data is None or lines is None:
        plt.ion() 
        fig, axs, data, lines = initialise_plots(channel_numbers)
    
    # ensure first plot happens immediately
    last_plot_time = 0  

    last_voltages = {}
    last_resistances = {}

    # initialise voltages and resistances for each channel 
    for ch in channel_numbers:
        last_voltages[ch] = float(xdac.read_single_channel_voltage(ch))
        last_resistances[ch] = 0

    # timer for holding target resistances
    hold_start_time = None

    # Start feedback loop
    try:
        while True:

            measurements = []
            for ch in channel_numbers:
                v, i, r, p = calculate_virp(ch)
                measurements.append((v, i, r, p))
                last_voltages[ch] = v
                last_resistances[ch] = r

            last_plot_time = update_plots(axs, data, lines, channel_numbers, measurements, last_plot_time)
            save_data_to_csv(data, filename, channel_numbers)

            any_channel_failed = False

            # triggered when all channels reach target resistance
            all_targets_reached = True

            for i, ch in enumerate(channel_numbers):
                v, _, r, _ = measurements[i]
                target_r = target_resistances[i]

                delta = 0

                if r > target_r + tolerance_resistance:
                    delta = -voltage_step
                elif r < target_r - tolerance_resistance:
                    delta = voltage_step

                if delta != 0:

                    new_v = round(v + delta, 3)

                    # ensure voltage never drops below 1.1 V
                    if new_v < 1.1:
                        new_v = 1.1
                    
                    xdac.set_channel_voltage(ch, new_v)

                # stop feedback loop if any channel has too high resistance
                if r > fail_resistance:
                    print(f"Channel {ch} resistance too high. Voltage set to 0 V.")
                    xdac.set_channel_voltage(ch, 0)
                    any_channel_failed = True
                    
                # if any channel is not within target resistance, group condition fails
                if abs(r - target_r) > tolerance_resistance:
                    all_targets_reached = False

            if any_channel_failed:
                break

            # Start timing after all channels reach target resistance and if hold_duration is provided
            if all_targets_reached and hold_duration is not None:
                if hold_start_time is None:
                    hold_start_time = time()
                    print("Target resistance reached, starting hold timer...")
                else:
                    # Only check elapsed time if hold_start_time is set
                    elapsed = time() - hold_start_time
                    if elapsed >= hold_duration:
                        print(f"Max duration of {hold_duration} s reached. Exiting feedback loop.")
                        break
            
            #dynamic update time based on average distances from target resistances
            #total_distance_ratio = sum(
                #abs(measurements[i][2] - target_resistances[i]) / tolerance_resistance
                #for i in range(len(channel_numbers))
            #)
            #avg_distance_ratio = total_distance_ratio / len(channel_numbers)
            #scale_factor = max(avg_distance_ratio, 1)
            #sleep_time = max(base_update_time / scale_factor, fastest_update_time)
            
            sleep(update_time)

    except KeyboardInterrupt:
        print("\nFeedback loop stopped by user.")
        for ch in channel_numbers:
            xdac.set_channel_voltage(ch, last_voltages[ch])

    return last_voltages, last_resistances, filename, fig, axs, data, lines

In [8]:
def series_feedback(channel_numbers, feedback_schedule, tolerance_resistance,
                    fail_resistance, filename=None):
    """
    Run a series of feedback loops for multiple target resistances with specified hold durations.
    
    Args:
        channel_numbers (list): list of channel numbers
        feedback_schedule (list of tuples): 
            [([target1_ch1, target1_ch2, ...], hold_duration, update_time, voltage_step), ...]
            The final target should have hold_duration=None to indicate it runs indefinitely.
        tolerance_resistance (float)
        fail_resistance (float)
        filename (str or None)
    
    Returns:
        last_voltages (dict), last_resistances (dict), filename
    """

    last_voltages = {}
    last_resistances = {}

    # initialise plotting objects to keep them across steps
    fig = axs = data = lines = None
    
    for i, (target_resistances, hold_duration, update_time, voltage_step) in enumerate(feedback_schedule):

        # error handling
        if len(target_resistances) != len(channel_numbers):
            raise ValueError(
                f"Step {i+1}: number of target resistances ({len(target_resistances)}) "
                f"does not match number of channels ({len(channel_numbers)})."
            )
        
        print(f"\n--- Step {i+1}: Target resistance = {target_resistances} Ω ---")
        print(f"Update time = {update_time}s, Voltage step = {voltage_step}V")

        # to check if a duration is provided
        if hold_duration is not None:
            print(f"Holding target for {hold_duration} seconds after reaching it...")
        
        else:
            print("Final target: running feedback loop until keyboard interrupt.")

        # Run the feedback loop for this step
        last_voltages, last_resistances, filename, fig, axs, data, lines = feedback_loop(
            channel_numbers=channel_numbers,
            target_resistances=target_resistances,
            tolerance_resistance=tolerance_resistance,
            fail_resistance=fail_resistance,
            update_time=update_time,
            voltage_step=voltage_step,
            filename=filename,
            hold_duration=hold_duration,
            fig=fig,
            axs=axs,
            data=data,
            lines=lines
        )
        
        print(f"Step {i+1} complete. Voltages: {last_voltages}, Resistances: {last_resistances}")

    # at the very end, show plot
    plt.ioff()
    plt.show()

    return last_voltages, last_resistances, filename

In [9]:
def constant_voltage(channel_numbers, set_voltages, update_time, voltage_step, filename=None):
    
    '''
    Apply constant voltage(s) to single or multiple channels.
    Args:
        channel_numbers (list)
        set_voltages (list)
        update_time (float)
        voltage_step (float): voltage ramp step size
        filename: set to None
    '''
    if len(channel_numbers) != len(set_voltages):
        raise ValueError("channel_numbers and set_voltages must have the same length")
    
    # Initialise CSV filename for saving
    if filename is None:
        filename = initialise_csv(channel_numbers)

    # Initialise plots
    fig, axs, data, lines = initialise_plots(channel_numbers)
    last_plot_time = 0  # ensure first plot happens immediately

    # Initial voltage ramp for all channels
    current_voltages = [0,0] * len(channel_numbers)

    while any(v < target for v, target in zip(current_voltages, set_voltages)):

        measurements = []

        for i, ch in enumerate(channel_numbers):

            # ramp up voltage
            if current_voltages[i] < set_voltages[i]:
                current_voltages[i] = min(current_voltages[i] + voltage_step, set_voltages[i])
            
            xdac.set_channel_voltage(ch, round(current_voltages[i], 3))

        sleep(update_time)  # small wait for readings and prepare for next ramp up

        # Read measurements from all channels
        for ch in channel_numbers:
            v, i, r, p = calculate_virp(ch)
            measurements.append((v, i, r, p))

        last_plot_time = update_plots(axs, data, lines, channel_numbers, measurements, last_plot_time)
        save_data_to_csv(data, filename, channel_numbers)
    
    # Once ramp is completed, keeps voltage at set voltage and continue running
    try:
        while True:
            measurements = []
            
            for i, ch in enumerate(channel_numbers):
                xdac.set_channel_voltage(ch, round(set_voltages[i], 3))
            
            sleep(update_time)   # small wait for readings, continue to set same voltage afterwards
            
            # Read measurements from all channels
            for ch in channel_numbers:
                v, i, r, p = calculate_virp(ch)
                measurements.append((v, i, r, p))
            
            last_plot_time = update_plots(axs, data, lines, channel_numbers, measurements, last_plot_time)
            save_data_to_csv(data, filename, channel_numbers)
            
    except KeyboardInterrupt:
        print("\n Constant voltage loop stopped by user.")
        
    finally:
        plt.ioff()
        plt.show()

    return filename

In [10]:
xdac.set_channel_current(1, 300)
xdac.set_channel_voltage(1, 1)

0

In [19]:
xdac.read_single_channel_voltage(1)

'0.93828'

In [20]:
xdac.read_single_channel_current(1)

'23.87500'

In [13]:
xdac.set_channel_current(2, 300)
xdac.set_channel_voltage(2, 1)

0

In [16]:
xdac.read_single_channel_voltage(2)

'0.92715'

In [18]:
xdac.read_single_channel_current(2)

'23.29875'

In [None]:
channel_numbers = [1, 2]

#feedback_schedule (list of tuples): 
    #[([target1_ch1, target1_ch2, ...], hold_duration, update_time, voltage_step), ...]
    #The final target should have hold_duration=None to indicate it runs indefinitely.

feedback_schedule = [
    ([60, 60], 30, 0.5, 0.25), 
    ([58.5, 62.5], 30, 0.5, 0.25),
    ([55, 65], 30, 0.5, 0.25),
    ([52.5, 67.5], 30, 0.5, 0.25),       
    ([50, 70], 30, 0.5, 0.25),       
    ([49, 71], 30, 0.25, 0.125),
    ([48, 72], 30, 0.25, 0.125),
    ([47, 73], 30, 0.25, 0.125),
    ([46, 74], 30, 0.25, 0.125),
    ([45, 75], None, 0.25, 0.125)      
]

last_voltages, last_resistances, filename = series_feedback(
    channel_numbers=channel_numbers,
    feedback_schedule=feedback_schedule,
    tolerance_resistance=0.25,
    fail_resistance=100,
    filename=None,
)


--- Step 1: Target resistance = [60, 60] Ω ---
Update time = 0.5s, Voltage step = 0.25V
Holding target for 30 seconds after reaching it...
CSV file initialised: A13_A14_feedbackcontrol_20250911_114948.csv
Target resistance reached, starting hold timer...
Max duration of 30 s reached. Exiting feedback loop.
Step 1 complete. Voltages: {1: 14.52324, 2: 14.46406}, Resistances: {1: 60.07638174325351, 2: 60.09633072788179}

--- Step 2: Target resistance = [58.5, 62.5] Ω ---
Update time = 0.5s, Voltage step = 0.25V
Holding target for 30 seconds after reaching it...
Appending data to existing file: A13_A14_feedbackcontrol_20250911_114948.csv
Target resistance reached, starting hold timer...
Max duration of 30 s reached. Exiting feedback loop.
Step 2 complete. Voltages: {1: 13.58477, 2: 15.55781}, Resistances: {1: 58.57808297444577, 2: 62.5068954087526}

--- Step 3: Target resistance = [55, 65] Ω ---
Update time = 0.5s, Voltage step = 0.25V
Holding target for 30 seconds after reaching it...
Ap

In [None]:
# 1st time feedback loop
last_voltages, last_resistances, csv_filename, fig, axs, data, lines = feedback_loop(
    channel_numbers=[1, 2], 
    target_resistances=[45, 55],
    tolerance_resistance=0.5, 
    fail_resistance=100, 
    base_update_time=0.2,
    fastest_update_time=0.2, 
    voltage_step=0.05,
    filename=None                  # set to None to create filename for 1st time
    )

In [25]:
constant_voltage(channel_numbers=[1, 2], 
                 set_voltages=[2, 4], 
                 update_time=1.0,
                 voltage_step=0.1,
                 filename=None)

CSV file initialised: heater_20250830_153104.csv

 Constant voltage loop stopped by user.


'heater_20250830_153104.csv'

In [None]:
# 2nd time feedback loop
last_voltages, last_resistances, csv_filename, fig, axs, data, lines = feedback_loop(
    channel_numbers=[1, 2],             # MUST keep same
    target_resistances=[59, 45],        # CHANGE
    tolerance_resistance=1.0,           # can keep same
    fail_resistance=200,                # use higher R to ensure loop does not break
    base_update_time=0.2,               # can keep same
    fastest_update_time=0.2,            # can keep same
    voltage_step=0.05,                  # can keep same, do not go below 0.05 V
    filename=csv_filename               # use captured filename
    )

Appending data to existing file: c1r6_c1r8_gnd23_cryo_20250910_152509.csv

Feedback loop stopped by user.


In [None]:
# More loops
last_voltages, last_resistances, csv_filename, fig, axs, data, lines = feedback_loop(
    channel_numbers=[1],             # MUST keep same
    target_resistances=[50],        # CHANGE
    tolerance_resistance=1.0,           # can keep same
    fail_resistance=200,                # use higher R to ensure loop does not break
    base_update_time=5.0,               # can keep same
    fastest_update_time=5.0,            # can keep same
    voltage_step=0.1,                   # can keep same
    filename=csv_filename               # use captured filename
    )

Appending data to existing file: heater_20250830_135834.csv

 Feedback loop stopped by user.


In [None]:
# last feedback loop to RAMP down to V = 0
last_voltages, last_resistances, csv_filename, fig, axs, data, lines = feedback_loop(
    channel_numbers=[1],             # MUST keep same
    target_resistances=[0],          # CHANGE
    tolerance_resistance=1.0,           # can keep same
    fail_resistance=200,                # use higher R to ensure loop does not break
    base_update_time=1.0,               # can keep same
    fastest_update_time=1.0,             # can keep same
    voltage_step=0.1,                   # can keep same
    filename=None              # use captured filename
    )               

CSV file initialised: c2r5_gnd23_20250909_150306.csv
Channel 1 resistance too high: inf kΩ. Channel voltage set to 0 V.


In [21]:
# for safety, set voltage back to 0 V
xdac.set_channel_voltage(1, 0)

0

In [22]:
# for safety, set voltage back to 0 V
xdac.set_channel_voltage(2, 0)

0