Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions easyDiffractionLib/Jobs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
__author__ = "github.com/wardsimon"
__version__ = "0.1.1"

from gemmi import cif
from easyCore.Datasets.xarray import xr, np
from easyDiffractionLib.Profiles.common import _PowderBase
from easyDiffractionLib.elements.Backgrounds.Point import PointBackground, BackgroundPoint
from easyDiffractionLib.interface import InterfaceFactory
from easyCore.Fitting.Fitting import Fitter

Expand Down Expand Up @@ -36,6 +38,8 @@ def __init__(
)
self._x_axis_name = ""
self._y_axis_prefix = "Intensity_"
if phases is not None:
self.phases = phases

@property
def simulation_data(self):
Expand Down Expand Up @@ -136,6 +140,120 @@ def fit_experiment(self, experiment_name, fitter=None, **kwargs):
fitter = Fitter(self, self.interface.fit_func)
return self.datastore.store[dataarray_name].easyCore.fit(fitter)

def pattern_from_cif_block(self, block):
# Various pattern parameters
value = block.find_value("_diffrn_radiation_polarization")
if value is not None:
self.pattern.beam.polarization = float(value)
value = block.find_value("_diffrn_radiation_efficiency")
if value is not None:
self.pattern.beam.efficiency = float(value)
value = block.find_value("_setup_offset_2theta")
if value is not None:
self.pattern.zero_shift = float(value)
value = block.find_value("_setup_field")
if value is not None:
self.pattern.field = float(value)

def parameters_from_cif_block(self, block):
# Various instrumental parameters
value = block.find_value("_setup_wavelength")
if value is not None:
self.parameters.wavelength = float(value)
value = block.find_value("_pd_instr_resolution_u")
if value is not None:
self.parameters.resolution_u = float(value)
value = block.find_value("_pd_instr_resolution_v")
if value is not None:
self.parameters.resolution_v = float(value)
value = block.find_value("_pd_instr_resolution_w")
if value is not None:
self.parameters.resolution_w = float(value)
value = block.find_value("_pd_instr_resolution_x")
if value is not None:
self.parameters.resolution_x = float(value)
value = block.find_value("_pd_instr_resolution_y")
if value is not None:
self.parameters.resolution_y = float(value)
value = block.find_value("_pd_instr_reflex_asymmetry_p1")
if value is not None:
self.parameters.reflex_asymmetry_p1 = float(value)
value = block.find_value("_pd_instr_reflex_asymmetry_p2")
if value is not None:
self.parameters.reflex_asymmetry_p2 = float(value)
value = block.find_value("_pd_instr_reflex_asymmetry_p3")
if value is not None:
self.parameters.reflex_asymmetry_p3 = float(value)
value = block.find_value("_pd_instr_reflex_asymmetry_p4")
if value is not None:
self.parameters.reflex_asymmetry_p4 = float(value)

def phase_parameters_from_cif_block(self, block):
# Get phase parameters
sample_phase_labels = self.phases.phase_names
experiment_phase_labels = list(block.find_loop("_phase_label"))
experiment_phase_scales = np.fromiter(block.find_loop("_phase_scale"), float)
for (phase_label, phase_scale) in zip(experiment_phase_labels, experiment_phase_scales):
if phase_label in sample_phase_labels:
self.phases[phase_label].scale = phase_scale

def data_from_cif_block(self, block, experiment_name):
# data points
data_x = np.fromiter(block.find_loop("_pd_meas_2theta"), float)
data_y = []
data_e = []
data_y.append(np.fromiter(block.find_loop("_pd_meas_intensity_up"), float))
data_e.append(np.fromiter(block.find_loop("_pd_meas_intensity_up_sigma"), float))
data_y.append(np.fromiter(block.find_loop("_pd_meas_intensity_down"), float))
data_e.append(np.fromiter(block.find_loop("_pd_meas_intensity_down_sigma"), float))
# Unpolarized case
if not np.any(data_y[0]):
data_y[0] = np.fromiter(block.find_loop("_pd_meas_intensity"), float)
data_e[0] = np.fromiter(block.find_loop("_pd_meas_intensity_sigma"), float)
data_y[1] = np.zeros(len(data_y))
data_e[1] = np.zeros(len(data_e))

coord_name = self.name + "_" + experiment_name + "_" + self._x_axis_name

self.datastore.store.easyCore.add_coordinate(coord_name, data_x)

for i in range(0, len(data_y)):
self.datastore.store.easyCore.add_variable(
self.name + "_" + experiment_name + f"_I{i}", [coord_name], data_y[i]
)
self.datastore.store.easyCore.sigma_attach(
self.name + "_" + experiment_name + f"_I{i}", data_e[i]
)

def background_from_cif_block(self, block, experiment_name):
# The background
background_2thetas = np.fromiter(block.find_loop("_pd_background_2theta"), float)
background_intensities = np.fromiter(block.find_loop("_pd_background_intensity"), float)
bkg = PointBackground(linked_experiment=experiment_name)
for (x, y) in zip(background_2thetas, background_intensities):
bkg.append(BackgroundPoint.from_pars(x, y))

self.set_background(bkg)

def from_cif_file(self, file_url, experiment_name=None):
"""
Load a CIF file and extract the experiment data.
This includes
- the pattern parameters
- the instrumental parameters
- the phase parameters
- the data points
- the background
"""
block = cif.read(file_url).sole_block()

if experiment_name is None:
experiment_name = block.name
self.pattern_from_cif_block(block)
self.parameters_from_cif_block(block)
self.phase_parameters_from_cif_block(block)
self.data_from_cif_block(block, experiment_name)
self.background_from_cif_block(block, experiment_name)

class Powder1DCW(JobBase_1D):
def __init__(
Expand Down Expand Up @@ -200,3 +318,55 @@ def __init__(
name, Unpolarized1DTOFClasses, datastore, phases, parameters, pattern
)
self._x_axis_name = "time"


def get_job_type_from_file(file_url):
'''
Get the job type from a CIF file.

Based on keywords in the CIF file, the job type is determined.
'''
block = cif.read(file_url).sole_block()
job_type = "Powder1DCW"
# Check if powder1DCWpol
value_cwpol = block.find_value("_diffrn_radiation_polarization")
value_tof = block.find_value("_pd_meas_time_of_flight")
value_cw = block.find_value("_pd_meas_2theta")

if value_cwpol is not None:
job_type = "PolPowder1DCW"
elif value_tof is not None:
job_type = "Powder1DTOF"
elif value_cw is not None:
job_type = "Powder1DCW"
else:
raise ValueError("Could not determine job type from file.")
return job_type

def get_job_from_file(file_url, exp_name="job", phases=None):
'''
Get the job from a CIF file.

Based on keywords in the CIF file, the job type is determined,
the job is created and the data is loaded from the CIF file.
'''
block = cif.read(file_url).sole_block()
datastore = xr.Dataset()
# Check if powder1DCWpol
value_cwpol = block.find_value("_diffrn_radiation_polarization")
value_tof = block.find_value("_pd_meas_time_of_flight")
value_cw = block.find_value("_pd_meas_2theta")

if value_cwpol is not None:
job = PolPowder1DCW(exp_name, datastore, phases)
elif value_tof is not None:
job = Powder1DTOF(exp_name, datastore, phases)
elif value_cw is not None:
job = Powder1DCW(exp_name, datastore, phases)
else:
raise ValueError("Could not determine job type from file.")

# Load the data
job.from_cif_file(file_url, exp_name)

return datastore, job