Skip to content

Test adequacy #215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions causal_testing/data_collection/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def filter_valid_data(self, data: pd.DataFrame, check_pos: bool = True) -> pd.Da
solver.push()
# Check that the row does not violate any scenario constraints
# Need to explicitly cast variables to their specified type. Z3 will not take e.g. np.int64 to be an int.
# Check that the row does not violate any scenario constraints
model = [
self.scenario.variables[var].z3
== self.scenario.variables[var].z3_val(self.scenario.variables[var].z3, row[var])
Expand Down
101 changes: 55 additions & 46 deletions causal_testing/json_front/json_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from causal_testing.testing.causal_test_result import CausalTestResult
from causal_testing.testing.estimators import Estimator
from causal_testing.testing.base_test_case import BaseTestCase
from causal_testing.testing.causal_test_adequacy import DataAdequacy

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,9 +67,8 @@ def set_paths(self, json_path: str, dag_path: str, data_paths: list[str] = None)
data_paths = []
self.input_paths = JsonClassPaths(json_path=json_path, dag_path=dag_path, data_paths=data_paths)

def setup(self, scenario: Scenario):
def setup(self, scenario: Scenario, data=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see this data parameter used in any setup calls, is this for some future use?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using it as part of my case study so I can pass in the data directly rather than having to pass in filepaths

"""Function to populate all the necessary parts of the json_class needed to execute tests"""
data = []
self.scenario = scenario
self._get_scenario_variables()
self.scenario.setup_treatment_variables()
Expand All @@ -81,9 +81,9 @@ def setup(self, scenario: Scenario):
# Populate the data
if self.input_paths.data_paths:
data = pd.concat([pd.read_csv(data_file, header=0) for data_file in self.input_paths.data_paths])
if len(data) == 0:
if data is None or len(data) == 0:
raise ValueError(
"No data found, either provide a path to a file containing data or manually populate the .data "
"No data found. Please either provide a path to a file containing data or manually populate the .data "
"attribute with a dataframe before calling .setup()"
)
self.data_collector = ObservationalDataCollector(self.scenario, data)
Expand Down Expand Up @@ -128,40 +128,20 @@ def run_json_tests(self, effects: dict, estimators: dict, f_flag: bool = False,
if "skip" in test and test["skip"]:
continue
test["estimator"] = estimators[test["estimator"]]
if "mutations" in test:
# If we have specified concrete control and treatment value
if "mutations" not in test:
failed, msg = self._run_concrete_metamorphic_test(test, f_flag, effects)
# If we have a variable to mutate
else:
if test["estimate_type"] == "coefficient":
msg = self._run_coefficient_test(test=test, f_flag=f_flag, effects=effects)
failed, msg = self._run_coefficient_test(test=test, f_flag=f_flag, effects=effects)
else:
msg = self._run_ate_test(test=test, f_flag=f_flag, effects=effects, mutates=mutates)
self._append_to_file(msg, logging.INFO)
else:
outcome_variable = next(
iter(test["expected_effect"])
) # Take first key from dictionary of expected effect
base_test_case = BaseTestCase(
treatment_variable=self.variables["inputs"][test["treatment_variable"]],
outcome_variable=self.variables["outputs"][outcome_variable],
)

causal_test_case = CausalTestCase(
base_test_case=base_test_case,
expected_causal_effect=effects[test["expected_effect"][outcome_variable]],
control_value=test["control_value"],
treatment_value=test["treatment_value"],
estimate_type=test["estimate_type"],
)

failed, _ = self._execute_test_case(causal_test_case=causal_test_case, test=test, f_flag=f_flag)

msg = (
f"Executing concrete test: {test['name']} \n"
+ f"treatment variable: {test['treatment_variable']} \n"
+ f"outcome_variable = {outcome_variable} \n"
+ f"control value = {test['control_value']}, treatment value = {test['treatment_value']} \n"
+ f"Result: {'FAILED' if failed else 'Passed'}"
)
print(msg)
self._append_to_file(msg, logging.INFO)
failed, msg = self._run_metamorphic_tests(
test=test, f_flag=f_flag, effects=effects, mutates=mutates
)
test["failed"] = failed
test["result"] = msg
return self.test_plan["tests"]

def _run_coefficient_test(self, test: dict, f_flag: bool, effects: dict):
"""Builds structures and runs test case for tests with an estimate_type of 'coefficient'.
Expand All @@ -183,18 +163,45 @@ def _run_coefficient_test(self, test: dict, f_flag: bool, effects: dict):
estimate_type="coefficient",
effect_modifier_configuration={self.scenario.variables[v] for v in test.get("effect_modifiers", [])},
)
result = self._execute_test_case(causal_test_case=causal_test_case, test=test, f_flag=f_flag)
failed, result = self._execute_test_case(causal_test_case=causal_test_case, test=test, f_flag=f_flag)
msg = (
f"Executing test: {test['name']} \n"
+ f" {causal_test_case} \n"
+ " "
+ ("\n ").join(str(result[1]).split("\n"))
+ ("\n ").join(str(result).split("\n"))
+ "==============\n"
+ f" Result: {'FAILED' if result[0] else 'Passed'}"
+ f" Result: {'FAILED' if failed else 'Passed'}"
)
self._append_to_file(msg, logging.INFO)
return failed, result

def _run_concrete_metamorphic_test(self, test: dict, f_flag: bool, effects: dict):
outcome_variable = next(iter(test["expected_effect"])) # Take first key from dictionary of expected effect
base_test_case = BaseTestCase(
treatment_variable=self.variables["inputs"][test["treatment_variable"]],
outcome_variable=self.variables["outputs"][outcome_variable],
)
return msg

def _run_ate_test(self, test: dict, f_flag: bool, effects: dict, mutates: dict):
causal_test_case = CausalTestCase(
base_test_case=base_test_case,
expected_causal_effect=effects[test["expected_effect"][outcome_variable]],
control_value=test["control_value"],
treatment_value=test["treatment_value"],
estimate_type=test["estimate_type"],
)
failed, msg = self._execute_test_case(causal_test_case=causal_test_case, test=test, f_flag=f_flag)

msg = (
f"Executing concrete test: {test['name']} \n"
+ f"treatment variable: {test['treatment_variable']} \n"
+ f"outcome_variable = {outcome_variable} \n"
+ f"control value = {test['control_value']}, treatment value = {test['treatment_value']} \n"
+ f"Result: {'FAILED' if failed else 'Passed'}"
)
self._append_to_file(msg, logging.INFO)
return failed, msg

def _run_metamorphic_tests(self, test: dict, f_flag: bool, effects: dict, mutates: dict):
"""Builds structures and runs test case for tests with an estimate_type of 'ate'.

:param test: Single JSON test definition stored in a mapping (dict)
Expand Down Expand Up @@ -226,7 +233,8 @@ def _run_ate_test(self, test: dict, f_flag: bool, effects: dict, mutates: dict):
+ f" Number of concrete tests for test case: {str(len(concrete_tests))} \n"
+ f" {failures}/{len(concrete_tests)} failed for {test['name']}"
)
return msg
self._append_to_file(msg, logging.INFO)
return failures, msg

def _execute_tests(self, concrete_tests, test, f_flag):
failures = 0
Expand Down Expand Up @@ -265,9 +273,13 @@ def _execute_test_case(
causal_test_result = causal_test_case.execute_test(
estimator=estimation_model, data_collector=self.data_collector
)

test_passes = causal_test_case.expected_causal_effect.apply(causal_test_result)

if "coverage" in test and test["coverage"]:
adequacy_metric = DataAdequacy(causal_test_case, estimation_model, self.data_collector)
adequacy_metric.measure_adequacy()
causal_test_result.adequacy = adequacy_metric

if causal_test_result.ci_low() is not None and causal_test_result.ci_high() is not None:
result_string = (
f"{causal_test_result.ci_low()} < {causal_test_result.test_value.value} < "
Expand All @@ -283,7 +295,6 @@ def _execute_test_case(
f"got {result_string}"
)
failed = True
logger.warning(" FAILED- expected %s, got %s", causal_test_case.expected_causal_effect, result_string)
return failed, causal_test_result

def _setup_test(self, causal_test_case: CausalTestCase, test: Mapping) -> Estimator:
Expand All @@ -294,7 +305,6 @@ def _setup_test(self, causal_test_case: CausalTestCase, test: Mapping) -> Estima
data. Conditions should be in the query format detailed at
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html
:returns:
- causal_test_engine - Test Engine instance for the test being run
- estimation_model - Estimator instance for the test being run
"""
minimal_adjustment_set = self.causal_specification.causal_dag.identification(causal_test_case.base_test_case)
Expand Down Expand Up @@ -370,7 +380,6 @@ def get_args(test_args=None) -> argparse.Namespace:
parser.add_argument(
"--log_path",
help="Specify a directory to change the location of the log file",
default="./json_frontend.log",
)
parser.add_argument(
"--data_path",
Expand Down
107 changes: 107 additions & 0 deletions causal_testing/testing/causal_test_adequacy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
This module contains code to measure various aspects of causal test adequacy.
"""
from itertools import combinations
from copy import deepcopy
import pandas as pd

from causal_testing.testing.causal_test_suite import CausalTestSuite
from causal_testing.data_collection.data_collector import DataCollector
from causal_testing.specification.causal_dag import CausalDAG
from causal_testing.testing.estimators import Estimator
from causal_testing.testing.causal_test_case import CausalTestCase


class DAGAdequacy:
"""
Measures the adequacy of a given DAG by hos many edges and independences are tested.
"""

def __init__(
self,
causal_dag: CausalDAG,
test_suite: CausalTestSuite,
):
self.causal_dag = causal_dag
self.test_suite = test_suite
self.tested_pairs = None
self.pairs_to_test = None
self.untested_edges = None
self.dag_adequacy = None

def measure_adequacy(self):
"""
Calculate the adequacy measurement, and populate the `dat_adequacy` field.
"""
self.tested_pairs = {(t.treatment_variable, t.outcome_variable) for t in self.test_suite}
self.pairs_to_test = set(combinations(self.causal_dag.graph.nodes, 2))
self.untested_edges = self.pairs_to_test.difference(self.tested_pairs)
self.dag_adequacy = len(self.tested_pairs) / len(self.pairs_to_test)

def to_dict(self):
"Returns the adequacy object as a dictionary."
return {
"causal_dag": self.causal_dag,
"test_suite": self.test_suite,
"tested_pairs": self.tested_pairs,
"pairs_to_test": self.pairs_to_test,
"untested_edges": self.untested_edges,
"dag_adequacy": self.dag_adequacy,
}


class DataAdequacy:
"""
Measures the adequacy of a given test according to the Fisher kurtosis of the bootstrapped result.
- Positive kurtoses indicate the model doesn't have enough data so is unstable.
- Negative kurtoses indicate the model doesn't have enough data, but is too stable, indicating that the spread of
inputs is insufficient.
- Zero kurtosis is optimal.
"""

def __init__(
self, test_case: CausalTestCase, estimator: Estimator, data_collector: DataCollector, bootstrap_size: int = 100
):
self.test_case = test_case
self.estimator = estimator
self.data_collector = data_collector
self.kurtosis = None
self.outcomes = None
self.bootstrap_size = bootstrap_size

def measure_adequacy(self):
"""
Calculate the adequacy measurement, and populate the data_adequacy field.
"""
results = []
for i in range(self.bootstrap_size):
estimator = deepcopy(self.estimator)
estimator.df = estimator.df.sample(len(estimator.df), replace=True, random_state=i)
# try:
results.append(self.test_case.execute_test(estimator, self.data_collector))
# except np.LinAlgError:
# continue
outcomes = [self.test_case.expected_causal_effect.apply(c) for c in results]
results = pd.DataFrame(c.to_dict() for c in results)[["effect_estimate", "ci_low", "ci_high"]]

def convert_to_df(field):
converted = []
for r in results[field]:
if isinstance(r, float):
converted.append(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this leave you with a list of dataframes of length 1? If so it seems quite inefficient and convoluted.

Would making converted a blank dataframe and using the df.append method so it is only 1 dataframe work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this so it type checks. If you have a categorical variable, statsmodels handles this with a dummy encoding, so you get a dataframe of coefficients with one for each category. It's easier to turn everything to a df than to handle the two different datatypes separately, although that's probably just me thinking like a functional programmer again...

pd.DataFrame({self.test_case.base_test_case.treatment_variable.name: [r]}).transpose()
)
else:
converted.append(r)
return converted

for field in ["effect_estimate", "ci_low", "ci_high"]:
results[field] = convert_to_df(field)

effect_estimate = pd.concat(results["effect_estimate"].tolist(), axis=1).transpose().reset_index(drop=True)
self.kurtosis = effect_estimate.kurtosis()
self.outcomes = sum(outcomes)

def to_dict(self):
"Returns the adequacy object as a dictionary."
return {"kurtosis": self.kurtosis.to_dict(), "bootstrap_size": self.bootstrap_size, "passing": self.outcomes}
11 changes: 2 additions & 9 deletions causal_testing/testing/causal_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ class CausalTestCase:
"""
A CausalTestCase extends the information held in a BaseTestCase. As well as storing the treatment and outcome
variables, a CausalTestCase stores the values of these variables. Also the outcome variable and value are
specified.

The goal of a CausalTestCase is to test whether the intervention made to the control via the treatment causes the
model-under-test to produce the expected change. The CausalTestCase structure is designed for execution using the
CausalTestEngine, using either execute_test() function to execute a single test case or packing CausalTestCases into
a CausalTestSuite and executing them as a batch using the execute_test_suite() function.
specified. The goal of a CausalTestCase is to test whether the intervention made to the control via the treatment
causes the model-under-test to produce the expected change.
"""

def __init__(
Expand Down Expand Up @@ -87,9 +83,6 @@ def execute_test(self, estimator: type(Estimator), data_collector: DataCollector
if estimator.df is None:
estimator.df = data_collector.collect_data()

logger.info("treatments: %s", self.treatment_variable.name)
logger.info("outcomes: %s", self.outcome_variable)

causal_test_result = self._return_causal_test_results(estimator)
return causal_test_result

Expand Down
21 changes: 13 additions & 8 deletions causal_testing/testing/causal_test_outcome.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ def apply(self, res: CausalTestResult) -> bool:
class NoEffect(CausalTestOutcome):
"""An extension of TestOutcome representing that the expected causal effect should be zero."""

def __init__(self, atol: float = 1e-10):
def __init__(self, atol: float = 1e-10, ctol: float = 0.05):
"""
:param atol: Arithmetic tolerance. The test will pass if the absolute value of the causal effect is less than
atol.
:param ctol: Categorical tolerance. The test will pass if this proportion of categories pass.
"""
self.atol = atol
self.ctol = ctol

def apply(self, res: CausalTestResult) -> bool:
if res.test_value.type == "ate":
Expand All @@ -52,14 +58,13 @@ def apply(self, res: CausalTestResult) -> bool:
ci_high = res.ci_high() if isinstance(res.ci_high(), Iterable) else [res.ci_high()]
value = res.test_value.value if isinstance(res.ci_high(), Iterable) else [res.test_value.value]

if not all(ci_low < 0 < ci_high for ci_low, ci_high in zip(ci_low, ci_high)):
print(
"FAILING ON",
[(ci_low, ci_high) for ci_low, ci_high in zip(ci_low, ci_high) if not ci_low < 0 < ci_high],
return (
sum(
not ((ci_low < 0 < ci_high) or abs(v) < self.atol)
for ci_low, ci_high, v in zip(ci_low, ci_high, value)
)

return all(ci_low < 0 < ci_high for ci_low, ci_high in zip(ci_low, ci_high)) or all(
abs(v) < self.atol for v in value
/ len(value)
< self.ctol
)
if res.test_value.type == "risk_ratio":
return (res.ci_low() < 1 < res.ci_high()) or np.isclose(res.test_value.value, 1.0, atol=self.atol)
Expand Down
Loading