-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
2,557 additions
and
172 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,199 +1,116 @@ | ||
|
||
import math | ||
import numpy as np | ||
import socket | ||
import time | ||
|
||
# IOC_PREFIX = "gov:" | ||
# IOC_PREFIX = "xxx:" | ||
IOC_PREFIX = "prj:" | ||
# Import matplotlib and put it in interactive mode. | ||
import matplotlib.pyplot as plt | ||
plt.ion() | ||
|
||
import numpy as np | ||
from ophyd.scaler import ScalerCH | ||
from ophyd import EpicsMotor, Signal, Component, Device | ||
from APS_BlueSky_tools.devices import use_EPICS_scaler_channels, AxisTunerMixin | ||
from ophyd import EpicsMotor, EpicsSignal | ||
from bluesky import RunEngine | ||
from bluesky.callbacks.best_effort import BestEffortCallback | ||
import bluesky.plans as bp | ||
# Make plots update live while scans run. | ||
from bluesky.utils import install_qt_kicker | ||
install_qt_kicker() | ||
|
||
from APS_BlueSky_tools.devices import use_EPICS_scaler_channels, AxisTunerMixin | ||
from APS_BlueSky_tools.plans import TuneAxis | ||
from APS_BlueSky_tools.synApps_ophyd import userCalcsDevice, swait_setup_lorentzian | ||
|
||
import datetime | ||
import time | ||
from bluesky.callbacks.fitting import PeakStats | ||
from bluesky import preprocessors as bpp | ||
from bluesky import plan_stubs as bps | ||
from bluesky import plan_patterns as bpat | ||
|
||
from bluesky.utils import ts_msg_hook | ||
class TunableEpicsMotor(EpicsMotor, AxisTunerMixin): | ||
pass | ||
|
||
# Import matplotlib and put it in interactive mode. | ||
import matplotlib.pyplot as plt | ||
plt.ion() | ||
class MyTuneAxis(TuneAxis): pass | ||
# override the default tune() method | ||
|
||
# Make plots update live while scans run. | ||
from bluesky.utils import install_qt_kicker | ||
install_qt_kicker() | ||
|
||
def myCallback(key, doc): | ||
# if key in (" start", " descriptor", " event", " stop"): | ||
print("-"*20) | ||
for k, v in doc.items(): | ||
print("\t", key, k, v) | ||
|
||
|
||
def setRandomPeak(calc, motor): | ||
swait_setup_lorentzian( | ||
calc, | ||
motor, | ||
center = -1.5 + 0.5*np.random.uniform(), | ||
noise = 0.2 + 0.2*np.random.uniform(), | ||
width = 0.001 + 0.05*np.random.uniform(), | ||
scale = 1e5, | ||
) | ||
|
||
|
||
hostname = socket.gethostname() | ||
|
||
HOST_PV_PREFIX_DICT = { | ||
'otz.aps.anl.gov': "gov:", | ||
'mint-vm': "prj:", | ||
'poof': "prj:", | ||
'enoki': "prj:", | ||
} | ||
IOC_PREFIX = HOST_PV_PREFIX_DICT.get(hostname, "xxx:") | ||
|
||
|
||
RE = RunEngine({}) | ||
RE.subscribe(BestEffortCallback()) | ||
|
||
m1 = EpicsMotor(IOC_PREFIX+"m1", name="m1") | ||
scaler = ScalerCH(IOC_PREFIX+"scaler1", name="scaler") | ||
calcs = userCalcsDevice(IOC_PREFIX, name="calcs") | ||
|
||
time.sleep(1) | ||
scaler.channels.chan01.chname.put("clock") | ||
scaler.channels.chan02.chname.put("I0") | ||
scaler.channels.chan03.chname.put("scint") | ||
scaler.channels.chan04.chname.put("Jake") | ||
scaler.channels.chan05.chname.put("") | ||
scaler.preset_time.put(0.3) | ||
|
||
scaler.match_names() | ||
use_EPICS_scaler_channels(scaler) | ||
for k, v in scaler.read().items(): | ||
print(k, v) | ||
print("scaler.preset_time", scaler.preset_time.value) | ||
# RE.msg_hook = ts_msg_hook | ||
|
||
|
||
class TunableEpicsMotor(EpicsMotor, AxisTunerMixin): | ||
pass | ||
|
||
|
||
class MyTuneAxis(TuneAxis): | ||
|
||
|
||
# override the default tune() method | ||
|
||
def tune(self, width=None, num=None, md=None): | ||
""" | ||
BlueSky plan to execute one pass through the current scan range | ||
Scan self.axis centered about current position from | ||
``-width/2`` to ``+width/2`` with ``num`` observations. | ||
If a peak was detected (default check is that max >= 4*min), | ||
then set ``self.tune_ok = True``. | ||
PARAMETERS | ||
width : float | ||
width of the tuning scan in the units of ``self.axis`` | ||
Default value in ``self.width`` (initially 1) | ||
num : int | ||
number of steps | ||
Default value in ``self.num`` (initially 10) | ||
md : dict, optional | ||
metadata | ||
""" | ||
width = width or self.width | ||
num = num or self.num | ||
|
||
if self.peak_choice not in self._peak_choices_: | ||
msg = "peak_choice must be one of {}, geave {}" | ||
msg = msg.format(self._peak_choices_, self.peak_choice) | ||
raise ValueError(msg) | ||
|
||
initial_position = self.axis.position | ||
final_position = initial_position # unless tuned | ||
start = initial_position - width/2 | ||
finish = initial_position + width/2 | ||
self.tune_ok = False | ||
|
||
tune_md = dict( | ||
width = width, | ||
initial_position = self.axis.position, | ||
time_iso8601 = str(datetime.datetime.now()), | ||
) | ||
_md = {'tune_md': tune_md, | ||
'plan_name': self.__class__.__name__ + '.tune', | ||
'tune_parameters': dict( | ||
num = num, | ||
width = width, | ||
initial_position = self.axis.position, | ||
peak_choice = self.peak_choice, | ||
x_axis = self.axis.name, | ||
y_axis = self.signal_name, | ||
) | ||
} | ||
_md.update(md or {}) | ||
if "pass_max" not in _md: | ||
self.stats = [] | ||
self.peaks = PeakStats(x=self.axis.name, y=self.signal_name) | ||
|
||
class Results(Device): | ||
"""because bps.read() needs a Device or a Signal)""" | ||
tune_ok = Component(Signal) | ||
initial_position = Component(Signal) | ||
final_position = Component(Signal) | ||
center = Component(Signal) | ||
# - - - - - | ||
x = Component(Signal) | ||
y = Component(Signal) | ||
cen = Component(Signal) | ||
com = Component(Signal) | ||
fwhm = Component(Signal) | ||
min = Component(Signal) | ||
max = Component(Signal) | ||
crossings = Component(Signal) | ||
peakstats_attrs = "x y cen com fwhm min max crossings".split() | ||
|
||
def report(self): | ||
keys = self.peakstats_attrs + "tune_ok center initial_position final_position".split() | ||
for key in keys: | ||
print("{} : {}".format(key, getattr(self, key).value)) | ||
|
||
@bpp.subs_decorator(self.peaks) | ||
def _scan(): | ||
yield from bps.open_run() | ||
|
||
position_list = np.linspace(start, finish, num) | ||
signal_list = list(self.signals) | ||
signal_list += [self.axis,] | ||
# TODO: need some hints to show the Y axis | ||
for pos in position_list: | ||
yield from bps.mv(self.axis, pos) | ||
yield from bps.trigger_and_read(signal_list) | ||
|
||
final_position = initial_position | ||
if self.peak_detected(): | ||
self.tune_ok = True | ||
if self.peak_choice == "cen": | ||
final_position = self.peaks.cen | ||
elif self.peak_choice == "com": | ||
final_position = self.peaks.com | ||
else: | ||
final_position = None | ||
self.center = final_position | ||
|
||
# add stream with results | ||
# yield from add_results_stream() | ||
stream_name = "PeakStats" | ||
results = Results(name=stream_name) | ||
|
||
for key in "tune_ok center".split(): | ||
getattr(results, key).put(getattr(self, key)) | ||
results.final_position.put(final_position) | ||
results.initial_position.put(initial_position) | ||
for key in results.peakstats_attrs: | ||
getattr(results, key).put(getattr(self.peaks, key)) | ||
|
||
yield from bps.create(name=stream_name) | ||
yield from bps.read(results) | ||
yield from bps.save() | ||
|
||
yield from bps.mv(self.axis, final_position) | ||
self.stats.append(self.peaks) | ||
yield from bps.close_run() | ||
|
||
results.report() | ||
|
||
return (yield from _scan()) | ||
|
||
|
||
def myCallback(key, doc): | ||
if key in (" start", " descriptor", " event", " stop"): | ||
print("-"*20) | ||
for k, v in doc.items(): | ||
print("\t", key, k, v) | ||
|
||
# change soft motor resolution from 200 steps/rev to 8000 (steps of 0.00025) | ||
_srev = EpicsSignal(m1.prefix+".SREV", name="_srev") | ||
_srev.put(8000) | ||
|
||
m1 = TunableEpicsMotor(IOC_PREFIX+"m1", name="m1") | ||
m1.tuner = MyTuneAxis([scaler], m1, signal_name="scint") | ||
m1.tuner.width = 0.02 | ||
m1.tuner.num = 21 | ||
|
||
noisy_calc = calcs.calc1 | ||
setRandomPeak(noisy_calc, m1) | ||
print("programmed peak signal: {}".format(calcs.calc1.channels.D.value.value)) | ||
print("programmed center: {}".format(calcs.calc1.channels.B.value.value)) | ||
sigma = calcs.calc1.channels.C.value.value | ||
fwhm = 2*math.sqrt(2* sigma**2 * math.log(2)) | ||
print("programmed sigma: {}".format(sigma)) | ||
print("programmed FWHM: {}".format(fwhm)) | ||
print("programmed noise: {}".format(calcs.calc1.channels.E.value.value)) | ||
noisy = EpicsSignal(noisy_calc.prefix, name="noisy") | ||
|
||
m1.tuner = TuneAxis([noisy], m1, signal_name=noisy.name) | ||
# m1.tuner = MyTuneAxis([det], m1, signal_name="det") | ||
m1.tuner.width = 3 | ||
m1.tuner.num = 41 | ||
|
||
m1.move(0) | ||
|
||
#RE(bp.count([scaler])) | ||
#RE(bp.scan([scaler], m1, -1, 1, 5)) | ||
# RE(bp.scan([noisy], m1, -2, 2, 5)) | ||
RE(m1.tune(), myCallback) | ||
|
||
# m1.tuner.width /= 10 | ||
# RE(m1.tune(), myCallback) | ||
# | ||
# m1.tuner.width /= 10 | ||
# RE(m1.tune(), myCallback) | ||
|
||
# time.sleep(60) |
Oops, something went wrong.