Skip to content

Commit

Permalink
Improve memory usage and logging info by moving the encoding step (#113)
Browse files Browse the repository at this point in the history
* Move encoding step to save memory during analysis

* Version 2.2.0
  • Loading branch information
jrm5100 committed Jul 19, 2021
1 parent 7efc8b5 commit a39d4f5
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 113 deletions.
33 changes: 0 additions & 33 deletions clarite/modules/analyze/association_study.py
Expand Up @@ -2,7 +2,6 @@

import click
import pandas as pd
from pandas_genomics import GenotypeDtype

from clarite.modules.analyze import regression
from clarite.modules.analyze.regression import (
Expand Down Expand Up @@ -43,10 +42,6 @@ def association_study(
This can be 'glm', 'weighted_glm', or 'r_survey' for built-in Regression types,
or a custom subclass of Regression. If None, it is set to 'glm' if a survey design is not specified
and 'weighted_glm' if it is.
encoding: str, default "additive"
Encoding method to use for any genotype data. One of {'additive', 'dominant', 'recessive', 'codominant', or 'weighted'}
edge_encoding_info: Optional pd.DataFrame, default None
If edge encoding is used, this must be provided. See Pandas-Genomics documentation on edge encodings.
kwargs: Keyword arguments specific to the Regression being used
Returns
Expand All @@ -55,34 +50,6 @@ def association_study(
Association Study results DataFrame with at least these columns: ['N', 'pvalue', 'error', 'warnings'].
Indexed by the outcome variable and the variable being assessed in each regression
"""
# Copy data to avoid modifying the original, in case it is changed
data = data.copy(deep=True)

# Encode any genotype data
has_genotypes = False
for dt in data.dtypes:
if GenotypeDtype.is_dtype(dt):
has_genotypes = True
break
if has_genotypes:
if encoding == "additive":
data = data.genomics.encode_additive()
elif encoding == "dominant":
data = data.genomics.encode_dominant()
elif encoding == "recessive":
data = data.genomics.encode_recessive()
elif encoding == "codominant":
data = data.genomics.encode_codominant()
elif encoding == "edge":
if edge_encoding_info is None:
raise ValueError(
"'edge_encoding_info' must be provided when using edge encoding"
)
else:
data = data.genomics.encode_edge(edge_encoding_info)
else:
raise ValueError(f"Genotypes provided with unknown 'encoding': {encoding}")

# Ensure outcome, covariates, and regression variables are lists
if isinstance(outcomes, str):
outcomes = [
Expand Down
4 changes: 2 additions & 2 deletions clarite/modules/analyze/interaction_study.py
Expand Up @@ -16,7 +16,7 @@ def interaction_study(
edge_encoding_info: Optional[pd.DataFrame] = None,
report_betas: bool = False,
min_n: int = 200,
process_num: Optional[int] = None
process_num: Optional[int] = None,
):
"""Perform LRT tests comparing a model with interaction terms to one without.
Expand Down Expand Up @@ -110,7 +110,7 @@ def interaction_study(
min_n=min_n,
interactions=interactions,
report_betas=report_betas,
process_num=process_num
process_num=process_num,
)
print(regression)

Expand Down
4 changes: 3 additions & 1 deletion clarite/modules/analyze/regression/base.py
Expand Up @@ -39,6 +39,8 @@ def __init__(
regression_variables: List[str],
covariates: Optional[List[str]] = None,
):
# Copy the data to avoid changing the original. The copy will be modified in-place.
data = data.copy()
# Print a warning if there are any empty categories and remove them
# This is done to distinguish from those that become missing during analysis (and could be an issue)
empty_categories = _remove_empty_categories(data)
Expand Down Expand Up @@ -105,7 +107,7 @@ def _validate_regression_params(self, regression_variables):
types = _get_dtypes(self.data)
rv_types = {v: t for v, t in types.iteritems() if v in regression_variables}
rv_count = 0
for dtype in ["binary", "categorical", "continuous"]:
for dtype in ["binary", "categorical", "continuous", "genotypes"]:
self.regression_variables[dtype] = [
v for v, t in rv_types.items() if t == dtype
]
Expand Down
96 changes: 77 additions & 19 deletions clarite/modules/analyze/regression/glm_regression.py
Expand Up @@ -9,9 +9,10 @@
import patsy
import scipy
import statsmodels.api as sm
from pandas_genomics import GenotypeDtype
from scipy.stats import stats

from clarite.internal.utilities import _remove_empty_categories
from clarite.internal.utilities import _remove_empty_categories, _get_dtypes

from .base import Regression
from ..utils import fix_names, statsmodels_var_regex
Expand Down Expand Up @@ -59,10 +60,16 @@ class GLMRegression(Regression):
False by default.
If True, numeric data will be standardized using z-scores before regression.
This will affect the beta values and standard error, but not the pvalues.
encoding: str, default "additive"
Encoding method to use for any genotype data. One of {'additive', 'dominant', 'recessive', 'codominant', or 'weighted'}
edge_encoding_info: Optional pd.DataFrame, default None
If edge encoding is used, this must be provided. See Pandas-Genomics documentation on edge encodings.
process_num: Optional[int]
Number of processes to use when running the analysis, default is None (use the number of cores)
"""

KNOWN_ENCODINGS = {"additive", "dominant", "recessive", "codominant", "edge"}

def __init__(
self,
data: pd.DataFrame,
Expand All @@ -72,6 +79,8 @@ def __init__(
min_n: int = 200,
report_categorical_betas: bool = False,
standardize_data: bool = False,
encoding: str = "additive",
edge_encoding_info: Optional[pd.DataFrame] = None,
process_num: Optional[int] = None,
):
# base class init
Expand All @@ -91,6 +100,15 @@ def __init__(
if process_num is None:
process_num = multiprocessing.cpu_count()
self.process_num = process_num
if encoding not in self.KNOWN_ENCODINGS:
raise ValueError(f"Genotypes provided with unknown 'encoding': {encoding}")
elif encoding == "edge" and edge_encoding_info is None:
raise ValueError(
"'edge_encoding_info' must be provided when using edge encoding"
)
else:
self.encoding = encoding
self.edge_encoding_info = edge_encoding_info

# Ensure the data output type is compatible
# Set 'self.family' and 'self.use_t' which are dependent on the outcome dtype
Expand Down Expand Up @@ -316,6 +334,28 @@ def _run_categorical(
"Diff_AIC": est.aic - est_restricted.aic,
}

def _get_rv_specific_data(self, rv: str):
"""Select the data relevant to performing a regression on a given variable, encoding genotypes if needed"""
data = self.data[[rv, self.outcome_variable] + self.covariates].copy()
# Encode any genotype data
has_genotypes = False
for dt in data.dtypes:
if GenotypeDtype.is_dtype(dt):
has_genotypes = True
break
if has_genotypes:
if self.encoding == "additive":
data = data.genomics.encode_additive()
elif self.encoding == "dominant":
data = data.genomics.encode_dominant()
elif self.encoding == "recessive":
data = data.genomics.encode_recessive()
elif self.encoding == "codominant":
data = data.genomics.encode_codominant()
elif self.encoding == "edge":
data = data.genomics.encode_edge(self.edge_encoding_info)
return data

def run(self):
"""Run a regression object, returning the results and logging any warnings/errors"""
for rv_type, rv_list in self.regression_variables.items():
Expand All @@ -330,24 +370,37 @@ def run(self):
)
)

with multiprocessing.Pool(processes=self.process_num) as pool:
run_result = pool.starmap(
self._run_rv,
zip(
rv_list,
repeat(rv_type),
[
self.data[[rv, self.outcome_variable] + self.covariates]
for rv in rv_list
],
repeat(self.outcome_variable),
repeat(self.covariates),
repeat(self.min_n),
repeat(self.family),
repeat(self.use_t),
repeat(self.report_categorical_betas),
),
)
if self.process_num == 1:
run_result = [
self._run_rv(
rv,
rv_type,
self._get_rv_specific_data(rv),
self.outcome_variable,
self.covariates,
self.min_n,
self.family,
self.use_t,
self.report_categorical_betas,
)
for rv in rv_list
]
else:
with multiprocessing.Pool(processes=self.process_num) as pool:
run_result = pool.starmap(
self._run_rv,
zip(
rv_list,
repeat(rv_type),
[self._get_rv_specific_data(rv) for rv in rv_list],
repeat(self.outcome_variable),
repeat(self.covariates),
repeat(self.min_n),
repeat(self.family),
repeat(self.use_t),
repeat(self.report_categorical_betas),
),
)

for rv, rv_result in zip(rv_list, run_result):
results, warnings, error = rv_result
Expand Down Expand Up @@ -424,6 +477,11 @@ def _run_rv(
# Apply the complete_case_mask to the data to ensure categorical models use the same data in the LRT
data = data.loc[complete_case_mask]

# Update rv_type to the encoded type if it is a genotype
if rv_type == "genotypes":
"""Need to update with encoded type"""
rv_type = _get_dtypes(data[rv])[rv]

# Run Regression
if rv_type == "continuous":
result = cls.get_default_result_dict(rv)
Expand Down
104 changes: 76 additions & 28 deletions clarite/modules/analyze/regression/interaction_regression.py
Expand Up @@ -8,6 +8,7 @@
import patsy
import scipy
import statsmodels.api as sm
from pandas_genomics import GenotypeDtype

from clarite.internal.utilities import _remove_empty_categories
from . import GLMRegression
Expand Down Expand Up @@ -45,6 +46,10 @@ class InteractionRegression(GLMRegression):
False by default.
If True, the results will contain one row for each interaction term and will include the beta value
for that term. The number of terms increases with the number of categories in each interacting term.
encoding: str, default "additive"
Encoding method to use for any genotype data. One of {'additive', 'dominant', 'recessive', 'codominant', or 'weighted'}
edge_encoding_info: Optional pd.DataFrame, default None
If edge encoding is used, this must be provided. See Pandas-Genomics documentation on edge encodings.
process_num: Optional[int]
Number of processes to use when running the analysis, default is None (use the number of cores)
Expand All @@ -58,6 +63,8 @@ def __init__(
min_n=200,
interactions=None,
report_betas=False,
encoding: str = "additive",
edge_encoding_info: Optional[pd.DataFrame] = None,
process_num: Optional[int] = None,
):
# base class init
Expand All @@ -71,6 +78,8 @@ def __init__(
outcome_variable=outcome_variable,
covariates=covariates,
regression_variables=regression_variables,
encoding=encoding,
edge_encoding_info=edge_encoding_info,
min_n=min_n,
)

Expand All @@ -86,11 +95,10 @@ def __init__(

def _process_interactions(self, interactions):
"""Validate the interactions parameter and save it as a list of string tuples"""
regression_var_list = (
self.regression_variables["binary"]
+ self.regression_variables["categorical"]
+ self.regression_variables["continuous"]
)
regression_var_list = []
for var_list in self.regression_variables.values():
regression_var_list.extend(var_list)

if len(regression_var_list) < 2:
raise ValueError(
f"Not enough valid variables for running interactions: {len(regression_var_list)} variables"
Expand Down Expand Up @@ -212,6 +220,35 @@ def _run_interaction_regression(
# Did not converge - nothing to update
yield dict()

def _get_interaction_specific_data(self, interaction: Tuple[str, str]):
"""Select the data relevant to performing a regression on a given interaction, encoding genotypes if needed"""
data = self.data[
list(interaction)
+ [
self.outcome_variable,
]
+ self.covariates
].copy()

# Encode any genotype data
has_genotypes = False
for dt in data.dtypes:
if GenotypeDtype.is_dtype(dt):
has_genotypes = True
break
if has_genotypes:
if self.encoding == "additive":
data = data.genomics.encode_additive()
elif self.encoding == "dominant":
data = data.genomics.encode_dominant()
elif self.encoding == "recessive":
data = data.genomics.encode_recessive()
elif self.encoding == "codominant":
data = data.genomics.encode_codominant()
elif self.encoding == "edge":
data = data.genomics.encode_edge(self.edge_encoding_info)
return data

def run(self):
"""Run a regression object, returning the results and logging any warnings/errors"""
# Log how many interactions are being run using how many processes
Expand All @@ -221,29 +258,40 @@ def run(self):
fg="green",
)
)
with multiprocessing.Pool(processes=self.process_num) as pool:
run_result = pool.starmap(
self._run_interaction,
zip(
self.interactions,
[
self.data[
list(interaction)
+ [
self.outcome_variable,
]
+ self.covariates
]
for interaction in self.interactions
],
repeat(self.outcome_variable),
repeat(self.covariates),
repeat(self.min_n),
repeat(self.family),
repeat(self.use_t),
repeat(self.report_betas),
),
)

if self.process_num == 1:
run_result = [
self._run_interaction(
interaction,
self._get_interaction_specific_data(interaction),
self.outcome_variable,
self.covariates,
self.min_n,
self.family,
self.use_t,
self.report_betas,
)
for interaction in self.interactions
]

else:
with multiprocessing.Pool(processes=self.process_num) as pool:
run_result = pool.starmap(
self._run_interaction,
zip(
self.interactions,
[
self._get_interaction_specific_data(interaction)
for interaction in self.interactions
],
repeat(self.outcome_variable),
repeat(self.covariates),
repeat(self.min_n),
repeat(self.family),
repeat(self.use_t),
repeat(self.report_betas),
),
)

for interaction, interaction_result in zip(self.interactions, run_result):
interaction_str = ":".join(interaction)
Expand Down

0 comments on commit a39d4f5

Please sign in to comment.