## Stub Plans 

### Links Úteis

- Documentação dos plan stubs: https://blueskyproject.io/bluesky/main/plans.html#stub-plans

- Documentação da customização de planos com per_step: https://blueskyproject.io/bluesky/main/plans.html#customize-step-scans-with-per-step

- Documentação da utilização de metadados: https://blueskyproject.io/bluesky/main/metadata.html

- Documentação dos preprocessors: https://blueskyproject.io/bluesky/main/plans.html#plan-preprocessors

In [None]:
%matplotlib qt
from time import time
from bluesky import RunEngine
from bluesky.plans import count, scan
from bluesky.callbacks import LiveTable
from bluesky.plan_stubs import abs_set, rel_set, mv, mvr, \
    stage, unstage, stop, pause, deferred_pause, sleep, \
    checkpoint, clear_checkpoint, repeat, trigger_and_read, \
    open_run, close_run
from ophyd import Device, Component, ADComponent
from ophyd.sim import noisy_det, motor1, motor2, SynGauss, \
    FakeEpicsSignalWithRBV, FakeEpicsSignal, SynAxis
from matplotlib.pyplot import ion
from bluesky.utils import install_nb_kicker
install_nb_kicker()

ion()
RE = RunEngine()

#### Getting values from ophyd objects

In [None]:
print(f"Read: {motor1.read()}")
print()
print(f"Configuration: {motor1.read_configuration()}")
print()
print(f"Get Device: {motor1.get()}")
print()
print(f"Get Readback: {motor1.readback.get()}")

## mv and abs_set

In [None]:
#help(abs_set)
#help(mv)

In [None]:
RE(mv(motor1, 0))
motor1.delay = 2
print(f"Before: {motor1.read()}")
RE(abs_set(motor1, 15))
print(f"After: {motor1.read()}")
motor1.delay = 0

In [None]:
RE(mv(motor1, 0))
motor1.delay = 2
print(f"Before: {motor1.read()}")
RE(abs_set(motor1, 15, wait=True))
print(f"After: {motor1.read()}")
motor1.delay = 0

In [None]:
RE(mv(motor1, 0, motor2, 0))
motor1.delay = 2
motor2.delay = 4
print(f"Before : {motor1.readback.get()}")
print(f"Before motor2: {motor1.readback.get()}")
start = time()
RE(mv(motor1, 4, motor2, 15))
end = time()
print(f"After motor1: {motor1.readback.get()}")
print(f"After motor2: {motor2.readback.get()}")
print(f"Duration: {end - start}")
motor1.delay = 0
motor2.delay = 0

In [None]:
class SimAreaDetector(Device):
    
    value = ADComponent(SynGauss, motor=motor1, motor_field="motor1", center=160, 
        Imax=20, sigma=100, noise="uniform", noise_multiplier=0.1, kind="hinted")
    
    trigger_mode = ADComponent(FakeEpicsSignalWithRBV, "TriggerMode", string=True, kind="config")
    file_name = ADComponent(FakeEpicsSignalWithRBV, "FileName", string=True, kind="config")
    file_path = ADComponent(FakeEpicsSignalWithRBV, "FilePath", string=True, kind="config")

    file_template = ADComponent(FakeEpicsSignalWithRBV, "FileTemplate", string=True, kind="config")
    auto_increment = ADComponent(FakeEpicsSignalWithRBV, "AutoIncrement", string=True, kind="config")
    auto_save = ADComponent(FakeEpicsSignalWithRBV, "AutoSave", string=True, kind="config")
    
    num_capture = ADComponent(FakeEpicsSignalWithRBV, "NumCapture", kind="config")
    num_exposures = ADComponent(FakeEpicsSignalWithRBV, "NumExposures", kind="config")
    
    acquire_time = ADComponent(FakeEpicsSignalWithRBV, "AcquireTime", kind="config")
    acquire_period = ADComponent(FakeEpicsSignalWithRBV, "AcquirePeriod", kind="config")

In [None]:
def area_detector_configuration(
        cam: any, acquire_time: float, num_exposures: int,
        file_name: str, file_path: str, det_readout: float = 0.01):
    
    yield from mv(cam.auto_save, "Yes")
    yield from mv(cam.auto_increment, "Yes")
    yield from mv(cam.file_template, "%s%s_%3.3d.hdf5")
    yield from mv(cam.trigger_mode, "External")

    yield from mv(cam.num_capture, num_exposures)
    yield from mv(cam.num_exposures, num_exposures)
        
    yield from mv(cam.acquire_time, acquire_time)
    yield from mv(cam.acquire_period, acquire_time + det_readout)
    
    yield from mv(cam.file_name, file_name)
    yield from mv(cam.file_path, file_path)

In [None]:
def get_values():
    attr_dict = {}
    for attr_name, values in area_detector_cam.read_configuration().items():
        attr_dict[attr_name] = values["value"]
    return attr_dict

area_detector_cam = SimAreaDetector("SIM:", name="area_detector_cam")

before_config = get_values()
RE(area_detector_configuration(area_detector_cam, 0.1, 15, "acquisiton_test", "/ibira/test"))
after_config = get_values()

for key in after_config.keys():
    print(f"{key}: {before_config[key]} -> {after_config[key]}")

In [None]:
def area_detector_configuration_simplified(
        cam: any, acquire_time: float, num_exposures: int,
        file_name: str, file_path: str, det_readout: float = 0.01):
    
    yield from mv(
        cam.auto_save, "Yes",
        cam.auto_increment, "Yes",
        cam.file_template, "%s%s_%3.3d.hdf5",
        cam.trigger_mode, "External",
        cam.num_capture, num_exposures,
        cam.num_exposures, num_exposures,
        cam.file_name, file_name,
        cam.file_path, file_path,
        cam.acquire_time, acquire_time
    )
    
    yield from mv(cam.acquire_period, acquire_time + det_readout)
    

In [None]:
def get_values():
    attr_dict = {}
    for attr_name, values in area_detector_cam.read_configuration().items():
        attr_dict[attr_name] = values["value"]
    return attr_dict

area_detector_cam = SimAreaDetector("SIM:", name="area_detector_cam")

before_config = get_values()
RE(area_detector_configuration_simplified(area_detector_cam, 0.1, 15, "acquisiton_test", "/ibira/test"))
after_config = get_values()

for key in after_config.keys():
    print(f"{key}: {before_config[key]} -> {after_config[key]}")

## mvr and rel_set

In [None]:
#help(mvr)
#help(rel_set)

In [None]:
RE(mv(motor1, 14))
motor1.delay = 2
print(f"Before: {motor1.readback.get()}")
RE(rel_set(motor1, 15))
print(f"After: {motor1.readback.get()}")
motor1.delay = 0

In [None]:
RE(mv(motor1, 14))
motor1.delay = 2
print(f"Before: {motor1.readback.get()}")
RE(mvr(motor1, 15))
print(f"After +15: {motor1.readback.get()}")
RE(mvr(motor1, -10))
print(f"After -10: {motor1.readback.get()}")
RE(mvr(motor1, 2.5))
print(f"After +2.5: {motor1.readback.get()}")
motor1.delay = 0

In [None]:
def scan_and_rotate(detector: any, rx: any, ux: any):
    yield from mv(rx, 0)
    
    for delta_angle in range(0, 10):
        yield from scan([pimega_cam.value], ux, -1, 1, 5)
        yield from mvr(rx, 36)
        
        print(f"Motor Rx: {rx.readback.read()}")

In [None]:
area_detector_cam = SimAreaDetector("SIM:", name="area_detector_cam")

RE(scan_and_rotate(pimega_cam, motor1, motor2), LiveTable(['area_detector_cam_value']))

## sleep and stop

In [None]:
#help(sleep)
#help(stop)

In [None]:
class StopDevice(Device):

    def stop(self):
        super().stop()
        print("This Device has been stopped")

In [None]:
stop_device = StopDevice(name="stop_device")
RE(stop(stop_device))

In [None]:
start = time()
RE(sleep(1))
end = time()
print(f"Duration: {end - start}")

In [None]:
class SimTatu(Device):
    
    tatu_stop = Component(FakeEpicsSignal, "Stop", kind="config")
    
    def stop(self):
        print("Stopping TATU!")
        self.tatu_stop.set(1).wait()
        print("TATU has been stopped!")
        
tatu = SimTatu("SIM:", name="tatu")
print(tatu.tatu_stop.get())
RE(stop(tatu))
print(tatu.tatu_stop.get())

## stage and unstage

In [None]:
#help(stage)
#help(unstage)

In [None]:
class StagedDevice(Device):

    def stage(self):
        super().stage()
        print("This Device is being armed")

    def unstage(self):
        super().unstage()
        print("This Device is being restored to a safe idle state")

In [None]:
staged_device = StagedDevice(name="staged_device")

def stage_unstage_plan():
    yield from stage(staged_device)
    
    # Custom plan
    print("Custom plan")
    
    yield from unstage(staged_device)

RE(stage_unstage_plan())

In [None]:
class KinematicMotor(SynAxis):
        
    enable_real_motor1 = Component(FakeEpicsSignal, "CND", kind="config", auto_monitor=True)
    enable_real_motor2 = Component(FakeEpicsSignal, "ABC", kind="config", auto_monitor=True)

    def stage(self):
        super().stage()
        print("The real motors are being enabled for the kinematic motion")
        self.enable_real_motor1.set(1).wait()
        self.enable_real_motor2.set(1).wait()
        print(self.read_configuration())

    def unstage(self):
        super().unstage()
        print("The real motors are being disabled for the kinematic motion")
        self.enable_real_motor1.set(0).wait()
        self.enable_real_motor2.set(0).wait()
        print(self.read_configuration())

In [None]:
kin_motor = KinematicMotor(name="kin_motor")
area_detector_cam = SimAreaDetector("SIM:", name="area_detector_cam")

def stage_unstage_plan():
    yield from stage(kin_motor)
    
    # Custom plan
    yield from mv(kin_motor, 10)
    
    yield from unstage(kin_motor)

RE(stage_unstage_plan())

## Controlling the Run Engine 

In [None]:
#help(checkpoint)
#help(clear_checkpoint)
#help(pause)
#help(deferred_pause)
#help(open_run)
#help(close_run)

### Pause

In [None]:
def pause_plan():
    yield from open_run()
    yield from mv(motor1, 5)
    yield from trigger_and_read([noisy_det])
    yield from checkpoint()
    
    yield from pause()
    
    yield from mv(motor1, 4)
    yield from trigger_and_read([noisy_det])
    yield from checkpoint()
    
    yield from mv(motor1, 14)
    yield from trigger_and_read([noisy_det])
    yield from checkpoint()
    
    yield from close_run()

In [None]:
RE(pause_plan(), LiveTable(["noisy_det"]))

In [None]:
print(RE.state)
RE.resume()

### Deferred Pause

In [None]:
def deferred_pause_plan():
    yield from open_run()
    yield from mv(motor1, 5)
    yield from trigger_and_read([noisy_det])
    yield from checkpoint()
    
    yield from deferred_pause()
    
    yield from mv(motor1, 4)
    yield from trigger_and_read([noisy_det])
    yield from checkpoint()
    
    yield from mv(motor1, 14)
    yield from trigger_and_read([noisy_det])
    yield from checkpoint()
    
    yield from close_run()

In [None]:
RE(deferred_pause_plan(), LiveTable(["noisy_det"]))

In [None]:
print(RE.state)
RE.resume()

### clear_checkpoint

In [None]:
def clear_checkpoint_plan():
    yield from open_run()
    yield from mv(motor1, 5)
    yield from trigger_and_read([noisy_det])
    yield from clear_checkpoint()
    
    yield from pause()
    
    yield from mv(motor1, 4)
    yield from trigger_and_read([noisy_det])
    yield from checkpoint()
    
    yield from mv(motor1, 14)
    yield from trigger_and_read([noisy_det])
    yield from checkpoint()
    
    yield from close_run()

In [None]:
RE(clear_checkpoint_plan(), LiveTable(["noisy_det"]))

In [None]:
RE.state

## repeat

In [None]:
help(repeat)

In [None]:
def repeat_count():
    yield from count([noisy_det], num=5, delay=0.25)
RE(repeat(repeat_count, num=3, delay=3), LiveTable(["noisy_det"]))