diff --git a/.github/workflows/test_and_release.yml b/.github/workflows/test_and_release.yml
index 6d75eff..7a19685 100644
--- a/.github/workflows/test_and_release.yml
+++ b/.github/workflows/test_and_release.yml
@@ -18,7 +18,7 @@ jobs:
- ubuntu-18.04
- ubuntu-16.04
- macos-latest
- # - windows-latest
+ - windows-latest
# Special matrix job to report coverage only once
include:
@@ -40,7 +40,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest coverage coveralls
- if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
+ pip install -r requirements.txt
- name: Lint with flake8
run: |
@@ -58,7 +58,7 @@ jobs:
if: ${{ matrix.report-coverage }}
release:
- runs-on: [ ubuntu-latest ]
+ runs-on: ubuntu-latest
needs: test
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/beta'
steps:
diff --git a/.gitignore b/.gitignore
index 3c05368..6022105 100755
--- a/.gitignore
+++ b/.gitignore
@@ -13,4 +13,4 @@
src/logs/
# Config file is generated on run
-src/config/config.json
+src/config/config.*
diff --git a/API.py b/API.py
index 8119b00..30b7105 100644
--- a/API.py
+++ b/API.py
@@ -2,6 +2,9 @@
# probability-code API #
###########################################################
+from typing import Union
+from pathlib import Path
+
from src.api.backdoor_paths import api_backdoor_paths
from src.api.deconfounding_sets import api_deconfounding_sets
from src.api.joint_distribution_table import api_joint_distribution_table
@@ -44,7 +47,7 @@ def __init__(self, model: dict or None, print_detail=False, print_result=False,
# API Modifications #
################################################################
- def load_model(self, data: dict):
+ def load_model(self, data: Union[str, dict, Path]):
"""
Load a model into the API.
@param data: A dictionary conforming to the required causal model specification to be loaded
@@ -104,7 +107,7 @@ def p(self, y: set, x: set) -> float:
return result
- def joint_distribution_table(self) -> list:
+ def joint_distribution_table(self) -> ConditionalProbabilityTable:
"""
Compute a joint distribution table across the entire model loaded.
@return: A list of tuples, (Outcomes, P), where Outcomes is a unique set of Outcome objects for the model, and
@@ -114,12 +117,8 @@ def joint_distribution_table(self) -> list:
if self._print_result:
keys = sorted(self._cg.variables.keys())
- rows = [[",".join(map(str, outcomes)), [], p] for outcomes, p in result]
- rows.append(["Total:", [], sum(map(lambda r: r[1], result))])
- cpt = ConditionalProbabilityTable(Variable(",".join(keys), [], []), [], rows)
-
self._output.result(f"Joint Distribution Table for: {','.join(keys)}")
- self._output.result(f"{cpt}")
+ self._output.result(f"{result}")
return result
diff --git a/README.md b/README.md
index c4cec7f..24e7769 100755
--- a/README.md
+++ b/README.md
@@ -1,13 +1,13 @@
probability-code
A Python implementation of the do-calculus of Judea Pearl et. al.
-
+
-
-
+
+
-
+
diff --git a/doc/Configuration.md b/doc/Configuration.md
new file mode 100644
index 0000000..e99f115
--- /dev/null
+++ b/doc/Configuration.md
@@ -0,0 +1,53 @@
+# Configuration File Settings
+
+Settings for the project are stored in ``src/config/config.yml``.
+- **Note**: This file will be created if it does not exist, when the project is run.
+
+## Output Control
+
+Control what information is output; the computational steps of queries or regression tests, on launch, whether to minimize acceptable sets Z in backdoor paths.
+
+#### Output Levels of Precision
+
+How many digits of precision to output a result to.
+
+| Setting Name | Options | Default Value |
+|:-:|:-:|:-:|
+| ``output_levels_of_precision`` | any positive integer | 5 |
+
+#### Minimize Backdoor Sets
+
+If enabled, when sets X and Y are given, and all feasible sets Z to ensure causal independence are created, only minimal sets will be shown.
+
+| Setting Name | Options | Default Value |
+|:-:|:-:|:-:|
+| ``minimize_backdoor_sets`` | [True, False] | True |
+
+## Accuracy / Formatting / Precision Rules
+
+Regards settings on the accuracy/settings of regression tests, computation caching, and noisein function evaluations.
+
+#### Cache Computation Results
+
+If enabled, any time a specific query is computed, its results will be cached; if the same query is required in any subsequent queries, its cached result will be reused instead of computing the same result from scratch. This can yield a large performance increase in larger causal graphs.
+
+| Setting Name | Options | Default Value |
+|:-:|:-:|:-:|
+| ``cache_computation_results`` | [True, False] | True |
+
+#### Topological Sort Variables
+
+If enabled, to avoid Bayes rule as much as possible, the head and body of queries can be topologically sorted.
+
+| Setting Name | Options | Default Value |
+|:-:|:-:|:-:|
+| ``topological_sort_variables`` | [True, False] | True |
+
+#### Regression Test Result Precision
+
+In a regression test (see: ``Regression Tests``) where an 'expected value' is provided, this is how many digits of precision the computed value must meet within. Higher requires more accuracy, but also a longer/more detailed hand-computed 'expected result'.
+
+| Setting Name | Options | Default Value |
+|:-:|:-:|:-:|
+| ``regression_levels_of_precision`` | any positive integer | 5 |
+
diff --git a/main.py b/main.py
index 21b7d7c..3f8252c 100755
--- a/main.py
+++ b/main.py
@@ -9,50 +9,7 @@
# #
#########################################################
-# Main libraries can always be loaded
-import os
-from sys import argv
-
from src.REPL import run_repl
-from src.validation.backdoors.backdoor_path_tests import backdoor_tests
-from src.validation.inference.inference_tests import inference_tests
-from test_driver import graph_location
-
-# TODO - Examine if necessary after re-works; should always set cwd to root of file itself
-os.chdir(os.path.dirname(os.path.abspath(__file__)))
-
-#######################################
-# Parse Input #
-#######################################
-
-# TODO - cleaner way of integrating tests with workflow
-
-if len(argv) > 1 and argv[1].lower() == "inference":
- inference_bool, inference_msg = inference_tests(graph_location)
- assert inference_bool, f"Inference module has failed: {inference_msg}"
- exit(0)
-
-if len(argv) > 1 and argv[1].lower() == "backdoor":
- backdoor_bool, backdoor_msg = backdoor_tests(graph_location)
- assert backdoor_bool, f"Backdoor module has failed: {backdoor_msg}"
- exit(0)
-
-run_debug = len(argv) >= 2 and argv[1].lower() == "debug"
-
-#######################################
-# Test Software (if specified) #
-#######################################
-
-if run_debug:
- from test_driver import run_all_tests
- from src.validation.test_util import print_test_result
-
- index = argv.index("debug")
- extreme = len(argv) > index+1 and argv[index+1].lower() == "extreme"
-
- # Boolean result returned: True if all tests are successful, False otherwise
- success = run_all_tests(extreme)
- print_test_result(success, "[All Tests Passed]" if success else "[Some Errors Occurred]")
#######################################
# REPL #
diff --git a/setup.cfg b/setup.cfg
index a611e4c..1862601 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -13,6 +13,7 @@ source = src/
omit =
src/REPL.py
src/util/OutputLogger.py
+ src/graphs/dataset_generator
[coverage:report]
exclude_lines =
diff --git a/src/REPL.py b/src/REPL.py
index f1dfd08..c87b66d 100644
--- a/src/REPL.py
+++ b/src/REPL.py
@@ -1,5 +1,6 @@
from yaml import safe_load as load
from os import path, listdir
+from pathlib import Path
from src.api.backdoor_paths import api_backdoor_paths_parse
from src.api.deconfounding_sets import api_deconfounding_sets_parse
@@ -10,7 +11,9 @@
# TODO - Change graph_location to allow a specific graph to be given and loaded, or specify a user directory without
# there being path issues depending on the working directory
-def run_repl(graph_location="src/graphs/full"):
+
+
+def run_repl(graph_location=Path(".", "src", "graphs", "full")):
"""
Run an interactive IO prompt allowing full use of the causality software.
@param graph_location: A string of the path from the working directory to a directory of graphs
@@ -66,18 +69,20 @@ def skip(*args, **kwargs):
# List all possible graphs (ignores the generated models used for debugging / testing)
if f in list_options:
- assert path.isdir(graph_location), \
- "The specified directory for causal graph models {} does not exist!".format(graph_location)
- print("Options", "\n- ".join(filter(lambda g: g.endswith(".yml"), sorted(listdir(graph_location)))))
+ assert graph_location.is_dir(), \
+ "The specified directory for causal graph models {} does not exist!".format(graph_location.name)
+
+ files = filter(lambda g: g.suffix.lower() == ".yml", sorted(graph_location.iterdir()))
+ print("Options", *list(map(lambda file: file.stem, files)), sep="\n- ")
continue
# Parse and load a model into the API
if f in load_options:
s = arg + (".yml" if not arg.endswith(".yml") else "")
- assert path.isfile(full_path := graph_location + "/" + s), \
+ assert (full_path := graph_location / s).is_file(), \
"File: {} does not exist!".format(s)
- with open(full_path) as f:
+ with full_path.open("r") as f:
api.load_model(load(f))
continue
diff --git a/src/api/backdoor_paths.py b/src/api/backdoor_paths.py
index b96e1ec..127ef7d 100644
--- a/src/api/backdoor_paths.py
+++ b/src/api/backdoor_paths.py
@@ -11,7 +11,7 @@ def api_backdoor_paths_parse(query: str) -> (set, set):
of the arrow, and the third as all vertices are the right of the bar, respectively.
"""
def clean(x):
- return set(map(lambda y: y.strip(), x.strip().split(" ")))
+ return set(map(lambda y: y.strip(), x.strip().split(",")))
l, r = query.split("->")
@@ -19,7 +19,7 @@ def clean(x):
s = r.split("|")
r, dcf = clean(s[0]), clean(s[1])
else:
- r, dcf = clean(r), {}
+ r, dcf = clean(r), set()
return {
"src": clean(l),
diff --git a/src/api/deconfounding_sets.py b/src/api/deconfounding_sets.py
index 6c38a7c..b0f82c8 100644
--- a/src/api/deconfounding_sets.py
+++ b/src/api/deconfounding_sets.py
@@ -9,7 +9,7 @@ def api_deconfounding_sets_parse(query: str) -> (set, set):
right sides of the arrow, respectively.
"""
def clean(x):
- return set(map(lambda y: y.strip(), x.strip().split(" ")))
+ return set(map(lambda y: y.strip(), x.strip().split(",")))
src, dst = map(clean, query.split("->"))
diff --git a/src/api/joint_distribution_table.py b/src/api/joint_distribution_table.py
index 2668aa9..535cbc9 100644
--- a/src/api/joint_distribution_table.py
+++ b/src/api/joint_distribution_table.py
@@ -1,9 +1,10 @@
from itertools import product
from src.probability.structures.CausalGraph import CausalGraph
-from src.probability.structures.VariableStructures import Outcome
+from src.probability.structures.ConditionalProbabilityTable import ConditionalProbabilityTable
+from src.probability.structures.VariableStructures import Outcome, Variable
-def api_joint_distribution_table(cg: CausalGraph) -> list:
+def api_joint_distribution_table(cg: CausalGraph) -> ConditionalProbabilityTable:
"""
Compute and return a joint distribution table for the given model.
@param cg: A CausalGraph to compute the JDT for.
@@ -17,4 +18,9 @@ def api_joint_distribution_table(cg: CausalGraph) -> list:
outcomes = {Outcome(x, cross[i]) for i, x in enumerate(sorted_keys)}
results.append((outcomes, cg.probability_query(outcomes, set())))
- return results
+ keys = sorted(cg.variables.keys())
+ rows = [[",".join(map(str, outcomes)), [], p] for outcomes, p in results]
+ rows.append(["Total:", [], sum(map(lambda r: r[1], results))])
+ cpt = ConditionalProbabilityTable(Variable(",".join(keys), [], []), [], rows)
+
+ return cpt
diff --git a/src/config/config_manager.py b/src/config/config_manager.py
index cdb32fb..ce57552 100755
--- a/src/config/config_manager.py
+++ b/src/config/config_manager.py
@@ -1,245 +1,29 @@
-#########################################################
-# #
-# config manager #
-# #
-# Author: Braden Dubois (braden.dubois@usask.ca) #
-# Written for: Dr. Eric Neufeld #
-# #
-#########################################################
+from pathlib import Path
+from yaml import safe_load as load, dump
-import json # Settings data is stored in JSON
-import os # Used to create a directory/config file if not found
-import argparse # Allow command-line flag parsing
+from src.config.primary_configuration import *
-try:
- from src.config.primary_configuration import *
+path = Path(".", "src", "config", "config.yml")
-except ModuleNotFoundError:
- print("Uh-oh: Can't import some project modules. Try running this directly in PyCharm.")
- exit(-1)
-# Root of the project; fix any relative naming conflicts
-root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+def create_default():
-# Output some superfluous information only if we are directly running this file
-directly_run = __name__ == "__main__"
+ # This is the "defaults" configuration file, generated from the primary copy located
+ # in config/primary... Used to validate settings
+ d = dict()
+ for section in primary_config_file:
+ for parameter in section["parameters"]:
+ d[parameter["parameter"]] = parameter["default_value"]
-# Default configuration file directory and name
-config_dir = root + "/" + "config"
-config_file = "config.json"
+ return d
-# A dictionary to hold all the settings;
-# For CLI overrides, we abstract accessing parameters through "access", never direct indexing.
-loaded_settings: dict
-# Used such that configuration-file-specified settings can be overridden by a CLI flag
-cli_flag_overrides = dict()
+# No configuration file found - create one
+if not path.is_file():
-# This is the "defaults" configuration file, generated from the primary copy located
-# in config/primary... Used to validate settings
-lookup = dict()
-for section in primary_config_file:
- for parameter in section["parameters"]:
- p = parameter["parameter"]
- lookup[p] = {
- "default": parameter["default_value"],
- "options": parameter["options"]
- }
-
-
-def default_value(param: str):
- """
- Get the default setting for a given parameter
- :param param:
- :return: The default value/setting
- """
- return lookup[param]["default"]
-
-
-def is_valid_option(param: str) -> bool:
- """
- Determine whether a given parameter has a valid setting stored
- :param param: The key of the parameter
- :return: True if the option is valid, False otherwise
- """
- # Strings as "options" indicates a message rather than an actual value
- if isinstance(lookup[param]["options"], str):
- # Looking for any positive number
- if lookup[param]["options"] == "any positive integer":
- return isinstance(loaded_settings[param], int) and loaded_settings[param] > 0
-
- # Just looking for a path
- else:
- return True
-
- return isinstance(loaded_settings[param], type(default_value(param)))
-
-
-def generate_default_configuration_file() -> dict:
- """
- Generate and return a new, "fresh" configuration file
- :return: A dictionary representing a default configuration file
- """
- # Iterate through the primary copy and each "section", and each param in each section
- default_configuration_file = dict()
- for sec in primary_config_file:
- for param in sec["parameters"]:
- key = param["parameter"]
- default_configuration_file[key] = param["default_value"]
- return default_configuration_file
-
-
-def initialize_configuration_file():
- """
- Create a default/vanilla config file if it does not already exist
- """
-
- # The directory doesn't exist; make it
- if not os.path.isdir(config_dir):
- print("Default configuration directory not found...", end="")
- os.makedirs(config_dir)
- print("Created.")
- elif directly_run:
- print("Default configuration directory already exists.")
-
- # The file doesn't exist; make it
- if not os.path.isfile(config_dir + "/" + config_file):
- print("Default configuration file not found...", end="")
-
- # The default configuration file will be generated from the primary version
- with open(config_dir + "/" + config_file, "w") as f:
- json.dump(generate_default_configuration_file(), f, indent=4, sort_keys=True)
- print("Created.")
- elif directly_run:
- print("Default configuration file already exists.")
-
- load_configuration_file()
-
-
-def delete_configuration_file():
- """
- Delete the configuration file
- """
- if os.path.isfile(config_file):
- os.remove(config_dir + "/" + config_file)
- print("Configuration file deleted.")
- else:
- print("Couldn't find configuration file.")
-
-
-def repair_configuration_file():
- """
- Attempt to repair a configuration file if it an error is detected, such as a missing parameter, or invalid option.
- """
- # See if any settings have failed
- errors = False
-
- def set_default(reset_key):
- loaded_settings[reset_key] = lookup[reset_key]["default"]
-
- for key in lookup:
- if key not in loaded_settings:
- print("Missing configuration setting for:", key)
- set_default(key)
- errors = True
-
- if not is_valid_option(key):
- setting_is = str(loaded_settings[key]) + "|" + str(type(loaded_settings[key]))
- setting_should = str(lookup[key]["default"])
- print("Parameter:", key, "has unsupported option:", setting_is + "\nUsing default value:", setting_should)
- set_default(key)
- errors = True
-
- # Store the new version of the configuration file
- with open(config_dir + "/" + config_file, "w") as f:
- json.dump(loaded_settings, f, indent=4, sort_keys=True)
-
- # Reload if any errors
- if errors:
- print("Some errors were detected and repaired; reloading configuration file.")
- load_configuration_file()
- else:
- print("No errors found.")
-
-
-def load_configuration_file():
- """
- Load the configuration file from the stored JSON file
- """
- # Load the configuration file
- global loaded_settings
- with open(config_dir + "/" + config_file) as config:
- loaded_settings = json.load(config)
-
-
-def cli_arg_parser() -> argparse.Namespace:
- """
- Create a basic CLI flag parser to override the config file settings
- :return: an argparse.Namespace object, with flag values accessed as "parser.FLAG"
- """
-
- # TODO - More flags will be added
- arg_params = [
- {
- "flag": "-s",
- "help": "Silent computation: only show resulting probabilities.",
- "action": "store_true",
- "override_setting": "output_computation_results"
- },
- {
- "flag": "-c",
- "help": "Cache computation results; speeds up subsequent queries.",
- "action": "store_true",
- "override_setting": "cache_computation_results"
- }
- ]
-
- parser = argparse.ArgumentParser(description="Compute probabilities and resolve backdoor paths.")
-
- # Add each flag as listed above into the parser
- for param in arg_params:
- parser.add_argument(param["flag"], help=param["help"], action=param["action"])
-
- if "override_setting" in param:
- cli_flag_overrides["override_setting"] = param["flag"][1:]
-
- # Parse all given, constructing and returning a Namespace object
- return parser.parse_args()
-
-
-# Always initialize/load the configuration load
-initialize_configuration_file()
-
-# Create parser for CLI flags to override config settings
-# parsed_args = cli_arg_parser()
-
-
-def access(param: str) -> any:
- """
- Access a configuration-file setting, if it exists, or has a CLI flag given as an override.
- :param param: The string key for the setting
- :return: The specified value, first checking CLI flags, then config file.
- """
-
- # Quick Check; if the param specified isn't found, maybe the config file is outdated
- if param not in loaded_settings:
- print("\nConfiguration Lookup Error;\nCouldn't find parameter: " + param + "\n" +
- "Re-generating configuration file...")
- repair_configuration_file()
- initialize_configuration_file()
-
- # See if the configuration file has an invalid setting for this, and repair if so
- if not is_valid_option(param):
- print("Error on key:", param)
- print("Repairing configuration file.")
- repair_configuration_file()
-
- # A default has been specified in the configuration file
- value = loaded_settings[param]
-
- # A CLI flag has been provided to override the config file
- # if param in cli_flag_overrides:
- # value = parsed_args.param
-
- return value
+ with path.open("w") as f:
+ dump(create_default(), f, indent=4, sort_keys=True)
+# Load the settings file
+with path.open("r") as config:
+ settings_yml = load(config)
diff --git a/src/config/generate_config_docs.py b/src/config/generate_config_docs.py
index 71b1825..c33d86b 100755
--- a/src/config/generate_config_docs.py
+++ b/src/config/generate_config_docs.py
@@ -1,40 +1,25 @@
#!/usr/bin/env python
-#########################################################
-# #
-# Generate Configuration Documentation #
-# #
-# Author: Braden Dubois (braden.dubois@usask.ca) #
-# Written for: Dr. Eric Neufeld #
-# #
-#########################################################
-
# Run this file directly to update documentation on configuration files
-# PyCharm might warn of primary_configuration and primary_config_file not being defined / resolved, but that is okay;
-# it wants it prefaced with config. since the root of the project requires this from that cwd, but when this file is
-# directly run, it wouldn't make sense to include config., since primary_configuration is in the *same* directory as
-# this file.
+from pathlib import Path
-import os
from primary_configuration import *
-root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-documentation_file = root + "/../doc/configuration.md"
+documentation_file = Path(".", "doc", "Configuration.md")
def generate_configuration_documentation():
"""
Generates the markdown file for configuration file doc
"""
- # Delete it if it exists; making a new one
- if os.path.isfile(documentation_file):
- os.remove(documentation_file)
- with open(documentation_file, "w") as f:
+ with documentation_file.open("w") as f:
# Title of the file
f.write("# Configuration File Settings\n\n")
+ f.write("Settings for the project are stored in ``src/config/config.yml``.\n")
+ f.write("- **Note**: This file will be created if it does not exist, when the project is run.\n\n")
# The master file is structured as a list of sections
for category in primary_config_file:
@@ -50,7 +35,9 @@ def generate_configuration_documentation():
# This is the header/markdown required for a table
f.write("| Setting Name | Options | Default Value |\n|:-:|:-:|:-:|\n")
- f.write("| ``" + parameter["parameter"] + "`` | " + str(parameter["options"]) + " | " + str(parameter["default_value"]) + " |\n\n")
+ f.write("| ``" + parameter["parameter"] + "`` | " + str(parameter["options"]))
+ f.write(" | " + str(parameter["default_value"]) + " |\n\n")
+
if __name__ == "__main__":
generate_configuration_documentation()
diff --git a/src/config/primary_configuration.py b/src/config/primary_configuration.py
index 12eb4b1..0d95feb 100755
--- a/src/config/primary_configuration.py
+++ b/src/config/primary_configuration.py
@@ -12,60 +12,10 @@
primary_config_file = [
{
- "section": "Regression Tests",
- "description":
- "This section controls the regression testing suite, available to be run at launch, validating " +
- "the software before running.\n\nFor information on *creating* test files for the regression suite, see " +
- "``Regression Tests``.",
- "parameters": [{
- "parameter_title": "Run Regression Tests on Launch",
- "description": "Control whether or not to have the regression suite run on launch.",
- "parameter": "run_regression_tests_on_launch",
- "default_value": True,
- "options": [True, False]
- }, {
- "parameter_title": "Output Regression Results",
- "description":
- "If regression tests are enabled, control whether or not to output the results of the tests. Results " +
- "are of the form (success_boolean, success_message).\n\n``always`` and ``never`` are self-explanatory; " +
- "``failure`` will only print the results if there are errors.",
- "parameter": "output_regression_results",
- "default_value": "always",
- "options": ["always", "failure", "never"],
- }, {
- "parameter_title": "Exit if Regression Failure",
- "description": "If regression tests are enabled and any test fails, control whether to exit the software " +
- "or launch anyway. Useful if test results are doubtful or features on unfinished.",
- "parameter": "exit_if_regression_failure",
- "default_value": False,
- "options": [True, False]
- }]
- }, {
"section": "Output Control",
"description": "Control what information is output; the computational steps of queries or regression tests, " +
"on launch, whether to minimize acceptable sets Z in backdoor paths.",
"parameters": [{
- "parameter_title": "Output Computation Steps",
- "description": "If enabled, each step of a query will be output to the console. This will show the " +
- "step-by-step application of each rule, and for larger queries, can be quite large.",
- "parameter": "output_computation_steps",
- "default_value": False,
- "options": [True, False]
- }, {
- "parameter_title": "Output Regression Step Computation",
- "description": "If enabled, shows all steps involved in regression tests; similar to the above, output " +
- "can become very long.",
- "parameter": "output_regression_test_computation",
- "default_value": False,
- "options": [True, False]
- }, {
- "parameter_title": "Print Causal Graph Info on Instantiation",
- "description": "If enabled, when a Causal Graph is loaded from a file, information on each variable in " +
- "the Causal Graph will be output.",
- "parameter": "print_cg_info_on_instantiation",
- "default_value": True,
- "options": [True, False]
- }, {
"parameter_title": "Output Levels of Precision",
"description": "How many digits of precision to output a result to.",
"parameter": "output_levels_of_precision",
@@ -78,80 +28,6 @@
"parameter": "minimize_backdoor_sets",
"default_value": True,
"options": [True, False]
- }, {
- "parameter_title": "Choosing Z Set in do-calculus",
- "description": "In the do-calculus of p(Y | do(X)), multiple possible sets Z may serve as a " +
- "deconfounding set; control how the set Z is chosen. Either ``ask`` the user to select " +
- "one, or choose one at ``random``, or run the query multiple times, using every possible " +
- "set, ensuring only one answer is ever computed. The last option is useful in debugging.",
- "parameter": "z_selection_preference",
- "default_value": "all",
- "options": ["ask", "random", "all"]
- }]
- }, {
- "section": "File Directories",
- "description": "Here are directories specified in which to *search for/locate* files.",
- "parameters": [{
- "parameter_title": "Graph File Folder",
- "description": "A specific directory in which multiple graph files can be placed; they will be listed on " +
- "launch, allowing the user to choose which one to load. For information on graph files, " +
- "see ``Causal Graph Files``.",
- "parameter": "graph_file_folder",
- "default_value": "graphs/full",
- "options": "any valid path in the project"
- }, {
- "parameter_title": "Regression Test Directory",
- "description": "A specific directory in which each regression test file can be placed; all test files in " +
- "this directory will be automatically run if regression tests are enabled. For information " +
- "on regression test files, see ``Regression Tests``.",
- "parameter": "regression_directory",
- "default_value": "tests/test_files",
- "options": "any valid path in the project"
- }]
- }, {
- "section": "Logging Rules / Directories",
- "description":
- "Here are rules regarding whether or not to log computation queries and/or regression test results, and " +
- "if so, where to log said files.\n\n**Warning**: As a general rule, large causal graphs can result in " +
- "exceptionally large log files, and it is not recommended to log said queries; they will likely be too " +
- "long to be human-readable, a file size too large for stable text file reading, and the process of " +
- "writing all the information to said file will have a noticeable affect on performance.",
- "parameters": [{
- "parameter_title": "Log Computation",
- "description": "If enabled, queries will be logged to a file with a name conforming to the query. The " +
- "file location is determined by ``logging_directory``.",
- "parameter": "log_computation",
- "default_value": True,
- "options": [True, False]
- }, {
- "parameter_title": "Log All Regression Computation",
- "description": "If enabled, when regression tests are run on launch, all computation involved will be " +
- "written to a file named by the date and time the test is run. The location of the file " +
- "will be the directory ``regression_log_subdirectory``, which is itself a subdirectory of " +
- "``logging_directory``.",
- "parameter": "log_all_regression_computation",
- "default_value": False,
- "options": [True, False]
- }, {
- "parameter_title": "Logging Directory",
- "description": "The directory in which queries or regression tests will be logged, if they are enabled.",
- "parameter": "logging_directory",
- "default_value": "logs",
- "options": "any valid path in the project"
- }, {
- "parameter_title": "Regression Log Subdirectory",
- "description": "The subdirectory of ``logging_directory`` in which regression tests will be logged, if " +
- "enabled.",
- "parameter": "regression_log_subdirectory",
- "default_value": "regression",
- "options": "any valid path name"
- }, {
- "parameter_title": "Update from Github on Launch",
- "description": "If enabled, the project will attempt to pull from Github, and effectively update itself, " +
- "on launch - it probably won't even need to be restarted if there is an update.",
- "parameter": "github_pull_on_launch",
- "default_value": True,
- "options": [True, False]
}]
}, {
"section": "Accuracy / Formatting / Precision Rules",
@@ -173,13 +49,6 @@
"parameter": "topological_sort_variables",
"default_value": True,
"options": [True, False]
- }, {
- "parameter_title": "Default Regression Test Repetition",
- "description": "In *deterministic* regression tests (see: ``Regression Tests``), " +
- "this value specifies how many times to repeat a test.",
- "parameter": "default_regression_repetition",
- "default_value": 10,
- "options": "any positive integer"
}, {
"parameter_title": "Regression Test Result Precision",
"description": "In a regression test (see: ``Regression Tests``) where an 'expected " +
@@ -189,23 +58,6 @@
"parameter": "regression_levels_of_precision",
"default_value": 5,
"options": "any positive integer"
- }, {
- "parameter_title": "Apply Function Noise",
- "description": "In evaluating the value of variable where a function is provided rather than a table " +
- "(see: ``Causal Graph Files``), this will control whether the 'noise functions' provided " +
- "will be applied.",
- "parameter": "apply_any_noise",
- "default_value": True,
- "options": [True, False]
- }, {
- "parameter_title": "Recursive Noise Propagation",
- "description": "If ``apply_any_noise`` is enabled, this parameter will control whether any nested " +
- "evaluation functions will be subject to noise, or just the primary/first function. For " +
- "example, 'val(C) = val(B) + 1'. If enabled, val(B) is subject to noise. If disabled, " +
- "only val(C).",
- "parameter": "recursive_noise_propagation",
- "default_value": True,
- "options": [True, False]
}]
}
]
diff --git a/src/config/settings.py b/src/config/settings.py
new file mode 100644
index 0000000..20c135f
--- /dev/null
+++ b/src/config/settings.py
@@ -0,0 +1,10 @@
+from src.config.config_manager import settings_yml
+
+
+class Settings:
+
+ cache_computation_results = settings_yml["cache_computation_results"]
+ minimize_backdoor_sets = settings_yml["minimize_backdoor_sets"]
+ output_levels_of_precision = settings_yml["output_levels_of_precision"]
+ regression_levels_of_precision = settings_yml["regression_levels_of_precision"]
+ topological_sort_variables = settings_yml["topological_sort_variables"]
diff --git a/src/graphs/dataset_generator/distribution_generation.py b/src/graphs/dataset_generator/distribution_generation.py
index e696314..4518eb4 100755
--- a/src/graphs/dataset_generator/distribution_generation.py
+++ b/src/graphs/dataset_generator/distribution_generation.py
@@ -30,12 +30,8 @@ def generate_distribution(graph: Graph):
parent_list = sorted(list(graph.parents(v)))
cur = {
- "name": v,
"outcomes": outcome_list,
"parents": parent_list,
- "determination": {
- "type": "table"
- }
}
variables[v] = cur
@@ -52,8 +48,8 @@ def generate_distribution(graph: Graph):
nums = sum_to(len(outcomes), 10000)
for i, outcome in enumerate(outcomes):
- distribution.append([outcome, list(cross), nums[i] / 10000])
+ distribution.append([outcome, *list(cross), nums[i] / 10000])
- variables[v]["determination"]["table"] = distribution
+ variables[v]["table"] = distribution
return variables
diff --git a/src/graphs/dataset_generator/model_generator.py b/src/graphs/dataset_generator/model_generator.py
index c6e1c85..eba211c 100755
--- a/src/graphs/dataset_generator/model_generator.py
+++ b/src/graphs/dataset_generator/model_generator.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python
-from sys import argv
-from os import path, listdir
from json import dump
+from pathlib import Path
from random import randrange
+from sys import argv
from src.probability.structures.CausalGraph import CausalGraph
from src.validation.inference.inference_tests import model_inference_validation
@@ -26,9 +26,9 @@
except ValueError:
print("Could not convert", argv[1], "to int; defaulting to", N)
-destination_directory = argv[2]
+destination_directory = Path(".", argv[2])
-if not path.isdir(destination_directory):
+if not destination_directory.is_dir():
print("Cannot resolve", destination_directory)
exit()
@@ -43,14 +43,14 @@
g = generate_graph(num_vertices, max_path_length, num_edges)
distribution = generate_distribution(g)
- cg = CausalGraph(**parse_model({"variables": list(distribution.values())}))
+ cg = CausalGraph(**parse_model({"model": list(distribution.values())}))
success, message = model_inference_validation(cg)
if success:
- l = len(listdir(destination_directory)) // 2 + 1
+ l = len(list(destination_directory.iterdir())) // 2 + 1
- with open("{}/m{}.json".format(destination_directory, l), "w") as f:
+ with (destination_directory / f"m{l}").open("w") as f:
dump({
"name": "m" + str(l),
"variables": list(distribution.values()),
@@ -61,7 +61,7 @@
for v in latent_variables:
distribution[v]["latent"] = True
- with open("{}/m{}_L.json".format(destination_directory, l), "w") as f:
+ with (destination_directory / f"m{l}_L").open("w") as f:
dump({
"name": "m" + str(l) + "_L",
"variables": list(distribution.values()),
diff --git a/src/graphs/full/test.json b/src/graphs/full/test.json
new file mode 100644
index 0000000..9cacb8e
--- /dev/null
+++ b/src/graphs/full/test.json
@@ -0,0 +1,25 @@
+{
+ "name": "Two-Variable Test",
+ "model": {
+
+ "Y": {
+ "latent": true,
+ "outcomes": ["y", "~y"],
+ "table": [
+ ["y", 0.6],
+ ["~y", 0.4]
+ ]
+ },
+
+ "X":{
+ "outcomes": ["x", "~x"],
+ "parents": ["Y"],
+ "table": [
+ ["x", "y", 0.7],
+ ["x", "~y", 0.2],
+ ["~x", "y", 0.3],
+ ["~x", "~y", 0.8]
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/probability/structures/BackdoorController.py b/src/probability/structures/BackdoorController.py
index 33a9bd0..bca0967 100755
--- a/src/probability/structures/BackdoorController.py
+++ b/src/probability/structures/BackdoorController.py
@@ -9,7 +9,7 @@
from itertools import product
-from src.config.config_manager import access
+from src.config.settings import Settings
from src.probability.structures.Graph import Graph
from src.util.helpers import minimal_sets
from src.util.helpers import power_set
@@ -111,7 +111,7 @@ def get_backdoor_paths(cur: str, path: list, path_list: list, previous="up") ->
backdoor_paths = get_backdoor_paths(s, [], [])
# Filter out the paths that don't "enter" x; see the definition of a backdoor path
- return list(filter(lambda l: l[0] in self.graph.children(l[1]), backdoor_paths))
+ return list(filter(lambda l: l[0] in self.graph.children(l[1]) and l[1] != t, backdoor_paths))
def all_dcf_sets(self, src: set, dst: set) -> list:
"""
@@ -149,8 +149,7 @@ def all_dcf_sets(self, src: set, dst: set) -> list:
valid_deconfounding_sets.append(tentative_dcf)
# Minimize the sets, if enabled
- # TODO - Revisit configuration detail implementation
- if access("minimize_backdoor_sets"):
+ if Settings.minimize_backdoor_sets:
valid_deconfounding_sets = minimal_sets(*valid_deconfounding_sets)
return list(valid_deconfounding_sets)
diff --git a/src/probability/structures/CausalGraph.py b/src/probability/structures/CausalGraph.py
index 8d0920e..d01e9b7 100755
--- a/src/probability/structures/CausalGraph.py
+++ b/src/probability/structures/CausalGraph.py
@@ -9,7 +9,7 @@
from itertools import product
-from src.config.config_manager import access
+from src.config.settings import Settings
from src.probability.structures.BackdoorController import BackdoorController
from src.probability.structures.Graph import Graph
@@ -111,7 +111,7 @@ def strings(s: set):
# If results do NOT match; error
assert abs(result-probability) < 0.00000001, f"Error: Distinct results: {probability} vs {result}"
- msg = "{0} = {1:.{precision}f}".format(str_rep, probability, precision=access("output_levels_of_precision") + 1)
+ msg = "{0} = {1:.{precision}f}".format(str_rep, probability, precision=Settings.output_levels_of_precision + 1)
self.output.detail(msg)
self.graph.reset_disabled()
return probability
diff --git a/src/probability/structures/ConditionalProbabilityTable.py b/src/probability/structures/ConditionalProbabilityTable.py
index cb6a37e..fe89206 100755
--- a/src/probability/structures/ConditionalProbabilityTable.py
+++ b/src/probability/structures/ConditionalProbabilityTable.py
@@ -6,10 +6,10 @@
# #
#########################################################
-import numpy as np # Used in table->str formatting
-from math import floor, ceil # Used in table->str formatting
+from numpy import empty
+from math import floor, ceil
-from src.config.config_manager import access
+from src.config.settings import Settings
from src.probability.structures.VariableStructures import Variable, Outcome, Intervention
from src.util.ProbabilityExceptions import MissingTableRow
@@ -51,7 +51,7 @@ def __str__(self) -> str:
columns = 1 + len(self.given) + 1
# dtype declaration is better than "str", as str only allows one character in each cell
- table = np.empty((rows, columns), dtype=' str:
table[i+1][1+given_idx] = row[1][given_idx].outcome
# The probability, to some modifiable number of digits
- table[i+1][table.shape[1]-1] = "{0:.{prec}f}".format(row[2], prec=access("output_levels_of_precision"))
+ table[i+1][table.shape[1]-1] = "{0:.{prec}f}".format(row[2], prec=Settings.output_levels_of_precision)
# Wiggle/Padding, column by column
for column_index in range(1 + len(self.given) + 1):
diff --git a/src/probability/structures/Graph.py b/src/probability/structures/Graph.py
index e390b51..fa95f30 100755
--- a/src/probability/structures/Graph.py
+++ b/src/probability/structures/Graph.py
@@ -11,10 +11,12 @@
# We can isolate more generalized graph code here, as well as create a better way to "erase" incoming or outgoing
# edges, but only temporarily; this will improve "reach", "parents", etc.
+from typing import Union
+
from src.probability.structures.VariableStructures import *
# These functions should work with any sort of Variable type, or the name itself
-CG_Types = str or Variable or Outcome or Intervention
+CG_Types = Union[str, Variable, Outcome, Intervention]
class Graph:
diff --git a/src/probability/structures/Probability_Engine.py b/src/probability/structures/Probability_Engine.py
index c63ac52..440bd6e 100755
--- a/src/probability/structures/Probability_Engine.py
+++ b/src/probability/structures/Probability_Engine.py
@@ -9,7 +9,7 @@
from itertools import product
-from src.config.config_manager import access
+from src.config.settings import Settings
from src.probability.structures.Graph import Graph
from src.probability.structures.VariableStructures import Outcome, Intervention
@@ -75,7 +75,7 @@ def _compute(self, head: list, body: list, depth=0) -> float:
###############################################
# Sort the head and body if enabled
- if access("topological_sort_variables"):
+ if Settings.topological_sort_variables:
head, body = self.graph.descendant_first_sort(head), self.graph.descendant_first_sort(body)
# Create a string representation of this query, and see if it's been done / in-progress / contradictory
@@ -165,7 +165,7 @@ def _compute(self, head: list, body: list, depth=0) -> float:
result_1 = self._compute(child, head + new_body, depth+1)
result_2 = self._compute(head, new_body, depth+1)
result_3 = self._compute(child, new_body, depth+1)
- if result_3 == 0: # Avoid dividing by 0!
+ if result_3 == 0: # Avoid dividing by 0! coverage: skip
self.output.detail(f"{str_3} = 0, therefore the result is 0.", x=depth)
return 0
@@ -190,33 +190,32 @@ def _compute(self, head: list, body: list, depth=0) -> float:
if missing_parents:
self.output.detail("Attempting application of Jeffrey's Rule", x=depth)
- # Try an approach beginning with each missing parent
- for missing_parent in missing_parents:
+ for missing_parent in missing_parents:
- try:
- # Add one parent back in and recurse
- parent_outcomes = self.outcomes[missing_parent]
+ try:
+ # Add one parent back in and recurse
+ parent_outcomes = self.outcomes[missing_parent]
- # Consider the missing parent and sum every probability involving it
- total = 0.0
- for parent_outcome in parent_outcomes:
+ # Consider the missing parent and sum every probability involving it
+ total = 0.0
+ for parent_outcome in parent_outcomes:
- as_outcome = Outcome(missing_parent, parent_outcome)
+ as_outcome = Outcome(missing_parent, parent_outcome)
- self.output.detail(p_str(head, [as_outcome] + body), "*", p_str([as_outcome], body), x=depth)
+ self.output.detail(p_str(head, [as_outcome] + body), "*", p_str([as_outcome], body), x=depth)
- result_1 = self._compute(head, [as_outcome] + body, depth+1)
- result_2 = self._compute([as_outcome], body, depth+1)
- outcome_result = result_1 * result_2
+ result_1 = self._compute(head, [as_outcome] + body, depth+1)
+ result_2 = self._compute([as_outcome], body, depth+1)
+ outcome_result = result_1 * result_2
- total += outcome_result
+ total += outcome_result
- self.output.detail(rep, "=", total, x=depth)
- self._store_computation(rep, total)
- return total
+ self.output.detail(rep, "=", total, x=depth)
+ self._store_computation(rep, total)
+ return total
- except ProbabilityException: # coverage: skip
- self.output.detail("Failed to resolve by Jeffrey's Rule", x=depth)
+ except ProbabilityException: # coverage: skip
+ self.output.detail("Failed to resolve by Jeffrey's Rule", x=depth)
###############################################
# Interventions / do(X) #
@@ -261,7 +260,7 @@ def _store_computation(self, string_representation: str, result: float):
@param result: The actual float value to store
"""
# Ensure the configuration file is specified to allow caching
- if access("cache_computation_results"):
+ if Settings.cache_computation_results:
# Not stored yet - store it
if string_representation not in self._stored_computations:
diff --git a/src/util/ModelLoader.py b/src/util/ModelLoader.py
index 2f729fe..180b912 100755
--- a/src/util/ModelLoader.py
+++ b/src/util/ModelLoader.py
@@ -1,5 +1,6 @@
from json import load as json_load
-from os import path
+from pathlib import Path
+from typing import Union
from yaml import safe_load as yaml_load
from src.probability.structures.ConditionalProbabilityTable import ConditionalProbabilityTable
@@ -7,33 +8,42 @@
from src.probability.structures.VariableStructures import *
-def parse_model(file: dict or str):
+def parse_model(file: Union[dict, str, Path]):
"""
Parse a given model for use within the project, such as to create a CausalGraph
@param file: a string path to either a JSON or YML file containing a valid model, or a dictionary
containing a model
- @raises FileNotFoundError if a string is provided that does not lead to a file
- @raises Exception if a string given does not end in .yml, .yaml, or .json
+ @raise FileNotFoundError if a string is provided that does not lead to a file
+ @raise Exception if a string given does not end in .yml, .yaml, or .json
@return a dictionary of the parsed model, with keys "variables", "outcomes", "tables", "graph", "latent"
"""
- # str: path to a file
- if isinstance(file, str):
- if not path.isfile(file):
+ # str: path to a file, or Path
+ if not isinstance(file, dict):
+
+ if isinstance(file, Path):
+ p = file
+
+ else:
+ p = Path(file)
+
+ if not p.is_file():
print(f"ERROR: Can't find {file}")
raise FileNotFoundError
- if file.lower().endswith(".yml") or file.lower().endswith(".yaml"):
+ extension = p.suffix.lower()
+
+ if extension in [".yml", ".yaml"]:
loader = yaml_load
- elif file.lower().endswith(".json"):
+ elif extension == ".json":
loader = json_load
else:
- print(f"Unknown extension for file: {file}, needs to end with .yml, .yaml, or .json")
+ print(f"Unknown extension '{extension}' for file: {file}, needs to end with .yml, .yaml, or .json")
raise FileNotFoundError
- with open(file) as f:
+ with p.open("r") as f:
data = loader(f)
# dict: assume valid model
diff --git a/src/util/helpers.py b/src/util/helpers.py
index f082c11..0a41077 100644
--- a/src/util/helpers.py
+++ b/src/util/helpers.py
@@ -1,6 +1,8 @@
from itertools import chain, combinations
from typing import Iterator
+from src.config.settings import Settings
+
def power_set(variable_list: list or set, allow_empty_set=True) -> Iterator[any]:
"""
@@ -49,3 +51,13 @@ def p_str(lhs: list, rhs: list) -> str:
return f'P({", ".join(map(str, lhs))})'
return f'P({", ".join(map(str, lhs))} | {", ".join(map(str, rhs))})'
+
+
+def within_precision(a: float, b: float) -> bool:
+ """
+ Check whether two values differ by an amount less than some number of digits of precision
+ @param a: The first value
+ @param b: The second value
+ @return: True if the values are within the margin of error acceptable, False otherwise
+ """
+ return abs(a - b) < 1 / (10 ** Settings.regression_levels_of_precision)
diff --git a/src/validation/backdoors/backdoor_path_tests.py b/src/validation/backdoors/backdoor_path_tests.py
index a186568..73131a7 100644
--- a/src/validation/backdoors/backdoor_path_tests.py
+++ b/src/validation/backdoors/backdoor_path_tests.py
@@ -1,6 +1,7 @@
import itertools
from os import listdir
from os.path import dirname, abspath
+from pathlib import Path
from yaml import safe_load as load
from src.validation.test_util import print_test_result
@@ -8,7 +9,7 @@
from src.probability.structures.BackdoorController import BackdoorController
from src.util.ModelLoader import parse_model
-test_file_directory = dirname(abspath(__file__)) + "/test_files"
+test_file_directory = Path(dirname(abspath(__file__))) / "test_files"
def model_backdoor_validation(bc: BackdoorController, test_data: dict) -> (bool, str):
@@ -60,27 +61,25 @@ def model_backdoor_validation(bc: BackdoorController, test_data: dict) -> (bool,
return True, "Backdoor tests passed."
-def backdoor_tests(graph_location: str) -> (bool, str):
+def backdoor_tests(graph_location: Path) -> (bool, str):
"""
Run tests on models located in a given directory of graphs, verifying various backdoor paths in the models.
@param graph_location: a directory containing causal graph models in JSON
@return: True if all tests are successful, False otherwise
"""
- files = sorted(list(filter(lambda x: x.endswith(".yml"), listdir(test_file_directory))))
+ files = sorted(list(filter(lambda x: x.suffix.lower() == ".yml", test_file_directory.iterdir())))
assert len(files) > 0, f"Found no backdoor module tests"
all_successful = True
- # TODO - Threading ? Good for inference tests but shouldn't take too long here
-
for test_file in files:
- with open(f"{test_file_directory}/{test_file}") as f:
+ with test_file.open("r") as f:
yml_test_data = load(f)
graph_filename = yml_test_data["graph_filename"]
- with open(f"{graph_location}/{graph_filename}") as f:
+ with (graph_location / graph_filename).open("r") as f:
graph_data = load(f)
bc = BackdoorController(parse_model(graph_data)["graph"])
@@ -88,7 +87,7 @@ def backdoor_tests(graph_location: str) -> (bool, str):
success, msg = model_backdoor_validation(bc, yml_test_data)
print_test_result(success, msg if not success else f"All tests in {test_file}, {graph_filename} passed")
- if not success:
+ if not success: # coverage: skip
all_successful = False
return all_successful, "[Backdoor module passed]" if all_successful else "[Backdoor module encountered errors]"
diff --git a/src/validation/backdoors/test_files/xi_xj.yml b/src/validation/backdoors/test_files/xi_xj.yml
index 78c1e8b..afad7a2 100644
--- a/src/validation/backdoors/test_files/xi_xj.yml
+++ b/src/validation/backdoors/test_files/xi_xj.yml
@@ -26,6 +26,6 @@ tests:
expect: false
- type: independence
- src: [ Xi ]
+ src: [ X5 ]
dst: [ X2 ]
expect: false
diff --git a/src/validation/inference/inference_tests.py b/src/validation/inference/inference_tests.py
index 0b63103..2555e4f 100755
--- a/src/validation/inference/inference_tests.py
+++ b/src/validation/inference/inference_tests.py
@@ -1,25 +1,14 @@
-from yaml import safe_load as load
-from os import listdir
from os.path import dirname, abspath
+from pathlib import Path
+from yaml import safe_load as load
-from src.config.config_manager import access
from src.probability.structures.CausalGraph import CausalGraph, Outcome
-
-from src.util.ProbabilityExceptions import *
+from src.util.helpers import within_precision
from src.util.ModelLoader import parse_model, parse_outcomes_and_interventions
+from src.util.ProbabilityExceptions import *
from src.validation.test_util import print_test_result
-test_file_directory = dirname(abspath(__file__)) + "/test_files"
-
-
-def within_precision(a: float, b: float) -> bool:
- """
- Check whether two values differ by an amount less than some number of digits of precision
- @param a: The first value
- @param b: The second value
- @return: True if the values are within the margin of error acceptable, False otherwise
- """
- return abs(a - b) < 1 / (10 ** access("regression_levels_of_precision"))
+test_file_directory = Path(dirname(abspath(__file__))) / "test_files"
def model_inference_validation(cg: CausalGraph) -> (bool, str):
@@ -51,26 +40,24 @@ def model_inference_validation(cg: CausalGraph) -> (bool, str):
return True, "Basic tests passed."
-def inference_tests(graph_location: str) -> (bool, str):
+def inference_tests(graph_location: Path) -> (bool, str):
"""
Run tests on all models located in a given directory of graphs, verifying the probabilities in the model.
@param graph_location: A string path to a directory containing any number of causal graph JSON files
@return: True if all tests are successful, False otherwise, along with a string summary message.
"""
- model_files = sorted(list(filter(lambda x: x.endswith(".yml"), listdir(graph_location))))
- test_files = sorted(list(filter(lambda x: x.endswith(".yml"), listdir(test_file_directory))))
+ model_files = sorted(list(filter(lambda x: x.suffix.lower() == ".yml", graph_location.iterdir())))
+ test_files = sorted(list(filter(lambda x: x.suffix.lower() == ".yml", test_file_directory.iterdir())))
assert len(model_files) > 0, "Models not found"
assert len(test_files) > 0, "Inference test files not found"
all_successful = True
- # TODO - Threading to handle all the tests
-
for model in model_files:
- with open(graph_location + "/" + model) as f:
+ with model.open("r") as f:
yml_model = load(f)
parsed_model = parse_model(yml_model)
@@ -84,11 +71,11 @@ def inference_tests(graph_location: str) -> (bool, str):
for test_file in test_files:
- with open(f"{test_file_directory}/{test_file}") as f:
+ with test_file.open("r") as f:
yml_test_data = load(f)
graph_filename = yml_test_data["graph_filename"]
- with open(f"{graph_location}/{graph_filename}") as f:
+ with (graph_location / graph_filename).open("r") as f:
graph_data = load(f)
cg = CausalGraph(**parse_model(graph_data))
diff --git a/test_driver.py b/test_driver.py
index e77b837..29df5c3 100644
--- a/test_driver.py
+++ b/test_driver.py
@@ -1,3 +1,5 @@
+from math import prod
+from pathlib import Path
# api
from src.api.backdoor_paths import api_backdoor_paths, api_backdoor_paths_parse
@@ -5,12 +7,13 @@
from src.api.joint_distribution_table import api_joint_distribution_table
from src.api.probability_query import api_probability_query, api_probability_query_parse
+from src.probability.structures.BackdoorController import BackdoorController
from src.probability.structures.CausalGraph import CausalGraph
from src.probability.structures.ConditionalProbabilityTable import ConditionalProbabilityTable
from src.probability.structures.Graph import Graph, to_label
from src.probability.structures.VariableStructures import Outcome, Variable, Intervention
-from src.util.helpers import power_set, disjoint, minimal_sets
+from src.util.helpers import power_set, disjoint, minimal_sets, within_precision
from src.util.ModelLoader import parse_model
from src.validation.backdoors.backdoor_path_tests import backdoor_tests
@@ -19,30 +22,100 @@
from src.validation.test_util import print_test_result
-# TODO - use pathlib
-graph_location = "src/graphs/full"
-generated_location = "src/graphs/generated"
+
+# Use the Xi-Xj model of TBoW as a test
default_model_file = "pearl-3.4.yml"
+# Default location for the graphs made by hand
+graphs = Path(".", "src", "graphs", "full")
+
+# Path to the Xi-Xj model
+test_file = graphs / default_model_file
+
+
+cg = CausalGraph(**parse_model(test_file))
+graph = cg.graph
+bc = BackdoorController(graph)
+
+json_model = graphs / "test.json"
+
+
# api
def test_api_backdoor_paths():
- ...
+
+ blocked = ({"Xi"}, {"Xj"}, {"X4", "X2"})
+ unblocked = ({"Xi"}, {"Xj"}, set())
+
+ assert api_backdoor_paths_parse("Xi -> Xj") == {"src": {"Xi"}, "dst": {"Xj"}, "dcf": set()}
+ assert api_backdoor_paths_parse("Xi -> Xj | X4, X2") == {"src": {"Xi"}, "dst": {"Xj"}, "dcf": {"X4", "X2"}}
+
+ assert len(api_backdoor_paths(bc, *unblocked)) > 0
+ assert len(api_backdoor_paths(bc, *blocked)) == 0
+
+ assert api_backdoor_paths(bc, *unblocked) == bc.backdoor_paths(*unblocked)
+ assert api_backdoor_paths(bc, *blocked) == bc.backdoor_paths(*blocked)
def test_api_deconfounding_sets():
- ...
+
+ paths = ({"Xi"}, {"Xj"})
+ paths2 = ({"Xj"}, {"Xi"})
+ no_paths = ({"X1"}, {"Xj"})
+
+ unfixable = ({"Xi", "X4", "X2"}, {"Xj"})
+
+ assert api_deconfounding_sets_parse("Xi, X1 -> Xj") == {"src": {"Xi", "X1"}, "dst": {"Xj"}}
+ assert api_deconfounding_sets_parse("Xi -> Xj") == {"src": {"Xi"}, "dst": {"Xj"}}
+ assert api_deconfounding_sets_parse("Xj -> Xi") == {"src": {"Xj"}, "dst": {"Xi"}}
+ assert api_deconfounding_sets_parse("X1 -> Xj") == {"src": {"X1"}, "dst": {"Xj"}}
+ assert api_deconfounding_sets_parse("Xi, X4, X2 -> Xj") == {"src": {"Xi", "X4", "X2"}, "dst": {"Xj"}}
+
+ assert len(api_deconfounding_sets(bc, *paths)) > 0
+ assert len(api_deconfounding_sets(bc, *paths2)) > 0
+ assert len(api_deconfounding_sets(bc, *no_paths)) > 0
+ assert len(api_deconfounding_sets(bc, *unfixable)) == 0
+
+ assert api_deconfounding_sets(bc, *paths) == bc.all_dcf_sets(*paths)
+ assert api_deconfounding_sets(bc, *paths2) == bc.all_dcf_sets(*paths2)
+ assert api_deconfounding_sets(bc, *no_paths) == bc.all_dcf_sets(*no_paths)
+ assert api_deconfounding_sets(bc, *unfixable) == bc.all_dcf_sets(*unfixable)
def test_api_joint_distribution_table():
- ...
+
+ jdt: ConditionalProbabilityTable = api_joint_distribution_table(cg)
+
+ outcome_counts = list(map(lambda v: len(cg.outcomes[v]), cg.variables))
+ totals = map(lambda row: row[-1], jdt.table_rows[:-1])
+
+ assert isinstance(jdt, ConditionalProbabilityTable)
+ assert len(jdt.table_rows[:-1]) == prod(outcome_counts)
+ assert within_precision(sum(list(totals)), 1)
def test_api_probability_query():
- ...
+ x = Outcome("X", "x")
+ y = Outcome("Y", "y")
+ z = Outcome("Z", "z")
+
+ v = Intervention("V", "v")
+ w = Intervention("W", "w")
+
+ head_and_body = "Y=y, X=x | Z=z, do(W=w, V=v)"
+ head_only = "Y=y, X=x"
+ single_both = "Y=y | X = x"
+ single_head = "Y = y"
-# config - TODO
+ assert api_probability_query_parse(head_and_body) == {"y": {y, x}, "x": {z, w, v}}
+ assert api_probability_query_parse(head_only) == {"y": {y, x}, "x": set()}
+ assert api_probability_query_parse(single_both) == {"y": {y}, "x": {x}}
+ assert api_probability_query_parse(single_head) == {"y": {y}, "x": set()}
+
+ xi = Outcome("Xi", "xi")
+ xj = Outcome("Xj", "xj")
+ assert api_probability_query(cg, {xj}, {xi}) == cg.probability_query({xj}, {xi})
# graphs
@@ -87,8 +160,6 @@ def test_randomized_latent_variables():
# probability/structures/CausalGraph
-cg = CausalGraph(**parse_model(f"{graph_location}/{default_model_file}"))
-
# See: validation
@@ -111,9 +182,6 @@ def test_probability_lookup():
# probability/structures/Graph
-graph = cg.graph
-
-
def test_roots():
assert sum(map(lambda v: len(graph.parents(v)), graph.roots())) == 0
@@ -266,7 +334,38 @@ def test_outcome():
def test_variable():
- ...
+
+ for v in cg.variables.values():
+
+ v: Variable
+
+ assert isinstance(v.name, str)
+ assert isinstance(v.reach, set)
+ assert isinstance(v.parents, list)
+ assert isinstance(v.topological_order, int)
+
+ c = v.copy()
+
+ assert v == c
+ assert v is not c
+
+ assert v.name == c.name
+
+ assert v.reach is not c.reach
+ assert v.reach == c.reach
+
+ assert v.parents is not c.parents
+ assert v.parents == c.parents
+
+ assert v.topological_order == c.topological_order
+
+ assert hash(v) == hash(c)
+
+ # Unique enough hashing function
+ assert list(map(lambda variable: hash(variable), cg.variables.values())).count(hash(v)) <= 3
+ assert str(v) == str(c)
+
+ assert v == v.name
def test_intervention():
@@ -326,42 +425,46 @@ def test_parse_model():
# nonexistent file
try:
- parse_model("fake/path/fake")
+ parse_model(Path("fake", "path", "fake"))
raise Exception
except FileNotFoundError:
pass
# invalid file
try:
- parse_model("src/util/helpers.py")
+ parse_model(Path("src", "util", "helpers.py"))
raise Exception
except FileNotFoundError:
pass
+ # string path
+ parse_model(str(test_file.absolute()))
+
# yml
- parse_model(f"{graph_location}/{default_model_file}")
+ parse_model(test_file)
# json
+ parse_model(json_model)
# validation
def test_inference_module() -> bool:
- inference_bool, inference_msg = inference_tests(graph_location)
+ inference_bool, inference_msg = inference_tests(graphs)
assert inference_bool, inference_msg
print_test_result(inference_bool, inference_msg)
return inference_bool
def test_backdoor_module() -> bool:
- backdoor_bool, backdoor_msg = backdoor_tests(graph_location)
+ backdoor_bool, backdoor_msg = backdoor_tests(graphs)
assert backdoor_bool, backdoor_msg
print_test_result(backdoor_bool, backdoor_msg)
return backdoor_bool
def test_shpitser_module() -> bool:
- shpitser_bool, shpitser_msg = shpitser_tests(graph_location)
+ shpitser_bool, shpitser_msg = shpitser_tests(graphs)
assert shpitser_bool, shpitser_msg
print_test_result(shpitser_bool, shpitser_msg)
return shpitser_bool