diff --git a/CHANGES.rst b/CHANGES.rst index fda58dd04..df097d33b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -30,6 +30,11 @@ describe future plans. release expected by 2024-04-02 +New Features +------------ + +* Add new plan for edge alignment called edge_align + Fixes ----- diff --git a/apstools/plans/__init__.py b/apstools/plans/__init__.py index 1337677a5..7e6770c52 100644 --- a/apstools/plans/__init__.py +++ b/apstools/plans/__init__.py @@ -2,6 +2,7 @@ from .alignment import TuneResults from .alignment import lineup from .alignment import lineup2 +from .alignment import edge_align from .alignment import tune_axes from .command_list import CommandFileReadError from .command_list import command_list_as_table diff --git a/apstools/plans/alignment.py b/apstools/plans/alignment.py index ce5b19f71..f96881708 100644 --- a/apstools/plans/alignment.py +++ b/apstools/plans/alignment.py @@ -16,6 +16,10 @@ import numpy as np import pyRestTable +from scipy.optimize import curve_fit +from scipy.special import erf +from scipy.signal import find_peaks + from bluesky import plan_stubs as bps from bluesky import plans as bp from bluesky import preprocessors as bpp @@ -135,7 +139,8 @@ def peak_analysis(): if det0.name not in bec.peaks[feature]: logger.error( - "No statistical analysis of scan peak for feature '%s'!" " (bec.peaks=%s, bec=%s)", + "No statistical analysis of scan peak for feature '%s'!" + " (bec.peaks=%s, bec=%s)", feature, bec.peaks, bec, @@ -213,10 +218,165 @@ def peak_analysis(): scaler.stage_sigs = old_sigs +def edge_align(detectors, mover, start, end, points, cat=None, md={}): + """ + Align to the edge given mover & detector data, relative to absolute position. + + This plan can be used in the queueserver, Jupyter notebooks, and IPython + consoles. + + PARAMETERS + ---------- + detectors *Readable* or [*Readable*]: + Detector object or list of detector objects (each is a Device or + Signal). + + mover *Movable*: + Mover object, such as motor or other positioner. + + start *float*: + Starting point for the scan. This is an absolute motor location. + + end *float*: + Ending point for the scan. This is an absolute motor location. + + points *int*: + Number of points in the scan. + + cat *databroker.temp().v2*: + Catalog where bluesky data is saved and can be retrieved from. + + md *dict*: + User-supplied metadata for this scan. + """ + + def guess_erf_params(x_data, y_data): + """ + Provide an initial guess for the parameters of an error function. + + Parameters + ---------- + x_data : A numpy array of the values on the x_axis + y_data : A numpy array of the values on the y_axis + + Returns + ------- + guess : dict + A dictionary containing the guessed parameters 'low_y_data', 'high_y_data', 'width', and 'midpoint'. + """ + + # Sort data to make finding the mid-point easier and to assist in other estimations + y_data_sorted = np.sort(y_data) + x_data_sorted = np.sort(x_data) + + # Estimate low and high as the first and last elements (assuming sorted data) + low_y_data = np.min(y_data_sorted) + high_y_data = np.max(y_data_sorted) + + low_x_data = np.min(x_data_sorted) + high_x_data = np.max(x_data_sorted) + + # Estimate wid as a fraction of the range. This is very arbitrary and might need tuning! + width = ( + (high_x_data - low_x_data) / 10 + ) # This is a guess and might need adjustment based on your data's characteristics + + # Estimate the midpoint of the x values + midpoint = x_data[int(len(x_data) / 2)] + + return [low_y_data, high_y_data, width, midpoint] + + def erf_model(x, low, high, width, midpoint): + """ + Create error function for fitting and simulation + + Parameters + ---------- + x : input upon which error function is evaluated + low : min value of error function + high : max value of error function + width : "spread" of error function transition region + midpoint: location of error function's "center" + """ + return (high - low) * 0.5 * (1 - erf((x - midpoint) / width)) + low + + def check_signal_change(y_data, window_size=5, std_multiplier=5): + """ + Checks if the signal has a significant change or if it's just noise. + + Parameters + ---------- + x_data : numpy.array + The x-axis values of the signal. + y_data : numpy.array + The y-axis values of the signal. + window_size : int + The size of the window for the moving average filter. + std_multiplier : float + The multiplier for the standard deviation to set the threshold. + + Returns + ------- + bool + True if a significant change is detected, False otherwise. + """ + # Smooth the data + y_smoothed = np.convolve( + y_data, np.ones(window_size) / window_size, mode="valid" + ) + + print(f"y_0: {y_smoothed[0]}") + + # Calculate the standard deviation of the smoothed data + std_dev = np.std(y_smoothed) + print(std_dev) + + # Find peaks and troughs which are above/below the std_multiplier * std_dev + peaks, _ = find_peaks( + y_smoothed, height=y_smoothed[0] + std_multiplier * std_dev + ) + troughs, _ = find_peaks( + -y_smoothed, threshold=y_smoothed[0] + std_multiplier * std_dev + ) + print(troughs) + # print(f"peaks: {len(peaks)}") + print(f"troughs: {len(troughs)}") + + # Check for significant changes + if len(peaks) > 0 or len(troughs) > 0: + return True + else: + return False + + if not isinstance(detectors, (tuple, list)): + detectors = [detectors] + + _md = dict(purpose="edge_align") + _md.update(md or {}) + + uid = yield from bp.scan(detectors, mover, start, end, points, md=_md) + cat = cat or utils.getCatalog() + run = cat[uid] # return uids + ds = run.primary.read() + + x = ds["mover"] + y = ds["noisy"] + + if check_signal_change(y): + print("Significant signal change detected; motor moving to detected edge.") + + initial_guess = guess_erf_params(x, y) + popt, pcov = curve_fit(erf_model, x, y, p0=initial_guess) + + yield from bps.mv(mover, popt[3]) + else: + print("No significant signal change detected; motor movement skipped.") + + def lineup2( # fmt: off detectors, mover, rel_start, rel_end, points, - peak_factor=2.5, width_factor=0.8, + peak_factor=2.5, width_factor=0.8, feature="centroid", nscans=2, signal_stats=None, @@ -621,7 +781,9 @@ def _scan(width=1, step_factor=10, num=10, snake=True): } _md.update(md or {}) - yield from self.tune(width=width, num=num, peak_factor=peak_factor, md=_md) + yield from self.tune( + width=width, num=num, peak_factor=peak_factor, md=_md + ) if not self.tune_ok: return @@ -633,7 +795,9 @@ def _scan(width=1, step_factor=10, num=10, snake=True): if snake: width *= -1 - return (yield from _scan(width=width, step_factor=step_factor, num=num, snake=snake)) + return ( + yield from _scan(width=width, step_factor=step_factor, num=num, snake=snake) + ) def multi_pass_tune_summary(self): t = pyRestTable.Table() @@ -724,7 +888,10 @@ class TuneResults(Device): peakstats_attrs = "x y cen com fwhm min max crossings".split() def report(self, title=None): - keys = self.peakstats_attrs + "tune_ok center initial_position final_position".split() + keys = ( + self.peakstats_attrs + + "tune_ok center initial_position final_position".split() + ) t = pyRestTable.Table() t.addLabel("key") t.addLabel("result") diff --git a/docs/source/api/_plans.rst b/docs/source/api/_plans.rst index 49fdd7cfc..47744806a 100644 --- a/docs/source/api/_plans.rst +++ b/docs/source/api/_plans.rst @@ -29,14 +29,15 @@ Custom Scans .. autosummary:: - ~apstools.plans.doc_run.documentation_run - ~apstools.plans.labels_to_streams.label_stream_decorator + ~apstools.plans.alignment.edge_align ~apstools.plans.alignment.lineup ~apstools.plans.alignment.lineup2 + ~apstools.plans.alignment.tune_axes + ~apstools.plans.alignment.TuneAxis + ~apstools.plans.doc_run.documentation_run + ~apstools.plans.labels_to_streams.label_stream_decorator ~apstools.plans.nscan_support.nscan ~apstools.plans.sscan_support.sscan_1D - ~apstools.plans.alignment.TuneAxis - ~apstools.plans.alignment.tune_axes .. _plans.overall: