Skip to content

Commit

Permalink
Fix several issues with KineticImage model (#612)
Browse files Browse the repository at this point in the history
Fixes #604, #605, #606

Main features:
* Made Irf.normalize true by default and only normalize matrix when set. Closes #604
* When combinging k matrices, the right hand matrix now overwrites the left one. Closes #605
* Changed megacomplex scale application and definition. Closes #606
Megacomplex scales are now a property of the dataset_descriptor. They are defined per dataset per megacomplex.

Other commits:
* Fix unit test for combining k matrices
Accessing the matrix by key returns the parameter stored at that key, whereas in the test the full_label is used to test if combining worked.
* Fix typo associated in kinetic_*_result
Internal API only. User exposed attributes already had correct spelling of associated
* Update docstring for KMatrix.combine

Co-authored-by: Sebastian Weigand <s.weigand.phy@gmail.com>
Co-authored-by: Joris Snellenburg <jsnel@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 28, 2021
1 parent 0d41d63 commit b188047
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 42 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ tests/resources/PE2017
# Dask
dask-worker-space/

# virtualenv
bin/
pyvenv.cfg

# TIMP data
*.RData
Expand Down
2 changes: 1 addition & 1 deletion glotaran/builtin/io/yml/test/test_model_parser_kinetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_irf(model):
assert irf.width_dispersion == want
want = [9]
assert irf.scale == want
assert not irf.normalize
assert irf.normalize == (i == 1)

if i == 2:
assert irf.backsweep
Expand Down
2 changes: 1 addition & 1 deletion glotaran/builtin/io/yml/test/test_model_spec_kinetic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ megacomplex:
k_matrix: [km1] # A megacomplex has one or more k-matrices
cmplx2:
k_matrix: [km2]
cmplx3: [[km3], None]
cmplx3: [[km3]]

spectral_constraints:
- type: zero
Expand Down
13 changes: 2 additions & 11 deletions glotaran/builtin/models/kinetic_image/initial_concentration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import numpy as np

from glotaran.model import DatasetDescriptor
from glotaran.model import model_attribute
from glotaran.parameter import Parameter

Expand All @@ -22,16 +21,8 @@ class InitialConcentration:
"""An initial concentration describes the population of the compartments at
the beginning of an experiment."""

def normalized(self, dataset: DatasetDescriptor) -> InitialConcentration:
parameters = self.parameters
for megacomplex in dataset.megacomplex:
scale = [
megacomplex.scale
if c in megacomplex.involved_compartments and megacomplex.scale
else 1
for c in self.compartments
]
parameters = np.multiply(parameters, scale)
def normalized(self) -> InitialConcentration:
parameters = np.array(self.parameters)
idx = [c not in self.exclude_from_normalize for c in self.compartments]
parameters[idx] /= np.sum(parameters[idx])
new = copy.deepcopy(self)
Expand Down
2 changes: 1 addition & 1 deletion glotaran/builtin/models/kinetic_image/irf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class IrfMeasured:
"center": List[Parameter],
"width": List[Parameter],
"scale": {"type": List[Parameter], "allow_none": True},
"normalize": {"type": bool, "default": False},
"normalize": {"type": bool, "default": True},
"backsweep": {"type": bool, "default": False},
"backsweep_period": {"type": Parameter, "allow_none": True},
},
Expand Down
10 changes: 6 additions & 4 deletions glotaran/builtin/models/kinetic_image/k_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ def involved_compartments(self) -> list[str]:
def combine(self, k_matrix: KMatrix) -> KMatrix:
"""Creates a combined matrix.
When combining k-matrices km1 and km2 (km1.combine(km2)),
entries in km1 will be overwritten by corresponding entries in km2.
Parameters
----------
k_matrix :
KMatrix to combine with.
Returns
-------
combined :
Expand All @@ -63,9 +65,9 @@ def combine(self, k_matrix: KMatrix) -> KMatrix:
"""
if not isinstance(k_matrix, KMatrix):
raise TypeError("K-Matrices can only be combined with other K-Matrices.")
combined_matrix = {entry: k_matrix.matrix[entry] for entry in k_matrix.matrix}
for entry in self.matrix:
combined_matrix[entry] = self.matrix[entry]
combined_matrix = {entry: self.matrix[entry] for entry in self.matrix}
for entry in k_matrix.matrix:
combined_matrix[entry] = k_matrix.matrix[entry]
combined = KMatrix()
combined.label = f"{self.label}+{k_matrix.label}"
combined.matrix = combined_matrix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,32 @@
}
)
class KineticImageDatasetDescriptor(DatasetDescriptor):
def get_k_matrices(self):
return [mat for mat in [cmplx.full_k_matrix() for cmplx in self.megacomplex] if mat]
def get_megacomplex_k_matrices(self):
scales = (
[
self.megacomplex_scale[i]
for i, megacomplex in enumerate(self.megacomplex)
if megacomplex.has_k_matrix()
]
if self.megacomplex_scale is not None
else None
)

matrices = [
megacomplex.full_k_matrix()
for megacomplex in self.megacomplex
if megacomplex.has_k_matrix()
]

return scales, matrices

def has_k_matrix(self) -> bool:
_, k_matrices = self.get_megacomplex_k_matrices()
return len(k_matrices) != 0

def compartments(self):
compartments = []
for k in self.get_k_matrices():
_, k_matrices = self.get_megacomplex_k_matrices()
for k in k_matrices:
compartments += k.involved_compartments()
return list(set(compartments))
18 changes: 11 additions & 7 deletions glotaran/builtin/models/kinetic_image/kinetic_image_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def kinetic_matrix(

compartments = None
matrix = None
k_matrices = dataset_descriptor.get_k_matrices()
megacomplex_scales, k_matrices = dataset_descriptor.get_megacomplex_k_matrices()

if len(k_matrices) == 0:
return (None, None)
Expand All @@ -29,9 +29,9 @@ def kinetic_matrix(
raise Exception(
f'No initial concentration specified in dataset "{dataset_descriptor.label}"'
)
initial_concentration = dataset_descriptor.initial_concentration.normalized(dataset_descriptor)
initial_concentration = dataset_descriptor.initial_concentration.normalized()

for k_matrix in k_matrices:
for k_matrix_index, k_matrix in enumerate(k_matrices):

if k_matrix is None:
continue
Expand All @@ -46,6 +46,9 @@ def kinetic_matrix(
matrix_implementation,
)

if megacomplex_scales is not None:
this_matrix *= megacomplex_scales[k_matrix_index]

if matrix is None:
compartments = this_compartments
matrix = this_matrix
Expand All @@ -54,11 +57,11 @@ def kinetic_matrix(
c for c in this_compartments if c not in compartments
]
new_matrix = np.zeros((matrix.shape[0], len(new_compartments)), dtype=np.float64)
for i, comp in enumerate(new_compartments):
for idx, comp in enumerate(new_compartments):
if comp in compartments:
new_matrix[:, i] += matrix[:, compartments.index(comp)]
new_matrix[:, idx] += matrix[:, compartments.index(comp)]
if comp in this_compartments:
new_matrix[:, i] += this_matrix[:, this_compartments.index(comp)]
new_matrix[:, idx] += this_matrix[:, this_compartments.index(comp)]
compartments = new_compartments
matrix = new_matrix

Expand Down Expand Up @@ -129,7 +132,8 @@ def kinetic_image_matrix_implementation(
backsweep,
backsweep_period,
)
matrix /= np.sum(irf_scale)
if dataset_descriptor.irf.normalize:
matrix /= np.sum(irf_scale)

else:
calculate_kinetic_matrix_no_irf(matrix, rates, axis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
from typing import List

from glotaran.model import model_attribute
from glotaran.parameter import Parameter


@model_attribute(
properties={
"k_matrix": {"type": List[str], "default": []},
"scale": {"type": Parameter, "allow_none": True},
}
)
class KineticImageMegacomplex:
"""A Megacomplex with one or more K-Matrices."""

def has_k_matrix(self) -> bool:
return len(self.k_matrix) != 0

def full_k_matrix(self, model=None):
full_k_matrix = None
for k_matrix in self.k_matrix:
Expand Down
11 changes: 6 additions & 5 deletions glotaran/builtin/models/kinetic_image/kinetic_image_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ def finalize_kinetic_image_result(model, problem: Problem, data: dict[str, xr.Da
for label, dataset in data.items():

dataset_descriptor = problem.filled_dataset_descriptors[label]
if not dataset_descriptor.get_k_matrices():

if not dataset_descriptor.has_k_matrix():
continue

retrieve_species_assocatiated_data(problem.model, dataset, dataset_descriptor, "images")
retrieve_decay_assocatiated_data(problem.model, dataset, dataset_descriptor, "images")
retrieve_species_associated_data(problem.model, dataset, dataset_descriptor, "images")
retrieve_decay_associated_data(problem.model, dataset, dataset_descriptor, "images")

if dataset_descriptor.baseline:
dataset["baseline"] = dataset.clp.sel(clp_label=f"{dataset_descriptor.label}_baseline")

retrieve_irf(problem.model, dataset, dataset_descriptor, "images")


def retrieve_species_assocatiated_data(model, dataset, dataset_descriptor, name):
def retrieve_species_associated_data(model, dataset, dataset_descriptor, name):
compartments = dataset_descriptor.initial_concentration.compartments

compartments = [c for c in compartments if c in dataset_descriptor.compartments()]
Expand Down Expand Up @@ -59,7 +60,7 @@ def retrieve_species_assocatiated_data(model, dataset, dataset_descriptor, name)
)


def retrieve_decay_assocatiated_data(model, dataset, dataset_descriptor, name):
def retrieve_decay_associated_data(model, dataset, dataset_descriptor, name):
# get_das
all_das = []
all_a_matrix = []
Expand Down
26 changes: 26 additions & 0 deletions glotaran/builtin/models/kinetic_image/test/test_k_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,29 @@ def test_unibranched():

print(mat.a_matrix_unibranch(con))
assert np.allclose(mat.a_matrix_unibranch(con), wanted_a_matrix)


def test_combine_matrices():

matrix1 = {
("s1", "s1"): "1",
("s2", "s2"): "2",
}
mat1 = KMatrix()
mat1.label = "A"
mat1.matrix = matrix1

matrix2 = {
("s2", "s2"): "3",
("s3", "s3"): "4",
}
mat2 = KMatrix()
mat2.label = "B"
mat2.matrix = matrix2

combined = mat1.combine(mat2)

assert combined.label == "A+B"
assert combined.matrix[("s1", "s1")].full_label == "1"
assert combined.matrix[("s2", "s2")].full_label == "3"
assert combined.matrix[("s3", "s3")].full_label == "4"
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from glotaran.analysis.problem import Problem

from ..kinetic_image.irf import IrfMultiGaussian
from ..kinetic_image.kinetic_image_result import retrieve_decay_assocatiated_data
from ..kinetic_image.kinetic_image_result import retrieve_decay_associated_data
from ..kinetic_image.kinetic_image_result import retrieve_irf
from ..kinetic_image.kinetic_image_result import retrieve_species_assocatiated_data
from ..kinetic_image.kinetic_image_result import retrieve_species_associated_data
from .spectral_irf import IrfGaussianCoherentArtifact
from .spectral_irf import IrfSpectralMultiGaussian

Expand All @@ -17,15 +17,15 @@ def finalize_kinetic_spectrum_result(model, problem: Problem, data: dict[str, xr
for label, dataset in data.items():

dataset_descriptor = problem.filled_dataset_descriptors[label]
if not dataset_descriptor.get_k_matrices():
if not dataset_descriptor.has_k_matrix():
continue

retrieve_species_assocatiated_data(problem.model, dataset, dataset_descriptor, "spectra")
retrieve_species_associated_data(problem.model, dataset, dataset_descriptor, "spectra")

if dataset_descriptor.baseline:
dataset["baseline"] = dataset.clp.sel(clp_label=f"{dataset_descriptor.label}_baseline")

retrieve_decay_assocatiated_data(problem.model, dataset, dataset_descriptor, "spectra")
retrieve_decay_associated_data(problem.model, dataset, dataset_descriptor, "spectra")

irf = dataset_descriptor.irf
if isinstance(irf, IrfMultiGaussian):
Expand Down
1 change: 1 addition & 0 deletions glotaran/model/dataset_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
@model_attribute(
properties={
"megacomplex": List[str],
"megacomplex_scale": {"type": List[Parameter], "default": None, "allow_none": True},
"scale": {"type": Parameter, "default": None, "allow_none": True},
}
)
Expand Down
4 changes: 2 additions & 2 deletions glotaran/model/test/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def model():
"megacomplex": ["m1", "m2"],
"scale": "scale_1",
},
"dataset2": [["m2"], "scale_2"],
"dataset2": [["m2"], ["bar"], "scale_2"],
},
}
return MockModel.from_dict(d)
Expand All @@ -98,7 +98,7 @@ def model_error():
"megacomplex": ["N1", "N2"],
"scale": "scale_1",
},
"dataset2": [["mrX"], "scale_3"],
"dataset2": [["mrX"], ["bar"], "scale_3"],
},
}
return MockModel.from_dict(d)
Expand Down

0 comments on commit b188047

Please sign in to comment.