-
Notifications
You must be signed in to change notification settings - Fork 128
/
parametric.py
118 lines (96 loc) · 5.44 KB
/
parametric.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import pandas as pd
import numpy as np
from causality.estimation.parametric import (DifferenceInDifferences,
PropensityScoreMatching,
InverseProbabilityWeightedLS)
from tests.unit import TestAPI
class TestDID(TestAPI):
def setUp(self):
SIZE = 2000
assignment = np.random.binomial(1,0.5, size=SIZE)
pre_experiment = assignment + np.random.normal(-1, size=SIZE)
start = assignment + np.random.normal(1, size=SIZE)
end = start + np.random.normal(2.*assignment) + np.random.normal(2, size=SIZE)
self.X_pre = pd.DataFrame({'Start' : pre_experiment, 'End' : start, 'assignment' : assignment})
self.X = pd.DataFrame({'Start' : start, 'End' : end, 'assignment' : assignment})
self.did = DifferenceInDifferences()
def test_assumption_tester(self):
assert self.did.test_parallel_trend(self.X_pre)
self.X_pre['End'] += self.X_pre['assignment']
assert not self.did.test_parallel_trend(self.X_pre)
def test_did_estimator(self):
lower, exp, upper = self.did.average_treatment_effect(self.X)
assert 1.8 <= exp <= 2.2
assert lower <= exp <= upper
self.did = DifferenceInDifferences(robust=True)
lower, exp, upper = self.did.average_treatment_effect(self.X)
assert 1.8 <= exp <= 2.2
assert lower <= exp <= upper
class TestPropScore(TestAPI):
def test_match(self):
matcher = PropensityScoreMatching()
X = pd.DataFrame({'assignment': [1, 0, 0, 0, 0, 0],
'propensity score': [3, 1, 2, 3, 5, 4]})
test, control = matcher.match(X, n_neighbors=3)
assert set(control['propensity score'].values) == set([2, 3, 4])
def test_score(self):
N = 5000
z1 = np.random.normal(size=N)
z2 = np.random.choice(['a','b','c'], size=N)
numeric_mapping = {'a' :3, 'b' :4, 'c' :5}
z2_numeric = [numeric_mapping[z2i] for z2i in z2]
p_assign = np.exp(z1 + z2_numeric) / (1. + np.exp(z1 + z2_numeric))
assignment = np.random.binomial(1, p_assign)
outcome = np.random.normal(assignment)
matcher = PropensityScoreMatching()
X = pd.DataFrame({'z1': z1, 'z2': z2, 'assignment': assignment, 'outcome': outcome})
confounder_types = {'z1': 'c', 'z2':'o'}
matcher.score(X, confounder_types, store_model_fit=True)
assert 0.7 <= matcher.propensity_score_model.params['z1'] <= 1.3
assert 0.0 <= matcher.propensity_score_model.params['z2_b'] <= 2.0
assert 1.0 <= matcher.propensity_score_model.params['z2_c'] <= 3.0
assert 2.0 <= matcher.propensity_score_model.params['intercept'] <= 4.0
def test_at_estimators(self):
N = 1000 # how many data points
z1 = 0.5 * np.random.normal(size=N) # a few confounding variables
z2 = 0.5 * np.random.normal(size=N)
z3 = 0.5 * np.random.normal(size=N)
arg = (z1 + z2 + z3 + np.random.normal(size=N))
p = np.exp(arg) / (1. + np.exp(arg)) # propensity to receive treatment, P(d|z), taking on a logistic form
d = np.random.binomial(1, p)
y = (np.random.normal(size=N) + (z1 + z2 + z3 + 1.) * d) # effect of d is confounded by z. True ATE is 1.
X = pd.DataFrame({'d': d, 'z1': z1, 'z2': z2, 'z3': z3, 'y': y, 'p': p})
matcher = PropensityScoreMatching()
ATE = matcher.estimate_ATE(X, 'd', 'y', {'z1': 'c', 'z2': 'c', 'z3': 'c'})
assert 0.9 <= ATE <= 1.1
class TestIPW(TestAPI):
def test_estimators(self):
N = 2000
z = np.random.normal(size=N)
d = np.random.binomial(1, p=1. / (1. + np.exp(-z)))
y0 = np.random.normal(size=N)
y1 = y0 + 2. * (1 + z)
y = (d == 1) * y1 + (d == 0) * y0
X = pd.DataFrame({'d': d, 'z': z, 'y': y, 'y0': y0, 'y1': y1})
assignment = 'd'
confounder_types = {'z': 'c'}
outcome = 'y'
ipw_model = InverseProbabilityWeightedLS()
atc_lower, atc_exp, atc_upper = ipw_model.estimate_ATC(X,
assignment,
outcome,
confounder_types,
propensity_score_name='propensity score')
assert 0.9 * atc_lower <= (X[X['d'] == 0]['y1'] - X[X['d'] == 0]['y0']).mean() <= 1.1 * atc_upper
att_lower, att_exp, att_upper = ipw_model.estimate_ATT(X,
assignment,
outcome,
confounder_types,
propensity_score_name='propensity score')
assert 0.9 * att_lower <= (X[X['d'] == 1]['y1'] - X[X['d'] == 1]['y0']).mean() <= 1.1 * att_upper
ate_lower, ate_exp, ate_upper = ipw_model.estimate_ATE(X,
assignment,
outcome,
confounder_types,
propensity_score_name='propensity score')
assert 0.9 * ate_lower <= (X['y1'] - X['y0']).mean() <= 1.1 * ate_upper