diff --git a/easyDiffractionLib/Jobs.py b/easyDiffractionLib/Jobs.py index fbc8677b..cb645ef4 100644 --- a/easyDiffractionLib/Jobs.py +++ b/easyDiffractionLib/Jobs.py @@ -1,8 +1,10 @@ __author__ = "github.com/wardsimon" __version__ = "0.1.1" +from gemmi import cif from easyCore.Datasets.xarray import xr, np from easyDiffractionLib.Profiles.common import _PowderBase +from easyDiffractionLib.elements.Backgrounds.Point import PointBackground, BackgroundPoint from easyDiffractionLib.interface import InterfaceFactory from easyCore.Fitting.Fitting import Fitter @@ -36,6 +38,8 @@ def __init__( ) self._x_axis_name = "" self._y_axis_prefix = "Intensity_" + if phases is not None: + self.phases = phases @property def simulation_data(self): @@ -136,6 +140,120 @@ def fit_experiment(self, experiment_name, fitter=None, **kwargs): fitter = Fitter(self, self.interface.fit_func) return self.datastore.store[dataarray_name].easyCore.fit(fitter) + def pattern_from_cif_block(self, block): + # Various pattern parameters + value = block.find_value("_diffrn_radiation_polarization") + if value is not None: + self.pattern.beam.polarization = float(value) + value = block.find_value("_diffrn_radiation_efficiency") + if value is not None: + self.pattern.beam.efficiency = float(value) + value = block.find_value("_setup_offset_2theta") + if value is not None: + self.pattern.zero_shift = float(value) + value = block.find_value("_setup_field") + if value is not None: + self.pattern.field = float(value) + + def parameters_from_cif_block(self, block): + # Various instrumental parameters + value = block.find_value("_setup_wavelength") + if value is not None: + self.parameters.wavelength = float(value) + value = block.find_value("_pd_instr_resolution_u") + if value is not None: + self.parameters.resolution_u = float(value) + value = block.find_value("_pd_instr_resolution_v") + if value is not None: + self.parameters.resolution_v = float(value) + value = block.find_value("_pd_instr_resolution_w") + if value is not None: + self.parameters.resolution_w = float(value) + value = block.find_value("_pd_instr_resolution_x") + if value is not None: + self.parameters.resolution_x = float(value) + value = block.find_value("_pd_instr_resolution_y") + if value is not None: + self.parameters.resolution_y = float(value) + value = block.find_value("_pd_instr_reflex_asymmetry_p1") + if value is not None: + self.parameters.reflex_asymmetry_p1 = float(value) + value = block.find_value("_pd_instr_reflex_asymmetry_p2") + if value is not None: + self.parameters.reflex_asymmetry_p2 = float(value) + value = block.find_value("_pd_instr_reflex_asymmetry_p3") + if value is not None: + self.parameters.reflex_asymmetry_p3 = float(value) + value = block.find_value("_pd_instr_reflex_asymmetry_p4") + if value is not None: + self.parameters.reflex_asymmetry_p4 = float(value) + + def phase_parameters_from_cif_block(self, block): + # Get phase parameters + sample_phase_labels = self.phases.phase_names + experiment_phase_labels = list(block.find_loop("_phase_label")) + experiment_phase_scales = np.fromiter(block.find_loop("_phase_scale"), float) + for (phase_label, phase_scale) in zip(experiment_phase_labels, experiment_phase_scales): + if phase_label in sample_phase_labels: + self.phases[phase_label].scale = phase_scale + + def data_from_cif_block(self, block, experiment_name): + # data points + data_x = np.fromiter(block.find_loop("_pd_meas_2theta"), float) + data_y = [] + data_e = [] + data_y.append(np.fromiter(block.find_loop("_pd_meas_intensity_up"), float)) + data_e.append(np.fromiter(block.find_loop("_pd_meas_intensity_up_sigma"), float)) + data_y.append(np.fromiter(block.find_loop("_pd_meas_intensity_down"), float)) + data_e.append(np.fromiter(block.find_loop("_pd_meas_intensity_down_sigma"), float)) + # Unpolarized case + if not np.any(data_y[0]): + data_y[0] = np.fromiter(block.find_loop("_pd_meas_intensity"), float) + data_e[0] = np.fromiter(block.find_loop("_pd_meas_intensity_sigma"), float) + data_y[1] = np.zeros(len(data_y)) + data_e[1] = np.zeros(len(data_e)) + + coord_name = self.name + "_" + experiment_name + "_" + self._x_axis_name + + self.datastore.store.easyCore.add_coordinate(coord_name, data_x) + + for i in range(0, len(data_y)): + self.datastore.store.easyCore.add_variable( + self.name + "_" + experiment_name + f"_I{i}", [coord_name], data_y[i] + ) + self.datastore.store.easyCore.sigma_attach( + self.name + "_" + experiment_name + f"_I{i}", data_e[i] + ) + + def background_from_cif_block(self, block, experiment_name): + # The background + background_2thetas = np.fromiter(block.find_loop("_pd_background_2theta"), float) + background_intensities = np.fromiter(block.find_loop("_pd_background_intensity"), float) + bkg = PointBackground(linked_experiment=experiment_name) + for (x, y) in zip(background_2thetas, background_intensities): + bkg.append(BackgroundPoint.from_pars(x, y)) + + self.set_background(bkg) + + def from_cif_file(self, file_url, experiment_name=None): + """ + Load a CIF file and extract the experiment data. + This includes + - the pattern parameters + - the instrumental parameters + - the phase parameters + - the data points + - the background + """ + block = cif.read(file_url).sole_block() + + if experiment_name is None: + experiment_name = block.name + self.pattern_from_cif_block(block) + self.parameters_from_cif_block(block) + self.phase_parameters_from_cif_block(block) + self.data_from_cif_block(block, experiment_name) + self.background_from_cif_block(block, experiment_name) class Powder1DCW(JobBase_1D): def __init__( @@ -200,3 +318,55 @@ def __init__( name, Unpolarized1DTOFClasses, datastore, phases, parameters, pattern ) self._x_axis_name = "time" + + +def get_job_type_from_file(file_url): + ''' + Get the job type from a CIF file. + + Based on keywords in the CIF file, the job type is determined. + ''' + block = cif.read(file_url).sole_block() + job_type = "Powder1DCW" + # Check if powder1DCWpol + value_cwpol = block.find_value("_diffrn_radiation_polarization") + value_tof = block.find_value("_pd_meas_time_of_flight") + value_cw = block.find_value("_pd_meas_2theta") + + if value_cwpol is not None: + job_type = "PolPowder1DCW" + elif value_tof is not None: + job_type = "Powder1DTOF" + elif value_cw is not None: + job_type = "Powder1DCW" + else: + raise ValueError("Could not determine job type from file.") + return job_type + +def get_job_from_file(file_url, exp_name="job", phases=None): + ''' + Get the job from a CIF file. + + Based on keywords in the CIF file, the job type is determined, + the job is created and the data is loaded from the CIF file. + ''' + block = cif.read(file_url).sole_block() + datastore = xr.Dataset() + # Check if powder1DCWpol + value_cwpol = block.find_value("_diffrn_radiation_polarization") + value_tof = block.find_value("_pd_meas_time_of_flight") + value_cw = block.find_value("_pd_meas_2theta") + + if value_cwpol is not None: + job = PolPowder1DCW(exp_name, datastore, phases) + elif value_tof is not None: + job = Powder1DTOF(exp_name, datastore, phases) + elif value_cw is not None: + job = Powder1DCW(exp_name, datastore, phases) + else: + raise ValueError("Could not determine job type from file.") + + # Load the data + job.from_cif_file(file_url, exp_name) + + return datastore, job