Skip to content
1 change: 1 addition & 0 deletions petab/C.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,4 @@

SIMULATION = 'simulation'
RESIDUAL = 'residual'
NOISE_VALUE = 'noiseValue'
130 changes: 129 additions & 1 deletion petab/calculate.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,12 @@ def calculate_residuals_for_table(
# non-normalized residual is just the difference
residual = simulation - measurement

noise_value = 1
if normalize:
# look up noise standard deviation
noise_value = evaluate_noise_formula(
row, parameter_df, noise_formulas)
residual /= noise_value
residual /= noise_value

# fill in value
residual_df.loc[irow, RESIDUAL] = residual
Expand Down Expand Up @@ -229,3 +230,130 @@ def calculate_chi2_for_table_from_residuals(
residual_df: pd.DataFrame) -> float:
"""Compute chi2 value for a single residual table."""
return (np.array(residual_df[RESIDUAL])**2).sum()


def calculate_llh(
measurement_dfs: Union[List[pd.DataFrame], pd.DataFrame],
simulation_dfs: Union[List[pd.DataFrame], pd.DataFrame],
observable_dfs: Union[List[pd.DataFrame], pd.DataFrame],
parameter_dfs: Union[List[pd.DataFrame], pd.DataFrame],
) -> float:
"""Calculate total log likelihood.

Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.

Returns:
llh: The log-likelihood.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]

# iterate over data frames
llhs = []
for (measurement_df, simulation_df, observable_df, parameter_df) in zip(
measurement_dfs, simulation_dfs, observable_dfs, parameter_dfs):
_llh = calculate_llh_for_table(
measurement_df, simulation_df, observable_df, parameter_df)
llhs.append(_llh)
llh = sum(llhs)
return llh


def calculate_llh_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame) -> float:
"""Calculate log-likelihood for one set of tables. For the arguments, see
`calculate_llh`."""
llhs = []

# matching columns
compared_cols = set(MEASUREMENT_DF_COLS)
compared_cols -= {MEASUREMENT}
compared_cols &= set(measurement_df.columns)

# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)

# iterate over measurements, find corresponding simulations
for irow, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]

# look up in simulation df
masks = [simulation_df[col] == row[col] for col in compared_cols]
mask = reduce(lambda x, y: x & y, masks)

simulation = simulation_df.loc[mask][SIMULATION].iloc[0]

observable = observable_df.loc[row[OBSERVABLE_ID]]

# get scale
scale = observable.get(OBSERVABLE_TRANSFORMATION, LIN)

# get noise standard deviation
noise_value = evaluate_noise_formula(
row, parameter_df, noise_formulas)

# get noise distribution
noise_distribution = observable.get(NOISE_DISTRIBUTION, NORMAL)

llh = calculate_single_llh(
measurement, simulation, scale, noise_distribution, noise_value)
llhs.append(llh)
llh = sum(llhs)
return llh


def calculate_single_llh(
measurement: float,
simulation: float,
scale: str,
noise_distribution: str,
noise_value: float) -> float:
"""Calculate a single log likelihood.

Arguments:
measurement: The measurement value.
simulation: The simulated value.
scale: The scale on which the noise model is to be applied.
noise_distribution: The noise distribution.
noise_value: The considered noise models possess a single noise
parameter, e.g. the normal standard deviation.

Returns:
llh: The computed likelihood for the given values.
"""
# short-hand
m, s, sigma = measurement, simulation, noise_value
pi, log, log10 = np.pi, np.log, np.log10

# go over the possible cases
if noise_distribution == NORMAL and scale == LIN:
llh = 0.5*log(2*pi*sigma**2) + 0.5*((s-m)/sigma)**2
elif noise_distribution == NORMAL and scale == LOG:
llh = 0.5*log(2*pi*sigma**2*m**2) + 0.5*((log(s)-log(m))/sigma)**2
elif noise_distribution == NORMAL and scale == LOG10:
llh = 0.5*log(2*pi*sigma**2*m**2) + 0.5*((log10(s)-log10(m))/sigma)**2
elif noise_distribution == LAPLACE and scale == LIN:
llh = log(2*sigma) + abs((s-m)/sigma)
elif noise_distribution == LAPLACE and scale == LOG:
llh = log(2*sigma*m) + abs((log(s)-log(m))/sigma)
elif noise_distribution == LAPLACE and scale == LOG10:
llh = log(2*sigma*m) + abs((log10(s)-log10(m))/sigma)
return llh
127 changes: 115 additions & 12 deletions tests/test_calculate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests related to petab.calculate."""

from petab import calculate_residuals, calculate_chi2
from petab import (calculate_residuals, calculate_chi2, calculate_llh,
calculate_single_llh)
from petab.C import *
import pandas as pd
import numpy as np
Expand Down Expand Up @@ -33,9 +34,12 @@ def model_simple():

expected_residuals = {(2-0)/2, (2-1)/2, (19-20)/3, (20-22)/3}
expected_residuals_nonorm = {2-0, 2-1, 19-20, 20-22}
expected_llh = 0.5*(np.array(list(expected_residuals))**2).sum() + \
0.5*np.log(2*np.pi*np.array([2, 2, 3, 3])**2).sum()

return (measurement_df, observable_df, parameter_df,
simulation_df, expected_residuals, expected_residuals_nonorm)
simulation_df, expected_residuals, expected_residuals_nonorm,
expected_llh)


def model_replicates():
Expand Down Expand Up @@ -64,9 +68,12 @@ def model_replicates():

expected_residuals = {(2-0)/2, (2-1)/2}
expected_residuals_nonorm = {2-0, 2-1}
expected_llh = 0.5*(np.array(list(expected_residuals))**2).sum() + \
0.5*np.log(2*np.pi*np.array([2, 2])**2).sum()

return (measurement_df, observable_df, parameter_df,
simulation_df, expected_residuals, expected_residuals_nonorm)
simulation_df, expected_residuals, expected_residuals_nonorm,
expected_llh)


def model_scalings():
Expand Down Expand Up @@ -96,9 +103,12 @@ def model_scalings():

expected_residuals = {(np.log(2)-np.log(0.5))/2, (np.log(3)-np.log(1))/2}
expected_residuals_nonorm = {np.log(2)-np.log(0.5), np.log(3)-np.log(1)}
expected_llh = 0.5*(np.array(list(expected_residuals))**2).sum() + \
0.5*np.log(2*np.pi*np.array([2, 2])**2*np.array([0.5, 1])**2).sum()

return (measurement_df, observable_df, parameter_df,
simulation_df, expected_residuals, expected_residuals_nonorm)
simulation_df, expected_residuals, expected_residuals_nonorm,
expected_llh)


def model_non_numeric_overrides():
Expand Down Expand Up @@ -131,23 +141,65 @@ def model_non_numeric_overrides():
expected_residuals = {(np.log(2)-np.log(0.5))/(2*7+8+4),
(np.log(3)-np.log(1))/(2*2+3+4)}
expected_residuals_nonorm = {np.log(2)-np.log(0.5), np.log(3)-np.log(1)}
expected_llh = 0.5*(np.array(list(expected_residuals))**2).sum() + \
0.5*np.log(2*np.pi*np.array([2*7+8+4, 2*2+3+4])**2
* np.array([0.5, 1])**2).sum()

return (measurement_df, observable_df, parameter_df,
simulation_df, expected_residuals, expected_residuals_nonorm)
simulation_df, expected_residuals, expected_residuals_nonorm,
expected_llh)


def model_custom_likelihood():
"""Model with customized likelihoods."""
measurement_df = pd.DataFrame(data={
OBSERVABLE_ID: ['obs_a', 'obs_b'],
SIMULATION_CONDITION_ID: ['c0', 'c0'],
TIME: [5, 10],
MEASUREMENT: [0.5, 2]
})

observable_df = pd.DataFrame(data={
OBSERVABLE_ID: ['obs_a', 'obs_b'],
OBSERVABLE_FORMULA: ['A', 'B'],
OBSERVABLE_TRANSFORMATION: [LOG, LIN],
NOISE_FORMULA: [2, 1.5],
NOISE_DISTRIBUTION: [LAPLACE, LAPLACE]
}).set_index([OBSERVABLE_ID])

parameter_df = pd.DataFrame(data={
PARAMETER_ID: ['par1', 'par2'],
NOMINAL_VALUE: [3, 4]
}).set_index([PARAMETER_ID])

simulation_df = measurement_df.copy(deep=True).rename(
columns={MEASUREMENT: SIMULATION})
simulation_df[SIMULATION] = [2, 3]

expected_residuals = {(np.log(2)-np.log(0.5))/2, (3-2)/1.5}
expected_residuals_nonorm = {np.log(2)-np.log(0.5), 3-2}
expected_llh = np.abs(list(expected_residuals)).sum() + \
np.log(2*np.array([2, 1.5])*np.array([0.5, 1])).sum()

return (measurement_df, observable_df, parameter_df,
simulation_df, expected_residuals, expected_residuals_nonorm,
expected_llh)


@pytest.fixture
def models():
"""Test model collection covering different features."""
return [model_simple(), model_replicates(),
model_scalings(), model_non_numeric_overrides()]
model_scalings(), model_non_numeric_overrides(),
model_custom_likelihood()]


def test_calculate_residuals(models): # pylint: disable=W0621
"""Test calculate.calculate_residuals."""
for model in models:
for i_model, model in enumerate(models):
print(f"Model {i_model}")
(measurement_df, observable_df, parameter_df, simulation_df,
expected_residuals, _) = model
expected_residuals, _, _) = model
residual_dfs = calculate_residuals(
measurement_df, simulation_df, observable_df, parameter_df)
assert set(residual_dfs[0][RESIDUAL]) == pytest.approx(
Expand All @@ -156,9 +208,10 @@ def test_calculate_residuals(models): # pylint: disable=W0621

def test_calculate_non_normalized_residuals(models): # pylint: disable=W0621
"""Test calculate.calculate_residuals without normalization."""
for model in models:
for i_model, model in enumerate(models):
print(f"Model {i_model}")
(measurement_df, observable_df, parameter_df, simulation_df,
_, expected_residuals_nonorm) = model
_, expected_residuals_nonorm, _) = model
residual_dfs = calculate_residuals(
measurement_df, simulation_df, observable_df, parameter_df,
normalize=False)
Expand All @@ -168,11 +221,61 @@ def test_calculate_non_normalized_residuals(models): # pylint: disable=W0621

def test_calculate_chi2(models): # pylint: disable=W0621
"""Test calculate.calculate_chi2."""
for model in models:
for i_model, model in enumerate(models):
print(f"Model {i_model}")
(measurement_df, observable_df, parameter_df, simulation_df,
expected_residuals, _) = model
expected_residuals, _, _) = model
chi2 = calculate_chi2(
measurement_df, simulation_df, observable_df, parameter_df)

expected = sum(np.array(list(expected_residuals))**2)
assert chi2 == pytest.approx(expected)


def test_calculate_llh(models): # pylint: disable=W0621
"""Test calculate.calculate_llh."""
for i_model, model in enumerate(models):
print(f"Model {i_model}")
(measurement_df, observable_df, parameter_df, simulation_df,
_, _, expected_llh) = model
llh = calculate_llh(
measurement_df, simulation_df, observable_df, parameter_df)
assert llh == pytest.approx(expected_llh) or expected_llh is None


def test_calculate_single_llh():
"""Test calculate.calculate_single_llh."""
m, s, sigma = 5.3, 4.5, 1.6
pi, log, log10 = np.pi, np.log, np.log10

llh = calculate_single_llh(measurement=m, simulation=s, noise_value=sigma,
noise_distribution=NORMAL, scale=LIN)
expected_llh = 0.5 * (((s-m)/sigma)**2 + log(2*pi*sigma**2))
assert llh == pytest.approx(expected_llh)

llh = calculate_single_llh(measurement=m, simulation=s, noise_value=sigma,
noise_distribution=NORMAL, scale=LOG)
expected_llh = 0.5 * (((log(s)-log(m))/sigma)**2 +
log(2*pi*sigma**2*m**2))
assert llh == pytest.approx(expected_llh)

llh = calculate_single_llh(measurement=m, simulation=s, noise_value=sigma,
noise_distribution=NORMAL, scale=LOG10)
expected_llh = 0.5 * (((log10(s)-log10(m))/sigma)**2 +
log(2*pi*sigma**2*m**2))
assert llh == pytest.approx(expected_llh)

llh = calculate_single_llh(measurement=m, simulation=s, noise_value=sigma,
noise_distribution=LAPLACE, scale=LIN)
expected_llh = abs((s-m)/sigma) + log(2*sigma)
assert llh == pytest.approx(expected_llh)

llh = calculate_single_llh(measurement=m, simulation=s, noise_value=sigma,
noise_distribution=LAPLACE, scale=LOG)
expected_llh = abs((log(s)-log(m))/sigma) + log(2*sigma*m)
assert llh == pytest.approx(expected_llh)

llh = calculate_single_llh(measurement=m, simulation=s, noise_value=sigma,
noise_distribution=LAPLACE, scale=LOG10)
expected_llh = abs((log10(s)-log10(m))/sigma) + log(2*sigma*m)
assert llh == pytest.approx(expected_llh)