In [4]:
# %load ../ipython_profiles/profile_6bma/startup/00-prep.py
# Setup script for using BlueSky at 6-BM-A
__file__ = "../ipython_profiles/profile_6bma/startup/00-prep.py"
_sep = u"🙈"*30
print(f'{_sep}\nInitializing IPython environment using {__file__}\n')

# -----
print('\nConfig the meta-data handler...\n')
from databroker import Broker
db = Broker.named("mongodb_config")

# -----
print('\nCreate RunEngine RE')
import bluesky
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback
print('*** subscribe both mongodb and callback to RE')
RE = RunEngine({})
RE.subscribe(db.insert)
RE.subscribe(BestEffortCallback())

print('*** add beamline specific meta-data')
import os
from datetime import datetime
import apstools
import ophyd
import socket
import getpass
HOSTNAME = socket.gethostname() or 'localhost'
USERNAME = getpass.getuser() or '6-BM-A user'
RE.md['beamline_id'] = 'APS 6-BM-A'
RE.md['proposal_id'] = 'internal test'
RE.md['pid'] = os.getpid()
RE.md['login_id'] = USERNAME + '@' + HOSTNAME
RE.md['BLUESKY_VERSION'] = bluesky.__version__
RE.md['OPHYD_VERSION'] = ophyd.__version__
RE.md['apstools_VERSION'] = apstools.__version__
RE.md['SESSION_STARTED'] = datetime.isoformat(datetime.now(), " ")

print("*** checking beam status")
import apstools.devices as APS_devices
aps = APS_devices.ApsMachineParametersDevice(name="APS")

from ophyd import EpicsSignalRO
instrument_in_use = lambda : EpicsSignalRO("6bm:instrument_in_use", 
                                           name="instrument_in_use",
                                ).get()
try:
    RE.md['INSTRUMENT_IN_USE'] = instrument_in_use()
except TimeoutError as e:
    print(f"---receving error: {e}---")
    print("---switching to offline test mode---")
    offline_testmode = True
    RE.md['INSTRUMENT_IN_USE'] = False
finally:
    from pprint import pprint
    print("\nCurrent registered metadata:")
    pprint(RE.md)
    print("***Update entries in RE.md if necessary")

import apstools.synApps_ophyd
calcs = apstools.synApps_ophyd.userCalcsDevice("6bma1:", name="calcs", )
hutch_light_on = lambda : bool(calcs.calc1.val.get())

# conducting experiment mode
if offline_testmode:
    in_production = False
else:
    in_production = aps.inUserOperations \
                and (instrument_in_use.get() in (1, "6-BM-A")) \
                and (not hutch_light_on)

# testing mode, supercede in_production
in_dryrun = True

print(f"Done with {__file__}\n{_sep}\n")

🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈
Initializing IPython environment using ../ipython_profiles/profile_6bma/startup/00-prep.py


Config the meta-data handler...



  return yaml.load(f)



Create RunEngine RE
*** subscribe both mongodb and callback to RE
*** add beamline specific meta-data
*** checking beam status
---receving error: Failed to connect to 6bm:instrument_in_use---
---switching to offline test mode---

Current registered metadata:
{'BLUESKY_VERSION': '1.5.0',
 'INSTRUMENT_IN_USE': False,
 'OPHYD_VERSION': '1.3.1',
 'SESSION_STARTED': '2019-04-22 13:05:47.514605',
 'apstools_VERSION': '2019.0321.1',
 'beamline_id': 'APS 6-BM-A',
 'login_id': 'chenzhang@chenz3-mbp.local',
 'pid': 9349,
 'proposal_id': 'internal test'}
***Update entries in RE.md if necessary
Done with ../ipython_profiles/profile_6bma/startup/00-prep.py
🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈



In [6]:
# %load ../ipython_profiles/profile_6bma/startup/01-devices.py
# define devices used for tomo scan at 6BM
__file__ = "../ipython_profiles/profile_6bma/startup/01-devices.py"
_sep = u"🙉"*30
print(f'{_sep}\nInitializing devices on Python end with {__file__}')

# ----
# Shutter
from bluesky.suspenders import SuspendFloor
print("Setting up Shutter")
if in_production:
    # define the real shutter used at 6BMA@APS
    # NOTE: 
    #   this requires connection to the hardware, otherwise a connection error will be raised

    A_shutter = APS_devices.ApsPssShutterWithStatus(
            "6bmb1:rShtrA:",
            "PA:06BM:STA_A_FES_OPEN_PL",
            name="A_shutter",
        )
    A_shutter.pss_state
    # no scans until A_shutter is open
    suspend_A_shutter = SuspendFloor(A_shutter.pss_state, 1)
    # NOTE:
    # since tomo scan take dark field images with shutter colosed, the
    # suspender installation for A_shutter is located in the plan for
    # granular control.
    
    # no scans if aps.current is too low
    suspend_APS_current = SuspendFloor(aps.current, 2, resume_thresh=10)
    RE.install_suspender(suspend_APS_current)

else:
    # for testing during dark time (no beam, shutter closed by APS)
    A_shutter = APS_devices.SimulatedApsPssShutterWithStatus(name="A_shutter")
    suspend_A_shutter = SuspendFloor(A_shutter.pss_state, 1)
    print("---use simulated shutter---")

# ----
# Motors
from ophyd import MotorBundle
from ophyd import Component
from ophyd import EpicsMotor

class TomoStage(MotorBundle):
    #rotation
    preci = Component(EpicsMotor, "6bmpreci:m1", name='preci')    
    samX = Component(EpicsMotor, "6bma1:m19", name='samX')
    samY = Component(EpicsMotor, "6bma1:m18", name="samY")

print("\nSetting up motors")
if in_production or in_dryrun:
    tomostage = TomoStage(name='tomostage')

    samx  = tomostage.samX
    samy  = tomostage.samY
    preci = tomostage.preci

else:
    tomostage = MotorBundle()
    tomostage.preci = sim.motor
    tomostage.samX = sim.motor
    tomostage.samY = sim.motor
    print("using simulated detectors")


# ----
# Area Detector
#   Initialize the area detector here, but allow user to update the results
#   later
import os
import datetime
from pathlib import Path, PureWindowsPath

print("\nSetting up area detector")
# production control ENV vars
ADPV_prefix = "1idPG2"   # AreaDetector prefix

config_experiment = {
    "OUTPUT_ROOT" : "Y:\\",     # The control os is windows...
    "CYCLE" : "2019-1",
    "EXPID" : "internal_apr19",
    "USER"  : "tomo",
    'SAMPLE' : "test",
}

def get_file_path(config_dict=config_experiment):
    return str(PureWindowsPath(Path("/".join([v for _,v in config_dict.items()])+"/")))+'\\'

FILE_PATH = get_file_path(config_experiment)
FILE_PREFIX = config_experiment['SAMPLE']

print("***Make sure update FILE_PATH and FILE_PREFIX before experiment")
print("***--- FILE_PATH can also be generated by passing config_experiment to get_file_path()")
print("***--- read the config dictionary config_experiment for more details")

# make area detector
from ophyd import AreaDetector
from ophyd import SingleTrigger
from ophyd import ADComponent
from ophyd import PointGreyDetectorCam
from ophyd import ProcessPlugin
from ophyd import TIFFPlugin
from ophyd import HDF5Plugin
from ophyd import sim

class PointGreyDetector6BM(SingleTrigger, AreaDetector):
    """Point Gray area detector used at 6BM"""
    # cam component
    cam = ADComponent(PointGreyDetectorCam, "cam1:")
    
    # proc plugin
    proc1 = ADComponent(ProcessPlugin, suffix="Proc1:")
    
    # tiff plugin
    tiff1 = ADComponent(
        TIFFPlugin,
        suffix="TIFF1:",
        )

    hdf1 = ADComponent(
            HDF5Plugin, 
            suffix="HDF1:",
            root='/',                          # for databroker
            write_path_template=FILE_PATH,     # for EPICS AD
        )

# Area Detector (AD) config block
# NOTE:  user need to update the entry in these dictionary before
#        running the experiment
config_cam = {
    "num_images":     1,           # number of images (nFrame)
    "image_mode":     "Multiple",  #
    "trigger_mode":   "Internal",  #
    "acquire_time":   0.05,        # exposure time (fExposureTime)
    "acquire_period": 0.05+0.01,   #
    "gain":           5,           # detector gain [0~30]
}

config_proc1 = {
    "enable":           1,  # toggle on proc1
    "enable_filter":    1,  # enable filter
    "num_filter":       5,  # change number_filtered in proc1 (same as nFrame)
    "reset_filter":     1,  # reset number_filtered
}

config_tiff1 = {
    "enable":           0,            # disable by default
    "nd_array_port":    "PROC1",      # switch port for TIFF plugin
    "file_write_mode":  "Stream",     # change write mode
    "auto_save":        "Yes",        # turn on file save
    "file_path":        FILE_PATH,    # set file path
    "file_name":        FILE_PREFIX,  # img name prefix
}

config_hdf1 = {
    "enable":          0,           # disable by default
    "nd_array_port":  "PROC1",      # switch port for TIFF plugin
    "auto_save":      "Yes",
    "file_path":      FILE_PATH,    # set file path
    "file_name":      FILE_PREFIX,  # img name prefix
}

if offline_testmode:
    det = sim.noisy_det  # use ophyd simulated detector
    print("***Ophyd virtual detector is used for offline dev")
else:
    det = PointGreyDetector6BM(f"{ADPV_prefix}:", name='det')

    # det.read_attrs.append('tiff1')  # this is very important
    # use det.read_attrs[-1] = 'hdf1' to switch to HDF5 output
    
    # catch timeout error in case detector not responding
    try:
        for k, v in config_cam.items():     det.cam.stage_sigs[k]   = v
        for k, v in config_proc1.items():   det.proc1.stage_sigs[k] = v
        for k, v in config_tiff1.items():   det.tiff1.stage_sigs[k] = v
        for k, v in config_hdf1.items():    det.hdf1.stage_sigs[k]  = v
        
    except TimeoutError as _exc:
        print(f"{_exc}\n !! Could not connect with area detector {det}")

    print(f"***Area Detector {det} is primed.")
    print("***Adjust settings (dict) in")
    print("***--- config_cam")
    print("***--- config_proc1")
    print("***--- config_tiff1")
    print("***--- config_hdf1")
    print("***before the acutal scan")


def set_output_type(output='tiff'):
    """config output"""
    # clear the watch list first
    # NOTE:
    #    det.read_attrs is treated as a gloal var, therefore the chnages
    #    made here affect the global workspace
    det.read_attrs = [me for me in det.read_attrs 
                        if me not in ('tiff1', 'hdf1')
                    ]
    if output.lower() in ['tif', 'tiff']:
        for k,v in {
            "enable":      1,
            "num_capture": n_images,
            "capture":     1,
        }.items(): det.tiff1.stage_sigs[k] = v
        det.hdf1.stage_sigs["enable"] = 0
        det.read_attrs.append('tiff1')
    elif output.lower() in ['hdf', 'hdf1', 'hdf5']:
        for k,v in {
            "enable":      1,
            "num_capture": n_images,
            "capture":     1,
        }.items(): det.hdf1.stage_sigs[k] = v
        det.tiff1.stage_sigs["enable"] = 0
        det.read_attrs.append('hdf1')
    else:
        raise ValueError(f"Unknown output format {output}")

print(f"Done with {__file__}\n{_sep}\n")


🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉
Initializing devices on Python end with ../ipython_profiles/profile_6bma/startup/01-devices.py
Setting up Shutter
---use simulated shutter---

Setting up motors

Setting up area detector
***Make sure update FILE_PATH and FILE_PREFIX before experiment
***--- FILE_PATH can also be generated by passing config_experiment to get_file_path()
***--- read the config dictionary config_experiment for more details
***Ophyd virtual detector is used for offline dev
Done with ../ipython_profiles/profile_6bma/startup/01-devices.py
🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉🙉



In [8]:
# %load ../ipython_profiles/profile_6bma/startup/02-plans.py
# prefined plans for tomo scan at 6bma
__file__ = "../ipython_profiles/profile_6bma/startup/02-plans.py"
_sep = u"🙊"*30
print(f'{_sep}\nCaching pre-defined scan plans with {__file__}')

import numpy                 as np
import bluesky.plans         as bp
import bluesky.preprocessors as bpp
import bluesky.plan_stubs    as bps


# ----
# Dark/White flat field
# NOTE:
#    det is a global var that refers to the detector
def collect_background(n_images, 
                       n_frames,
                       output='tiff',
    ):
    """collect n_images backgrounds with n_frames per take"""
    yield from bps.mv(det.cam.acquire, 0)

    set_output_type(output)
    
    for k,v in {
        "reset_filter":     1,
        "num_filter":       n_frames,
    }.items(): det.proc1.stage_sigs[k] = v
        
    for k,v in {
        "trigger_mode": "Internal",
        "image_mode":   "Multiple",
        "num_images":   n_frames,
    }.items(): det.cam.stage_sigs[k] = v

    @bpp.stage_decorator([det])
    @bpp.run_decorator()
    def scan():
        yield from bps.trigger_and_read([det]) 
    
    return (yield from scan())


# ----
# Projections (step)
def step_scan(n_images, 
              n_frames,
              angs,           # list of angles where images are taken
              output='tiff',
    ):
    """collect proejctions by stepping motors"""
    set_output_type(output)

    for k, v in {
        "enable":           1,         # toggle on proc1
        "reset_filter":     1,         # reset number_filtered
        "num_filter":       n_frames,
    }.items(): det.proc1.stage_sigs[k] = v
    
    for k, v in {
        "num_images":   n_frames,      
    }.items(): det.cam.stage_sigs[k] = v

    @bpp.stage_decorator([det])
    @bpp.run_decorator()
    def scan_closure():
        for ang in angs:
            yield from bps.checkpoint()
            yield from bps.mv(preci, ang)
            yield from bps.trigger_and_read([det])
    
    return (yield from scan_closure())


# ----
# Projections (fly)
def fly_scan():
    """collect projections using fly scan feature"""
    pass


# ----
# Example bundled tomo characterization scan
config_tomo_step = {
    "n_white"        :  10,
    "n_dark"         :  10,
    "samOutDist"     : -5.00,           # mm
    "omega_step"     :  0.25,           # degrees
    "acquire_time"   :  0.05,           # sec
    "acquire_period" :  0.05+0.01,      # sec
    "time_wait"      : (0.05+0.01)*2,   # sec
    "omega_start"    :  -180,           # degrees
    "omega_end"      :  180,            # degrees
    "n_frames"       :  5,              # proc.n_filters, cam.n_images
    "output"         : "tiff",          # output format ['tiff', 'hdf5']
}
def tomo_step(config_dict):
    """
    The master plan pass to RE for
    
    1. pre-white-field background collection
    2. projection collection
    3. post-white-field background collection
    4. post-dark-field background collection

    NOTE:
    see config_tomo_step for key inputs
    """
    pass



# ----
# Fly scan bundle example
config_tomo_step = {
    "n_white"        :  10,
    "n_dark"         :  10,
}
def tomo_fly(config_dict):
    """
    The master plan pass to RE for
    
    1. pre-white-field background collection
    2. projection collection
    3. post-white-field background collection
    4. post-dark-field background collection

    NOTE:
    see config_tomo_fly for key inputs
    """
    pass


print(f"Done with {__file__}\n{_sep}\n")


🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊
Caching pre-defined scan plans with ../ipython_profiles/profile_6bma/startup/02-plans.py
Done with ../ipython_profiles/profile_6bma/startup/02-plans.py
🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊🙊

