Skip to content

Commit

Permalink
fix simulate (#479)
Browse files Browse the repository at this point in the history
Default noise distribution wasn't handled in some cases

Add option to ensure synthetic data is non-negative (can become negative after noise is added)

* fix default noise distribution

* switch to pandas isna

* Return negative values by default.

* change non-negative argument to zero-bounded

* docs, set random seed

Co-authored-by: Daniel Weindl <dweindl@users.noreply.github.com>
  • Loading branch information
dilpath and dweindl committed Jan 11, 2021
1 parent d948970 commit 91612f0
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 25 deletions.
60 changes: 46 additions & 14 deletions petab/simulate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""PEtab simulator base class and related functions."""

import abc
import numpy as np
import pathlib
Expand All @@ -10,8 +12,8 @@


class Simulator(abc.ABC):
"""
Base class that specific simulators should inherit.
"""Base class that specific simulators should inherit.
Specific simulators should minimally implement the
`simulate_without_noise` method.
Example (AMICI): https://bit.ly/33SUSG4
Expand All @@ -32,10 +34,14 @@ class Simulator(abc.ABC):
directory and its contents may be modified and deleted, and
should be considered ephemeral.
"""
def __init__(self,
petab_problem: petab.Problem,
working_dir: Optional[Union[pathlib.Path, str]] = None):
"""

def __init__(
self,
petab_problem: petab.Problem,
working_dir: Optional[Union[pathlib.Path, str]] = None,
):
"""Initialize the simulator.
Initialize the simulator with sufficient information to perform a
simulation. If no working directory is specified, a temporary one is
created.
Expand Down Expand Up @@ -64,14 +70,16 @@ def __init__(self,
self.rng = np.random.default_rng()

def remove_working_dir(self, force: bool = False, **kwargs) -> None:
"""
Remove the simulator working directory and all files within (see the
`__init__` method arguments).
"""Remove the simulator working directory, and all files within.
See the `__init__` method arguments.
Arguments:
force:
If True, the working directory is removed regardless of
whether it is a temporary directory.
**kwargs:
Additional keyword arguments are passed to `shutil.rmtree`.
"""
if force or self.temporary_working_dir:
shutil.rmtree(self.working_dir, **kwargs)
Expand All @@ -85,17 +93,18 @@ def remove_working_dir(self, force: bool = False, **kwargs) -> None:

@abc.abstractmethod
def simulate_without_noise(self) -> pd.DataFrame:
"""
Simulate a PEtab problem. This is an abstract method that should be
implemented in a simulation package. Links to examples of this are in
the class docstring.
"""Simulate the PEtab problem.
This is an abstract method that should be implemented with a simulation
package. Examples of this are referenced in the class docstring.
Returns:
Simulated data, as a PEtab measurements table, which should be
equivalent to replacing all values in the `petab.C.MEASUREMENT`
column of the measurements table (of the PEtab problem supplied to
the `__init__` method), with simulated values.
"""
raise NotImplementedError

def simulate(
self,
Expand All @@ -109,6 +118,9 @@ def simulate(
noise: If True, noise is added to simulated data.
noise_scaling_factor:
A multiplier of the scale of the noise distribution.
**kwargs:
Additional keyword arguments are passed to
`simulate_without_noise`.
Returns:
Simulated data, as a PEtab measurements table.
Expand All @@ -122,6 +134,7 @@ def add_noise(
self,
simulation_df: pd.DataFrame,
noise_scaling_factor: float = 1,
**kwargs
) -> pd.DataFrame:
"""Add noise to simulated data.
Expand All @@ -130,6 +143,8 @@ def add_noise(
A PEtab measurements table that contains simulated data.
noise_scaling_factor:
A multiplier of the scale of the noise distribution.
**kwargs:
Additional keyword arguments are passed to `sample_noise`.
Returns:
Simulated data with noise, as a PEtab measurements table.
Expand All @@ -143,6 +158,7 @@ def add_noise(
self.noise_formulas,
self.rng,
noise_scaling_factor,
**kwargs,
)
for _, row in simulation_df_with_noise.iterrows()
]
Expand All @@ -156,6 +172,7 @@ def sample_noise(
noise_formulas: Optional[Dict[str, sp.Expr]] = None,
rng: Optional[np.random.Generator] = None,
noise_scaling_factor: float = 1,
zero_bounded: bool = False,
) -> float:
"""Generate a sample from a PEtab noise distribution.
Expand All @@ -176,6 +193,10 @@ def sample_noise(
A NumPy random generator.
noise_scaling_factor:
A multiplier of the scale of the noise distribution.
zero_bounded:
Return zero if the sign of the return value and `simulated_value`
differ. Can be used to ensure non-negative and non-positive values,
if the sign of `simulated_value` should not change.
Returns:
The sample from the PEtab noise distribution.
Expand All @@ -200,9 +221,20 @@ def sample_noise(
.loc[measurement_row[petab.C.OBSERVABLE_ID]]
.get(petab.C.NOISE_DISTRIBUTION, petab.C.NORMAL)
)
# an empty noise distribution column in an observables table can result in
# `noise_distribution == float('nan')`
if pd.isna(noise_distribution):
noise_distribution = petab.C.NORMAL

# below is e.g.: `np.random.normal(loc=simulation, scale=noise_value)`
return getattr(rng, noise_distribution)(
simulated_value_with_noise = getattr(rng, noise_distribution)(
loc=simulated_value,
scale=noise_value * noise_scaling_factor
)

if (
zero_bounded and
np.sign(simulated_value) != np.sign(simulated_value_with_noise)
):
return 0.0
return simulated_value_with_noise
80 changes: 69 additions & 11 deletions tests/test_simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,67 @@ def test_remove_working_dir(petab_problem):
assert not pathlib.Path(simulator.working_dir).is_dir()


def test_zero_bounded(petab_problem):
"""Test `zero_bounded` argument of `sample_noise`."""
positive = np.spacing(1)
negative = -positive

simulator = TestSimulator(petab_problem)
# Set the random seed to ensure predictable tests.
simulator.rng = np.random.default_rng(seed=0)

# Set approximately half of the measurements to negative values, and the
# rest to positive values.
n_measurements = len(petab_problem.measurement_df)
neg_indices = range(round(n_measurements / 2))
pos_indices = range(len(neg_indices), n_measurements)
measurements = [
negative if index in neg_indices else
(positive if index in pos_indices else np.nan)
for index in range(n_measurements)
]
synthetic_data_df = simulator.simulate().assign(**{
petab.C.MEASUREMENT: measurements
})
# All measurements are non-zero
assert (synthetic_data_df['measurement'] != 0).all()
# No measurements are NaN
assert not (np.isnan(synthetic_data_df['measurement'])).any()

synthetic_data_df_with_noise = simulator.add_noise(
synthetic_data_df,
)
# Both negative and positive values are returned by default.
assert all([
(synthetic_data_df_with_noise['measurement'] <= 0).any(),
(synthetic_data_df_with_noise['measurement'] >= 0).any(),
])

synthetic_data_df_with_noise = simulator.add_noise(
synthetic_data_df,
zero_bounded=True,
)
# Values with noise that are different in sign to values without noise are
# zeroed.
assert all([
(synthetic_data_df_with_noise['measurement'][neg_indices] <= 0).all(),
(synthetic_data_df_with_noise['measurement'][pos_indices] >= 0).all(),
(synthetic_data_df_with_noise['measurement'][neg_indices] == 0).any(),
(synthetic_data_df_with_noise['measurement'][pos_indices] == 0).any(),
(synthetic_data_df_with_noise['measurement'][neg_indices] < 0).any(),
(synthetic_data_df_with_noise['measurement'][pos_indices] > 0).any(),
])


def test_add_noise(petab_problem):
"""Test the noise generating method."""

tested_noise_distributions = {'normal', 'laplace'}
assert set(petab.C.NOISE_MODELS) == tested_noise_distributions, (
'The noise generation methods have only been tested for '
f'{tested_noise_distributions}. Please edit this test '
'to include this distribution in its tested distributions. The '
'appropriate SciPy distribution will need to be added to '
f'{tested_noise_distributions}. Please edit this test to include this '
'distribution in its tested distributions. The appropriate SciPy '
'distribution will need to be added to '
'`petab_numpy2scipy_distribution` in `_test_add_noise`.'
)

Expand All @@ -94,6 +146,8 @@ def _test_add_noise(petab_problem) -> None:
}

simulator = TestSimulator(petab_problem)
# Set the random seed to ensure predictable tests.
simulator.rng = np.random.default_rng(seed=0)
synthetic_data_df = simulator.simulate()

# Generate samples of noisy data
Expand Down Expand Up @@ -144,26 +198,30 @@ def row2cdf(row, index) -> Callable:
getattr(
scipy.stats,
petab_numpy2scipy_distribution[
expected_noise_distributions[index]]
).cdf, loc=row[MEASUREMENT], scale=expected_noise_values[index])
expected_noise_distributions[index]
]
).cdf,
loc=row[MEASUREMENT],
scale=expected_noise_values[index]
)

# Test whether the distribution of the samples is equal to the expected
# distribution, for each measurement.
results = []
for index, row in synthetic_data_df.iterrows():
r = scipy.stats.ks_1samp(
results.append(scipy.stats.ks_1samp(
samples[:, index],
row2cdf(row, index)
)
results.append(r)
))
observed_fraction_above_threshold = (
sum(r.pvalue > ks_1samp_pvalue_threshold for r in results) /
len(results)
sum(r.pvalue > ks_1samp_pvalue_threshold for r in results)
/ len(results)
)
# Sufficient distributions of measurement samples are sufficiently similar
# to the expected distribution
assert (
observed_fraction_above_threshold > minimum_fraction_above_threshold)
observed_fraction_above_threshold > minimum_fraction_above_threshold
)

simulator.remove_working_dir()
assert not pathlib.Path(simulator.working_dir).is_dir()

0 comments on commit 91612f0

Please sign in to comment.