diff --git a/dynamo/simulation/Gillespie.py b/dynamo/simulation/Gillespie.py index 995533af7..3392c8696 100755 --- a/dynamo/simulation/Gillespie.py +++ b/dynamo/simulation/Gillespie.py @@ -35,28 +35,29 @@ def Gillespie( method: str = "basic", verbose: bool = False, ) -> AnnData: - """A simulator of RNA dynamics that includes RNA bursting, transcription, metabolic labeling, splicing, transcription, RNA/protein degradation + """A simulator of RNA dynamics that includes RNA bursting, transcription, metabolic labeling, splicing, + transcription, RNA/protein degradation. Args: - a: rate of active promoter switches to inactive one - b: rate of inactive promoter switches to active one - la: lambda_: 4sU labelling rate - aa: transcription rate with active promoter - ai: transcription rate with inactive promoter - si: sigma, degradation rate - be: beta, splicing rate - ga: gamma, the fraction of labeled u turns to unlabeled s + a: rate of active promoter switches to inactive one. + b: rate of inactive promoter switches to active one. + la: lambda_: 4sU labelling rate. + aa: transcription rate with active promoter. + ai: transcription rate with inactive promoter. + si: sigma, degradation rate. + be: beta, splicing rate. + ga: gamma, the fraction of labeled u turns to unlabeled s. C0: A numpy array with dimension of 5 x n_gene. Here 5 corresponds to the five species (s - promoter state, ul, uu, sl, su) for each gene. - t_span: list of between and end time of simulation - n_traj: number of simulation trajectory to use - t_eval: the time points at which data is simulated - dt: delta t used in simulation - method: method to simulate the expression dynamics - verbose: whether to report running information + t_span: list of between and end time of simulation. + n_traj: number of simulation trajectory to use. + t_eval: the time points at which data is simulated. + dt: delta t used in simulation. + method: method to simulate the expression dynamics. + verbose: whether to report running information. Returns: - adata: an Annodata object containing the simulated data. + An Annodata object containing the simulated data. """ gene_num, species_num = C0.shape[0:2] diff --git a/dynamo/simulation/ODE.py b/dynamo/simulation/ODE.py index b896eb59c..178604fc5 100755 --- a/dynamo/simulation/ODE.py +++ b/dynamo/simulation/ODE.py @@ -1,5 +1,5 @@ from random import seed, uniform -from typing import List, Optional, Tuple, Union +from typing import Callable, List, Optional, Tuple, Union import anndata import numpy as np @@ -18,8 +18,7 @@ def hill_inh_func(x: float, A: float, K: float, n: float, g: float = 0) -> float g: Background inhibition parameter. Defaults to 0. Returns: - float: The value of the Hill inhibition function for the given input. - + The value of the Hill inhibition function for the given input. """ Kd = K**n return A * Kd / (Kd + x**n) - g * x @@ -36,8 +35,7 @@ def hill_inh_grad(x: float, A: float, K: float, n: float, g: float = 0) -> float g: Background inhibition parameter. Defaults to 0. Returns: - float: The value of the gradient of the Hill inhibition function for the given input. - + The value of the gradient of the Hill inhibition function for the given input. """ Kd = K**n return -A * n * Kd * x ** (n - 1) / (Kd + x**n) ** 2 - g @@ -54,8 +52,7 @@ def hill_act_func(x: float, A: float, K: float, n: float, g: float = 0) -> float g: Background activation parameter. Defaults to 0. Returns: - float: The value of the Hill activation function for the given input. - + The value of the Hill activation function for the given input. """ Kd = K**n return A * x**n / (Kd + x**n) - g * x @@ -72,8 +69,7 @@ def hill_act_grad(x: float, A: float, K: float, n: float, g: float = 0) -> float g: Background activation parameter. Defaults to 0. Returns: - float: The value of the gradient of the Hill activation function for the given input. - + The value of the gradient of the Hill activation function for the given input. """ Kd = K**n return A * n * Kd * x ** (n - 1) / (Kd + x**n) ** 2 - g @@ -92,8 +88,7 @@ def toggle( n: The Hill coefficient. Defaults to 2. Returns: - np.ndarray: The RHS of the differential equations for the toggle switch system, calculated using the given input parameters. - + The RHS of the differential equations for the toggle switch system, calculated using the given input parameters. """ if len(ab.shape) == 2: a, b = ab[:, 0], ab[:, 1] @@ -106,9 +101,16 @@ def toggle( def Ying_model(x: np.ndarray): - """network used in the potential landscape paper from Ying, et. al: - https://www.nature.com/articles/s41598-017-15889-2. + """Solve the equation from the network used in the potential landscape paper from Ying, et. al: + https://www.nature.com/articles/s41598-017-15889-2. + This is also the mixture of Gaussian model. + + Args: + x: The current state of the system. + + Returns: + The rate of change of the system state. """ if len(x.shape) == 2: dx1 = -1 + 9 * x[:, 0] - 2 * pow(x[:, 0], 3) + 9 * x[:, 1] - 2 * pow(x[:, 1], 3) @@ -124,10 +126,18 @@ def Ying_model(x: np.ndarray): return ret -def jacobian_Ying_model(x): - """network used in the potential landscape paper from Ying, et. al: - https://www.nature.com/articles/s41598-017-15889-2. - This is also the mixture of Gaussian model. +def jacobian_Ying_model(x: np.ndarray) -> np.ndarray: + """Solve the jacobian from network used in the potential landscape paper from Ying, et. al: + https://www.nature.com/articles/s41598-017-15889-2. + + This is also the mixture of Gaussian model. + + Args: + x: The current state of the system. + t: Time variable. Defaults to None. + + Returns: + The Jacobian of the system. """ if len(x.shape) == 2: df1_dx1 = 9 - 6 * pow(x[:, 0], 2) @@ -145,10 +155,18 @@ def jacobian_Ying_model(x): return J -def hessian_Ying_model(x, t=None): - """network used in the potential landscape paper from Ying, et. al: - https://www.nature.com/articles/s41598-017-15889-2. +def hessian_Ying_model(x: np.ndarray, t: Optional[float] = None) -> np.ndarray: + """Solve the hessian from the network used in the potential landscape paper from Ying, et. al: + https://www.nature.com/articles/s41598-017-15889-2. + This is also the mixture of Gaussian model. + + Args: + x: The current state of the system. + t: Time variable. Defaults to None. + + Returns: + The Hessian of the system. """ if len(x.shape) == 2: H = np.zeros([2, 2, 2, x.shape[0]]) @@ -168,13 +186,13 @@ def hessian_Ying_model(x, t=None): def ode_bifur2genes( x: np.ndarray, - a: List = [1, 1], - b: List = [1, 1], - S: List = [0.5, 0.5], - K: List = [0.5, 0.5], - m: List = [4, 4], - n: List = [4, 4], - gamma: List = [1, 1], + a: List[float] = [1, 1], + b: List[float] = [1, 1], + S: List[float] = [0.5, 0.5], + K: List[float] = [0.5, 0.5], + m: List[float] = [4, 4], + n: List[float] = [4, 4], + gamma: List[float] = [1, 1], ) -> np.ndarray: """The ODEs for the toggle switch motif with self-activation and mutual inhibition. @@ -182,8 +200,8 @@ def ode_bifur2genes( x: The current state of the system. a: The self-activation strengths of the genes. Defaults to [1, 1]. b: The mutual inhibition strengths of the genes. Defaults to [1, 1]. - S: The self-activation thresholds of the genes. Defaults to [0.5, 0.5]. - K: The mutual inhibition thresholds of the genes. Defaults to [0.5, 0.5]. + S: The self-activation factors of the genes. Defaults to [0.5, 0.5]. + K: The mutual inhibition factors of the genes. Defaults to [0.5, 0.5]. m: The Hill coefficients for self-activation. Defaults to [4, 4]. n: The Hill coefficients for mutual inhibition. Defaults to [4, 4]. gamma: The degradation rates of the genes. Defaults to [1, 1]. @@ -209,9 +227,30 @@ def ode_bifur2genes( def jacobian_bifur2genes( - x: np.ndarray, a=[1, 1], b=[1, 1], S=[0.5, 0.5], K=[0.5, 0.5], m=[4, 4], n=[4, 4], gamma=[1, 1] -): - """The Jacobian of the toggle switch ODE model.""" + x: np.ndarray, + a: List[float] = [1, 1], + b: List[float] = [1, 1], + S: List[float] = [0.5, 0.5], + K: List[float] = [0.5, 0.5], + m: List[float] = [4, 4], + n: List[float] = [4, 4], + gamma: List[float] = [1, 1] +) -> np.ndarray: + """The Jacobian of the toggle switch ODE model. + + Args: + x: The current state of the system. + a: The self-activation strengths of the genes. Defaults to [1, 1]. + b: The mutual inhibition strengths of the genes. Defaults to [1, 1]. + S: The self-activation factors of the genes. Defaults to [0.5, 0.5]. + K: The mutual inhibition factors of the genes. Defaults to [0.5, 0.5]. + m: The Hill coefficients for self-activation. Defaults to [4, 4]. + n: The Hill coefficients for mutual inhibition. Defaults to [4, 4]. + gamma: The degradation rates of the genes. Defaults to [1, 1]. + + Returns: + The Jacobian of the system. + """ df1_dx1 = hill_act_grad(x[:, 0], a[0], S[0], m[0], g=gamma[0]) df1_dx2 = hill_inh_grad(x[:, 1], b[0], K[0], n[0]) df2_dx1 = hill_inh_grad(x[:, 0], b[1], K[1], n[1]) @@ -220,8 +259,11 @@ def jacobian_bifur2genes( return J -def two_genes_motif_jacobian(x1, x2): - """This should be equivalent to jacobian_bifur2genes when using default parameters""" +def two_genes_motif_jacobian(x1: float, x2: float) -> np.ndarray: + """The Jacobian of the two genes motif model. + + This should be equivalent to jacobian_bifur2genes when using default parameters. + """ J = np.array( [ [ @@ -237,18 +279,61 @@ def two_genes_motif_jacobian(x1, x2): return J -def hill_inh_grad2(x, A, K, n): +def hill_inh_grad2(x: float, A: float, K: float, n: float) -> float: + """Calculates the second derivative of the Hill inhibition function for a given input. + + Args: + x: Input value for which the second derivative of the Hill inhibition function is to be calculated. + A: Scaling factor for the output of the function. + K: Concentration at which half-maximal inhibition occurs. + n: Hill coefficient, which describes the steepness of the function's curve. + + Returns: + The value of the second derivative of the Hill inhibition function for the given input. + """ Kd = K**n return A * n * Kd * x ** (n - 2) * ((n + 1) * x**n - Kd * n + Kd) / (Kd + x**n) ** 3 -def hill_act_grad2(x, A, K, n): +def hill_act_grad2(x: float, A: float, K: float, n: float) -> float: + """Calculates the second derivative of the Hill activation function for a given input. + + Args: + x: Input value for which the second derivative of the Hill activation function is to be calculated. + A: Scaling factor for the output of the function. + K: Concentration at which half-maximal activation occurs. + n: Hill coefficient, which describes the steepness of the function's curve. + + Returns: + The value of the second derivative of the Hill activation function for the given input. + """ Kd = K**n return -A * n * Kd * x ** (n - 2) * ((n + 1) * x**n - Kd * n + Kd) / (Kd + x**n) ** 3 -def hessian_bifur2genes(x: np.ndarray, a=[1, 1], b=[1, 1], S=[0.5, 0.5], K=[0.5, 0.5], m=[4, 4], n=[4, 4], t=None): - """The Hessian of the toggle switch ODE model.""" +def hessian_bifur2genes( + x: np.ndarray, + a: List[float] = [1, 1], + b: List[float] = [1, 1], + S: List[float] = [0.5, 0.5], + K: List[float] = [0.5, 0.5], + m: List[float] = [4, 4], + n: List[float] = [4, 4], +) -> np.ndarray: + """The Hessian of the toggle switch ODE model. + + Args: + x: The current state of the system. + a: The self-activation strengths of the genes. Defaults to [1, 1]. + b: The mutual inhibition strengths of the genes. Defaults to [1, 1]. + S: The self-activation factors of the genes. Defaults to [0.5, 0.5]. + K: The mutual inhibition factors of the genes. Defaults to [0.5, 0.5]. + m: The Hill coefficients for self-activation. Defaults to [4, 4]. + n: The Hill coefficients for mutual inhibition. Defaults to [4, 4]. + + Returns: + The Hessian of the system. + """ if len(x.shape) == 2: H = np.zeros([2, 2, 2, x.shape[0]]) H[0, 0, 0, :] = hill_act_grad2(x[:, 0], a[0], S[0], m[0]) @@ -265,8 +350,31 @@ def hessian_bifur2genes(x: np.ndarray, a=[1, 1], b=[1, 1], S=[0.5, 0.5], K=[0.5, return H -def ode_osc2genes(x: np.ndarray, a, b, S, K, m, n, gamma): - """The ODEs for the two gene oscillation based on a predator-prey model.""" +def ode_osc2genes( + x: np.ndarray, + a: List[float], + b: List[float], + S: List[float], + K: List[float], + m: List[float], + n: List[float], + gamma: List[float], +) -> np.ndarray: + """The ODEs for the two gene oscillation based on a predator-prey model. + + Args: + x: The current state of the system. + a: The self-activation strengths of the genes. + b: The mutual inhibition strengths of the genes. + S: The self-activation factors of the genes. + K: The mutual inhibition factors of the genes. + m: The Hill coefficients for self-activation. + n: The Hill coefficients for mutual inhibition. + gamma: The degradation rates of the genes. + + Returns: + The rate of change of the system state. + """ d = x.ndim x = np.atleast_2d(x) @@ -283,12 +391,24 @@ def ode_osc2genes(x: np.ndarray, a, b, S, K, m, n, gamma): def ode_neurongenesis( x: np.ndarray, - a, - K, - n, - gamma, -): - """The ODE model for the neurogenesis system that used in benchmarking Monocle 2, Scribe and dynamo (here), original from Xiaojie Qiu, et. al, 2012.""" + a: List[float], + K: List[float], + n: List[float], + gamma: List[float], +) -> np.ndarray: + """The ODE model for the neurogenesis system that used in benchmarking Monocle 2, Scribe and dynamo (here), original + from Xiaojie Qiu, et. al, 2012. + + Args: + x: The current state of the system. + a: The self-activation strengths of the genes. + K: The mutual inhibition strengths of the genes. + n: The Hill coefficients for self-activation. + gamma: The degradation rates of the genes. + + Returns: + The rate of change of the system state. + """ d = x.ndim x = np.atleast_2d(x) @@ -332,18 +452,36 @@ def ode_neurongenesis( def neurongenesis( - x, - mature_mu=0, - n=4, - k=1, - a=4, - eta=0.25, - eta_m=0.125, - a_s=2.2, - a_e=6, - mx=10, -): - """The ODE model for the neurogenesis system that used in benchmarking Monocle 2, Scribe and dynamo (here), original from Xiaojie Qiu, et. al, 2012.""" + x: np.ndarray, + mature_mu: float = 0, + n: float = 4, + k: float = 1, + a: float = 4, + eta: float = 0.25, + eta_m: float = 0.125, + a_s: float = 2.2, + a_e: float = 6, + mx: float = 10, +) -> np.ndarray: + """The ODE model for the neurogenesis system that used in benchmarking Monocle 2, Scribe and dynamo (here), original + from Xiaojie Qiu, et. al, 2012. + + Args: + x: The current state of the system. + t: Time variable. Defaults to None. + mature_mu: The degradation rate of the mature neuron. Defaults to 0. + n: The Hill coefficient. Defaults to 4. + k: The degradation rate of the genes. Defaults to 1. + a: The production rate of the genes. Defaults to 4. + eta: Parameter representing negative feedback from terminal cells. Defaults to 0.25. + eta_m: Parameter representing negative feedback from terminal cells. Defaults to 0.125. + a_s: The production rate of the genes. Defaults to 2.2. + a_e: The production rate of the genes. Defaults to 6. + mx: The maximum number of mature neurons. Defaults to 10. + + Returns: + The rate of change of the system state. + """ dx = np.nan * np.ones(shape=x.shape) @@ -384,8 +522,30 @@ def neurongenesis( return dx -def state_space_sampler(ode, dim, seed_num=19491001, clip=True, min_val=0, max_val=4, N=10000): - """Sample N points from the dim dimension gene expression space while restricting the values to be between min_val and max_val. Velocity vector at the sampled points will be calculated according to ode function.""" +def state_space_sampler( + ode: Callable, + dim: int, + seed_num: int = 19491001, + clip: bool = True, + min_val: float = 0, + max_val: float = 4, + N: int = 10000, +) -> Tuple[np.ndarray, np.ndarray]: + """Sample N points from the dim dimension gene expression space while restricting the values to be between min_val + and max_val. Velocity vector at the sampled points will be calculated according to ode function. + + Args: + ode: The ODE function that will be used to calculate the velocity vector at the sampled points. + dim: The dimension of the gene expression space. + seed_num: The seed number for the random number generator. Defaults to 19491001. + clip: Whether to clip data points that are negative. Defaults to True. + min_val: The minimum value of the gene expression space. Defaults to 0. + max_val: The maximum value of the gene expression space. Defaults to 4. + N: The number of points to sample. Defaults to 10000. + + Returns: + The sampled points from the gene expression space and the corresponding velocity vector. + """ seed(seed_num) X = np.array([[uniform(min_val, max_val) for _ in range(dim)] for _ in range(N)]) @@ -407,7 +567,7 @@ def Simulator( cell_num: Number of cells to simulate. Returns: - adata: an Annodata object containing the simulated data. + An Annodata object containing the simulated data. """ if motif == "toggle": diff --git a/dynamo/simulation/bif_os_inclusive_sim.py b/dynamo/simulation/bif_os_inclusive_sim.py index b5506df39..394c07a70 100755 --- a/dynamo/simulation/bif_os_inclusive_sim.py +++ b/dynamo/simulation/bif_os_inclusive_sim.py @@ -1,3 +1,5 @@ +from typing import Callable, Dict, List, Optional, Tuple, Union + import numpy as np from .utils import directMethod, temporal_interp @@ -5,30 +7,31 @@ # Differentiation model class sim_diff: + """The differentiation model.""" def __init__( self, - a1, - b1, - c1, - a2, - b2, - c2, - a1_l, - b1_l, - c1_l, - a2_l, - b2_l, - c2_l, - K, - n, - be1, - ga1, - et1, - de1, - be2, - ga2, - et2, - de2, + a1: float, + b1: float, + c1: float, + a2: float, + b2: float, + c2: float, + a1_l: float, + b1_l: float, + c1_l: float, + a2_l: float, + b2_l: float, + c2_l: float, + K: float, + n: float, + be1: float, + ga1: float, + et1: float, + de1: float, + be2: float, + ga2: float, + et2: float, + de2: float, ): self.parameters = { "a1": a1, @@ -55,7 +58,8 @@ def __init__( "de2": de2, } - def f_prop(self, C): + def f_prop(self, C: np.ndarray) -> np.ndarray: + """Get the propensities.""" # unlabeled mRNA u1 = C[0] s1 = C[1] @@ -116,7 +120,8 @@ def f_prop(self, C): return prop - def f_stoich(self): + def f_stoich(self) -> np.ndarray: + """Get the stoichiometry matrix.""" # species u1 = 0 s1 = 1 @@ -164,26 +169,27 @@ def f_stoich(self): # Oscillator class sim_osc: + """The oscillator model.""" def __init__( self, - a1, - b1, - a2, - b2, - a1_l, - b1_l, - a2_l, - b2_l, - K, - n, - be1, - ga1, - et1, - de1, - be2, - ga2, - et2, - de2, + a1: float, + b1: float, + a2: float, + b2: float, + a1_l: float, + b1_l: float, + a2_l: float, + b2_l: float, + K: float, + n: float, + be1: float, + ga1: float, + et1: float, + de1: float, + be2: float, + ga2: float, + et2: float, + de2: float, ): self.parameters = { "a1": a1, @@ -206,7 +212,8 @@ def __init__( "de2": de2, } - def f_prop(self, C): + def f_prop(self, C: np.ndarray) -> np.ndarray: + """Get the propensities.""" # unlabeled mRNA u1 = C[0] s1 = C[1] @@ -265,7 +272,8 @@ def f_prop(self, C): return prop - def f_stoich(self): + def f_stoich(self) -> np.ndarray: + """Get the stoichiometry matrix.""" # species u1 = 0 s1 = 1 @@ -311,7 +319,25 @@ def f_stoich(self): return stoich -def simulate(model, C0, t_span, n_traj, report=False): +def simulate( + model: Union[sim_diff, sim_osc], + C0: List[np.ndarray], + t_span: List[int], + n_traj: int, + report: bool = False, +) -> Tuple[List[np.ndarray], List[np.ndarray]]: + """Simulate the model. + + Args: + model: The model to simulate. + C0: The initial conditions for each trajectory. + t_span: The time span to simulate over. + n_traj: The number of trajectories to simulate. + report: Whether to report the progress of the simulation. + + Returns: + The time points and the corresponding conditions of the trajectories. + """ stoich = model.f_stoich() update_func = lambda C, mu: C + stoich[mu, :] @@ -328,7 +354,27 @@ def simulate(model, C0, t_span, n_traj, report=False): # synthesize labeling data (kinetics) at different time points (multi-time-series) -def syn_kin_data(model_lab, n_trajs, t_idx, trajs_CP, Tl, n_cell): +def syn_kin_data( + model_lab: Union[sim_diff, sim_osc], + n_trajs: int, + t_idx: int, + trajs_CP: List[np.ndarray], + Tl: List[int], + n_cell: int, +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + """Synthesize labeling data (kinetics) at different time points (multi-time-series). + + Args: + model_lab: The labeling model. + n_trajs: The number of trajectories to simulate. + t_idx: The index of the time point to simulate at. + trajs_CP: The checkpoint data. + Tl: The time points to simulate at. + n_cell: The number of cells to simulate. + + Returns: + The synthesized labeling data. + """ C0 = [trajs_CP[j][:, t_idx] for j in range(n_trajs)] # label for 1 unit of time trajs_T, trajs_C = simulate(model_lab, C0=C0, t_span=[0, 1], n_traj=n_cell, report=True) @@ -361,7 +407,29 @@ def syn_kin_data(model_lab, n_trajs, t_idx, trajs_CP, Tl, n_cell): # synthesize labeling data (degradation) at the begining and the end -def syn_deg_data(model_lab, model_unlab, n_trajs, t_idx, trajs_CP, Tl, n_cell): +def syn_deg_data( + model_lab: Union[sim_diff, sim_osc], + model_unlab: Union[sim_diff, sim_osc], + n_trajs: int, + t_idx: int, + trajs_CP: List[np.ndarray], + Tl: List[int], + n_cell: int, +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + """Synthesize labeling data (degradation) at the beginning and the end. + + Args: + model_lab: The labeling model. + model_unlab: The unlabeling model. + n_trajs: The number of trajectories to simulate. + t_idx: The index of the time point to simulate at. + trajs_CP: The checkpoint data. + Tl: The time points to simulate at. + n_cell: The number of cells to simulate. + + Returns: + The synthesized labeling data. + """ C0 = [trajs_CP[j][:, t_idx] for j in range(n_trajs)] # label for 10 unit of time trajs_T, trajs_C = simulate(model_lab, C0=C0, t_span=[0, 10], n_traj=n_cell, report=True) @@ -396,7 +464,25 @@ def syn_deg_data(model_lab, model_unlab, n_trajs, t_idx, trajs_CP, Tl, n_cell): return uu_deg, su_deg, ul_deg, sl_deg, pr_deg -def osc_diff_dup(n_species, trajs_C, model_lab, model_unlab, n_cell): +def osc_diff_dup( + n_species: int, + trajs_C: List[np.ndarray], + model_lab: Union[sim_diff, sim_osc], + model_unlab: Union[sim_diff, sim_osc], + n_cell: int, +) -> Tuple: + """Synthesize data for both oscillatory and differentiation models. + + Args: + n_species: The number of species. + trajs_C: The checkpoint data. + model_lab: The labeling model. + model_unlab: The unlabeling model. + n_cell: The number of cells to simulate. + + Returns: + The synthesized data. + """ n_trajs = len(trajs_C) # get the steady state expression (cell state at the final time point) for the cells diff --git a/dynamo/simulation/evaluation.py b/dynamo/simulation/evaluation.py index 4df7a0db3..ca3f3d82d 100755 --- a/dynamo/simulation/evaluation.py +++ b/dynamo/simulation/evaluation.py @@ -8,10 +8,10 @@ def evaluate(reference: np.ndarray, prediction: np.ndarray, metric: str = "cosin """Function to evaluate the vector field related reference quantities vs. that from vector field prediction. Args: - reference: The reference quantity of the vector field (for example, simulated velocity vectors at each point or trajectory, - or estimated RNA velocity vector) - prediction: The predicted quantity of the vector field (for example, velocity vectors calculated based on reconstructed vector - field function at each point or trajectory, or reconstructed RNA velocity vector) + reference: The reference quantity of the vector field (for example, simulated velocity vectors at each point or + trajectory, or estimated RNA velocity vector). + prediction: The predicted quantity of the vector field (for example, velocity vectors calculated based on + reconstructed vector field function at each point or trajectory, or reconstructed RNA velocity vector). metric: The metric for benchmarking the vector field quantities after reconstruction. Returns: diff --git a/dynamo/simulation/simulate_anndata.py b/dynamo/simulation/simulate_anndata.py index 02c4472a1..d3c856efc 100644 --- a/dynamo/simulation/simulate_anndata.py +++ b/dynamo/simulation/simulate_anndata.py @@ -1,4 +1,4 @@ -from typing import Callable, Dict, List, Optional, Union +from typing import Callable, Dict, List, Optional, Tuple, Union import anndata import numpy as np @@ -72,6 +72,7 @@ class AnnDataSimulator: + """A base anndata simulator class.""" def __init__( self, reactions: GillespieReactions, @@ -82,6 +83,17 @@ def __init__( required_param_names: List = [], velocity_func: Optional[Callable] = None, ) -> None: + """Initialize an AnnDataSimulator object. + + Args: + reactions: The reaction object. + C0s: Initial conditions. + param_dict: The parameter dictionary. + species: The species object. + gene_param_names: List of gene specific parameters. + required_param_names: List of required parameters. + velocity_func: Function used to calculate velocity. + """ # initialization of variables self.reactions = reactions @@ -109,20 +121,37 @@ def __init__( main_info(f"The model contains {self.get_n_genes()} genes and {self.get_n_species()} species") - def get_n_genes(self): + def get_n_genes(self) -> int: + """Get the number of genes. + + Returns: + The number of genes. + """ return self.species.get_n_genes() - def get_n_species(self): + def get_n_species(self) -> int: + """Get the number of species. + + Returns: + The number of species. + """ return len(self.species) - def get_n_cells(self): + def get_n_cells(self) -> int: + """Get the number of cells. + + Returns: + The number of cells. + """ if self.C is None: raise Exception("Simulation results not found. Run simulation first.") return self.C.shape[0] - def fix_param_dict(self, required_param_names): - """ - Fixes parameters in place. + def fix_param_dict(self, required_param_names: List) -> None: + """Fixes parameters in place. + + Args: + required_param_names: List of required parameters. """ param_dict = {} for k, v in self.param_dict.items(): @@ -148,7 +177,17 @@ def fix_param_dict(self, required_param_names): self.param_dict = param_dict - def augment_C0_gaussian(self, n, sigma=5, inplace=True): + def augment_C0_gaussian(self, n: int, sigma: float = 5, inplace: bool = True) -> np.ndarray: + """Augment the initial conditions by adding Gaussian noise. + + Args: + n: Number of augmented initial conditions. + sigma: Standard deviation of the Gaussian noise. + inplace: Whether to modify the object in place. + + Returns: + The augmented initial conditions. + """ C0s = np.array(self.C0s, copy=True) for C0 in self.C0s: c = np.random.normal(scale=sigma, size=(n, self.get_n_species())) @@ -159,7 +198,14 @@ def augment_C0_gaussian(self, n, sigma=5, inplace=True): self.C0s = C0s return C0s - def simulate(self, t_span, n_cells=None, **simulator_kwargs): + def simulate(self, t_span: Union[List, np.ndarray], n_cells: Optional[int] = None, **simulator_kwargs) -> None: + """Simulate the model. + + Args: + t_span: The time span. + n_cells: Number of cells to sample. + **simulator_kwargs: Additional keyword arguments. + """ Ts, Cs, traj_id = [], None, [] count = 0 for C0 in self.C0s: @@ -191,7 +237,15 @@ def simulate(self, t_span, n_cells=None, **simulator_kwargs): V.append(flatten(v)) self.V = np.array(V) - def generate_anndata(self, remove_empty_cells: bool = False): + def generate_anndata(self, remove_empty_cells: bool = False) -> anndata.AnnData: + """Generate an AnnData object. + + Args: + remove_empty_cells: Whether to remove cells that have no expression. + + Returns: + The AnnData object. + """ if self.T is not None and self.C is not None: obs = pd.DataFrame( @@ -245,6 +299,7 @@ def generate_anndata(self, remove_empty_cells: bool = False): class CellularModelSimulator(AnnDataSimulator): + """An anndata simulator class handling models with synthesis, splicing (optional), and first-order degrdation reactions.""" def __init__( self, gene_names: List, @@ -259,18 +314,22 @@ def __init__( velocity_func: Optional[Callable] = None, report_stoich: bool = False, ) -> None: - """ - An anndata simulator class handling models with synthesis, splicing (optional), and first-order degrdation reactions. + """Initialize a CellularModelSimulator object. Args: gene_names: List of gene names. synthesis_param_names: List of kinetic parameters used to calculate synthesis rates. param_dict: The parameter dictionary containing "a", "b", "S", "K", "m", "n", "beta" (optional), "gamma" - if `param_dict` has the key "beta", the simulation includes the splicing process and therefore has 4 species (`unspliced` and `spliced` for each gene). - molecular_param_names: List of names of parameters which has `number of molecules` in their dimensions. These parameters will be multiplied with `r_aug` for scaling. - kinetic_param_names: List of names of parameters which has `time` in their dimensions. These parameters will be multiplied with `tau` to scale the kinetics. - C0s: Initial conditions (# init cond. by # species). If None, the simulator automatically generates `n_C0s` initial conditions. - r_aug: Parameter which augments steady state copy number for r1 and r2. At steady state, r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 + if `param_dict` has the key "beta", the simulation includes the splicing process and therefore has 4 + species (`unspliced` and `spliced` for each gene). + molecular_param_names: List of names of parameters which has `number of molecules` in their dimensions. + These parameters will be multiplied with `r_aug` for scaling. + kinetic_param_names: List of names of parameters which has `time` in their dimensions. These parameters + will be multiplied with `tau` to scale the kinetics. + C0s: Initial conditions (# init cond. by # species). If None, the simulator automatically generates `n_C0s` + initial conditions. + r_aug: Parameter which augments steady state copy number for r1 and r2. At steady state, + r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 tau: Time scale parameter which does not affect steady state solutions. n_C0s: Number of augmented initial conditions, if C0s is `None`. velocity_func: Function used to calculate velocity. If `None`, the velocity will not be calculated. @@ -342,20 +401,47 @@ def __init__( else: self.vfunc = lambda x, param=self.param_dict: velocity_func(x, **param) - def get_synthesized_species(self): - """return species which are either `total` or `unspliced` when there is splicing.""" + def get_synthesized_species(self) -> List: + """Get species which are either `total` or `unspliced` when there is splicing. + + Returns: + List of species names. + """ name = "unspliced" if self.splicing else "total" return self.species[name] - def get_primary_species(self): - """return species which are either `total` or `spliced` when there is splicing.""" + def get_primary_species(self) -> List: + """Get species which are either `total` or `spliced` when there is splicing. + + Returns: + List of species names. + """ name = "spliced" if self.splicing else "total" return self.species[name] - def auto_C0(self, r_aug, tau): + def auto_C0(self, r_aug: float, tau: float) -> np.ndarray: + """Automatically calculate the initial conditions based on the steady state solutions. + + Args: + r_aug: Parameter which augments steady state copy number for r1 and r2. At steady state, + r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 + tau: Time scale parameter which does not affect steady state solutions. + + Returns: + Initial conditions (# init cond. by # species). + """ return np.zeros(len(self.species)) - def register_reactions(self, reactions: GillespieReactions): + def register_reactions(self, reactions: GillespieReactions) -> GillespieReactions: + """Register reactions to the reaction object. + + + Args: + reactions: The target reaction object to register. + + Returns: + The reaction object after registration. + """ for i_gene in range(self.get_n_genes()): if self.splicing: u, s = self.species["unspliced", i_gene], self.species["spliced", i_gene] @@ -375,7 +461,15 @@ def register_reactions(self, reactions: GillespieReactions): ) return reactions - def generate_anndata(self, remove_empty_cells: bool = False): + def generate_anndata(self, remove_empty_cells: bool = False) -> anndata.AnnData: + """Generate anndata object from the simulation results. + + Args: + remove_empty_cells: Whether to remove cells that has no expression. + + Returns: + The generated anndata object. + """ adata = super().generate_anndata(remove_empty_cells) if self.splicing: @@ -386,11 +480,17 @@ def generate_anndata(self, remove_empty_cells: bool = False): class KinLabelingSimulator: + """A simulator for kinetic labeling experiments.""" def __init__( self, simulator: CellularModelSimulator, syn_rxn_tag: str = "synthesis", ) -> None: + """Initialize a KinLabelingSimulator object. + + Args: + simulator: The simulator object to simulate the kinetic labeling experiment. + """ self.n_cells = simulator.C.shape[0] self.splicing = simulator.splicing @@ -424,10 +524,30 @@ def __init__( self.Tl = None self._label_time = None - def get_n_cells(self): + def get_n_cells(self) -> int: + """Get the number of cells. + + Returns: + The number of cells. + """ return self.n_cells - def register_reactions(self, species: CellularSpecies, label_species, param_dict): + def register_reactions( + self, + species: CellularSpecies, + label_species: List[str], + param_dict: Dict, + ) -> Tuple[GillespieReactions, List[int]]: + """Register reactions to the reaction object. + + Args: + species: The species object. + label_species: List of label species names. + param_dict: The parameter dictionary. + + Returns: + The reaction object after registration and the indices of synthesis reactions. + """ reactions = GillespieReactions(species) syn_rxns = [] if self.splicing: @@ -467,7 +587,12 @@ def register_reactions(self, species: CellularSpecies, label_species, param_dict ) return reactions, syn_rxns - def simulate(self, label_time): + def simulate(self, label_time: Union[float, np.ndarray]) -> None: + """Simulate the kinetic labeling experiment. + + Args: + label_time: The label time. + """ if np.isscalar(label_time): label_time = np.ones(self.get_n_cells()) * label_time self._label_time = label_time @@ -482,7 +607,15 @@ def simulate(self, label_time): self.Tl = np.hstack((self.Tl, T[-1])) self.Cl = C[:, -1] if self.Cl is None else np.vstack((self.Cl, C[:, -1])) - def write_to_anndata(self, adata: anndata): + def write_to_anndata(self, adata: anndata) -> anndata: + """Write the kinetic labeling experiment results to an anndata object. + + Args: + adata: The anndata object to write to. + + Returns: + The anndata object with the kinetic labeling experiment results. + """ if adata.n_vars != self.species.get_n_genes(): raise Exception( f"The input anndata has {adata.n_vars} genes while there are {self.species.get_n_genes()} registered." @@ -506,6 +639,7 @@ def write_to_anndata(self, adata: anndata): class BifurcationTwoGenes(CellularModelSimulator): + """Two gene bifurcation model anndata simulator.""" def __init__( self, param_dict: Dict, @@ -516,14 +650,16 @@ def __init__( gene_names: Optional[List] = None, report_stoich: bool = False, ) -> None: - """ - Two gene toggle switch model anndata simulator. + """Initialize a BifurcationTwoGenes object. Args: param_dict: The parameter dictionary containing "a", "b", "S", "K", "m", "n", "beta" (optional), "gamma" - if `param_dict` has the key "beta", the simulation includes the splicing process and therefore has 4 species (u and s for each gene). - C0s: Initial conditions (# init cond. by # species). If None, the simulator automatically generates `n_C0s` initial conditions based on the steady states. - r_aug: Parameter which augments steady state copy number for gene 1 (r1) and gene 2 (r2). At steady state, r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 + if `param_dict` has the key "beta", the simulation includes the splicing process and therefore has 4 + species (u and s for each gene). + C0s: Initial conditions (# init cond. by # species). If None, the simulator automatically generates `n_C0s` + initial conditions based on the steady states. + r_aug: Parameter which augments steady state copy number for gene 1 (r1) and gene 2 (r2). At steady state, + r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 tau: Time scale parameter which does not affect steady state solutions. n_C0s: Number of augmented initial conditions, if C0s is `None`. gene_names: List of gene names. If `None`, "gene_1", "gene_2", etc., are used. @@ -546,7 +682,17 @@ def __init__( report_stoich=report_stoich, ) - def auto_C0(self, r_aug, tau): + def auto_C0(self, r_aug: float, tau: float) -> np.ndarray: + """Automatically calculate the initial conditions based on the steady state solutions. + + Args: + r_aug: Parameter which augments steady state copy number for gene 1 (r1) and gene 2 (r2). At steady state, + r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 + tau: Time scale parameter which does not affect steady state solutions. + + Returns: + Initial conditions (# init cond. by # species). + """ a, b = self.param_dict["a"], self.param_dict["b"] ga = self.param_dict["gamma"] @@ -563,7 +709,15 @@ def auto_C0(self, r_aug, tau): C0s = np.array([x1_s, x2_s]) return C0s - def register_reactions(self, reactions): + def register_reactions(self, reactions: GillespieReactions) -> None: + """Register reactions to the reaction object. + + Args: + reactions: The target reaction object to register. + + Returns: + The reaction object after registration. + """ def rate_syn(x, y, gene): activation = hill_act_func( x, self.param_dict["a"][gene], self.param_dict["S"][gene], self.param_dict["m"][gene] @@ -599,6 +753,8 @@ def rate_syn(x, y, gene): class OscillationTwoGenes(CellularModelSimulator): + """Two gene oscillation model anndata simulator. This is essentially a predator-prey model, where gene 1 (predator) + inhibits gene 2 (prey) and gene 2 activates gene 1.""" def __init__( self, param_dict: Dict, @@ -609,13 +765,14 @@ def __init__( gene_names: Optional[List] = None, report_stoich: bool = False, ) -> None: - """ - Two gene oscillation model anndata simulator. This is essentially a predator-prey model, where gene 1 (predator) inhibits gene 2 (prey) and gene 2 activates gene 1. + """Initialize a OscillationTwoGenes object. Args: param_dict: The parameter dictionary containing "a", "b", "S", "K", "m", "n", "beta" (optional), "gamma" - if `param_dict` has the key "beta", the simulation includes the splicing process and therefore has 4 species (u and s for each gene). - C0s: Initial conditions (# init cond. by # species). If None, the simulator automatically generates `n_C0s` initial conditions based on the steady states. + if `param_dict` has the key "beta", the simulation includes the splicing process and therefore has 4 + species (u and s for each gene). + C0s: Initial conditions (# init cond. by # species). If None, the simulator automatically generates `n_C0s` + initial conditions based on the steady states. r_aug: Parameter which augments copy numbers for the two genes. tau: Time scale parameter which does not affect steady state solutions. n_C0s: Number of augmented initial conditions, if C0s is `None`. @@ -639,7 +796,17 @@ def __init__( report_stoich=report_stoich, ) - def auto_C0(self, r_aug, tau): + def auto_C0(self, r_aug: float, tau: float) -> np.ndarray: + """Automatically calculate the initial conditions based on the steady state solutions. + + Args: + r_aug: Parameter which augments steady state copy number for gene 1 (r1) and gene 2 (r2). At steady state, + r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 + tau: Time scale parameter which does not affect steady state solutions. + + Returns: + Initial conditions (# init cond. by # species). + """ # TODO: derive solutions for auto C0 if self.splicing: ga, be = self.param_dict["gamma"], self.param_dict["beta"] @@ -652,7 +819,15 @@ def auto_C0(self, r_aug, tau): C0s = 70 * np.ones(len(self.species)) return C0s - def register_reactions(self, reactions): + def register_reactions(self, reactions: GillespieReactions) -> None: + """Register reactions to the reaction object. + + Args: + reactions: The target reaction object to register. + + Returns: + The reaction object after registration. + """ def rate_syn_1(x, y, gene): activation = hill_act_func( x, self.param_dict["a"][gene], self.param_dict["S"][gene], self.param_dict["m"][gene] @@ -697,6 +872,7 @@ def rate_syn_2(x, y, gene): class Neurongenesis(CellularModelSimulator): + """Neurongenesis model anndata simulator from Xiaojie Qiu, et. al, 2012. anndata simulator.""" def __init__( self, param_dict: Dict, @@ -706,14 +882,16 @@ def __init__( n_C0s: int = 10, report_stoich: bool = False, ) -> None: - """ - Neurongenesis model from Xiaojie Qiu, et. al, 2012. anndata simulator. + """Initialize a Neurongenesis object. Args: param_dict: The parameter dictionary. - if `param_dict` has the key "beta", the simulation includes the splicing process and therefore has 4 species (u and s for each gene). - C0s: Initial conditions (# init cond. by # species). If None, the simulator automatically generates `n_C0s` initial conditions based on the steady states. - r_aug: Parameter which augments steady state copy number for gene 1 (r1) and gene 2 (r2). At steady state, r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 + if `param_dict` has the key "beta", the simulation includes the splicing process and therefore has 4 + species (u and s for each gene). + C0s: Initial conditions (# init cond. by # species). If None, the simulator automatically generates `n_C0s` + initial conditions based on the steady states. + r_aug: Parameter which augments steady state copy number for gene 1 (r1) and gene 2 (r2). At steady state, + r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 tau: Time scale parameter which does not affect steady state solutions. n_C0s: Number of augmented initial conditions, if C0s is `None`. report_stoich: Whether to report the Stoichiometry Matrix. @@ -748,14 +926,32 @@ def __init__( report_stoich=report_stoich, ) - def auto_C0(self, r_aug, tau): + def auto_C0(self, r_aug: float, tau: float) -> np.ndarray: + """Automatically calculate the initial conditions based on the steady state solutions. + + Args: + r_aug: Parameter which augments steady state copy number for gene 1 (r1) and gene 2 (r2). At steady state, + r1_s ~ r*(a1+b1)/ga1; r2_s ~ r*(a2+b2)/ga2 + tau: Time scale parameter which does not affect steady state solutions. + + Returns: + Initial conditions (# init cond. by # species). + """ # C0 = np.ones(self.get_n_species()) * r_aug C0 = np.zeros(self.get_n_species()) # TODO: splicing case C0[self.species["total", "Pax6"]] = 2.0 * r_aug return C0 - def register_reactions(self, reactions): + def register_reactions(self, reactions: GillespieReactions) -> None: + """Register reactions to the reaction object. + + Args: + reactions: The target reaction object to register. + + Returns: + The reaction object after registration. + """ def rate_pax6(x, y, z, gene): a = self.param_dict["a"][gene] K = self.param_dict["K"][gene] diff --git a/dynamo/simulation/utils.py b/dynamo/simulation/utils.py index d5e7e9653..fc6465346 100644 --- a/dynamo/simulation/utils.py +++ b/dynamo/simulation/utils.py @@ -36,8 +36,9 @@ def directMethod( record_max_length: The maximum length for recording the trajectories. Returns: - retT: a 1d numpy array of time points. - retC: a 2d numpy array (n_species x n_time_points) of copy numbers for each species at each time point. + A tuple containing: + retT: a 1d numpy array of time points. + retC: a 2d numpy array (n_species x n_time_points) of copy numbers for each species at each time point. """ retC = np.zeros((len(C0), int(record_max_length)), np.float64) retT = np.zeros(int(record_max_length), np.float64) @@ -71,7 +72,18 @@ def directMethod( return retT, retC -def prop_slam(C, a, b, la, aa, ai, si, be, ga): +def prop_slam( + C: Union[List, np.ndarray], + a: float, + b: float, + la: float, + aa: float, + ai: float, + si: float, + be: float, + ga: float, +) -> np.ndarray: + """Propensity function for the SLAM data.""" # species s = C[0] ul = C[1] @@ -99,7 +111,21 @@ def prop_slam(C, a, b, la, aa, ai, si, be, ga): return prop -def simulate_Gillespie(a, b, la, aa, ai, si, be, ga, C0, t_span, n_traj, report=False): +def simulate_Gillespie( + a: float, + b: float, + la: float, + aa: float, + ai: float, + si: float, + be: float, + ga: float, + C0: np.ndarray, + t_span: List, + n_traj: int, + report: bool = False, +) -> Tuple[List[np.ndarray], List[np.ndarray]]: + """Simulate using the Gillespie method.""" # species s = 0 u_l = 1 @@ -143,7 +169,17 @@ def simulate_Gillespie(a, b, la, aa, ai, si, be, ga, C0, t_span, n_traj, report= return trajs_T, trajs_C -def prop_2bifurgenes(C, a, b, S, K, m, n, gamma): +def prop_2bifurgenes( + C: Union[List, np.ndarray], + a: List[float], + b: List[float], + S: List[float], + K: List[float], + m: List[float], + n: List[float], + gamma: List[float], +) -> np.ndarray: + """Propensity function for the 2 bifurcating genes model.""" # species x = C[0] y = C[1] @@ -168,6 +204,7 @@ def prop_2bifurgenes(C, a, b, S, K, m, n, gamma): def stoich_2bifurgenes(): + """Get the stoichiometry matrix for the 2 bifurcating genes model.""" # species x = 0 y = 1 @@ -182,7 +219,18 @@ def stoich_2bifurgenes(): return stoich -def prop_2bifurgenes_splicing(C, a, b, S, K, m, n, beta, gamma): +def prop_2bifurgenes_splicing( + C: Union[List, np.ndarray], + a: List[float], + b: List[float], + S: List[float], + K: List[float], + m: List[float], + n: List[float], + beta: List[float], + gamma: List[float], +) -> np.ndarray: + """Propensity function for the 2 bifurcating genes model with splicing.""" # species u1 = C[0] u2 = C[1] @@ -212,6 +260,7 @@ def prop_2bifurgenes_splicing(C, a, b, S, K, m, n, beta, gamma): def stoich_2bifurgenes_splicing(): + """Get the stoichiometry matrix for the 2 bifurcating genes model with splicing.""" # species u1 = 0 u2 = 1 @@ -232,7 +281,15 @@ def stoich_2bifurgenes_splicing(): return stoich -def simulate_2bifurgenes(C0, t_span, n_traj, param_dict, report=False, **gillespie_kwargs): +def simulate_2bifurgenes( + C0: np.ndarray, + t_span: List, + n_traj: int, + param_dict: dict, + report: bool = False, + **gillespie_kwargs, +): + """Simulate the 2 bifurcating genes model.""" param_dict = param_dict.copy() beta = param_dict.pop("beta", None) if beta is None: @@ -259,7 +316,14 @@ def simulate_2bifurgenes(C0, t_span, n_traj, param_dict, report=False, **gillesp return trajs_T, trajs_C -def temporal_average(t, trajs_T, trajs_C, species, f=lambda x: x): +def temporal_average( + t: np.ndarray, + trajs_T: List[np.ndarray], + trajs_C: List[np.ndarray], + species: int, + f=lambda x: x, +) -> np.ndarray: + """Calculate the temporal average of a species.""" n = len(trajs_T) y = np.zeros((n, len(t))) for i in range(n): @@ -270,7 +334,14 @@ def temporal_average(t, trajs_T, trajs_C, species, f=lambda x: x): return np.nanmean(y, 0) -def temporal_cov(t, trajs_T, trajs_C, species1, species2): +def temporal_cov( + t: np.ndarray, + trajs_T: List[np.ndarray], + trajs_C: List[np.ndarray], + species1: int, + species2: int, +) -> np.ndarray: + """Calculate the temporal covariance between two species.""" n = len(trajs_T) y = np.zeros((n, len(t))) for i in range(n): @@ -283,7 +354,13 @@ def temporal_cov(t, trajs_T, trajs_C, species1, species2): return np.nanmean(y, 0) -def temporal_interp(t, trajs_T, trajs_C, round=False): +def temporal_interp( + t: np.ndarray, + trajs_T: List[np.ndarray], + trajs_C: List[np.ndarray], + round: bool = False, +) -> np.ndarray: + """Interpolate the trajectories to a given time section.""" n = len(trajs_T) ret = [] for i in range(n): @@ -298,7 +375,8 @@ def temporal_interp(t, trajs_T, trajs_C, round=False): return np.array(ret) -def convert_nosplice(trajs_T, trajs_C): +def convert_nosplice(trajs_T: List[np.ndarray], trajs_C: List[np.ndarray]) -> List: + """Convert the splicing model to the non-splicing model.""" trajs_C_nosplice = [] for i in range(len(trajs_T)): traj_temp = np.zeros((2, len(trajs_T[i]))) @@ -308,7 +386,22 @@ def convert_nosplice(trajs_T, trajs_C): return trajs_C_nosplice -def simulate_multigene(a, b, la, aa, ai, si, be, ga, C0, t_span, n_traj, t_eval, report=False): +def simulate_multigene( + a: List[float], + b: List[float], + la: List[float], + aa: List[float], + ai: List[float], + si: List[float], + be: List[float], + ga: List[float], + C0: List[np.ndarray], + t_span: List[float], + n_traj: int, + t_eval: np.ndarray, + report: bool = False, +) -> np.ndarray: + """Simulate the multi-gene model.""" n_genes = len(a) ret = [] for i in range(n_genes): @@ -332,9 +425,12 @@ def simulate_multigene(a, b, la, aa, ai, si, be, ga, C0, t_span, n_traj, t_eval, class CellularSpecies: + """A class to register gene and species for easier implemention of simulations.""" def __init__(self, gene_names: list = []) -> None: - """ - A class to register gene and species for easier implemention of simulations. + """Initialize the CellularSpecies class. + + Args: + gene_names: A list of gene names. """ self.species_dict = {} self.gene_names = gene_names @@ -342,13 +438,29 @@ def __init__(self, gene_names: list = []) -> None: self._is_gene_species = [] self.num_species = 0 - def get_n_genes(self): + def get_n_genes(self) -> int: + """Get the number of genes. + + Returns: + The number of genes. + """ return len(self.gene_names) - def get_species_names(self): + def get_species_names(self) -> List[str]: + """Get the names of the registered species. + + Returns: + A list of species names. + """ return self.species_dict.keys() - def register_species(self, species_name: str, is_gene_species: bool = True): + def register_species(self, species_name: str, is_gene_species: bool = True) -> None: + """Register a species. + + Args: + species_name: The name of the species. + is_gene_species: Whether the species is a gene species. + """ if self.get_n_genes() == 0 and is_gene_species: raise Exception("There is no gene and therefore cannot register gene species.") if species_name in self.species_dict: @@ -364,7 +476,16 @@ def register_species(self, species_name: str, is_gene_species: bool = True): self.species_dict[species_name] = [i + self.num_species for i in range(self.get_n_genes())] self.num_species += self.get_n_genes() - def get_index(self, species: str, gene: Optional[Union[int, str]] = None): + def get_index(self, species: str, gene: Optional[Union[int, str]] = None) -> int: + """Get the index of a species or gene. + + Args: + species: The name of the species. + gene: The name or index of the gene. + + Returns: + The index of the species or gene. + """ if not species in self.species_dict.keys(): raise Exception(f"Unregistered species `{species}`") idx = self.species_dict[species] @@ -377,7 +498,16 @@ def get_index(self, species: str, gene: Optional[Union[int, str]] = None): raise Exception(f"The gene name {gene} is not found in the registered genes.") return idx - def get_species(self, index, return_gene_name=True): + def get_species(self, index: int, return_gene_name: bool = True) -> Tuple[str, Optional[int]]: + """Get the species name from the index. + + Args: + index: The index of the species. + return_gene_name: Whether to return the gene name if the species is a gene species. + + Returns: + The name of the species or a tuple containing the name of the species and the gene index. + """ species = None for k, v in self.species_dict.items(): if type(v) == int: @@ -391,12 +521,28 @@ def get_species(self, index, return_gene_name=True): species = (k, gene_idx) return species - def get_gene_index(self, gene): + def get_gene_index(self, gene: str) -> int: + """Get the index of a gene. + + Args: + gene: The name of the gene. + + Returns: + The index of the gene. + """ if not gene in self.gene_names: raise Exception(f"Gene name `{gene}` not found.") return np.where(self.gene_names == gene)[0] - def is_gene_species(self, species: Union[str, int]): + def is_gene_species(self, species: Union[str, int]) -> bool: + """Check if a species is a gene species. + + Args: + species: The name or index of the species. + + Returns: + Whether the species is a gene species. + """ if type(species) == int: species = self.__getitem__(species)[0] @@ -407,12 +553,18 @@ def is_gene_species(self, species: Union[str, int]): if k == species: return self.is_gene_species[i] - def iter_gene_species(self): + def iter_gene_species(self) -> Tuple[str, int]: + """Iterate through the gene species. + + Yields: + A tuple containing the name of the gene species and the index of the gene species. + """ for i, (k, v) in enumerate(self.species_dict.items()): if self._is_gene_species[i]: yield (k, v) - def __getitem__(self, species): + def __getitem__(self, species: Union[str, int]) -> Tuple[str, Optional[int]]: + """Method for list indexing, dictionary lookup, or accessing ranges of values.""" if np.isscalar(species): if type(species) == str: return self.get_index(species) @@ -421,10 +573,12 @@ def __getitem__(self, species): else: return self.get_index(species[0], species[1]) - def __len__(self): + def __len__(self) -> int: + """Method to define length of the class.""" return self.num_species def copy(self): + """Create a copy of the CellularSpecies object.""" # this function needs to be tested. species = CellularSpecies(self.gene_names) for sp in self.get_species_names(): @@ -433,6 +587,7 @@ def copy(self): class Reaction: + """A class to register reactions for easier implementation of simulations.""" def __init__( self, substrates: list, @@ -442,6 +597,16 @@ def __init__( stoich_products=None, desc: str = "", ) -> None: + """Initialize the Reaction class. + + Args: + substrates: A list of substrates. + products: A list of products. + rate_func: The rate function of the reaction. + stoich_substrates: The stoichiometry of the substrates. + stoich_products: The stoichiometry of the products. + desc: The description of the reaction. + """ self.substrates = substrates self.products = products self.rate_func = rate_func @@ -455,35 +620,61 @@ def __init__( class GillespieReactions: + """A class to register reactions for easier implementation of Gillespie simulations.""" def __init__(self, species: CellularSpecies) -> None: + """Initialize the GillespieReactions class. + + Args: + species: The CellularSpecies object. + """ self.species = species self._rxns: List[Reaction] = [] self._stoich = None def __len__(self): + """Method to define length of the class.""" return len(self._rxns) - def __getitem__(self, index): + def __getitem__(self, index: int) -> Reaction: + """Method for list indexing, dictionary lookup, or accessing ranges of values.""" return self._rxns[index] - def __iter__(self): + def __iter__(self) -> Reaction: + """Method to iterate through the class.""" for rxn in self._rxns: yield rxn - def register_reaction(self, reaction: Reaction): + def register_reaction(self, reaction: Reaction) -> int: + """Register a reaction. + + Args: + reaction: The Reaction object to be registered. + + Returns: + The index of the registered reaction. + """ # reset stoich self._stoich = None # append reaction self._rxns.append(reaction) return len(self) - 1 - def propensity(self, C): + def propensity(self, C: np.ndarray) -> np.ndarray: + """Calculate the propensities of all reactions. + + Args: + C: An array of copy numbers of all species. + + Returns: + An array of propensities of all reactions. + """ prop = np.zeros(len(self)) for i, rxn in enumerate(self._rxns): prop[i] = rxn.rate_func(C) return prop def generate_stoich_matrix(self): + """Generate the stoichiometry matrix.""" # TODO: check if substrates and products are valid species indices # TODO: fix the case where a species is in both the substrates and products self._stoich = np.zeros((len(self), len(self.species)), dtype=int) @@ -492,12 +683,18 @@ def generate_stoich_matrix(self): self._stoich[i, rxn.products] = rxn.stoich_products def get_desc(self): + """Get the descriptions of all reactions. + + Returns: + A list of descriptions of all reactions. + """ desc = [] for rxn in self._rxns: desc.append(rxn.desc) return desc def display_stoich(self): + """Display the stoichiometry matrix.""" import pandas as pd if self._stoich is None: @@ -511,7 +708,19 @@ def display_stoich(self): df = pd.DataFrame(self._stoich, columns=species_names, index=self.get_desc()) print(df) - def simulate(self, t_span, C0, **gillespie_kwargs): + def simulate(self, t_span: List, C0: np.ndarray, **gillespie_kwargs) -> Tuple[np.ndarray, np.ndarray]: + """Simulate using the Gillespie method. + + Args: + t_span: A list of starting and end simulation time. + C0: A 1d array of initial conditions. + gillespie_kwargs: Additional keyword arguments for the Gillespie simulation. + + Returns: + A tuple containing: + T: A 1d numpy array of time points. + C: A 2d numpy array (n_species x n_time_points) of copy numbers for each species at each time point. + """ if self._stoich is None: self.generate_stoich_matrix() update_func = lambda C, mu: C + self._stoich[mu, :]