In [1]:
from tamalero.FIFO import FIFO
from tamalero.ETROC import ETROC
from tamalero.LPGBT import LPGBT
from tamalero.utils import get_kcu
from tamalero.DataFrame import DataFrame
from tamalero.colors import green, red, yellow
from tamalero.ReadoutBoard import ReadoutBoard
import os
import sys
import tty
import time
import select
import pickle
import termios
import numpy as np
from tqdm import tqdm
from random import randint
from datetime import datetime, timezone

KCU_IP = "192.168.0.10" ## If your KCU ip is diff, modify it.

READOUTBOARD_ID = 0
READOUTBOARD_CONFIG = 'default'

ETROC_I2C_ADDRESSES = [0x60]
ETROC_I2C_CHANNEL = 1
ETROC_ELINKS_MAP = {0: [0, 4, 8, 12]}

# Test parameters
TH_OFFSET = 50              # Threshold offset above baseline
TRIGGER_ENABLE_MASK = 0x2
TRIGGER_DATA_SIZE = 1
TRIGGER_DELAY_SEL = 472

CHARGE_FC = 30 
QINJ_COUNT = 1000

PIXEL_ROW = 16
PIXEL_COL = 16
NUM_ETROC = len(ETROC_I2C_ADDRESSES)

stop_acquisition = False
cosmic_data = []
hit_counter = 0



# def save_to_pickle():
#     data_to_save = {
#     'baseline': baseline_storage,
#     'NW' : nw_storage
#     }

#     with open('etroc_NW_scan.pkl', 'wb') as f:
#         pickle.dump(data_to_save)

#     print('Data saved')



# global stop_acquisition, cosmic_data, hit_counter

print('ETROC COSMIC RAY TEST - HARDWARE INITIALIZATION')

# ======================================================================================
# 1. INITIALIZE KCU
# ======================================================================================

kcu = get_kcu(
    KCU_IP,
    control_hub=True,
    host='localhost',
    verbose=False
)
print(green("Successfully connected to KCU."))

kcu.status() # Prints LpGBT link statuses from KCU 
# fw_ver = kcu.get_firmware_version() #
# kcu.check_clock_frequencies() # Verifies KCU clock stability

# Perform a simple loopback register test to confirm communication
loopback_val = 0xABCD1234
kcu.write_node("LOOPBACK.LOOPBACK", loopback_val) #
read_val = kcu.read_node("LOOPBACK.LOOPBACK").value()

if read_val == loopback_val:
    print(green(f"KCU Loopback test PASSED: Wrote 0x{loopback_val:X}, Read 0x{read_val:X}"))
else:
    print(red(f"KCU Loopback test FAILED: Wrote 0x{loopback_val:X}, Read 0x{read_val:X}"))

# ======================================================================================
# 2. INITIALIZE READOUT BOARD
# ======================================================================================
rb = ReadoutBoard(
    rb=READOUTBOARD_ID,
    kcu=kcu,
    config=READOUTBOARD_CONFIG, 
    trigger=False,     
    verbose=False
)
print(green(f"Readout Board version detected: {rb.ver}"))




ETROC COSMIC RAY TEST - HARDWARE INITIALIZATION
IPBus address: chtcp-2.0://localhost:10203?target=192.168.0.10:50001
KCU firmware version: 3.2.4
[92mSuccessfully connected to KCU.[0m
LPGBT Link Status from KCU:
[92m0x2021  r       READOUT_BOARD_0.LPGBT.DOWNLINK.READY              0x00000001[0m
[91m0x2001  r       READOUT_BOARD_0.LPGBT.UPLINK_0.READY              0x00000000[0m
[91m0x2001  r       READOUT_BOARD_0.LPGBT.UPLINK_0.FEC_ERR_CNT        0x00000FA1[0m
[91m0x1018  r       FW_INFO.RXCLK0_FREQ                               0x12884D7E[0m
[92mKCU Loopback test PASSED: Wrote 0xABCD1234, Read 0xABCD1234[0m
0x1d7 readback value is 0x04
lpGBT version found False
1
0x1d7 readback value is 0xae
lpGBT version found True
 > lpGBT v2 detected
 > Running power up within LPGBT.configure()
 > VTRx+ version detected: production
[92mReadout Board version detected: 2[0m


In [2]:
print("\n3. Initializing ETROC chips...")
etroc_chips = []
chip_names = []
# rb.DAQ_LPGBT.set_dac
for i, addr in enumerate(ETROC_I2C_ADDRESSES):
    chip_name = f"Chip{i+1}"
    chip_names.append(chip_name)
    
    print(f"\nInitializing {chip_name} (I2C: 0x{addr:02X})...")
    
    try:
        etroc = ETROC(
            rb,
            master='lpgbt',
            i2c_adr=addr,
            i2c_channel=ETROC_I2C_CHANNEL,
            elinks=ETROC_ELINKS_MAP,
            strict=False,
            verbose=False
        )
        etroc_chips.append(etroc)
        # Verify communication
        if etroc.is_connected():
            # Check key registers
            scrambler_status = etroc.rd_reg("disScrambler")
            controller_state = etroc.rd_reg("controllerState")
            pll_unlock_count = etroc.rd_reg("pllUnlockCount")
            
            print(green(f"✓ {chip_name} connected successfully"))
            print(f"  Controller state: {controller_state} (should be 11)")
            print(f"  PLL unlock count: {pll_unlock_count}")
            
            if scrambler_status == 1:
                print(green("  ✓ Register communication verified"))
            else:
                print(red("  ✗ Register communication issue"))
        else:
            print(red(f"✗ {chip_name} not responding"))
            
    except Exception as e:
        print(red(f"✗ Failed to initialize {chip_name}: {e}"))
        etroc_chips.append(None)

for etroc in etroc_chips:
    etroc.set_power_mode(mode='high', row = 0, col = 0, broadcast=True)

print(green("\n✓ Hardware initialization completed successfully!"))
print(f"Initialized {len([c for c in etroc_chips if c is not None])} ETROC chips")
print("\nETROC to E-link Mapping:")
elink_list = ETROC_ELINKS_MAP[0]  # [0, 4, 8, 12]
for i, (addr, elink) in enumerate(zip(ETROC_I2C_ADDRESSES, elink_list)):
    chip_name = f"Chip{i+1}"
    if i < len(etroc_chips) and etroc_chips[i] is not None:
        status = "✓ Connected"
    else:
        status = "✗ Failed"
    print(f"  {chip_name} (I2C: 0x{addr:02X}) <-> E-link {elink} - {status}")

for etroc in etroc_chips:
    print(f"Etroc: {etroc}-Power mode: {etroc.get_power_mode(0,0)}")



3. Initializing ETROC chips...

Initializing Chip1 (I2C: 0x60)...
elinks not locked, resetting PLL and FC modules
[92m✓ Chip1 connected successfully[0m
  Controller state: 11 (should be 11)
  PLL unlock count: 0
[92m  ✓ Register communication verified[0m
[92m
✓ Hardware initialization completed successfully![0m
Initialized 1 ETROC chips

ETROC to E-link Mapping:
  Chip1 (I2C: 0x60) <-> E-link 0 - ✓ Connected
Etroc: <tamalero.ETROC.ETROC object at 0x7f12ee8882e0>-Power mode: high


In [5]:
print("\nETROC COSMIC RAY TEST - CONTINUOUS DETECTION")
    
print(f"2. Calibrating {PIXEL_ROW * PIXEL_COL} pixel baselines...")

# Store baseline for all 256 pixels (16x16 array)
baseline_storage = {}
etroc_configs = []
failed_pixels = {}
nw_storage ={}

def save_to_pickle():
    data_to_save = {
    'baseline': baseline_storage,
    'NW' : nw_storage,
    'chip_name':chip_names
    }

    with open('etroc_NW_scan_sensor_HV230_22uF(external_vref_highPower).pkl', 'wb') as f:
        pickle.dump(data_to_save,f)

    print('Data saved')

print("\n3. Generating test pixel configuration...")
all_pixels_per_chip = []
for _ in range(NUM_ETROC):
    chip_pixels = []
    for row in range(PIXEL_ROW):
        for col in range(PIXEL_COL):
            # if len(chip_pixels) < 255:
            chip_pixels.append((row, col))
    all_pixels_per_chip.append(chip_pixels)

for i, (etroc, chip_name) in enumerate(zip(etroc_chips, chip_names)):
    if etroc is not None and i < len(all_pixels_per_chip):
        etroc_configs.append((etroc, chip_name, all_pixels_per_chip[i]))

print("Test pixel assignments:")
for etroc, chip_name, pixels in etroc_configs:
    print(f"  {chip_name}: {len(pixels)} pixels")

for etroc, chip_name, test_pixels in etroc_configs:
    baseline_storage[chip_name] = {}
    nw_storage[chip_name] = {}
    failed_pixels[chip_name] = []
    print(f"\nScanning {chip_name}...")
    
    for pixel_row, pixel_col in tqdm(test_pixels, desc = f"{chip_name} pixels"):
        # print(f"  Calibrating pixel ({pixel_row}, {pixel_col})...")
        try:
            baseline, nw = etroc.auto_threshold_scan(
                row=pixel_row,
                col=pixel_col,
                broadcast=False,
                offset='auto',
                use=False,
                verbose=True
            )
        
            baseline_storage[chip_name][(pixel_row, pixel_col)] = baseline
            nw_storage[chip_name][(pixel_row, pixel_col)] = nw
            # print(f"Baseline for {pixel_row}-{pixel_col} : {baseline_storage[chip_name][(pixel_row, pixel_col)]}")
            # print(f"NW for {pixel_row}-{pixel_col} : {nw_storage[chip_name][(pixel_row, pixel_col)]}")
        except Exception as e:
            print(red(f"  Pixel ({pixel_row},{pixel_col}): SCAN FAILED - {e}"))
            failed_pixels[chip_name].append((pixel_row, pixel_col))
save_to_pickle()   
if failed_pixels[chip_name]:
    print(red(f"  Found {len(failed_pixels[chip_name])} pixels with scan failures during sampling"))



ETROC COSMIC RAY TEST - CONTINUOUS DETECTION
2. Calibrating 256 pixel baselines...

3. Generating test pixel configuration...
Test pixel assignments:
  Chip1: 256 pixels

Scanning Chip1...


Chip1 pixels:   0%|          | 0/256 [00:00<?, ?it/s]

Chip1 pixels: 100%|██████████| 256/256 [01:26<00:00,  2.97it/s]

Data saved





In [None]:
import pickle
with open('etroc_NW_scan.pkl','rb') as f:
    data = pickle.load(f)
baseline = data['baseline']

# for chip_name, pixel_data in baseline_checked.items():
#     print(f"{chip_name}")
#     for pixel_coord, baseline_v in pixel_data.items():
#         print(f"pixel {pixel_coord} : {baseline_v}")

dict_items([('baseline', {'Chip1': {(0, 0): 419, (0, 1): 425, (1, 0): 413, (1, 1): 413}, 'Chip2': {(0, 0): 505, (0, 1): 506, (1, 0): 491, (1, 1): 492}, 'Chip3': {(0, 0): 597, (0, 1): 602, (1, 0): 587, (1, 1): 594}, 'Chip4': {(0, 0): 587, (0, 1): 580, (1, 0): 580, (1, 1): 583}}), ('NW', {'Chip1': {(0, 0): 4, (0, 1): 4, (1, 0): 4, (1, 1): 4}, 'Chip2': {(0, 0): 4, (0, 1): 3, (1, 0): 4, (1, 1): 4}, 'Chip3': {(0, 0): 5, (0, 1): 4, (1, 0): 4, (1, 1): 4}, 'Chip4': {(0, 0): 4, (0, 1): 4, (1, 0): 3, (1, 1): 4}}), ('chip_name', ['Chip1', 'Chip2', 'Chip3', 'Chip4'])])


In [1]:
from tamalero.utils import get_kcu
import uhal
import sys

# --- Please confirm this is your correct KCU IP address ---
KCU_IP = "192.168.0.10"

print("Connecting to KCU to verify XML file...")
try:
    # Use your project's get_kcu function to establish the connection
    kcu = get_kcu(KCU_IP, control_hub=True, host='localhost', quiet=True)
    
    # --- CORRECTED LINE ---
    # We must use kcu.hw to access the underlying uhal device object
    node = kcu.hw.getNode("READOUT_BOARD_0.RX_FIFO_OCCUPANCY")
    
    # Read and print its description
    description = node.getDescription()
    print("---------------------------------------------------------")
    print(f"SUCCESS: uHAL is reading the description as: '{description}'")
    print("---------------------------------------------------------")

    # Check if it contains our unique marker
    if "TEST_SUCCESSFUL" in description:
        print("\n✅ Verification PASSED! uHAL is using the correct new XML file.")
    else:
        print("\n❌ Verification FAILED. uHAL is still using an old XML file.")
        
except uhal._core.exception as e:
    print(f"\n❌ An error occurred during verification: {type(e).__name__}: {e}")
    print("Verification FAILED. This might be due to a connection issue or if the node doesn't exist in the loaded XML.")
except Exception as e:
    print(f"\n❌ An unexpected Python error occurred: {e}")

Connecting to KCU to verify XML file...
DEBUG: uHAL is loading address table from: /home/roy/yf_temp/tamalero/address_table/aac574f/etl_test_fw.xml
---------------------------------------------------------
SUCCESS: uHAL is reading the description as: 'TEST_SUCCESSFUL_RX FIFO occupancy'
---------------------------------------------------------

✅ Verification PASSED! uHAL is using the correct new XML file.
