# ARTIQ & LabVIEW Calibration Test Notebook

This notebook provides interactive testing and calibration for:
- **ARTIQ** (via ZMQ Control Manager): DC electrodes, RF, DDS, Cooling parameters
- **LabVIEW** (via TCP): U_RF, Piezo, Toggles (oven, B-field, UV3, E-gun, etc.)

Features:
- Latency measurement for command round-trips
- Signal sweep tests
- Safety limit verification
- Response plotting and analysis

## 1. Setup and Imports

In [None]:
import sys
import os
import time
import json
import socket
import zmq
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
from collections import defaultdict
import threading
import ipywidgets as widgets
from IPython.display import display, clear_output

# Add project root to path
project_root = os.path.abspath('..')
if project_root not in sys.path:
    sys.path.insert(0, project_root)

from core import get_config
from core.enums import smile_mv_to_real_volts, real_volts_to_smile_mv

# Load configuration
config = get_config()
print("‚úì Configuration loaded")
print(f"  Master IP: {config.master_ip}")
print(f"  CMD Port: {config.cmd_port}")
print(f"  Data Port: {config.data_port}")
print(f"  Client Port: {config.client_port}")
print(f"  LabVIEW Host: {config.get('labview.host')}")
print(f"  LabVIEW Port: {config.get('labview.port')}")

## 2. Connection Classes

In [None]:
@dataclass
class LatencyMeasurement:
    """Stores latency measurement data."""
    command_type: str
    timestamp: float
    latency_ms: float
    success: bool
    details: Dict = field(default_factory=dict)

class ARTIQConnection:
    """ZMQ connection to Control Manager for ARTIQ commands."""
    
    def __init__(self, host: str = None, port: int = None):
        self.host = host or config.master_ip
        self.port = port or config.client_port
        self.ctx = zmq.Context()
        self.socket = None
        self.connected = False
        self.latency_history: List[LatencyMeasurement] = []
        
    def connect(self) -> bool:
        """Connect to Control Manager."""
        try:
            self.socket = self.ctx.socket(zmq.REQ)
            self.socket.setsockopt(zmq.RCVTIMEO, 5000)  # 5s timeout
            self.socket.connect(f"tcp://{self.host}:{self.port}")
            self.connected = True
            print(f"‚úì Connected to Control Manager at {self.host}:{self.port}")
            return True
        except Exception as e:
            print(f"‚úó Connection failed: {e}")
            return False
    
    def disconnect(self):
        """Close connection."""
        if self.socket:
            self.socket.close()
        self.connected = False
        print("Disconnected from Control Manager")
    
    def send_command(self, action: str, params: Dict = None, 
                     source: str = "CALIBRATION", exp_id: str = None) -> Tuple[Dict, float]:
        """
        Send command and measure latency.
        Returns: (response_dict, latency_ms)
        """
        if not self.connected:
            raise RuntimeError("Not connected")
        
        request = {
            "action": action,
            "source": source,
            "params": params or {},
        
        if exp_id:
            request["exp_id"] = exp_id
        
        start_time = time.time()
        self.socket.send_json(request)
        response = self.socket.recv_json()
        latency_ms = (time.time() - start_time) * 1000
        
        # Record latency
        measurement = LatencyMeasurement(
            command_type=action,
            timestamp=start_time,
            latency_ms=latency_ms,
            success=response.get("status") == "success",
            details={"request": request, "response": response}
        )
        self.latency_history.append(measurement)
        
        return response, latency_ms
    
    def set_parameters(self, params: Dict) -> Tuple[Dict, float]:
        """Set ARTIQ parameters (DC, RF, Cooling, etc.)."""
        return self.send_command("SET", params)
    
    def get_parameters(self, param_names: List[str] = None) -> Tuple[Dict, float]:
        """Get current parameter values."""
        params = {"params": param_names} if param_names else {}
        return self.send_command("GET", params)
    
    def run_sweep(self, sweep_params: Dict, exp_id: str = None) -> Tuple[Dict, float]:
        """Trigger a sweep experiment."""
        return self.send_command("SWEEP", sweep_params, exp_id=exp_id)
    
    def get_status(self) -> Tuple[Dict, float]:
        """Get system status."""
        return self.send_command("STATUS")
    
    def run_secular_compare(self, params: Dict) -> Tuple[Dict, float]:
        """Run secular frequency comparison."""
        return self.send_command("COMPARE", params)

class LabVIEWConnection:
    """TCP connection to LabVIEW SMILE interface."""
    
    def __init__(self, host: str = None, port: int = None):
        self.host = host or config.get('labview.host', '127.0.0.1')
        self.port = port or config.get('labview.port', 5559)
        self.timeout = config.get('labview.timeout', 5.0)
        self.socket = None
        self.connected = False
        self.latency_history: List[LatencyMeasurement] = []
        self._lock = threading.Lock()
        self._request_counter = 0
    
    def connect(self) -> bool:
        """Connect to LabVIEW."""
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.settimeout(self.timeout)
            self.socket.connect((self.host, self.port))
            self.connected = True
            print(f"‚úì Connected to LabVIEW at {self.host}:{self.port}")
            return True
        except Exception as e:
            print(f"‚úó Connection failed: {e}")
            return False
    
    def disconnect(self):
        """Close connection."""
        if self.socket:
            self.socket.close()
        self.connected = False
        print("Disconnected from LabVIEW")
    
    def _generate_request_id(self) -> str:
        """Generate unique request ID."""
        with self._lock:
            self._request_counter += 1
            return f"CAL_{self._request_counter:06d}_{int(time.time() * 1000)}"
    
    def send_command(self, command: str, device: str, value: Any) -> Tuple[Dict, float]:
        """
        Send command to LabVIEW and measure latency.
        Returns: (response_dict, latency_ms)
        """
        if not self.connected:
            raise RuntimeError("Not connected")
        
        request = {
            "command": command,
            "device": device,
            "value": value,
            "timestamp": time.time(),
            "request_id": self._generate_request_id()
        }
        
        start_time = time.time()
        try:
            # Send with newline terminator
            message = json.dumps(request) + "\n"
            self.socket.sendall(message.encode('utf-8'))
            
            # Receive response
            response_data = self.socket.recv(4096).decode('utf-8').strip()
            latency_ms = (time.time() - start_time) * 1000
            
            response = json.loads(response_data) if response_data else {"status": "error"}
            success = response.get("status") == "ok"
            
        except Exception as e:
            latency_ms = (time.time() - start_time) * 1000
            response = {"status": "error", "message": str(e)}
            success = False
            self.connected = False
        
        # Record latency
        measurement = LatencyMeasurement(
            command_type=f"{command}:{device}",
            timestamp=start_time,
            latency_ms=latency_ms,
            success=success,
            details={"request": request, "response": response}
        )
        self.latency_history.append(measurement)
        
        return response, latency_ms
    
    def set_rf_voltage(self, voltage: float) -> Tuple[Dict, float]:
        """Set U_RF voltage (0-1000V)."""
        return self.send_command("set_voltage", "U_RF", round(voltage, 3))
    
    def set_piezo_voltage(self, voltage: float) -> Tuple[Dict, float]:
        """Set piezo voltage (0-4V)."""
        return self.send_command("set_voltage", "piezo", round(voltage, 3))
    
    def set_toggle(self, device: str, state: bool) -> Tuple[Dict, float]:
        """Set toggle device (be_oven, b_field, bephi, uv3, e_gun)."""
        return self.send_command("set_toggle", device, bool(state))
    
    def set_dds_frequency(self, freq_mhz: float) -> Tuple[Dict, float]:
        """Set DDS frequency in MHz."""
        return self.send_command("set_frequency", "dds", round(freq_mhz, 6))
    
    def emergency_stop(self) -> Tuple[Dict, float]:
        """Send emergency stop."""
        return self.send_command("emergency_stop", "all", None)
    
    def get_status(self) -> Tuple[Dict, float]:
        """Query LabVIEW status."""
        return self.send_command("get_status", "all", None)

# Initialize connections
artiq = ARTIQConnection()
labview = LabVIEWConnection()
print("\n‚úì Connection objects created")

## 3. Connect to Systems

In [None]:
# Connect to both systems
print("Connecting to ARTIQ (via Control Manager)...")
artiq.connect()

print("\nConnecting to LabVIEW...")
labview.connect()

# Get initial status
if artiq.connected:
    print("\n--- ARTIQ System Status ---")
    status, latency = artiq.get_status()
    print(f"Latency: {latency:.2f} ms")
    print(f"Mode: {status.get('values', {}).get('mode', 'N/A')}")
    print(f"Worker Alive: {status.get('values', {}).get('worker_alive', 'N/A')}")

## 4. Interactive Control Panel

In [None]:
# Create interactive widgets for ARTIQ control
print("Creating ARTIQ Control Panel...")

# DC Electrode controls
ec1_slider = widgets.FloatSlider(value=0, min=-100, max=100, step=0.1, description='EC1 (V):')
ec2_slider = widgets.FloatSlider(value=0, min=-100, max=100, step=0.1, description='EC2 (V):')
comp_h_slider = widgets.FloatSlider(value=0, min=-100, max=100, step=0.1, description='Comp H (V):')
comp_v_slider = widgets.FloatSlider(value=0, min=-100, max=100, step=0.1, description='Comp V (V):')

# RF Voltage
rf_slider = widgets.FloatSlider(value=200, min=0, max=500, step=1, description='U_RF (V):')

# Cooling parameters
freq0_slider = widgets.FloatSlider(value=212.5, min=200, max=220, step=0.001, description='Freq0 (MHz):')
amp0_slider = widgets.FloatSlider(value=0.05, min=0, max=1, step=0.01, description='Amp0:')

# Toggles
b_field_check = widgets.Checkbox(value=True, description='B-field')
bephi_check = widgets.Checkbox(value=False, description='Bephi')
oven_check = widgets.Checkbox(value=False, description='Be+ Oven')
uv3_check = widgets.Checkbox(value=False, description='UV3')
e_gun_check = widgets.Checkbox(value=False, description='E-gun')

output = widgets.Output()

def apply_artiq_params(b):
    with output:
        clear_output()
        params = {
            'ec1': ec1_slider.value,
            'ec2': ec2_slider.value,
            'comp_h': comp_h_slider.value,
            'comp_v': comp_v_slider.value,
            'u_rf_volts': rf_slider.value,
            'freq0': freq0_slider.value,
            'amp0': amp0_slider.value,
            'b_field': b_field_check.value,
            'bephi': bephi_check.value,
            'be_oven': oven_check.value,
            'uv3': uv3_check.value,
            'e_gun': e_gun_check.value,
        }
        print(f"Setting parameters: {params}")
        response, latency = artiq.set_parameters(params)
        print(f"Response: {response.get('status')}, Latency: {latency:.2f} ms")

apply_btn = widgets.Button(description='Apply to ARTIQ', button_style='primary')
apply_btn.on_click(apply_artiq_params)

# Layout
dc_box = widgets.VBox([ec1_slider, ec2_slider, comp_h_slider, comp_v_slider])
rf_box = widgets.VBox([rf_slider])
cooling_box = widgets.VBox([freq0_slider, amp0_slider])
toggle_box = widgets.VBox([b_field_check, bephi_check, oven_check, uv3_check, e_gun_check])

artiq_panel = widgets.Tab([dc_box, rf_box, cooling_box, toggle_box])
artiq_panel.set_title(0, 'DC Electrodes')
artiq_panel.set_title(1, 'RF Voltage')
artiq_panel.set_title(2, 'Cooling')
artiq_panel.set_title(3, 'Toggles')

display(widgets.VBox([
    widgets.HTML("<h3>ARTIQ Control Panel</h3>"),
    artiq_panel,
    apply_btn,
    output
]))

In [None]:
# Create interactive widgets for LabVIEW control
print("Creating LabVIEW Control Panel...")

# RF Voltage (SMILE mV to real volts conversion)
smile_mv_slider = widgets.FloatSlider(value=700, min=0, max=3500, step=10, description='SMILE (mV):')
real_volts_display = widgets.FloatText(value=100, description='Real (V):', disabled=True)

def update_real_volts(change):
    real_volts = smile_mv_to_real_volts(change['new'])
    real_volts_display.value = round(real_volts, 2)

smile_mv_slider.observe(update_real_volts, names='value')

# Piezo
piezo_slider = widgets.FloatSlider(value=0, min=0, max=4, step=0.01, description='Piezo (V):')

# DDS Frequency
dds_freq_slider = widgets.FloatSlider(value=0, min=0, max=500, step=0.1, description='DDS (kHz):')

# LabVIEW toggles
lv_b_field = widgets.Checkbox(value=True, description='B-field')
lv_bephi = widgets.Checkbox(value=False, description='Bephi')
lv_oven = widgets.Checkbox(value=False, description='Be+ Oven')
lv_uv3 = widgets.Checkbox(value=False, description='UV3')
lv_e_gun = widgets.Checkbox(value=False, description='E-gun')

lv_output = widgets.Output()

def apply_lv_rf(b):
    with lv_output:
        clear_output()
        real_v = smile_mv_to_real_volts(smile_mv_slider.value)
        print(f"Setting U_RF to {real_v:.2f}V (SMILE: {smile_mv_slider.value}mV)")
        response, latency = labview.set_rf_voltage(real_v)
        print(f"Response: {response.get('status')}, Latency: {latency:.2f} ms")

def apply_lv_piezo(b):
    with lv_output:
        clear_output()
        print(f"Setting Piezo to {piezo_slider.value}V")
        response, latency = labview.set_piezo_voltage(piezo_slider.value)
        print(f"Response: {response.get('status')}, Latency: {latency:.2f} ms")

def apply_lv_toggles(b):
    with lv_output:
        clear_output()
        toggles = {
            'b_field': lv_b_field.value,
            'bephi': lv_bephi.value,
            'be_oven': lv_oven.value,
            'uv3': lv_uv3.value,
            'e_gun': lv_e_gun.value,
        }
        for device, state in toggles.items():
            print(f"Setting {device} to {state}...")
            response, latency = labview.set_toggle(device, state)
            print(f"  Response: {response.get('status')}, Latency: {latency:.2f} ms")

def apply_lv_dds(b):
    with lv_output:
        clear_output()
        freq_mhz = dds_freq_slider.value / 1000.0
        print(f"Setting DDS to {dds_freq_slider.value} kHz ({freq_mhz:.6f} MHz)")
        response, latency = labview.set_dds_frequency(freq_mhz)
        print(f"Response: {response.get('status')}, Latency: {latency:.2f} ms")

rf_btn = widgets.Button(description='Set RF Voltage', button_style='primary')
rf_btn.on_click(apply_lv_rf)

piezo_btn = widgets.Button(description='Set Piezo', button_style='primary')
piezo_btn.on_click(apply_lv_piezo)

toggle_btn = widgets.Button(description='Set Toggles', button_style='primary')
toggle_btn.on_click(apply_lv_toggles)

dds_btn = widgets.Button(description='Set DDS', button_style='primary')
dds_btn.on_click(apply_lv_dds)

estop_btn = widgets.Button(description='EMERGENCY STOP', button_style='danger')
def estop_click(b):
    with lv_output:
        clear_output()
        print("‚ö†Ô∏è EMERGENCY STOP TRIGGERED!")
        response, latency = labview.emergency_stop()
        print(f"Response: {response}, Latency: {latency:.2f} ms")
estop_btn.on_click(estop_click)

# Layout
rf_box = widgets.HBox([smile_mv_slider, real_volts_display, rf_btn])
piezo_box = widgets.HBox([piezo_slider, piezo_btn])
dds_box = widgets.HBox([dds_freq_slider, dds_btn])
toggle_box = widgets.VBox([lv_b_field, lv_bephi, lv_oven, lv_uv3, lv_e_gun, toggle_btn])

display(widgets.VBox([
    widgets.HTML("<h3>LabVIEW Control Panel</h3>"),
    widgets.HTML("<b>RF Voltage:</b>"), rf_box,
    widgets.HTML("<b>Piezo:</b>"), piezo_box,
    widgets.HTML("<b>DDS:</b>"), dds_box,
    widgets.HTML("<b>Toggles:</b>"), toggle_box,
    estop_btn,
    lv_output
]))

## 5. Latency Measurement Tests

In [None]:
def run_latency_test(connection, test_commands: List[Tuple], num_iterations: int = 10) -> Dict:
    """
    Run latency test for given commands.
    
    Args:
        connection: ARTIQ or LabVIEW connection
        test_commands: List of (command_type, *args) tuples
        num_iterations: Number of times to repeat each command
    
    Returns:
        Dictionary with statistics
    """
    results = defaultdict(list)
    
    print(f"Running {num_iterations} iterations for {len(test_commands)} commands...")
    
    for i in range(num_iterations):
        for cmd in test_commands:
            cmd_type = cmd[0]
            try:
                if isinstance(connection, ARTIQConnection):
                    if cmd_type == "SET":
                        _, latency = connection.set_parameters(cmd[1])
                    elif cmd_type == "GET":
                        _, latency = connection.get_parameters()
                    elif cmd_type == "STATUS":
                        _, latency = connection.get_status()
                    else:
                        continue
                elif isinstance(connection, LabVIEWConnection):
                    if cmd_type == "SET_RF":
                        _, latency = connection.set_rf_voltage(cmd[1])
                    elif cmd_type == "SET_PIEZO":
                        _, latency = connection.set_piezo_voltage(cmd[1])
                    elif cmd_type == "SET_TOGGLE":
                        _, latency = connection.set_toggle(cmd[1], cmd[2])
                    elif cmd_type == "STATUS":
                        _, latency = connection.get_status()
                    else:
                        continue
                
                results[cmd_type].append(latency)
                
            except Exception as e:
                print(f"Error in {cmd_type}: {e}")
                results[cmd_type].append(None)
        
        if (i + 1) % 5 == 0:
            print(f"  Completed {i + 1}/{num_iterations} iterations")
    
    # Calculate statistics
    stats = {}
    for cmd_type, latencies in results.items():
        valid_latencies = [l for l in latencies if l is not None]
        if valid_latencies:
            stats[cmd_type] = {
                'mean': np.mean(valid_latencies),
                'std': np.std(valid_latencies),
                'min': np.min(valid_latencies),
                'max': np.max(valid_latencies),
                'median': np.median(valid_latencies),
                'p95': np.percentile(valid_latencies, 95),
                'p99': np.percentile(valid_latencies, 99),
                'samples': len(valid_latencies)
            }
    
    return stats

print("‚úì Latency test function defined")

In [None]:
# Run ARTIQ latency test
if artiq.connected:
    artiq_test_commands = [
        ("STATUS",),
        ("GET",),
        ("SET", {'ec1': 10.0}),
        ("SET", {'u_rf_volts': 200.0}),
    ]
    
    artiq_stats = run_latency_test(artiq, artiq_test_commands, num_iterations=10)
    
    print("\n=== ARTIQ Latency Statistics ===")
    for cmd, stat in artiq_stats.items():
        print(f"\n{cmd}:")
        print(f"  Mean: {stat['mean']:.2f} ms")
        print(f"  Std:  {stat['std']:.2f} ms")
        print(f"  Min:  {stat['min']:.2f} ms")
        print(f"  Max:  {stat['max']:.2f} ms")
        print(f"  P95:  {stat['p95']:.2f} ms")
        print(f"  P99:  {stat['p99']:.2f} ms")
else:
    print("ARTIQ not connected!")

In [None]:
# Run LabVIEW latency test
if labview.connected:
    lv_test_commands = [
        ("STATUS",),
        ("SET_RF", 100.0),
        ("SET_PIEZO", 1.0),
        ("SET_TOGGLE", "b_field", True),
    ]
    
    lv_stats = run_latency_test(labview, lv_test_commands, num_iterations=10)
    
    print("\n=== LabVIEW Latency Statistics ===")
    for cmd, stat in lv_stats.items():
        print(f"\n{cmd}:")
        print(f"  Mean: {stat['mean']:.2f} ms")
        print(f"  Std:  {stat['std']:.2f} ms")
        print(f"  Min:  {stat['min']:.2f} ms")
        print(f"  Max:  {stat['max']:.2f} ms")
        print(f"  P95:  {stat['p95']:.2f} ms")
        print(f"  P99:  {stat['p99']:.2f} ms")
else:
    print("LabVIEW not connected!")

## 6. Plot Latency Results

In [None]:
def plot_latency_comparison(artiq_stats: Dict, lv_stats: Dict):
    """Plot latency comparison between ARTIQ and LabVIEW."""
    
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 1. Bar chart of mean latencies
    ax1 = axes[0, 0]
    
    artiq_cmds = list(artiq_stats.keys())
    artiq_means = [artiq_stats[k]['mean'] for k in artiq_cmds]
    artiq_stds = [artiq_stats[k]['std'] for k in artiq_cmds]
    
    lv_cmds = list(lv_stats.keys())
    lv_means = [lv_stats[k]['mean'] for k in lv_cmds]
    lv_stds = [lv_stats[k]['std'] for k in lv_cmds]
    
    x1 = np.arange(len(artiq_cmds))
    x2 = np.arange(len(lv_cmds))
    
    width = 0.35
    ax1.bar(x1 - width/2, artiq_means, width, yerr=artiq_stds, 
            label='ARTIQ', alpha=0.8, capsize=5)
    ax1.bar(x2 + width/2 + len(artiq_cmds) + 0.5, lv_means, width, yerr=lv_stds,
            label='LabVIEW', alpha=0.8, capsize=5, color='orange')
    
    ax1.set_ylabel('Latency (ms)')
    ax1.set_title('Mean Command Latency (with Std Dev)')
    ax1.set_xticks(list(x1) + list(x2 + len(artiq_cmds) + 0.5))
    ax1.set_xticklabels(artiq_cmds + lv_cmds, rotation=45, ha='right')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # 2. Box plot of latency distributions
    ax2 = axes[0, 1]
    
    # Extract raw latency data from history
    artiq_latencies = defaultdict(list)
    for m in artiq.latency_history[-40:]:  # Last 40 measurements
        artiq_latencies[m.command_type].append(m.latency_ms)
    
    lv_latencies = defaultdict(list)
    for m in labview.latency_history[-40:]:
        lv_latencies[m.command_type].append(m.latency_ms)
    
    all_data = []
    all_labels = []
    for cmd in artiq_cmds:
        if cmd in artiq_latencies:
            all_data.append(artiq_latencies[cmd])
            all_labels.append(f'A:{cmd}')
    for cmd in lv_cmds:
        if cmd in lv_latencies:
            all_data.append(lv_latencies[cmd])
            all_labels.append(f'L:{cmd}')
    
    if all_data:
        bp = ax2.boxplot(all_data, labels=all_labels, patch_artist=True)
        colors = ['lightblue'] * len(artiq_cmds) + ['lightyellow'] * len(lv_cmds)
        for patch, color in zip(bp['boxes'], colors):
            patch.set_facecolor(color)
        ax2.set_ylabel('Latency (ms)')
        ax2.set_title('Latency Distribution')
        ax2.grid(True, alpha=0.3)
        plt.setp(ax2.xaxis.get_majorticklabels(), rotation=45, ha='right')
    
    # 3. Percentile comparison
    ax3 = axes[1, 0]
    
    percentiles = ['mean', 'p95', 'p99']
    x = np.arange(len(percentiles))
    width = 0.35
    
    # Average across all commands for each system
    artiq_avg = [np.mean([artiq_stats[k][p] for k in artiq_stats]) for p in percentiles]
    lv_avg = [np.mean([lv_stats[k][p] for k in lv_stats]) for p in percentiles]
    
    ax3.bar(x - width/2, artiq_avg, width, label='ARTIQ', alpha=0.8)
    ax3.bar(x + width/2, lv_avg, width, label='LabVIEW', alpha=0.8, color='orange')
    ax3.set_ylabel('Latency (ms)')
    ax3.set_title('Average Latency Percentiles')
    ax3.set_xticks(x)
    ax3.set_xticklabels(percentiles)
    ax3.legend()
    ax3.grid(True, alpha=0.3)
    
    # Add values on bars
    for i, (a, l) in enumerate(zip(artiq_avg, lv_avg)):
        ax3.text(i - width/2, a, f'{a:.1f}', ha='center', va='bottom', fontsize=9)
        ax3.text(i + width/2, l, f'{l:.1f}', ha='center', va='bottom', fontsize=9)
    
    # 4. Summary table
    ax4 = axes[1, 1]
    ax4.axis('off')
    
    table_data = []
    table_data.append(['System', 'Command', 'Mean (ms)', 'P95 (ms)', 'P99 (ms)'])
    
    for cmd, stat in artiq_stats.items():
        table_data.append(['ARTIQ', cmd, f"{stat['mean']:.2f}", 
                         f"{stat['p95']:.2f}", f"{stat['p99']:.2f}"])
    
    for cmd, stat in lv_stats.items():
        table_data.append(['LabVIEW', cmd, f"{stat['mean']:.2f}", 
                         f"{stat['p95']:.2f}", f"{stat['p99']:.2f}"])
    
    table = ax4.table(cellText=table_data, loc='center', cellLoc='center')
    table.auto_set_font_size(False)
    table.set_fontsize(9)
    table.scale(1.2, 1.5)
    
    # Style header row
    for i in range(5):
        table[(0, i)].set_facecolor('#4CAF50')
        table[(0, i)].set_text_props(weight='bold', color='white')
    
    # Alternate row colors
    for i in range(1, len(table_data)):
        color = '#f0f0f0' if i % 2 == 0 else 'white'
        for j in range(5):
            table[(i, j)].set_facecolor(color)
    
    ax4.set_title('Latency Summary Table', pad=20)
    
    plt.tight_layout()
    plt.savefig('latency_comparison.png', dpi=150, bbox_inches='tight')
    plt.show()
    print("\n‚úì Plot saved to latency_comparison.png")

# Generate plots if stats are available
try:
    if 'artiq_stats' in globals() and 'lv_stats' in globals():
        plot_latency_comparison(artiq_stats, lv_stats)
    else:
        print("Run latency tests first to generate stats!")
except Exception as e:
    print(f"Plotting error: {e}")

## 7. RF Voltage Sweep Test

In [None]:
def run_rf_sweep_test(start_v: float, stop_v: float, steps: int, dwell_ms: float = 100):
    """
    Run RF voltage sweep and measure latency at each step.
    
    Args:
        start_v: Starting voltage (V)
        stop_v: Ending voltage (V)
        steps: Number of steps
        dwell_ms: Dwell time at each step (ms)
    """
    voltages = np.linspace(start_v, stop_v, steps)
    results = {
        'voltages': [],
        'artiq_latencies': [],
        'labview_latencies': [],
        'set_times': []
    }
    
    print(f"Running RF sweep: {start_v}V to {stop_v}V in {steps} steps")
    
    for i, v in enumerate(voltages):
        print(f"  Step {i+1}/{steps}: Setting {v:.1f}V...", end='\r')
        
        # Set via ARTIQ
        if artiq.connected:
            resp, artiq_lat = artiq.set_parameters({'u_rf_volts': v})
            results['artiq_latencies'].append(artiq_lat)
        
        # Set via LabVIEW
        if labview.connected:
            resp, lv_lat = labview.set_rf_voltage(v)
            results['labview_latencies'].append(lv_lat)
        
        results['voltages'].append(v)
        results['set_times'].append(time.time())
        
        if dwell_ms > 0:
            time.sleep(dwell_ms / 1000.0)
    
    print("\n‚úì Sweep complete!")
    return results

# Example sweep (adjust parameters as needed)
sweep_results = run_rf_sweep_test(
    start_v=100,
    stop_v=300,
    steps=20,
    dwell_ms=50
)

In [None]:
# Plot sweep results
fig, axes = plt.subplots(2, 1, figsize=(12, 8))

# Voltage vs Time
ax1 = axes[0]
times = np.array(sweep_results['set_times']) - sweep_results['set_times'][0]
ax1.plot(times, sweep_results['voltages'], 'b.-', linewidth=2, markersize=8)
ax1.set_xlabel('Time (s)')
ax1.set_ylabel('RF Voltage (V)')
ax1.set_title('RF Voltage Sweep Profile')
ax1.grid(True, alpha=0.3)

# Latency vs Voltage
ax2 = axes[1]
if sweep_results['artiq_latencies']:
    ax2.plot(sweep_results['voltages'], sweep_results['artiq_latencies'], 
             'b.-', label='ARTIQ', linewidth=2, markersize=8)
if sweep_results['labview_latencies']:
    ax2.plot(sweep_results['voltages'], sweep_results['labview_latencies'], 
             'orange', marker='.', linestyle='-', label='LabVIEW', linewidth=2, markersize=8)
ax2.set_xlabel('RF Voltage (V)')
ax2.set_ylabel('Command Latency (ms)')
ax2.set_title('Command Latency vs RF Voltage')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('rf_sweep_test.png', dpi=150, bbox_inches='tight')
plt.show()

## 8. Toggle Response Test

In [None]:
def run_toggle_test(device: str, cycles: int = 10, on_time_ms: float = 100):
    """
    Test toggle response time by cycling a device on/off.
    
    Args:
        device: Device name (b_field, bephi, be_oven, uv3, e_gun)
        cycles: Number of on/off cycles
        on_time_ms: Time to keep device on (ms)
    """
    results = {
        'cycle': [],
        'on_latency': [],
        'off_latency': [],
        'total_cycle_time': []
    }
    
    print(f"Testing {device} toggle response ({cycles} cycles)...")
    
    for i in range(cycles):
        cycle_start = time.time()
        
        # Turn ON
        _, on_lat = labview.set_toggle(device, True)
        
        # Wait
        time.sleep(on_time_ms / 1000.0)
        
        # Turn OFF
        _, off_lat = labview.set_toggle(device, False)
        
        cycle_time = (time.time() - cycle_start) * 1000
        
        results['cycle'].append(i + 1)
        results['on_latency'].append(on_lat)
        results['off_latency'].append(off_lat)
        results['total_cycle_time'].append(cycle_time)
        
        print(f"  Cycle {i+1}: ON={on_lat:.1f}ms, OFF={off_lat:.1f}ms, Total={cycle_time:.1f}ms")
    
    print("\n‚úì Toggle test complete!")
    return results

# Test B-field toggle
toggle_results = run_toggle_test('b_field', cycles=5, on_time_ms=200)

In [None]:
# Plot toggle results
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Latency per cycle
ax1 = axes[0]
x = toggle_results['cycle']
ax1.plot(x, toggle_results['on_latency'], 'g.-', label='ON latency', linewidth=2, markersize=10)
ax1.plot(x, toggle_results['off_latency'], 'r.-', label='OFF latency', linewidth=2, markersize=10)
ax1.set_xlabel('Cycle Number')
ax1.set_ylabel('Latency (ms)')
ax1.set_title('Toggle Response Latency')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.set_xticks(x)

# Histogram of latencies
ax2 = axes[1]
all_latencies = toggle_results['on_latency'] + toggle_results['off_latency']
ax2.hist(toggle_results['on_latency'], bins=10, alpha=0.6, label='ON', color='green')
ax2.hist(toggle_results['off_latency'], bins=10, alpha=0.6, label='OFF', color='red')
ax2.set_xlabel('Latency (ms)')
ax2.set_ylabel('Count')
ax2.set_title('Toggle Latency Distribution')
ax2.legend()
ax2.grid(True, alpha=0.3)

# Add statistics text
mean_on = np.mean(toggle_results['on_latency'])
mean_off = np.mean(toggle_results['off_latency'])
std_on = np.std(toggle_results['on_latency'])
std_off = np.std(toggle_results['off_latency'])

stats_text = f"ON:  {mean_on:.1f} ¬± {std_on:.1f} ms\nOFF: {mean_off:.1f} ¬± {std_off:.1f} ms"
ax2.text(0.95, 0.95, stats_text, transform=ax2.transAxes, 
         verticalalignment='top', horizontalalignment='right',
         bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

plt.tight_layout()
plt.savefig('toggle_test.png', dpi=150, bbox_inches='tight')
plt.show()

## 9. Safety Limit Verification

In [None]:
def test_parameter_limits():
    """Test that parameter limits are enforced."""
    
    test_cases = [
        # (param_name, test_value, expected_result)
        ('u_rf_volts', 600, 'rejected'),   # Over 500V limit
        ('u_rf_volts', 200, 'success'),    # Valid
        ('ec1', 150, 'rejected'),          # Over 100V limit
        ('ec1', 50, 'success'),            # Valid
        ('piezo', 5, 'rejected'),          # Over 4V limit
        ('piezo', 2, 'success'),           # Valid
        ('freq0', 250, 'rejected'),        # Over 220 MHz limit
        ('freq0', 210, 'success'),         # Valid
    ]
    
    print("Testing Parameter Safety Limits\n" + "="*50)
    
    results = []
    for param, value, expected in test_cases:
        response, latency = artiq.set_parameters({param: value})
        actual = response.get('status', 'unknown')
        passed = (actual == expected) or (expected == 'rejected' and actual == 'error')
        
        status_symbol = "‚úì" if passed else "‚úó"
        print(f"{status_symbol} {param}={value}: {actual} (expected: {expected}, {latency:.1f}ms)")
        
        results.append({
            'param': param,
            'value': value,
            'expected': expected,
            'actual': actual,
            'passed': passed,
            'latency_ms': latency
        })
    
    passed_count = sum(1 for r in results if r['passed'])
    print(f"\n{passed_count}/{len(results)} tests passed")
    
    return results

# Run limit tests
limit_results = test_parameter_limits()

## 9b. Pressure Safety Monitor

The LabVIEW interface includes a **pressure safety monitor** that:
- Monitors pressure from SMILE/LabVIEW via Y:/ telemetry files
- Triggers immediate kill switch when threshold is exceeded
- Automatically switches off piezo voltage and e-gun
- Notifies server (Control Manager) of the alert

**Default threshold**: 5√ó10‚Åª‚Åπ mbar (configurable in settings.yaml)

In [None]:
# Check pressure monitoring status
if labview.connected:
    pressure_status = labview.get_pressure_status()
    
    print("=== Pressure Monitor Status ===")
    print(f"Running:           {pressure_status['running']}")
    print(f"Alert Active:      {pressure_status['alert_active']}")
    print(f"Threshold:         {pressure_status['threshold_mbar']:.2e} mbar")
    print(f"Check Interval:    {pressure_status['check_interval_seconds']}s")
    
    if pressure_status['last_pressure_mbar']:
        print(f"
Last Pressure:     {pressure_status['last_pressure_mbar']:.2e} mbar")
        age = pressure_status['pressure_age_seconds']
        print(f"Data Age:          {age:.1f}s")
    else:
        print("
No pressure data received yet")
    
    print(f"
Statistics:")
    stats = pressure_status['stats']
    print(f"  Checks:          {stats['checks']}")
    print(f"  Alerts Triggered: {stats['alerts_triggered']}")
    print(f"  Max Pressure:    {stats['max_pressure_seen']:.2e} mbar")
else:
    print("LabVIEW not connected - cannot check pressure monitor")

In [None]:
# Interactive pressure threshold control
pressure_output = widgets.Output()
threshold_slider = widgets.FloatLogSlider(
    value=5e-9,
    base=10,
    min=-10,  # 1e-10
    max=-8,   # 1e-8
    step=0.1,
    description="Threshold (mbar):",
    readout_format='.2e'
)

def set_threshold_clicked(b):
    with pressure_output:
        clear_output()
        new_threshold = threshold_slider.value
        if labview.connected:
            labview.set_pressure_threshold(new_threshold)
            print(f"‚úì Pressure threshold set to {new_threshold:.2e} mbar")
            
            # Verify
            status = labview.get_pressure_status()
            print(f"  Verified: {status['threshold_mbar']:.2e} mbar")
        else:
            print("LabVIEW not connected!")

set_threshold_btn = widgets.Button(description="Set Threshold", button_style='primary')
set_threshold_btn.on_click(set_threshold_clicked)

display(widgets.VBox([
    widgets.HTML("<h4>Pressure Safety Configuration</h4>"),
    threshold_slider,
    set_threshold_btn,
    pressure_output
]))

### Pressure Alert Simulation (Test Mode)

‚ö†Ô∏è **WARNING**: This simulates a pressure spike to test the safety system.
The piezo and e-gun will be **immediately turned off** when the alert triggers.

In [None]:
# Test pressure alert response
import tempfile
from pathlib import Path

test_output = widgets.Output()

def test_pressure_alert(b):
    with test_output:
        clear_output()
        
        if not labview.connected:
            print("‚ùå LabVIEW not connected!")
            return
        
        # First, turn on piezo at a low voltage
        print("1. Setting piezo to 1V (safe value)...")
        labview.set_piezo_voltage(1.0)
        time.sleep(0.5)
        
        # Get current threshold
        status = labview.get_pressure_status()
        threshold = status['threshold_mbar']
        
        print(f"2. Current threshold: {threshold:.2e} mbar")
        print("3. Simulating pressure spike...")
        
        # Simulate by writing to telemetry file
        pressure_dir = Path(status['pressure_dir'])
        pressure_dir.mkdir(parents=True, exist_ok=True)
        
        # Write dangerous pressure value
        dangerous_pressure = threshold * 10  # 10x above threshold
        timestamp = time.time()
        filepath = pressure_dir / f"test_pressure_{int(timestamp)}.dat"
        
        with open(filepath, "w") as f:
            f.write(f"{timestamp},{dangerous_pressure}\n")
        
        print(f"   Written {dangerous_pressure:.2e} mbar to {filepath.name}")
        print("4. Waiting for monitor to detect...")
        
        time.sleep(0.3)
        
        # Check if alert triggered
        status = labview.get_pressure_status()
        
        if status['alert_active']:
            print("\nüö® ALERT TRIGGERED!")
            print(f"   Pressure: {status['last_pressure_mbar']:.2e} mbar")
            print("   Piezo and e-gun should be killed!")
            
            # Verify piezo is off
            print("\n5. Verifying piezo is off...")
            print("   (Kill switch triggered - check LabVIEW if piezo is 0V)")
        else:
            print("\n‚ö†Ô∏è Alert not yet active (may need more time)")
        
        # Cleanup
        filepath.unlink(missing_ok=True)

test_alert_btn = widgets.Button(
    description="TEST PRESSURE ALERT",
    button_style='danger',
    tooltip="Simulates pressure spike to test safety system"
)
test_alert_btn.on_click(test_pressure_alert)

display(widgets.VBox([
    widgets.HTML("<h4>‚ö†Ô∏è Pressure Alert Test (DANGER: Will kill piezo)</h4>"),
    test_alert_btn,
    test_output
]))

## 11. Save Test Report

In [None]:
def generate_test_report():
    """Generate and save a comprehensive test report."""
    
    report = {
        'timestamp': datetime.now().isoformat(),
        'artiq_connected': artiq.connected,
        'labview_connected': labview.connected,
        'latency_statistics': {
            'artiq': artiq_stats if 'artiq_stats' in globals() else {},
            'labview': lv_stats if 'lv_stats' in globals() else {}
        },
        'sweep_test': {
            'voltage_range': [min(sweep_results['voltages']), max(sweep_results['voltages'])],
            'steps': len(sweep_results['voltages']),
            'avg_artiq_latency_ms': np.mean(sweep_results['artiq_latencies']) if sweep_results['artiq_latencies'] else None,
            'avg_labview_latency_ms': np.mean(sweep_results['labview_latencies']) if sweep_results['labview_latencies'] else None,
        } if 'sweep_results' in globals() else None,
        'toggle_test': {
            'avg_on_latency_ms': np.mean(toggle_results['on_latency']),
            'avg_off_latency_ms': np.mean(toggle_results['off_latency']),
        } if 'toggle_results' in globals() else None,
        'limit_tests': limit_results if 'limit_results' in globals() else [],
    }
    
    # Save to JSON
    filename = f"calibration_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(filename, 'w') as f:
        json.dump(report, f, indent=2)
    
    print(f"‚úì Test report saved to: {filename}")
    return report, filename

# Generate report
report, filename = generate_test_report()

# Display summary
print("\n=== Test Report Summary ===" )
print(f"Timestamp: {report['timestamp']}")
print(f"ARTIQ Connected: {report['artiq_connected']}")
print(f"LabVIEW Connected: {report['labview_connected']}")

if report['latency_statistics']['artiq']:
    print("\nARTIQ Latencies:")
    for cmd, stats in report['latency_statistics']['artiq'].items():
        print(f"  {cmd}: {stats['mean']:.1f} ¬± {stats['std']:.1f} ms")

if report['latency_statistics']['labview']:
    print("\nLabVIEW Latencies:")
    for cmd, stats in report['latency_statistics']['labview'].items():
        print(f"  {cmd}: {stats['mean']:.1f} ¬± {stats['std']:.1f} ms")

## 12. Cleanup and Disconnect

In [None]:
# Apply safety defaults before disconnecting
print("Applying safety defaults...")

if artiq.connected:
    # Reset to safe values
    artiq.set_parameters({
        'u_rf_volts': 0,
        'ec1': 0, 'ec2': 0,
        'comp_h': 0, 'comp_v': 0,
        'piezo': 0,
        'be_oven': False,
        'e_gun': False,
        'uv3': False,
    })
    print("‚úì ARTIQ parameters reset")

if labview.connected:
    labview.set_rf_voltage(0)
    labview.set_piezo_voltage(0)
    labview.set_toggle('be_oven', False)
    labview.set_toggle('e_gun', False)
    labview.set_toggle('uv3', False)
    print("‚úì LabVIEW parameters reset")

# Disconnect
print("\nDisconnecting...")
artiq.disconnect()
labview.disconnect()

print("\n‚úì Cleanup complete!")

---

## Quick Reference

### ARTIQ Parameters (via Control Manager)
| Parameter | Range | Description |
|-----------|-------|-------------|
| `u_rf_volts` | 0-500V | RF voltage (real volts) |
| `ec1`, `ec2` | ¬±100V | Endcap electrodes |
| `comp_h`, `comp_v` | ¬±100V | Compensation electrodes |
| `freq0`, `freq1` | 200-220 MHz | Raman frequencies |
| `amp0`, `amp1` | 0-1 | Raman amplitudes |
| `piezo` | 0-4V | Piezo voltage |
| `b_field` | True/False | B-field toggle |
| `be_oven` | True/False | Be+ oven |
| `e_gun` | True/False | Electron gun |
| `uv3` | True/False | UV3 laser |

### LabVIEW Commands
| Command | Device | Range | Description |
|---------|--------|-------|-------------|
| `set_voltage` | `U_RF` | 0-1000V | RF voltage |
| `set_voltage` | `piezo` | 0-4V | Piezo voltage |
| `set_toggle` | `b_field` | True/False | B-field |
| `set_toggle` | `be_oven` | True/False | Be+ oven |
| `set_toggle` | `e_gun` | True/False | E-gun (30s max) |
| `set_frequency` | `dds` | 0-500 kHz | DDS frequency |

### Safety Notes
- **Piezo**: 10 second maximum ON time (kill switch protected)
- **E-gun**: 30 second maximum ON time (kill switch protected)
- **U_RF**: 500V maximum via ARTIQ, 1000V via LabVIEW
- Always use EMERGENCY STOP if needed