diff --git a/docsrc/API_docs.rst b/docsrc/API_docs.rst index 2c625c4..c6c4340 100644 --- a/docsrc/API_docs.rst +++ b/docsrc/API_docs.rst @@ -36,6 +36,7 @@ Sequences pyepr.sequences.CarrPurcellSequence pyepr.sequences.ResonatorProfileSequence pyepr.sequences.TWTProfileSequence + pyepr.sequences.T1InversionRecoverySequence Pulses ~~~~~~ @@ -90,6 +91,7 @@ I/O pyepr.dataset.create_dataset_from_sequence pyepr.dataset.create_dataset_from_axes pyepr.dataset.create_dataset_from_bruker + pyepr.dataset.downconvert_dataset Utilities ~~~~~~~~~ diff --git a/docsrc/install.rst b/docsrc/install.rst index e431c7a..f10a207 100644 --- a/docsrc/install.rst +++ b/docsrc/install.rst @@ -38,5 +38,4 @@ PyEPR requires: - h5netcdf - toml - deerlab (https://github.com/JeschkeLab/DeerLab) - - numba - psutil diff --git a/docsrc/releasenotes.rst b/docsrc/releasenotes.rst index 1c3a3be..9070159 100644 --- a/docsrc/releasenotes.rst +++ b/docsrc/releasenotes.rst @@ -1,6 +1,19 @@ Release Notes ============= +Version 1.1 (2026-05-27): +++++++++++++++++++++++++++++ +- Added `AmplifierLinearityAnalysis` class for characterizing amplifier non-linearity. +- Added `T1InversionRecovery` sequence. +- Added rise time to linear chirp pulses. +- Added right based arithmatic. +- Fixed Version Detection Bug +- Fixed Dependency issues +- Improved Documentation +- Removed Numba dependency + + + Version 1.0.0 (2025-09-12): ++++++++++++++++++++++++++++ - All references to `LO` have been changed to `freq` in the frequency object and related. diff --git a/docsrc/tutorial_sequencer.md b/docsrc/tutorial_sequencer.md index a78d50c..784cbf3 100644 --- a/docsrc/tutorial_sequencer.md +++ b/docsrc/tutorial_sequencer.md @@ -1,6 +1,6 @@ # Pulse Sequencer -PyEPR provides an intuitive object-oriented pulse programmer allowing the user to design pulsesequences in a hardware-agnostic manner. Additionally, several common EPR experiments are pre-defined and can be easily instantiated and modified. +PyEPR provides an intuitive object-oriented pulse programmer allowing the user to design pulse sequences in a hardware-agnostic manner. Additionally, several common EPR experiments are pre-defined and can be easily instantiated and modified. PyEPR uses ns, GHz and G as the default time, frequency and field units. Very occasionally, other units such as µs or MHz are used, in which case it will be explicitly mentioned. @@ -31,33 +31,33 @@ These pulses will eventually need a scale (amplitude), before the sequence can b A Detection window is also created ```python p90 = epr.RectPulse(tp=16, - freq=0, # Frequency offset in MHz, w.r.t the sequence frequency, + freq=0, # Frequency offset in GHz, w.r.t the sequence frequency, flipangle=np.pi/2, # Flip angle in degrees pcyc = {"phases":[0, np.pi], "dets":[1,-1]} ) p180 = epr.RectPulse(tp=32, - freq=0, # Frequency offset in MHz, w.r.t the sequence frequency, + freq=0, # Frequency offset in GHz, w.r.t the sequence frequency, flipangle=np.pi, # Flip angle in degrees ) -det = epr.Detetction(tp=32, - freq=0, # Frequency offset in MHz, w.r.t the sequence frequency, +det = epr.Detection(tp=32, + freq=0, # Frequency offset in GHz, w.r.t the sequence frequency, ) ``` We now need a time axis for our sequence and to add them to the sequence object. -When a pulse is copied into the sequence using the `add_pulse` method, parameters can be modified allowing the same pulse can be used multiple times with different timings or amplitudes. +When a pulse is copied into the sequence using the `addPulse` method, parameters can be modified allowing the same pulse can be used multiple times with different timings or amplitudes. ```python t = epr.Parameter(name='Interpulse Delay', value=400, # Initial interpulse delay in ns step=8, # Step size in ns - dim=1024 # Number of points, - unit='ns' # Unit of the parameter + dim=1024, # Number of points, + unit='ns', # Unit of the parameter description='Interpulse delay between the pi/2 and pi pulse' ) # Adding the pulses to the sequence -seq.add_pulse(p90.copy(t=0)) -seq.add_pulse(p180.copy(t=t)) -seq.add_pulse(det.copy(t=2*t)) +seq.addPulse(p90.copy(t=0)) +seq.addPulse(p180.copy(t=t)) +seq.addPulse(det.copy(t=2*t)) # Defining the evolution seq.evolution([t]) @@ -81,7 +81,7 @@ HE_Seq = epr.HahnEchoRelaxationSequence( shots = 20, # Number of shots per point start = 400, # Initial interpulse delay in ns step = 8, # Step size in ns - dim = 1024 # Number of points + dim = 1024, # Number of points pi2_pulse = p90, # The 90 degree pulse pi_pulse = p180 # The 180 degree pulse ) diff --git a/pyepr/classes.py b/pyepr/classes.py index ac49b91..8330486 100644 --- a/pyepr/classes.py +++ b/pyepr/classes.py @@ -45,7 +45,10 @@ def __init__(self,config_file:dict=None,log=None) -> None: else: self.log = log self.resonator = None - self.amp_nonlinearity = self.config["Spectrometer"]["Bridge"].get('Amplifier Non-Linearity',None) + if self.config != {}: + self.amp_nonlinearity = self.config["Spectrometer"]["Bridge"].get('Amplifier Non-Linearity',None) + else: + self.amp_nonlinearity = None pass def connect(self) -> None: @@ -505,6 +508,10 @@ def __add__(self, __o:object): raise RuntimeError( "Both parameters axis and the array must have the same shape") + def __radd__(self, __o:object): + return self.__add__(__o) + + def __sub__(self, __o:object): if type(__o) is Parameter: @@ -573,6 +580,9 @@ def __sub__(self, __o:object): raise RuntimeError( "Both parameters axis and the array must have the same shape") + def __rsub__(self, __o:object): + return self.__sub__(__o) + def __mul__(self, __o:object): if type(__o) is Parameter: if self.unit != __o.unit: diff --git a/pyepr/dataset.py b/pyepr/dataset.py index 868200e..1bf4c2f 100644 --- a/pyepr/dataset.py +++ b/pyepr/dataset.py @@ -102,7 +102,8 @@ def create_dataset_from_sequence(data, sequence: Sequence,extra_params={}): attr.update({'autoDEER_Version':__version__}) return xr.DataArray(data, dims=dims, coords=coords,attrs=attr) -def create_dataset_from_axes(data, axes, params: dict = {},axes_labels=None): +def create_dataset_from_axes(data, axes, params: dict = {},extra_coords:dict = None, + axes_labels=None): """ Create an xarray dataset from a numpy array and a list of axes. @@ -129,7 +130,9 @@ def create_dataset_from_axes(data, axes, params: dict = {},axes_labels=None): if not isinstance(axes, list): axes = [axes] coords = {default_labels.pop(0):a for a in axes} - params.update({'autoDEER_Version':__version__}) + if extra_coords is not None: + coords.update(extra_coords) + params.update({'PyEPR_Version':__version__}) return xr.DataArray(data, dims=dims, coords=coords, attrs=params) @@ -143,7 +146,7 @@ def create_dataset_from_bruker(filepath): labels = [] for i in range(ndims): ax_label = default_labels[i] - axis_string = params['DESC'][f'{ax_label}UNI'] + axis_string = params['DESC'].get(f'{ax_label}UNI',"") if "'" in axis_string: axis_string = axis_string.replace("'", "") if axis_string == 'G': @@ -162,11 +165,12 @@ def create_dataset_from_bruker(filepath): coords = {labels[i]:(default_labels[i],a) for i,a in enumerate(axes)} attr = {} - attr['LO'] = float(params['SPL']['MWFQ']) / 1e9 - attr['B'] = float(params['SPL']['B0VL']) * 1e4 - attr['reptime'] = float(params['DSL']['ftEpr']['ShotRepTime'].replace('us','')) - attr['nAvgs'] = int(params['DSL']['recorder']['NbScansAcc']) - attr['shots'] = int(params['DSL']['ftEpr']['ShotsPLoop']) + if 'DSL' in params: + attr['LO'] = float(params['SPL']['MWFQ']) / 1e9 + attr['B'] = float(params['SPL']['B0VL']) * 1e4 + attr['reptime'] = float(params['DSL']['ftEpr']['ShotRepTime'].replace('us','')) + attr['nAvgs'] = int(params['DSL']['recorder']['NbScansAcc']) + attr['shots'] = int(params['DSL']['ftEpr']['ShotsPLoop']) attr.update({'autoDEER_Version':__version__}) return xr.DataArray(data, dims=dims, coords=coords, attrs=attr) @@ -183,7 +187,7 @@ def save(self, filename, type='netCDF',overwrite=True): Parameters ---------- - filename : str + filename : str, file-like object The name of the file to save the dataset type : str, optional The type of file to save, by default 'netCDF' (including .h5) @@ -196,7 +200,7 @@ def save(self, filename, type='netCDF',overwrite=True): mode = "a" #if filename doesn't have the extension .h5, add it - if not filename.endswith('.h5'): + if isinstance(filename,str) and not filename.endswith('.h5'): filename = filename + '.h5' if 'Scan' in self._obj.dims: @@ -375,3 +379,108 @@ def merge(self,other,ignore_errors=True): +def downconvert_dataset(dataset, filter_type='boxcar',IF=None,reduce=True,sampling_rate=None,**kwargs): + """ + Downconvert a dataset to baseband using a filter + Parameters + ---------- + dataset : xr.DataArray + The dataset to downconvert + filter_type : str, optional + The type of filter to use, by default 'boxcar'. Other options are 'cheby' and a Pulse object + filter_width : float, optional + The width of the filter in MHz or ns depending on filter tyre, by default 20 ns for boxcar and 50 MHz for cheby. + IF : float, optional + The intermediate frequency to use, by default 0.15 + reduce : bool, optional + If True, the dataset is reduced to a single point, by default True + **kwargs : dict + Extra arguments to pass to the filter function + + Returns + ------- + xr.DataArray + The downconverted dataset + """ + + if IF is None: + if 'IF_freq' in dataset.attrs: + IF = dataset.attrs.get('IF_freq') # GHz + elif 'if_freq' in dataset.attrs: + IF = dataset.attrs.get('if_freq') # GHz + elif 'IFfreq' in dataset.attrs: + IF = dataset.attrs.get('IFfreq') # GHz + else: + raise ValueError('IFfreq not found in dataset attributes, please provide IF value') + + if sampling_rate is None: + if 'det_rate' in dataset.attrs: + sampling_rate = dataset.attrs.get('det_rate') # GHz + elif 'sampling_rate' in dataset.attrs: + sampling_rate = dataset.attrs.get('sampling_rate') # GHz + else: + raise ValueError('sampling_rate not found in dataset attributes, please provide sampling_rate value') + if filter_type is None: + dc_first=True + elif filter_type not in ['boxcar','cheby']: + filter_type = 'cheby' + filter_width = 50 + + + if filter_type == 'boxcar': + filter_width = kwargs.get('filter_width',20) + funct = lambda data: np.convolve(data,np.ones(filter_width),mode='same') + dc_first=True + elif isinstance(filter_type, ad_pulses.Pulse): # match filter to a epr Pulse + raise NotImplementedError('Match filter to a pulse not implemented yet') + elif filter_type == 'cheby': + from scipy.signal import cheby1,sosfiltfilt + filter_width = kwargs.get('filter_width',50) # MHz + filter_width /= 1e3 + + Wn = np.array([IF-filter_width,IF+filter_width]) + Wn[Wn<=0] = 0.001 + order = 5 + + a = cheby1(order,0.5,Wn,'bandpass',analog=False,fs=sampling_rate,output='sos') + funct = lambda data: sosfiltfilt(a,data) + dc_first=False + elif filter_type is None: + dc_first=True + + else: + raise ValueError('Filter not recognised') + + if dc_first: + data_array_dc = dataset*np.exp(-1j*2*np.pi*IF*dataset.tx/sampling_rate) + if filter_type is None: + return data_array_dc + data_array_dc.data = np.apply_along_axis(funct,-1,data_array_dc.data) + else: + data_array_dc = xr.apply_ufunc(funct,dataset) + data_array_dc = data_array_dc*np.exp(-1j*2*np.pi*IF*data_array_dc.tx/sampling_rate) + + # if data_array_dc.ndim == 3: + # max_echo_pos = np.unravel_index(np.argmax(np.abs(data_array_dc.data[0,0,20:-20])),data_array_dc.shape[-1])[0] + 20 + # else: + # max_echo_pos = np.unravel_index(np.argmax(np.abs(data_array_dc.data[0,20:-20])),data_array_dc.shape[-1])[0] + 20 + if reduce: + if 'max_echo_pos' in kwargs: + max_echo_pos = kwargs['max_echo_pos'] + else: + max_echo_pos = np.unravel_index(np.abs(data_array_dc).argmax(),data_array_dc.shape)[-1] + return data_array_dc[...,max_echo_pos] + else: + return data_array_dc + +def find_peak(dataset, freq,freq_axis=None,search_range=4): + if freq/1e3 < freq_axis.min() or freq/1e3 > freq_axis.max(): + return "N/A", "N/A" + + if freq_axis is None: + freq_axis = dataset.offset_frequency.values + n_points = dataset.shape[0] + loc = np.argmin(np.abs(freq_axis-freq/1e3)) + loc = loc-search_range + dataset.values[loc-search_range:loc+search_range].argmax() + peak = dataset[loc].values + return loc,peak diff --git a/pyepr/hardware/Bruker_AWG.py b/pyepr/hardware/Bruker_AWG.py index 52fac3f..e900e06 100644 --- a/pyepr/hardware/Bruker_AWG.py +++ b/pyepr/hardware/Bruker_AWG.py @@ -75,6 +75,17 @@ def __init__(self, config_file) -> None: def connect(self, d0=None) -> None: + """ + Connects to the spectrometer through the XeprAPI. Automatically sets + up the spectrometer for pulse experiments. + + Parameters + ---------- + d0: float, optional + The d0 value to be used. If None, the d0 will be calculated + upon connection. By default None. + + """ self.api.connect() @@ -84,17 +95,36 @@ def connect(self, d0=None) -> None: return super().connect() def setup(self,d0=None): + """ + Sets up the spectrometer for pulse experiments. + The video bandwidth is read from the configuration file, and the timebase + is set accordingly. + + Parameters + ---------- + d0: float, optional + The d0 value to be used. If None, the d0 will be calculated + upon connection. By default None. + + """ + # Check if needs to switch to pulse mode, only if needed as slow + if not self.api.hidden['BrPlsMode'].value: + hw_log.info('Switching to Pulse Mode') + temp_mw_amp = self.api.get_MW_amp() + + self.api.hidden['BrPlsMode'].value = True # Turns the mw (video) amplifier on + time.sleep(3) # Wait for bridge to actually switch to pulse mode + self.api.set_MW_amp(temp_mw_amp) - self.api.hidden['BrPlsMode'].value = True self.api.hidden['OpMode'].value = 'Operate' # self.api.hidden['RefArm'].value = 'On' - # TODO add detection of VAMP-III model video_bw = self.bridge_config.get('Video BW') # self.api.cur_exp['VideoBW'].value = 20 self.time_base = 1/(video_bw*2e-3) self.api.hidden['specJet.TimeBase'].value = self.time_base + if d0 is None: @@ -282,13 +312,14 @@ def launch(self, sequence: Sequence, savename: str, start=True, tune=True, def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): - """Generates a rectangular pi and pi/2 pulse of the given length at - the given field position. This value is stored in the pulse cache. + """ + Generates a rectangular pi/2 and pi pulse at the given frequency and field. + The pulses are of equal amplitude ($t_p$ for pi/2 and $2*t_p$ for pi) and are tuned using a Hahn echo sequence. Parameters ---------- tp : float - Pulse length in ns + $pi/2$ Pulse length in ns freq : float Central frequency of this pulse in GHz B : float @@ -303,7 +334,7 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): p90: RectPulse A tuned rectangular pi/2 pulse of length tp p180: RectPulse - A tuned rectangular pi pulse of length tp + A tuned rectangular pi pulse of length 2*tp """ time.sleep(5) amp_tune =HahnEchoSequence( @@ -335,7 +366,7 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): data = np.abs(dataset.data) scale_amp = np.around(dataset.pulse0_scale[data.argmax()].data,2) if scale_amp > 0.95: - raise RuntimeError("Not enough power avaliable.") + raise RuntimeError("Not enough power available.") if scale_amp == 0: warnings.warn("Pulse tuned with a scale of zero!") diff --git a/pyepr/hardware/Bruker_MPFU.py b/pyepr/hardware/Bruker_MPFU.py index 973a0fe..2f50c48 100644 --- a/pyepr/hardware/Bruker_MPFU.py +++ b/pyepr/hardware/Bruker_MPFU.py @@ -957,7 +957,7 @@ def ELDORtune(interface, sequence, freq, MPFU=True, for i,x in enumerate(atten_axis): interface.api.set_attenuator('ELDOR',x) # Set phase to value time.sleep(1) - data[i] = np.trapz(get_specjet_data(interface)) + data[i] = np.trapezoid(get_specjet_data(interface)) data = correctphase(data) if data[np.abs(data).argmax()].max() < 1: diff --git a/pyepr/hardware/Bruker_tools.py b/pyepr/hardware/Bruker_tools.py index 7d419ab..cc945da 100644 --- a/pyepr/hardware/Bruker_tools.py +++ b/pyepr/hardware/Bruker_tools.py @@ -1287,9 +1287,7 @@ def write_pulsespel_file(sequence,d0, AWG=False, MPFU=False,MaxGate=40): pcyc_hash = pcyc.pcyc_hash pcyc_str = pcyc.__str__() else: - pcyc = PSPhaseCycle(sequence, MPFU) - pcyc_hash = pcyc.pcyc_hash - pcyc_str = pcyc.__str__() + raise ValueError("Either AWG must be True or MPFU must be supplied") # Add Pulse Sequence pulse_str = "d9\n" for id, pulse in enumerate(sequence.pulses): diff --git a/pyepr/hardware/ETH_awg.py b/pyepr/hardware/ETH_awg.py index 70fdee5..ad1b653 100644 --- a/pyepr/hardware/ETH_awg.py +++ b/pyepr/hardware/ETH_awg.py @@ -480,7 +480,7 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400,step=0.02, dim=45): Parameters ---------- tp : float - Pulse length of pi/2 pulse in ns + $pi/2$ Pulse length in ns freq : float Central frequency of this pulse in GHz B : float @@ -499,9 +499,8 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400,step=0.02, dim=45): p90: RectPulse A tuned rectangular pi/2 pulse of length tp p180: RectPulse - A tuned rectangular pi pulse of length tp + A tuned rectangular pi pulse of length 2*tp """ - amp_tune =HahnEchoSequence( B=B, freq=freq, reptime=reptime, averages=1, shots=shots ) @@ -527,7 +526,7 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400,step=0.02, dim=45): data = np.abs(dataset.data) scale = np.around(dataset.pulse0_scale[data.argmax()].data,2) if scale > 0.9: - raise RuntimeError("Not enough power avaliable.") + raise RuntimeError("Not enough power available.") if scale == 0: warnings.warn("Pulse tuned with a scale of zero!") @@ -616,7 +615,7 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400,step=0.02, dim=45 new_amp = np.around(dataset.pulse0_scale[dataset.data.argmax()].data,2) if new_amp > 0.9: - raise RuntimeError("Not enough power avaliable.") + raise RuntimeError("Not enough power available.") if new_amp == 0: warnings.warn("Pulse tuned with a scale of zero!") @@ -709,7 +708,7 @@ def tune(self,*, sequence=None, mode="amp_hahn", freq=None, gyro=None): dataset = self.read_dataset() scale = np.around(dataset.pulse0_scale[dataset.data.argmax()].data,2) if scale > 0.9: - raise RuntimeError("Not enough power avaliable.") + raise RuntimeError("Not enough power available.") self.pulses[f"p90_{tp}"] = amp_tune.pulses[0].copy( scale=scale, freq=0) @@ -1107,4 +1106,4 @@ def set_params_by_uuid(new_seq, progTable,uuid,index): interface.bg_data.attrs['diglevel'] = dig_level interface.bg_data.attrs['nAvgs'] = iavg+1 - print("Background thread finished") \ No newline at end of file + print("Background thread finished") diff --git a/pyepr/hardware/ETH_awg_load.py b/pyepr/hardware/ETH_awg_load.py index b12fde3..ef52432 100644 --- a/pyepr/hardware/ETH_awg_load.py +++ b/pyepr/hardware/ETH_awg_load.py @@ -2,7 +2,7 @@ import scipy.signal as sig from pyepr.classes import Parameter from pyepr.dataset import create_dataset_from_axes, create_dataset_from_sequence -from pyepr.pulses import Pulse +import pyepr.pulses as pulses from scipy.integrate import cumulative_trapezoid from deerlab import correctphase from warnings import warn @@ -547,7 +547,7 @@ def uwb_eval_match(matfile, sequence=None, scans=None, mask=None,filter_pulse=No The scans to be loaded. mask : list, optional The mask to be used. - filter_pulse : ad.Pulse, optional + filter_pulse : epr.Pulse, optional The pulse to be used as a matched filter. If None, the maximum pulse width will be used. This is only used if filter_type is 'match' filter_type : str, optional The type of filter to be used. Options are 'match', 'cheby2' and 'butter. Default is 'match' @@ -856,12 +856,12 @@ def extract_data(matfile,scans): filter_func = lambda dta, det_frq: match_filter_dc(dta,t,complex_shape,det_frq) - elif isinstance(filter_pulse,Pulse): + elif isinstance(filter_pulse,pulses.Pulse): complex_shape = filter_pulse.build_shape(t) filter_func = lambda dta, det_frq: match_filter_dc(dta,t,complex_shape,det_frq) - elif (filter_type.lower() == 'cheby2') or (filter_type.lower() == 'butter'): + elif filter_type.lower() in ['cheby2','butter','boxcar']: if filter_width is None: raise ValueError('You must provide a filter width for the cheby2 or butter filter') @@ -961,7 +961,180 @@ def scipy_filter_dc(dta,t,filter_width,offset_freq,sampling_freq,filter_type='ch filter_sos = sig.cheby2(10,40,(offset_freq-filter_width,offset_freq+filter_width),fs=sampling_freq,btype="bandpass",output='sos') elif filter_type == 'butter': filter_sos = sig.butter(10,(offset_freq-filter_width,offset_freq+filter_width),fs=sampling_freq,btype="bandpass",output='sos') + elif filter_type == 'boxcar': + # A boxcar is just a moving average, so we can implement it with a convolution + boxcar_len = int(2*filter_width*sampling_freq) + if boxcar_len % 2 == 0: + boxcar_len += 1 + boxcar = np.ones(boxcar_len)/boxcar_len + filtered = sig.convolve(dta,boxcar,mode='same') + filtered_dc = digitally_upconvert(t,filtered,-offset_freq) + return filtered_dc + filtered = sig.sosfilt(filter_sos,dta) filtered_dc = digitally_upconvert(t,filtered,-offset_freq) return filtered_dc +# --------------------------------------------------------------------------- +# ETH UWB data to xarray +# --------------------------------------------------------------------------- + +def extract_data(matfile,estr): + if "dta" in matfile.keys(): + nAvgs = matfile["nAvgs"] + dta = [matfile["dta"]] + + elif "dta_001" in matfile.keys(): + dta = [] + nAvgs = 0 + for ii in range(1, estr["avgs"]+1): + actname = 'dta_%03u' % ii + if actname in matfile.keys(): + single_scan = matfile[actname] + # Only keep it if the average is complete, unless it is + # the first + if np.sum(single_scan[..., -1]) == 0 and ii > 1: # incomplete scan + if (nAvgs+1) != ii: + print(f"Scan {ii} is incomplete and will be skipped.") + continue + elif np.sum(single_scan[..., -1]) == 0 and ii == 1: + nAvgs = 0 + else: + nAvgs += 1 + dta.append(single_scan) + dta = np.array(dta) + dta = np.atleast_2d(dta) + if dta.ndim > 2: + dta = np.swapaxes(dta,-1,-2) + return dta, nAvgs + + +def get_metadata(estr,conf): + metadata = {} + metadata['averages'] = estr['avgs'] + metadata['reptime'] = estr['reptime'] + metadata['shots'] = estr['shots'] + metadata['B'] = estr['B'] + metadata['name'] = estr['name'] + metadata['freq'] = estr['LO'] + for i,event in enumerate(estr['events']): + metadata.update(get_pulse_metadata(event,i)) + metadata['dig_rate'] = conf['std']['dig_rate'] + return metadata + +def detect_pulse_type(pulse): + if 'det_len' in pulse: + return pulses.Detection + if pulse['pulsedef']['type'] == 'chirp': + if 'nu_final' in pulse['pulsedef']: + return pulses.ChirpPulse + else: + return pulses.RectPulse + +def get_pulse_metadata(pulse,i): + pulse_type = detect_pulse_type(pulse) + fixed_params = {} + if issubclass(pulse_type, pulses.Detection): + type_str="det" + else: + type_str="pulse" + + fixed_params[f"{type_str}{i}_t"] = pulse['t'] + + if issubclass(pulse_type,pulses.Detection): + fixed_params[f"{type_str}{i}_tp"] = pulse['det_len'] + else: + fixed_params[f"{type_str}{i}_tp"] = pulse['pulsedef']['tp'] + fixed_params[f"{type_str}{i}_scale"] = pulse['pulsedef']['scale'] + + if issubclass(pulse_type,pulses.Detection): + fixed_params[f"{type_str}{i}_freq"] = pulse['det_frq'] + elif issubclass(pulse_type, pulses.FrequencySweptPulse): + fixed_params[f"{type_str}{i}_init_freq"] = pulse['pulsedef']['nu_init'] + fixed_params[f"{type_str}{i}_final_freq"] = pulse['pulsedef']['nu_final'] + else: # Monochromatic pulse + fixed_params[f"{type_str}{i}_freq"] = pulse['pulsedef']['nu_init'] + return fixed_params + +def get_dig_level(data,metadata,conf): + max_val = data.max() + dig_max = conf['std']['dig_max'] + acqs = metadata['shots'] * metadata['nAvgs']* metadata['nPcyc'] + dig_pc = max_val/(dig_max*acqs) + return dig_pc + +def get_nPcyc(estr): + nPcyc = 1 + if not isinstance(estr['parvars'],list): + estr['parvars'] = [estr['parvars']] + for parvar in estr['parvars']: + if 'reduce' in parvar and parvar['reduce'] == 1: + nPcyc *= parvar['axis'].size + return nPcyc + +default_labels = ['X','Y','Z','T'] +def get_coords(parvars): + coords = {} + i=-1 + if not isinstance(parvars,list): + parvars = [parvars] + for parvar in parvars: + if 'reduce' in parvar and parvar['reduce'] == 1: + continue + i += 1 + for variable in np.atleast_1d(parvar['variables']): + if '.' in variable: + event,var = variable.split('.') + event_num = int(event.split('{')[1].split('}')[0]) + + coord_name = f"pulse{event_num}_{var}" + else: + coord_name = f"{variable}" + coord_axis = np.array(parvar['axis']).astype(float) + coords[coord_name] = (default_labels[i],coord_axis) + + return coords + +def ETHUWB_xarray_load(matfile, sum_scans=True,): + """ + This function reads data from an Andrin Doll ETH UWB EPR spectrometer + and converts it into an xarray.Dataset format compatible with PyEPR. + + Parameters + ---------- + matfile : dict + The data file to be loaded. + sum_scans : bool, optional + Whether to sum all scans or not. Default is True. + Returns + ------- + output : xarray.Dataset + The data in the xarray format. + + Notes and Limitations + --------------------------- + + - This function currently only supports experiments with reduced phase cycles. + """ + estr = matfile[matfile['expname']] + conf = matfile['conf'] + + data,nAvgs = extract_data(matfile,estr) + metadata = get_metadata(estr,conf) + metadata['nPcyc'] = get_nPcyc(estr) + metadata['nAvgs'] = nAvgs + metadata['dig_pc'] = get_dig_level(data,metadata,conf) + data_shape = data.shape # [nAvgs, ..., tx] + axes_labels = default_labels[:len(data_shape)-2] + ['tx'] + base_axes = [np.arange(data_shape[i+1]) for i in range(len(data_shape)-1)] + + if sum_scans: + data = data.sum(axis=0) + # Add scan axis label + else: + axes_labels = ['scan'] + axes_labels + base_axes = [np.arange(nAvgs)] + base_axes + dataset = create_dataset_from_axes(data,base_axes,params=metadata, + extra_coords=get_coords(estr['parvars']), + axes_labels=axes_labels) + return dataset diff --git a/pyepr/hardware/XeprAPI_link.py b/pyepr/hardware/XeprAPI_link.py index 074cafd..546c023 100644 --- a/pyepr/hardware/XeprAPI_link.py +++ b/pyepr/hardware/XeprAPI_link.py @@ -23,12 +23,25 @@ class XeprAPILink: - def __init__(self, config_file: str = None) -> None: + def __init__(self, config_file: str = None, **kwargs) -> None: + """ + This class links to the Bruker Xepr API to control the spectrometer. + Parameters + ---------- + config_file : str, optional + Path to a PyEPR spectrometer configuration file, by default None + kwargs : dict, optional + Additional keyword arguments: + - retry_attempts : int + Number of times to retry getting a Xepr parameter before + raising an exception. Default is 50. + """ self.Xepr = None self.cur_exp = None self.hidden = None self._tmp_dir = None self.XeprCmds = None + self.retry_attempts = kwargs.get('retry_attempts', 50) if config_file is not None: with open(config_file, mode='r') as file: config = yaml.safe_load(file) @@ -66,7 +79,7 @@ def connect(self) -> None: def log_xepr_version(self): """Logs the Xepr and Linacq versions for debugging purposes.""" - packages = ['xper','linacq'] + packages = ['xepr','linacq'] for package in packages: details = get_package_version_from_dnf(package) @@ -87,7 +100,7 @@ def _get_Xepr_global(self): def _xepr_retry(self, func, *args, **kwargs): - for i in range(0, 50): + for i in range(0, self.retry_attempts): try: return func(*args, **kwargs) except Exception as e: diff --git a/pyepr/hardware/dummy.py b/pyepr/hardware/dummy.py index a4bdd4f..e5cf50f 100644 --- a/pyepr/hardware/dummy.py +++ b/pyepr/hardware/dummy.py @@ -162,7 +162,7 @@ def __init__(self,config_file) -> None: self.mode = self.dummyResonator.mode - super().__init__(log=hw_log) + super().__init__(self.config,log=hw_log) def launch(self, sequence, savename: str, **kwargs): hw_log.info(f"Launching {sequence.name} sequence") diff --git a/pyepr/hardware/dummy_xepr.py b/pyepr/hardware/dummy_xepr.py index 546fa93..ae24b8f 100644 --- a/pyepr/hardware/dummy_xepr.py +++ b/pyepr/hardware/dummy_xepr.py @@ -232,7 +232,7 @@ def generate_phase_experiment(numpoints,pg,phase,phase_offset=0.2): t = range(0,numpoints) for i in t: ta,V = generate_hahn_echo_transient(pg,phase,phase_offset) - vals[i,] = np.trapz(V,x=ta) + vals[i,] = np.trapezoid(V,x=ta) return t, vals diff --git a/pyepr/pulses.py b/pyepr/pulses.py index aa641a0..e9148e5 100644 --- a/pyepr/pulses.py +++ b/pyepr/pulses.py @@ -13,9 +13,7 @@ import copy from functools import reduce from itertools import accumulate -from numba import njit -#@njit def compute_upulses_not_trajectory(dUs): n_offsets, n_steps, _, _ = dUs.shape Upulses = np.empty((n_offsets, 2, 2), dtype=np.complex128) @@ -26,7 +24,6 @@ def compute_upulses_not_trajectory(dUs): Upulses[i] = U return Upulses -#@njit def compute_upulses_trajectory(dUs): n_offsets, n_steps, _, _ = dUs.shape Upulses = np.empty((n_offsets, n_steps + 1, 2, 2), dtype=np.complex128) @@ -39,7 +36,6 @@ def compute_upulses_trajectory(dUs): Upulses[i, j + 1] = U return Upulses -#@njit def compute_magnetization_not_trajectory(Upulses, density0, Mmag): n_offsets = Upulses.shape[0] density = np.empty((n_offsets, 2, 2), dtype=np.complex128) @@ -71,7 +67,6 @@ def compute_magnetization_not_trajectory(Upulses, density0, Mmag): return Mag * Mmag[:, None] -#@njit def compute_magnetization_trajectory(Upulses, density0): n_offsets, n_steps = Upulses.shape[:2] density = np.empty((n_offsets, n_steps, 2, 2), dtype=np.complex128) @@ -329,7 +324,7 @@ def flip(self): @property def amp_factor(self): """ The B1 amplitude factor (nutation frequency) for the pulse in GHz""" - amp_factor_value= self.flipangle.value / (2 * np.pi * np.trapz(self.AM,self.ax)) + amp_factor_value= self.flipangle.value / (2 * np.pi * np.trapezoid(self.AM,self.ax)) return Parameter("amp_factor", amp_factor_value, "GHz", "Amplitude factor for the pulse") @@ -341,8 +336,7 @@ def exciteprofile_old(self, freqs=None, resonator = None): This function is ported from EasySpin (https://easyspin.org/easyspin/documentation/sop.html) [1-2], - and based upon the method from Gunnar Jeschke, Stefan Pribitzer and - Andrin Doll[3]. + and based upon the method from Gunnar Jeschke, Stefan Pribitzer and Andrin Doll[3]. References: +++++++++++ @@ -478,13 +472,27 @@ def exciteprofile(self, freqs=None, resonator=None, trajectory=False): offsets = np.linspace(-2*BW, 2*BW, 100) + c_freq else: offsets = freqs - - ISignal = np.real(self.complex) * self.amp_factor.value - QSignal = np.imag(self.complex) * self.amp_factor.value + + if self.scale is None or self.scale.value is None: + scale=None + amp_factor = self.amp_factor.value + elif (self.scale.unit is not None) and (self.scale.unit.lower() in ['ghz','mhz']): + scale = 1.0 + if self.scale.unit.lower() == 'mhz': + amp_factor = self.scale.value/1000 + else: + amp_factor = self.scale.value + elif self.scale is not None: + scale = self.scale.value + amp_factor = self.amp_factor.value + else: + scale=None + amp_factor = self.amp_factor.value if resonator is not None: + # Apply resonator correction to the pulse AM function FM = self.FM - if self.scale is None or self.scale.value is None: + if scale is None: amp_factor = np.interp(FM, resonator.freqs-resonator.freq_c, resonator.profile) amp_factor = np.min([amp_factor,np.ones_like(amp_factor)*self.amp_factor.value],axis=0) else: @@ -496,6 +504,12 @@ def exciteprofile(self, freqs=None, resonator=None, trajectory=False): ISignal = np.real(self.complex) * amp_factor QSignal = np.imag(self.complex) * amp_factor + + else: + # No resonator correction so use a constant amplitude + ISignal = np.real(self.complex) * amp_factor # In GHz + QSignal = np.imag(self.complex) * amp_factor # In GHz + t= self.ax M0=[0, 0, 1] @@ -610,7 +624,7 @@ def _pcyc_str(self): def __str__(self): # Build header line - header = "#" * 79 + "\n" + "autoDEER Pulse Definition" + \ + header = "#" * 79 + "\n" + "PyEPR Pulse Definition" + \ "\n" + "#" * 79 + "\n" # Build Overviews @@ -685,7 +699,7 @@ def __str__(self): # Build Footer footer = "#" * 79 + "\n" +\ - f"Built by autoDEER Version: {__version__}" + "\n" + "#" * 79 + f"Built by PyEPR Version: {__version__}" + "\n" + "#" * 79 # Combine All string = header + overview_params + param_table + prog_table +\ @@ -1032,8 +1046,12 @@ def __init__(self, *, tp, t=None, scale=None, flipangle=None, pcyc=[0], name=Non elif "final_freq" in kwargs: self.final_freq = Parameter("final_freq", kwargs["final_freq"], "GHz", "Final frequency of pulse") self.init_freq = Parameter("init_freq", self.final_freq.value - BW, "GHz", "Initial frequency of pulse") + elif "freq" in kwargs: + # Assumes symmetric sweep about central frequency + self.init_freq = Parameter("init_freq", kwargs["freq"] - BW/2, "GHz", "Initial frequency of pulse") + self.final_freq = Parameter("final_freq", kwargs["freq"] + BW/2, "GHz", "Final frequency of pulse") else: - raise ValueError("Bandwidth must be combined with either an initial or final frequency") + raise ValueError("Bandwidth must be combined with either an initial, final or center frequency") elif ("init_freq" in kwargs) & ("final_freq" in kwargs): self.init_freq = Parameter("init_freq", kwargs["init_freq"], "GHz", "Initial frequency of pulse") self.final_freq = Parameter("final_freq", kwargs["final_freq"], "GHz", "Final frequency of pulse") @@ -1102,7 +1120,7 @@ def func(self, ax): # beta**beta_exp2 * (ax[cut:-1]/tp)**order2) # FM = BW * cumulative_trapezoid(AM**2,ax,initial=0) /\ - # np.trapz(AM**2,ax) + self.init_freq.value + # np.trapezoid(AM**2,ax) + self.init_freq.value sech = lambda x: 1/np.cosh(x) cut = round(nx/2) AM = np.zeros_like(ti) @@ -1124,6 +1142,8 @@ def sweeprate(self): """ The sweep rate of the pulse in GHz/ns""" sweeprate_value = self.beta.value * self.bandwidth.value / (2*self.tp.value) return Parameter("sweeprate", sweeprate_value, "GHz/ns", "Sweep rate of the pulse") + + # ============================================================================= @@ -1153,6 +1173,7 @@ def func(self, ax): self.init_freq.value, self.final_freq.value, nx) return AM, FM + @property def sweeprate(self): diff --git a/pyepr/relaxation_analysis.py b/pyepr/relaxation_analysis.py index 93d0b17..fd3a2b0 100644 --- a/pyepr/relaxation_analysis.py +++ b/pyepr/relaxation_analysis.py @@ -10,7 +10,7 @@ from pyepr.colors import primary_colors # =========================================================================== - +# T2-type relaxation analysis classes class CarrPurcellAnalysis(): def __init__(self, dataset, sequence: Sequence = None) -> None: @@ -485,7 +485,9 @@ def __call__(self, x, norm=True, SNR=False, source=None): return V[0] else: return V - + +# T1-type relaxation analysis classes + class ReptimeAnalysis(): def __init__(self, dataset, sequence: Sequence = None) -> None: diff --git a/pyepr/resonator_profile_analysis.py b/pyepr/resonator_profile_analysis.py index 59eeb88..734f868 100644 --- a/pyepr/resonator_profile_analysis.py +++ b/pyepr/resonator_profile_analysis.py @@ -74,7 +74,7 @@ def __init__( - if np.iscomplexobj(dataset): + if np.iscomplexobj(dataset) and not kwargs.get('skip_phase_correction',False): self.dataset = phase_correct_respro(dataset) else: self.dataset = dataset @@ -159,7 +159,7 @@ def process_nutations( return self.prof_data, self.prof_frqs - def _process_fit(self,R_limit=0.5,mask=None,debug=False): + def _process_fit(self,R_limit=0.5,mask=None,debug=False,**kwargs): """Processes the nutation data by fitting a cosine function to each frequency in the dataset. @@ -229,6 +229,7 @@ def _process_fit(self,R_limit=0.5,mask=None,debug=False): self.profile_ci[i] = np.nan # Remove nan + old_freqs=self.freqs self.freqs = self.freqs[~np.isnan(self.profile)] self.profile = self.profile[~np.isnan(self.profile)] self.profile_ci = self.profile_ci[~np.isnan(self.profile_ci)] @@ -251,7 +252,7 @@ def _process_fit(self,R_limit=0.5,mask=None,debug=False): for i in range(n_excluded): key = int(list(excluded.keys())[i]) nutation = self.dataset[:,key] - freq = self.freqs[key].values + freq = old_freqs[key].values if np.iscomplexobj(nutation): nutation = nutation.real nutation = nutation/np.max(nutation) @@ -590,11 +591,11 @@ def calc_overlap(x, func1, func2): """ y1 = func1(x) y2 = func2(x) - area_1 = np.trapz(y1, x) - area_2 = np.trapz(y2, x) + area_1 = np.trapezoid(y1, x) + area_2 = np.trapezoid(y2, x) y1 /= area_1 y2 /= area_2 - area_12 = np.trapz(y1*y2, x) + area_12 = np.trapezoid(y1*y2, x) return area_12 def BSpline_extra(tck_s): @@ -642,3 +643,86 @@ def optimise_spectra_position(resonator_profile, fieldsweep, verbosity=0): return new_LO + +class AmplifierLinearityAnalysis: + + def __init__(self, dataset,attenuator=0) -> None: + self.dataset = dataset + self.attenuator = attenuator + + if "pulse0_scale" in self.dataset.coords: + self.scales= self.dataset.pulse0_scale + + if "pulse0_tp" in self.dataset.coords: + self.tp = self.dataset.pulse0_tp + + self._process_fit() + + + def _process_fit(self,R_limit=0.5,mask=None): + self.n_scales = self.scales.shape[0] + + self.profile = np.zeros(self.n_scales) + self.profile_ci = np.zeros(self.n_scales) + + fun = lambda x, f, tau,a,k: a*np.cos(2*np.pi*f*x)*np.exp(-x/tau)+ k + bounds = ([5e-3,10,0,-1],[0.3,2000,2,1]) + p0 = [50e-3,150,1,0] + + R2 = lambda y, yhat: 1 - np.sum((y - yhat)**2) / np.sum((y - np.mean(y))**2) + + for i in range(self.n_scales): + if mask is not None: + nutation = self.dataset[mask,i] + x = self.tp[mask] + else: + nutation = self.dataset[:,i] + x = self.tp + if np.iscomplexobj(nutation): + nutation = nutation.epr.correctphase + nutation = nutation/np.max(nutation) + + try: + results = curve_fit(fun, x, nutation, bounds=bounds,xtol=1e-4,ftol=1e-4,p0=p0) + if R2(nutation,fun(x,*results[0])) > R_limit: + self.profile[i] = results[0][0] + + self.profile_ci[i] = np.sqrt(np.diag(results[1]))[0]*1.96 + else: + self.profile[i] = np.nan + self.profile_ci[i] = np.nan + except RuntimeError: + self.profile[i] = np.nan + self.profile_ci[i] = np.nan + + # Remove nan + self.scales = self.scales[~np.isnan(self.profile)] + self.profile = self.profile[~np.isnan(self.profile)] + self.profile_ci = self.profile_ci[~np.isnan(self.profile_ci)] + + # Adjust for attenuator + self.profile = self.profile * 10**(self.attenuator/20) + self.profile_ci = self.profile_ci * 10**(self.attenuator/20) + + def fit(self,order=5): + """ 5th order polynomial fit to the profile """ + if hasattr(self, 'profile'): + self.coefficients = np.polyfit(self.scales, self.profile/self.profile.max(), order) + self.model = np.polyval(self.coefficients, self.scales) + else: + raise ValueError("Profile not calculated. Run _process_fit() first.") + + def plot(self, axs= None, fig=None): + + if axs is None and fig is None: + fig, axs = plt.subplots(1,1,constrained_layout=True,figsize=(8,8)) + + axs.plot(self.scales, self.profile*1e3, label="Profile", color=primary_colors[0], marker='o', markersize=5, linewidth=0,alpha=0.7) + + if hasattr(self, 'model'): + axs.plot(self.scales, self.model*1e3*self.profile.max(), label="Model", color=primary_colors[0], linewidth=2) + axs.text(0.05, 0.95, f"Fit Coeff: {[f'{i:.3g}' for i in self.coefficients]}", transform=axs.transAxes, fontsize=10, verticalalignment='top',bbox=dict(facecolor='white', alpha=0.5, edgecolor='black')) + + axs.legend() + axs.set_xlabel("Pulse amplitude / A.U.") + axs.set_ylabel("Nutation frequency / MHz") \ No newline at end of file diff --git a/pyepr/sequences.py b/pyepr/sequences.py index b1a9884..6899f7e 100644 --- a/pyepr/sequences.py +++ b/pyepr/sequences.py @@ -654,7 +654,7 @@ def __init__(self, *, B, freq, reptime, averages, shots, **kwargs) -> None: Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime : _type_ @@ -731,12 +731,12 @@ class T1InversionRecoverySequence(Sequence): Represents a T1 Inversion Recovery Sequence. Sequence: - [\pi - \tau - \pi/2 - \tau - echo] + [\pi - t - \pi/2 - \tau - \pi - \tau - echo] Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss LO : int or float The LO frequency in GHz reptime : _type_ @@ -751,6 +751,8 @@ class T1InversionRecoverySequence(Sequence): The step size of the interpulse delay in ns, by default 50 ns dim : int The number of points in the X axis + tau: float + The Hahn echo interpulse delay in ns, by default 500 ns Optional Parameters ------------------- @@ -761,6 +763,62 @@ class T1InversionRecoverySequence(Sequence): An autoEPR Pulse object describing the refocusing pi pulses. If not specified a RectPulse will be created instead. """ + + def __init__(self, *, B, freq, reptime, averages, shots,start=500, step=40, dim=200, **kwargs) -> None: + name = "T1InversionRecoverySequence" + tp = kwargs.get("tp", 12) + + self.tau = Parameter(name="tau", value=500, unit="ns", description="The hahn ehco interpulse delay",virtual=True) + self.t = Parameter(name="t", value=start, step=step, dim=dim, unit="ns", description="The inversion recovery delay",virtual=True) + super().__init__(name=name,B=B, freq=freq, reptime=reptime, averages=averages, shots=shots, **kwargs) + + if "pi_pulse" in kwargs: + self.pi_pulse = kwargs["pi_pulse"] + if "pi2_pulse" in kwargs: + self.pi2_pulse = kwargs["pi2_pulse"] + if "det_event" in kwargs: + self.det_event = kwargs["det_event"] + + if hasattr(self, "pi_pulse"): + pi_pulse = self.addPulse(self.pi_pulse.copy( + t=0, pcyc={"phases":[0], "dets": [1]})) + else: + pi_pulse = self.addPulse(RectPulse( # Pump 1 pulse + t=0, tp=tp, freq=0, flipangle=np.pi + )) + + if hasattr(self, "pi2_pulse"): + self.addPulse(self.pi2_pulse.copy( + t=self.t, pcyc={"phases":[0, np.pi], "dets": [1, -1]})) + else: + self.addPulse(RectPulse( # Exc pulse + t=self.t, tp=tp, freq=0, flipangle=np.pi/2, + pcyc={"phases":[0, np.pi], "dets":[1, -1]} + )) + + if hasattr(self, "pi_pulse"): + pi_pulse = self.addPulse(self.pi_pulse.copy( + t=self.t+self.tau, pcyc={"phases":[0], "dets": [1]})) + else: + pi_pulse = self.addPulse(RectPulse( # Pump 1 pulse + t=self.t+self.tau, tp=tp, freq=0, flipangle=np.pi + )) + if hasattr(self, "det_event"): + self.addPulse(self.det_event.copy(t=self.t+2*self.tau)) + else: + self.addPulse(Detection(t=self.t+2*self.tau, tp=self.det_window.value)) + + self.evolution([self.t]) + + def simulate(self, T1=1e4): + """ + Simulates the T1 recovery as a simple exponential recovery. + """ + func = lambda x, a, T1: a*(1 - 2*np.exp(-x/T1)) + xaxis = val_in_ns(self.t) + data = func(xaxis,1,T1) + data = add_phaseshift(data, 0.05) + return xaxis, data # ============================================================================= @@ -771,7 +829,7 @@ class HahnEchoRelaxationSequence(HahnEchoSequence): Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime : _type_ @@ -822,7 +880,7 @@ class T2RelaxationSequence(HahnEchoRelaxationSequence): Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime : _type_ @@ -862,7 +920,7 @@ def __init__(self, *, B, freq, Bwidth, reptime, averages, shots, **kwargs) -> No Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss Bwidth: int or float The width of the field sweep, in Gauss freq : int or float @@ -934,7 +992,7 @@ def __init__(self, *, B, freq, reptime, reptime_max, averages, shots, start=20, Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime: float @@ -1012,7 +1070,7 @@ def __init__(self, *, B, freq, reptime, averages, shots, Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime : _type_ @@ -1146,7 +1204,7 @@ def __init__(self, *, B, freq, reptime, averages, shots, fwidth=0.3,dtp=2, **kwa Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss Bwidth: int or float The width of the field sweep, in Gauss freq : int or float @@ -1216,7 +1274,7 @@ def _build_sequence(self): self.freq = Parameter("freq", center_freq, start=-fwidth, step=fstep, dim=dim, unit="GHz", description="frequency") self.B = Parameter( "B",((center_freq)/self.gyro), start=-fwidth/self.gyro, step=fstep/self.gyro, dim=dim, - unit="Guass",link=self.freq,description="B0 Field" ) + unit="Gauss",link=self.freq,description="B0 Field" ) self.addPulse(RectPulse( # Hard pulse t=0, tp=tp, freq=0, flipangle="Hard" diff --git a/pyepr/tools.py b/pyepr/tools.py index 0e19fcf..01ccea3 100644 --- a/pyepr/tools.py +++ b/pyepr/tools.py @@ -2,8 +2,8 @@ import deerlab as dl import numpy as np import logging -from pyepr.dataset import create_dataset_from_bruker -from pyepr.hardware.ETH_awg_load import uwb_load, uwb_eval_match +from pyepr.dataset import create_dataset_from_bruker, downconvert_dataset +from pyepr.hardware.ETH_awg_load import uwb_load, uwb_eval_match, ETHUWB_xarray_load from scipy.io import loadmat from scipy.io.matlab import MatReadError import xarray as xr @@ -12,7 +12,7 @@ def eprload( - path: str, experiment: str = None, type: str = None, + path: str, experiment: str = None, type: str = None,downconvert=True, **kwargs): """ A general versions of eprload @@ -54,7 +54,7 @@ def eprload( type = 'TXT' elif file_extension == '.mat': - log.debug('File detecetd as Matlab') + log.debug('File detected as Matlab') type = 'MAT' else: @@ -64,10 +64,10 @@ def eprload( " set type manually \n Valid file types: '.DSC','.DTA','.h5'," "'.hdf5','.csv','.txt','.mat'") - if type == 'BRUKER': + if type.upper() == 'BRUKER': return create_dataset_from_bruker(path) - elif type == 'TXT': + elif type.upper() == 'TXT': if 'full_output' in kwargs: full_output = kwargs['full_output'] del kwargs['full_output'] @@ -76,7 +76,22 @@ def eprload( data = np.loadtxt(path, *kwargs) return data - elif type == 'MAT': + elif type.upper() == 'MAT': + try: + Matfile = loadmat(path, simplify_cells=True, squeeze_me=True) + except Exception as e: + raise MatReadError("Error opening MatFile") + + dataset = ETHUWB_xarray_load(Matfile,kwargs.get('sum_scans',True)) + + if downconvert and 'tx' in dataset.dims: + datasetDC = downconvert_dataset(dataset,**kwargs) + return datasetDC + else: + return dataset + + + elif type.upper() == 'MAT_OLD': try: Matfile = loadmat(path, simplify_cells=True, squeeze_me=True) except Exception as e: @@ -97,8 +112,14 @@ def eprload( return uwb_output - elif type == 'HDF5': - return xr.load_dataarray(path,engine='h5netcdf',invalid_netcdf=True) + elif type.upper() == 'HDF5': + dataset= xr.load_dataarray(path,engine='h5netcdf',invalid_netcdf=True) + + if downconvert and 'tx' in dataset.dims: + datasetDC = downconvert_dataset(dataset,**kwargs) + return datasetDC + else: + return dataset def progress_bar(progress, post=""): diff --git a/pyepr/utils.py b/pyepr/utils.py index 7189809..98e8d68 100644 --- a/pyepr/utils.py +++ b/pyepr/utils.py @@ -157,7 +157,7 @@ def autoEPRDecoder(dct): def gcd(values:list): - """Generates the greatest common dividor on a list of floats + """Generates the greatest common divisor on a list of floats Parameters ---------- diff --git a/pyproject.toml b/pyproject.toml index 64a0735..04aa1fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pyEPR-ESR" -version = "1.0.2" +version = "1.1" description = "" authors = ["Hugo Karas ", "Gunnar Jeschke ", "Stefan Stoll "] readme = "README.md" @@ -9,7 +9,7 @@ packages = [ ] [tool.poetry.dependencies] -python = ">=3.11,<3.13" +python = ">=3.11" numpy = ">2.1" scipy = "^1.14.1" matplotlib = "^3.9.2" @@ -19,7 +19,6 @@ h5py = ">3.15" h5netcdf = ">1.4.0" toml = "^0.10.2" deerlab = "^1.1.4" -numba = ">=0.60.0" psutil = "^7.1.3" requests = ">=2.32" @@ -46,3 +45,4 @@ sphinx-design = "^0.6.1" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" + diff --git a/tests/test_pulses.py b/tests/test_pulses.py new file mode 100644 index 0000000..555d196 --- /dev/null +++ b/tests/test_pulses.py @@ -0,0 +1,72 @@ +from autodeer.pulses import * +import pytest + +def test_pulse_init(): + pulse = Pulse(tp=10, scale=0.5) + assert pulse.tp.value == 10 + assert pulse.scale.value == 0.5 + +def test_pulse_add_phase_cycle(): + pulse = Pulse(tp=10, scale=0.5) + pulse._addPhaseCycle([0, 90, 180, 270]) + assert pulse.pcyc["Phases"] == [0, 90, 180, 270] + assert pulse.pcyc["DetSigns"] == [1, 1, 1, 1] + + +def test_pulse_is_static(): + pulse = Pulse(tp=10, scale=0.5) + assert pulse.is_static() + pulse = Pulse(tp=10, scale=0.5, t=0) + assert pulse.is_static() + pulse = Pulse(tp=10, scale=0.5, t=0, pcyc=None) + assert pulse.is_static() + tp = Parameter( + name="tp", value=10, unit="us", description="The pulse length", + axis=np.arange(0,10,1), axis_id=0) + pulse = Pulse(tp=tp, scale=0.5, t=0, pcyc=None) + assert pulse.is_static() + +def test_pulse_plot(): + pulse = Pulse(tp=10, scale=0.5) + pulse._buildFMAM(lambda x: (np.ones_like(x), np.zeros_like(x))) + pulse.plot() + +def test_pulse_exciteprofile(): + pulse = Pulse(tp=10, scale=0.5, flipangle=np.pi/2) + pulse.func = lambda x: (np.ones_like(x), np.zeros_like(x)) + freqs = np.linspace(-.25, .25, 1001) + Mag= pulse.exciteprofile(freqs=freqs) + assert len(Mag[:,0]) == 1001 + assert len(Mag[:,1]) == 1001 + assert len(Mag[:,2]) == 1001 + +def test_rectpulse_init(): + pulse = RectPulse(tp=10, freq=1, t=0, flipangle=np.pi/2) + assert pulse.tp.value == 10 + assert pulse.freq.value == 1 + assert pulse.t.value == 0 + assert pulse.flipangle.value == np.pi/2 + +def test_rectpulse_func(): + pulse = RectPulse(tp=10, freq=1, t=0, flipangle=np.pi/2) + AM, FM = pulse.func(np.arange(10)) + assert np.allclose(AM, np.ones(10)) + assert np.allclose(FM, np.ones(10) ) + +def test_hspulse_init(): + pulse = HSPulse(tp=10, order1=1, order2=6, beta=20, BW=1, init_freq=0) + assert pulse.tp.value == 10 + assert pulse.order1.value == 1 + assert pulse.order2.value == 6 + assert pulse.beta.value == 20 + assert pulse.BW.value == 1 + assert pulse.init_freq.value == 0 + assert pulse.final_freq.value == 1 + +def test_hspulse_func(): + pulse = HSPulse(tp=128, order1=1, order2=6, beta=20, BW=0.1, init_freq=0) + AM, FM = pulse.func(np.arange(10)) + # assert np.allclose(AM, np.ones(10)) + assert np.allclose(FM.min(), 0) + assert np.allclose(FM.max(), 0.1) + diff --git a/tests/test_respro.py b/tests/test_respro.py new file mode 100644 index 0000000..3885568 --- /dev/null +++ b/tests/test_respro.py @@ -0,0 +1,134 @@ +from autodeer.ResPro import * +from autodeer.hardware.dummy import _similate_respro +from autodeer.sequences import ResonatorProfileSequence +from autodeer.dataset import create_dataset_from_sequence, create_dataset_from_axes +from autodeer.FieldSweep import create_Nmodel, FieldSweepAnalysis +import pytest + +def test_ResonatorProfileAnalysis_from_sim(): + def lorenz_fcn(x, centre, sigma): + y = (0.5*sigma)/((x-centre)**2 + (0.5*sigma)**2) + return y + + mode = lambda x: lorenz_fcn(x, 34.0, 34.0/60) + x = np.linspace(33,35) + scale = 75/mode(x).max() + mode_fun = lambda x: lorenz_fcn(x, 34.0, 34.0/60) * scale + seq = ResonatorProfileSequence(B=12220,LO=34,reptime=3e3,averages=1,shots=50,fwidth=0.3) + + + [tp_x, LO_axis], data = _similate_respro(seq,mode_fun) + + dset = create_dataset_from_sequence(data,seq) + + respro = ResonatorProfileAnalysis(dset + ) + # respro.process_nutations(threshold=4) + respro.fit() + assert hasattr(respro,'results') + assert hasattr(respro,'fc') + assert hasattr(respro,'q') + + print(respro.fc) + assert respro.fc == pytest.approx(34,abs=0.1) + assert respro.q == pytest.approx(60,abs=2) + + fig = respro.plot() + assert fig is not None + + + +def test_ceil(): + assert ceil(3.5) == 4 + assert ceil(3.4) == 4 + assert ceil(3.0) == 3 + assert ceil(3.1) == 4 + assert ceil(3.9999999) == 4 + + assert ceil(45,decimals=1) == 45 + assert ceil(45,decimals=-1) == 50 + +def test_floor(): + assert floor(3.5) == 3 + assert floor(3.4) == 3 + assert floor(3.0) == 3 + assert floor(3.1) == 3 + assert floor(3.9999999) == 3 + + assert floor(45,decimals=1) == 45 + assert floor(45,decimals=-1) == 40 + +def test_calc_overlap(): + + def gauss(x,centre,sigma): + return np.exp(-((x-centre)/sigma)**2) + + gauss1 = lambda x: gauss(x,0,1) + gauss2 = lambda x: gauss(x,0.5,1) + + x = np.linspace(-5,5,1000) + overlap = calc_overlap(x,gauss1,gauss2) + + assert overlap == pytest.approx(0.352065) + +def test_BSpline_extra(): + + def gauss(x,centre,sigma): + return np.exp(-((x-centre)/sigma)**2) + + x = np.linspace(-5,5,1000) + y = gauss(x,0,1) + + spl = BSpline_extra((x,y,3)) + x_test = np.linspace(-10,10,100) + y_test = spl(x_test) + + assert y_test[0] == pytest.approx(0,abs=1e-3) + assert y_test[-1] == pytest.approx(0,abs=1e-3) + assert y_test[50] == pytest.approx(1,abs=1e-1) + +@pytest.fixture +def create_fake_respro(): + def lorenz_fcn(x, centre, sigma): + y = (0.5*sigma)/((x-centre)**2 + (0.5*sigma)**2) + return y + + mode = lambda x: lorenz_fcn(x, 34.0, 34.0/60) + x = np.linspace(33,35) + scale = 75/mode(x).max() + mode_fun = lambda x: lorenz_fcn(x, 34.0, 34.0/60) * scale + seq = ResonatorProfileSequence(B=12220,LO=34,reptime=3e3,averages=1,shots=50,fwidth=0.3) + + + [tp_x, LO_axis], data = _similate_respro(seq,mode_fun) + + dset = create_dataset_from_sequence(data,seq) + + respro = ResonatorProfileAnalysis( + nuts = dset.data.T, + freqs = dset.LO.data, + dt=2 + ) + respro.process_nutations(threshold=4) + respro.fit() + return respro + +@pytest.fixture +def create_fake_fieldsweep(): + B_axis = np.linspace(12000, 12400, 100) + Nmodel = create_Nmodel(34.0*1e3) + params = {"B":B_axis/1e1,"az":3.66,"axy":0.488,"gy":2.01,"gz":2.0,"GB":0.45,"scale":1, "Boffset":0.7} + V = Nmodel(**params) + dset = create_dataset_from_axes(V, B_axis,params={"LO":34.0},axes_labels=['B']) + fsweep = FieldSweepAnalysis(dset) + fsweep.calc_gyro() + fsweep.fit() + return fsweep + +def test_optimise_spectra_position(create_fake_fieldsweep,create_fake_respro): + fsweep = create_fake_fieldsweep + respro = create_fake_respro + + new_LO = optimise_spectra_position(respro,fsweep) + + assert new_LO == pytest.approx(-0.47,abs=0.1) #TODO: check this value \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..0f8ea99 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,93 @@ +import numpy as np +from pyepr import Parameter +import pytest +from pyepr.utils import * + +def test_gcd(): + assert gcd([2.0, 4.0, 6.0, 8.0]) == 2 + assert gcd([1.5, 3.0]) == 1.5 + assert gcd([3, 6, 9, 12]) == 3 + assert gcd([5, 10, 15, 20]) == 5 + assert gcd([7, 14, 21, 28]) == 7 + assert gcd([8, 12, 16, 20]) == 4 + assert gcd([10, 20, 30, 40]) == 10 + assert gcd([15, 25, 35, 45]) == 5 + assert gcd([18, 24, 30, 36]) == 6 + assert gcd([21, 28, 35, 42]) == 7 + assert gcd([27, 36, 45, 54]) == 9 + +def test_transpose_dict_of_list(): + d = {'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]} + expected = [{'a': 1, 'b': 4, 'c': 7}, {'a': 2, 'b': 5, 'c': 8}, {'a': 3, 'b': 6, 'c': 9}] + assert transpose_dict_of_list(d) == expected + + d = {'a': [1, 2], 'b': [3, 4], 'c': [5, 6]} + expected = [{'a': 1, 'b': 3, 'c': 5}, {'a': 2, 'b': 4, 'c': 6}] + assert transpose_dict_of_list(d) == expected + + d = {'a': [1], 'b': [2], 'c': [3]} + expected = [{'a': 1, 'b': 2, 'c': 3}] + assert transpose_dict_of_list(d) == expected + + d = {'a': [], 'b': [], 'c': []} + expected = [] + assert transpose_dict_of_list(d) == expected + +def test_transpose_list_of_dicts(): + d = [{'a': 1, 'b': 4, 'c': 7}, {'a': 2, 'b': 5, 'c': 8}, {'a': 3, 'b': 6, 'c': 9}] + expected = {'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]} + assert transpose_list_of_dicts(d) == expected + + d = [{'a': 1, 'b': 3, 'c': 5}, {'a': 2, 'b': 4, 'c': 6}] + expected = {'a': [1, 2], 'b': [3, 4], 'c': [5, 6]} + assert transpose_list_of_dicts(d) == expected + + d = [{'a': 1, 'b': 2, 'c': 3}] + expected = {'a': [1], 'b': [2], 'c': [3]} + assert transpose_list_of_dicts(d) == expected + + d = [] + expected = {} + assert transpose_list_of_dicts(d) == expected + +def test_val_in_ns(): + # Test with no axis + p = Parameter(name='p', value=10, unit="ns") + assert val_in_ns(p) == 10 + p = Parameter(name='p', value=10, unit="us") + assert val_in_ns(p) == 10000 + + # Test with one axis + p = Parameter(name='p', value=10, unit="ns", step=5, dim=1) + assert np.all(val_in_ns(p) == np.array([10])) + + p = Parameter(name='p', value=10, unit="ns", step=5, dim=3) + assert np.all(val_in_ns(p) == np.array([10,15,20])) + + # Test with two axis + p = Parameter(name='p', value=10, unit="ns", step=5, dim=2) + p.add_axis(2,axis=np.array([1,2])) + + with pytest.raises(ValueError): + val_in_ns(p) + +def test_val_in_us(): + # Test with no axis + p = Parameter(name='p', value=10, unit="ns") + assert val_in_us(p) == 0.01 + p = Parameter(name='p', value=10, unit="us") + assert val_in_us(p) == 10 + + # Test with one axis + p = Parameter(name='p', value=10, unit="us", step=5, dim=1) + assert np.all(val_in_us(p) == np.array([10])) + + p = Parameter(name='p', value=10, unit="us", step=5, dim=3) + assert np.all(val_in_us(p) == np.array([10,15,20])) + + # Test with two axis + p = Parameter(name='p', value=10, unit="us", step=5, dim=2) + p.add_axis(2,axis=np.array([1,2])) + + with pytest.raises(ValueError): + val_in_us(p)