In [25]:
# from sfi import Data, Macro, Missing, SFIToolkit, Scalar
import os
import sys

sys.path.insert(0, "..")

In [36]:
import acro
from acro import utils
from acro import stata_config
import pandas as pd
from acro import acro_stata_parser
from importlib import reload
import pytest

reload(acro_stata_parser)

<module 'acro.acro_stata_parser' from '/Users/j4-smith/GitHub/AI-SDC/ACRO/stata/../acro/acro_stata_parser.py'>

In [37]:
def dummy_acrohandler(command, varlist, exclusion, exp, weights, options):
    if debug:
        outline = "in python acrohandler function: "
        outline += f"command = {command} "
        outline += f"varlist={varlist} "
        outline += f"if = {exclusion} "
        outline += f"exp = {exp} "
        outline += f"weights={weights} "
        outline += f"options={options} "
        print(outline)

    # make data object
    path = os.path.join("../data", "test_data.dta")
    the_data = pd.read_stata(path)
    # print(f'in dummy acrohandler dataset has size {the_data.shape}')

    # now do the acro part
    acro_outstr = acro_stata_parser.parse_and_run(
        the_data, command, varlist, exclusion, exp, weights, options
    )

    return acro_outstr

In [38]:
# handy string to copy/paste
# command='',varlist='',exclusion='',exp='',weights='',options=''

In [39]:
def test_apply_stata_ifstmt(data):
    """Tests that if statements work for selection."""
    ifstring = "year!=2013"
    all_list = list(data["year"].unique())
    smaller = acro_stata_parser.apply_stata_ifstmt(ifstring, data)
    all_list.remove(2013)
    assert list(smaller["year"].unique()) == all_list
    ifstring2 = "year != 2013 & year <2015"
    all_list.remove(2015)
    smaller2 = acro_stata_parser.apply_stata_ifstmt(ifstring2, data)
    assert list(smaller2["year"].unique()) == all_list

In [40]:
# make data object
path = os.path.join("../data", "test_data.dta")
the_data = pd.read_stata(path)
test_apply_stata_ifstmt(the_data)

( year != 2013)
( year  !=  2013 ) & ( year  < 2015)


In [5]:
debug = False

In [6]:
def test_stata_acro_init() -> str:
    assert isinstance(stata_config.stata_acro, ACRO)
    ret = dummy_acrohandler(
        command="init", varlist="", exclusion="", exp="", weights="", options=""
    )
    assert (
        ret == "acro analysis session created\n"
    ), f"wrong string for acro init: {ret}\n"
    errmsg = f"wrong type for stata_config.stata_acro:{type(stata_config.stata_acro)}"
    assert isinstance(stata_config.stata_acro, acro.ACRO), errmsg

In [7]:
test_stata_acro_init()

INFO:acro:version: 0.4.3
INFO:acro:config: {'safe_threshold': 10, 'safe_dof_threshold': 10, 'safe_nk_n': 2, 'safe_nk_k': 0.9, 'safe_pratio_p': 0.1, 'check_missing_values': False}
INFO:acro:automatic suppression: False


In [8]:
def test_parse_table_details():
    path = os.path.join("../data", "test_data.dta")
    the_data = pd.read_stata(path)

    varlist = ["survivor", "grant_type", "year"]
    varnames = the_data.columns
    options = "by(grant_type) " "contents(mean sd inc_activity)" "suppress " "nototals"
    details = acro_stata_parser.parse_table_details(varlist, varnames, options)

    errstring = f" rows {details['rowvars']} should be ['grant_type','survivor']"
    assert details["rowvars"] == ["grant_type", "survivor"], errstring

    errstring = f" cols {details['colvars']} should be ['year','grant_type']"
    assert details["colvars"] == ["year", "grant_type"], errstring

    errstring = f" aggfunctions {details['aggfuncs']} should be ['mean','sd']"
    assert details["aggfuncs"] == ["mean", "sd"], errstring

    errstring = f" values {details['values']} should be ['inc_activity']"
    assert details["values"] == ["inc_activity"], errstring

    assert not details["totals"], "totals should be False"
    assert details["suppress"], "supress should be True"

In [9]:
test_parse_table_details()

In [10]:
def test_simple_table() -> str:
    path = os.path.join("../data", "test_data.dta")
    df = pd.read_stata(path)
    # correct = pd.crosstab(index=df["survivor"], columns=df["grant_type"]).to_string()
    correct = (
        "------------------------------------|\n"
        "grant_type     |G   |N    |R    |R/G|\n"
        "survivor       |    |     |     |   |\n"
        "------------------------------------|\n"
        "Dead in 2015   |18  |  0  |282  | 0 |\n"
        "Alive in 2015  |72  |354  |144  |48 |\n"
        "------------------------------------|\n"
    )
    ret = dummy_acrohandler(
        "table",
        "survivor grant_type",
        exclusion="",
        exp="",
        weights="",
        options="nototals",
    )
    ret = ret.replace("NaN", "0")
    ret = ret.replace(".0", "")
    assert ret.split() == correct.split(), f"got\n{ret}\n expected\n{correct}"

In [11]:
test_simple_table()

INFO:acro:get_summary(): fail; threshold: 2 cells may need suppressing; 
INFO:acro:outcome_df:
---------------------------------------------------|
grant_type    |G   |N            |R   |R/G         |
survivor      |    |             |    |            |
---------------------------------------------------|
Dead in 2015  | ok | threshold;  | ok | threshold; |
Alive in 2015 | ok |          ok | ok |          ok|
---------------------------------------------------|

INFO:acro:records:add(): output_0


In [12]:
def test_stata_linregress():
    ret = dummy_acrohandler(
        command="regress",
        varlist=" inc_activity inc_grants inc_donations total_costs",
        exclusion="",
        exp="",
        weights="",
        options="",
    )

    tokens = ret.split()
    idx = tokens.index("Residuals:")
    val = int(tokens[idx + 1])
    assert val == 807, f"{val} should be 807"
    idx = tokens.index("R-squared:")
    val = float(tokens[idx + 1])
    assert val == pytest.approx(0.894, 0.001)

In [13]:
test_stata_linregress()

INFO:acro:ols() outcome: pass; dof=807.0 >= 10
INFO:acro:records:add(): output_1


In [58]:
def test_stata_probit():
    ret = dummy_acrohandler(
        command="probit",
        varlist=" survivor inc_activity inc_grants inc_donations total_costs",
        exclusion="",
        exp="",
        weights="",
        options="",
    )

    # assert False,f'\n{ret}\n'
    tokens = ret.split()
    idx = tokens.index("Residuals:")
    val = tokens[idx + 1]
    if val[-1] == "|":
        val = val[0:-1]
    assert float(val) == pytest.approx(806.0, 0.01), f"{val} should be 806"
    idx = tokens.index("R-squ.:")
    val = tokens[idx + 1]
    if val[-1] == "|":
        val = val[0:-1]
    val = float(val)
    assert val == pytest.approx(0.208, 0.01)

In [60]:
test_stata_probit()

INFO:acro:probit() outcome: pass; dof=806.0 >= 10
INFO:acro:records:add(): output_25


Optimization terminated successfully.
         Current function value: 0.497218
         Iterations 10


In [64]:
def test_stata_logit():
    ret = dummy_acrohandler(
        command="logit",
        varlist=" survivor inc_activity inc_grants inc_donations total_costs",
        exclusion="",
        exp="",
        weights="",
        options="",
    )
    # assert False,f'\n{ret}\n'
    tokens = ret.split()
    idx = tokens.index("Residuals:")
    val = tokens[idx + 1]
    if val[-1] == "|":
        val = val[0:-1]
    assert float(val) == pytest.approx(806.0, 0.01), f"{val} should be 806"
    idx = tokens.index("R-squ.:")
    val = tokens[idx + 1]
    if val[-1] == "|":
        val = val[0:-1]
    val = float(val)
    assert val == pytest.approx(0.214, 0.01)

In [65]:
reload(acro_stata_parser)
test_stata_logit()

INFO:acro:logit() outcome: pass; dof=806.0 >= 10
INFO:acro:records:add(): output_26


Optimization terminated successfully.
         Current function value: 0.493788
         Iterations 12


In [66]:
def test_stata_print_outputs():
    ret = dummy_acrohandler(
        command="print_outputs",
        varlist=" inc_activity inc_grants inc_donations total_costs",
        exclusion="",
        exp="",
        weights="",
        options="",
    )
    assert len(ret) == 0, "return string should  be empty"

In [None]:
test_stata_print_outputs()

In [68]:
def test_stata_finalise():
    ret = dummy_acrohandler(
        command="finalise",
        varlist="",
        exclusion="",
        exp="",
        weights="",
        options="",
    )
    correct = "outputs and stata_out.json written\n"
    assert ret == correct, f"returned string {ret} should be {correct}\n"

In [69]:
test_stata_finalise()

INFO:acro:records:
uid: output_0
status: fail
type: table
properties: {'method': 'crosstab'}
sdc: {'summary': {'suppressed': False, 'negative': 0, 'missing': 0, 'threshold': 2, 'p-ratio': 0, 'nk-rule': 0}, 'cells': {'negative': [], 'missing': [], 'threshold': [[0, 1], [0, 3]], 'p-ratio': [], 'nk-rule': []}}
command: safe_output = stata_acro.crosstab(
summary: fail; threshold: 2 cells may need suppressing; 
outcome: grant_type      G            N   R          R/G
survivor                                       
Dead in 2015   ok  threshold;   ok  threshold; 
Alive in 2015  ok           ok  ok           ok
output: [grant_type      G    N    R  R/G
survivor                        
Dead in 2015   18    0  282    0
Alive in 2015  72  354  144   48]
timestamp: 2023-08-14T16:06:03.132855
comments: []
exception: 

The status of the record above is: fail.
Please explain why an exception should be granted.



 missing


INFO:acro:records:outputs written to: stata_out


In [None]:
def test_find_brace_contents():
    options = "by(grant_type) " "contents(mean sd inc_activity)" "suppress " "nototals"
    res, substr = acro_stata_parser.find_brace_contents("by", options)
    assert res == True
    assert substr == "grant_type"
    res, substr = acro_stata_parser.find_brace_contents("contents", options)
    assert res == True
    assert substr == "mean sd inc_activity"
    res, substr = acro_stata_parser.find_brace_contents("foo", options)
    assert res == False
    assert substr == "foo not found"

In [None]:
test_find_brace_contents()

In [None]:
def test_unsupported_formatting_options():
    path = os.path.join("../data", "test_data.dta")
    df = pd.read_stata(path)
    format_string = "acro does not currently support table formatting commands."
    correct = pd.crosstab(index=df["survivor"], columns=df["grant_type"]).to_string()
    for bad_option in [
        "cellwidth",
        "csepwidth",
        "stubwidth",
        "scsepwidth",
        "center",
        "left",
    ]:
        ret = dummy_acrohandler(
            "table",
            "survivor grant_type",
            exclusion="",
            exp="",
            weights="",
            options=f"{bad_option} nototals",
        )

        rets = ret.split("\n", 1)
        assert len(rets) == 2, "table should have warning preprended"
        errmsg = f"first line {rets[0]} should be {format_string}"
        assert rets[0] == format_string, errmsg
        ret = rets[1]
        ret = ret.replace("NaN", "0")
        ret = ret.replace(".0", "")
        assert ret.split() == correct.split(), f"got\n{ret}\n expected\n{correct}"

In [None]:
test_unsupported_formatting_options()