In [18]:
import bluesky
import bluesky.plan_stubs as bps
from apstools.synApps import SscanRecord
from ophyd import EpicsMotor
from apstools.plans import run_blocking_function
from ophyd.status import Status


In [3]:
IOC = "2idsft:"
scan1 = SscanRecord(f"{IOC}scan1", name="scan1")
scan1.wait_for_connection()

In [14]:
m_prefix = "2idbtst:"
m1 = EpicsMotor(f"{m_prefix}m1", name="m1")
m1.wait_for_connection()

In [12]:
def setup_scan1(m1, start, finish, npts, ct=1):
    # yield from bps.mv(scaler1.preset_time, ct)  # counting time/point
    yield from run_blocking_function(scan1.reset)
    yield from bps.sleep(0.2)  # arbitrary wait for EPICS to finish the reset.

    # positioners
    yield from bps.mv(
        scan1.number_points, npts,
        scan1.positioners.p1.start, start,
        scan1.positioners.p1.end, finish,
        scan1.positioners.p1.readback_pv, m1.user_readback.pvname,
        scan1.positioners.p1.setpoint_pv, m1.user_setpoint.pvname,
    )
    # # triggers (push scaler count button at each point)
    # yield from bps.mv(
    #     scan1.triggers.t1.trigger_pv, scaler1.count.pvname,
    # )
    # # detectors
    # yield from bps.mv(
    #     scan1.detectors.d01.input_pv, I0.pvname,
    #     scan1.detectors.d02.input_pv, diode.pvname,
    # )

In [None]:
def setup_scan1_2D(m1, start, finish, m2, s2, f2, p2, ct=1):
    # yield from bps.mv(scaler1.preset_time, ct)  # counting time/point
    yield from run_blocking_function(scan1.reset)
    yield from bps.sleep(0.2)  # arbitrary wait for EPICS to finish the reset.

    # positioners
    yield from bps.mv(
        scan1.number_points, npts,
        scan1.positioners.p1.start, start,
        scan1.positioners.p1.end, finish,
        scan1.positioners.p1.readback_pv, m1.user_readback.pvname,
        scan1.positioners.p1.setpoint_pv, m1.user_setpoint.pvname,
    )
    # # triggers (push scaler count button at each point)
    # yield from bps.mv(
    #     scan1.triggers.t1.trigger_pv, scaler1.count.pvname,
    # )
    # # detectors
    # yield from bps.mv(
    #     scan1.detectors.d01.input_pv, I0.pvname,
    #     scan1.detectors.d02.input_pv, diode.pvname,
    # )

In [15]:
def run_sscan():
    # Create an instance of the Status object.
    # If it is not marked as finished within 20s, it will raise a timeout exception.
    st = Status(timeout=20)

    def watch_execute_scan(old_value, value, **kwargs):
        # Watch for scan1.EXSC to change from 1 to 0 (when the scan ends).
        if old_value == 1 and value == 0:
            # mark as finished (successfully).
            st.set_finished()
            # Remove the subscription.
            scan1.execute_scan.clear_sub(watch_execute_scan)

    yield from bps.mv(scan1.execute_scan, 1)
    scan1.execute_scan.subscribe(watch_execute_scan)
    yield from run_blocking_function(st.wait)

In [33]:
# RE = bluesky.RunEngine()
a = RE(setup_scan1(m1, -0.5, 0.5, 100))

In [34]:
# Run scan
j = RE(run_sscan())

In [31]:
# This command interacts with pv via ophyd directly, without bluesky RE
scan1.execute_scan.value = 1