In [55]:
# from ophyd.sim import motor1
from ophyd import EpicsMotor, sim, Device
from ophyd import Component as Cpt, FormattedComponent as FCpt
from ophyd.sim import make_fake_device, SynAxis
from ophyd.signal import EpicsSignal
from ophyd.status import MoveStatus
from ophyd.device import DynamicDeviceComponent
from bluesky.plans import count, scan
from bluesky.plan_stubs import mv
from bluesky.preprocessors import SupplementalData

In [56]:
from bluesky import RunEngine
RE = RunEngine({})

RE

<bluesky.run_engine.RunEngine at 0x24694aafc50>

In [57]:
from bluesky.callbacks.best_effort import BestEffortCallback
bec = BestEffortCallback()

RE.subscribe(bec)

0

In [58]:
from databroker import Broker
db = Broker.named('temp')

RE.subscribe(db.insert)

Tiled version 0.1.0b29


1

In [59]:
sd = SupplementalData()
RE.preprocessors.append(sd)
sd.baseline

[]

In [60]:
class LookupPair(Device):
    pair_name = FCpt(EpicsSignal, "{self.name_postfix}", kind="config")
    pair_val = FCpt(EpicsSignal, "{self.val_postfix}", kind="config")
    
    def __init__(self, *args, name_postfix : str, val_postfix : str, **kwargs):
        self.name_postfix = name_postfix
        self.val_postfix = val_postfix
        super().__init__(**kwargs)

In [61]:
FakeLookupPair = make_fake_device(LookupPair)

<!-- defn = OrderedDict()

classDynamicTable(Device): -->



In [62]:
class LookupTable(Device):

    pair1 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.ONST", val_postfix = "Val:1-SP")
    pair2 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.TwST", val_postfix = "Val:2-SP")
    pair3 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.THST", val_postfix = "Val:3-SP")
    pair4 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.FOST", val_postfix = "Val:4-SP")
    pair5 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.FIST", val_postfix = "Val:5-SP")
    pair6 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.SIST", val_postfix = "Val:6-SP")
    pair7 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.SEST", val_postfix = "Val:7-SP")
    pair8 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.EIST", val_postfix = "Val:8-SP")
    pair9 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.NIST", val_postfix = "Val:9-SP")
    pair10 = Cpt(FakeLookupPair, name_postfix = "Pos-Sel.TEST", val_postfix = "Val:10-SP")
    

In [63]:
from collections import OrderedDict

defn = OrderedDict({
    "pair1": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.ONST", "val_postfix" : "Val:1-SP"}),
    "pair2": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.TWST", "val_postfix" : "Val:2-SP"}),
    "pair3": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.THST", "val_postfix" : "Val:3-SP"}),
    "pair4": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.FOST", "val_postfix" : "Val:4-SP"}),
    "pair5": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.FIST", "val_postfix" : "Val:5-SP"}),
    "pair6": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.SIST", "val_postfix" : "Val:6-SP"}),
    "pair7": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.SEST", "val_postfix" : "Val:7-SP"}),
    "pair8": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.EIST", "val_postfix" : "Val:8-SP"}),
    "pair9": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.NIST", "val_postfix" : "Val:9-SP"}),
    "pair10": (FakeLookupPair, "", {"name_postfix" : "Pos-Sel.TEST", "val_postfix" : "Val:10-SP"})
})


class DynamicLookup(Device):
    lookupPairs = DynamicDeviceComponent(defn)

In [64]:

FakeLookupTable = make_fake_device(LookupTable)
FakeDynamicTable = make_fake_device(DynamicLookup)

In [65]:
class CustomMotor(SynAxis):
    pos_lookup = Cpt(FakeDynamicTable, "")

    def __init__(self, name):
        super().__init__(name=name)


    def lookup_by_name(self, name: str) -> float:
        
        pair_lst = list(self.pos_lookup.lookupPairs.get())
        for pair in pair_lst:
            if pair.pair_name == name:
                return pair.pair_val       
            
        raise ValueError (f"Could not find {name} in lookup table")
    
    def get_all_positions(self):
        pair_lst = list(self.pos_lookup.lookupPairs.get())
        length = len(pair_lst)
        print(f"{length} possible positions:")
        print("----------------------------------")
        for pair in pair_lst:
            print(f'    {pair.pair_name:_<15} : {pair.pair_val}')


    def set(self, pos: str | float) -> MoveStatus:
        if isinstance(pos, str):
            val = self.lookup_by_name(pos)
        else:
            val = pos
        return super().set(val)


motor1 = CustomMotor("Y-Axis")
motor2 = CustomMotor("X-Axis")

In [66]:
# Create custom motor
my_motor = CustomMotor("my_motor")

sd.baseline.append(my_motor)

my_motor.pos_lookup.lookupPairs.pair1.pair_name.put("out")
my_motor.pos_lookup.lookupPairs.pair1.pair_val.put(50)
my_motor.pos_lookup.lookupPairs.pair2.pair_name.put("YAG")
my_motor.pos_lookup.lookupPairs.pair2.pair_val.put(-62)
my_motor.pos_lookup.lookupPairs.pair3.pair_name.put("Cu Block")
my_motor.pos_lookup.lookupPairs.pair3.pair_val.put(-74)
my_motor.pos_lookup.lookupPairs.pair4.pair_name.put("SrTiO3")
my_motor.pos_lookup.lookupPairs.pair4.pair_val.put(-76)
my_motor.pos_lookup.lookupPairs.pair5.pair_name.put("HEO-dark")
my_motor.pos_lookup.lookupPairs.pair5.pair_val.put(-79)
my_motor.pos_lookup.lookupPairs.pair6.pair_name.put("HEO-light")
my_motor.pos_lookup.lookupPairs.pair6.pair_val.put(-82)
my_motor.pos_lookup.lookupPairs.pair7.pair_name.put("MgAl2O4")
my_motor.pos_lookup.lookupPairs.pair7.pair_val.put(-86)
my_motor.pos_lookup.lookupPairs.pair8.pair_name.put("Si")
my_motor.pos_lookup.lookupPairs.pair8.pair_val.put(-89)
my_motor.pos_lookup.lookupPairs.pair9.pair_name.put("ZnSe")
my_motor.pos_lookup.lookupPairs.pair9.pair_val.put(-92)
my_motor.pos_lookup.lookupPairs.pair10.pair_name.put("Value 10")
my_motor.pos_lookup.lookupPairs.pair10.pair_val.put(0)

In [67]:
for elem in my_motor.pos_lookup.get():
    print (elem)

FakeLookuppairsTuple(pair1=FakeFakeLookupPairTuple(pair_name='out', pair_val=50), pair2=FakeFakeLookupPairTuple(pair_name='YAG', pair_val=-62), pair3=FakeFakeLookupPairTuple(pair_name='Cu Block', pair_val=-74), pair4=FakeFakeLookupPairTuple(pair_name='SrTiO3', pair_val=-76), pair5=FakeFakeLookupPairTuple(pair_name='HEO-dark', pair_val=-79), pair6=FakeFakeLookupPairTuple(pair_name='HEO-light', pair_val=-82), pair7=FakeFakeLookupPairTuple(pair_name='MgAl2O4', pair_val=-86), pair8=FakeFakeLookupPairTuple(pair_name='Si', pair_val=-89), pair9=FakeFakeLookupPairTuple(pair_name='ZnSe', pair_val=-92), pair10=FakeFakeLookupPairTuple(pair_name='Value 10', pair_val=0))


In [68]:
my_motor.get_all_positions()

10 possible positions:
----------------------------------
    out____________ : 50
    YAG____________ : -62
    Cu Block_______ : -74
    SrTiO3_________ : -76
    HEO-dark_______ : -79
    HEO-light______ : -82
    MgAl2O4________ : -86
    Si_____________ : -89
    ZnSe___________ : -92
    Value 10_______ : 0


In [69]:
print(my_motor.lookup_by_name("ZnSe"))

-92


In [70]:
RE(scan([my_motor], my_motor, 0.0, my_motor.lookup_by_name("out"), 5))



Transient Scan ID: 1     Time: 2025-06-20 10:07:10
Persistent Unique Scan ID: '4a69ce8d-8c08-4bd9-b20f-df6beb3638ab'
New stream: 'baseline'
Start-of-run baseline readings:
+--------------------------------+--------------------------------+
|                       my_motor | 0                              |
+--------------------------------+--------------------------------+
New stream: 'primary'
+-----------+------------+------------+
|   seq_num |       time |   my_motor |
+-----------+------------+------------+
|         1 | 10:07:11.1 |      0.000 |
|         2 | 10:07:11.3 |     12.500 |
|         3 | 10:07:11.4 |     25.000 |
|         4 | 10:07:11.4 |     37.500 |
|         5 | 10:07:11.4 |     50.000 |
+-----------+------------+------------+
generator scan ['4a69ce8d'] (scan num: 1)
End-of-run baseline readings:
+--------------------------------+--------------------------------+
|                       my_motor | 50.0                           |
+-------------------------------

('4a69ce8d-8c08-4bd9-b20f-df6beb3638ab',)

In [71]:
db[-1].table('baseline', fields=['my_motor'])

Unnamed: 0_level_0,my_motor,time
seq_num,Unnamed: 1_level_1,Unnamed: 2_level_1
1,0.0,2025-06-20 14:07:11.021893024
2,50.0,2025-06-20 14:07:11.520136595


In [72]:
RE(mv(my_motor, "Si"))
RE(count([my_motor]))



Transient Scan ID: 2     Time: 2025-06-20 10:07:11
Persistent Unique Scan ID: '6c715add-878b-4970-9efb-88a7c89d13ea'
New stream: 'baseline'
Start-of-run baseline readings:
+--------------------------------+--------------------------------+
|                       my_motor | -89.0                          |
+--------------------------------+--------------------------------+
New stream: 'primary'
+-----------+------------+------------+
|   seq_num |       time |   my_motor |
+-----------+------------+------------+
|         1 | 10:07:12.1 |    -89.000 |
+-----------+------------+------------+
generator count ['6c715add'] (scan num: 2)
End-of-run baseline readings:
+--------------------------------+--------------------------------+
|                       my_motor | -89.0                          |
+--------------------------------+--------------------------------+





('6c715add-878b-4970-9efb-88a7c89d13ea',)