In [70]:
import numpy as np
import pandas as pd
from itertools import product
from sklearn.preprocessing import normalize

#### Participant simulation

First we'll create *n* participants in our experiment.

In [71]:
n = 5000

For each of these participants, we'll randomly assign them some (Compliance, Response) behavior combination.

In [72]:
compliance_partitions = [
    "always_taker",
    "complier",
    "defier",
    "never_taker",
]
response_partitions = [
    "always_better",
    "helped",
    "hurt",
    "never_better",
]

# Take the cross product of these sets of types

partition_types = product(
    compliance_partitions, response_partitions
)
partition_types = np.array(list(partition_types))

In [73]:
# We'll simulate probabilities that our participants
# will belong to one of the 16 possible behavior combinations

# I'm setting up a contrived example
# to prove my point with these arbitrary floats--
# trust me it's instructive!

arbitrary_floats = np.array(
    [
        [
            0,
            0,
            0,
            0.01,
            0,
            0.14,
            0,
            0.16,
            0.32,
            0,
            0.31,
            0,
            0.04,
            0.02,
            0,
            0,
        ]
    ]
)
partition_probabilities = normalize(
    arbitrary_floats, "l1"
)
partition_probabilities = (
    partition_probabilities.flatten()
)

# to be a true set of probabilities, the vector sum
# needs to be 1

# sometimes this can fail because of precision errors,
# so let's assert it

assert partition_probabilities.sum() == 1

In [74]:
# drawing participant compliance and response behaviors according to the
# specified distribution

participant_partition = np.random.choice(
    range(len(partition_types)),
    n,
    p=partition_probabilities,
)

compliance_response_pairs = zip(
    *partition_types[participant_partition]
)
compliance_type, response_type = list(
    compliance_response_pairs
)

# assigning participants to Control and Treatment groups
# with 50% probability

assignments = ["control", "treatment"]
participant_assignment = np.random.choice(
    assignments, n
)

# compiling all information into a dataframe
# that simulates the participants

df = pd.DataFrame(
    {
        "assignment": participant_assignment,
        "compliance_type": compliance_type,
        "response_type": response_type,
    }
)

### Simulate whether participants took treatment

Depending on assignment and compliance type, we can simulate whether or not each participant took the treatment.

In [75]:
# if the participant is an always_taker,
# they'll always take the treatment.

df["took_treatment"] = (
    df.compliance_type == "always_taker"
)

# if they're a complier, they'll take the treatment
# as long as they're in the treatment condition.

df["took_treatment"] = df["took_treatment"] | (
    (df.compliance_type == "complier")
    & (df.assignment == "treatment")
)

# if they're a defier, they'll only take the treatment
# if they were in the control condition.

df["took_treatment"] = df["took_treatment"] | (
    (df.compliance_type == "defier")
    & (df.assignment == "control")
)

### Simulate Outcomes

Now we can simulate outcomes from the experiment.

Depending on whether they took the treatment and their `response_type`, did they end up in a Good or Bad state after the experiment's conclusion?

In [76]:
# if the participant is of the always_better type,
# they'll definitely have a good outcome.

df["good_outcome"] = (
    df.response_type == "always_better"
)

# if the participant is of the 'helped' type,
# they'll have a good outcome as long as they
# took treatment.

df["good_outcome"] = df["good_outcome"] | (
    (df.response_type == "helped")
    & (df.took_treatment)
)

# if the participant is of the 'hurt' type,
# they'll have a good outcome as long as they
# did NOT take the treatment!

df["good_outcome"] = df["good_outcome"] | (
    (df.response_type == "hurt")
    & (~df.took_treatment)
)

# Otherwise, the outcome is going to be bad
# and the column will have a False value.

We can now observe the probabilities of each (Treatment, Outcome) combination that would emerge, conditional on the assignment.

In [77]:
df["n"] = 1
results = (
    df.groupby(
        [
            "assignment",
            "took_treatment",
            "good_outcome",
        ]
    )
    .count()
    .n
)
results = results.to_frame()
results["assignment_n"] = results.groupby(
    "assignment"
).transform("sum")

In [78]:
p_states = results.n / results.assignment_n
p_states = p_states.rename("P( X, Y | Z )")

# display
p_states.to_frame()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,"P( X, Y | Z )"
assignment,took_treatment,good_outcome,Unnamed: 3_level_1
control,False,False,0.327141
control,False,True,0.042003
control,True,False,0.318659
control,True,True,0.312197
treatment,False,False,0.026545
treatment,False,True,0.677496
treatment,True,False,0.16046
treatment,True,True,0.135499


### Intent-to-treat average treatment effect

All we need to do is sum up all 'good' from the Treatment column, and subtract by all 'good' from the Control column.


In [79]:
itt_ate = (
    df[
        df.assignment == "treatment"
    ].good_outcome.mean()
    - df[
        df.assignment == "control"
    ].good_outcome.mean()
)

# display
print("ATE: %6.4f" % itt_ate)

ATE: 0.4588


### Linear optimization

First we'll get all the probabilities we need: p(x,y|z) for each x,y,z combination:

In [80]:
states = list(
    product(["treatment", "control"], [0, 1], [0, 1])
)

get_conditional_probability = lambda assignment, took, outcome: (
    (
        df[df.assignment == assignment].took_treatment
        == took
    )
    & (
        df[df.assignment == assignment].good_outcome
        == outcome
    )
).mean()

# this terrible list comprehension
# collects p(x,y|z) for all 'states'
p_states = {
    f"{assignment}/"
    + f"{'treated' if took == 1 else 'untreated'}/"
    + f"{'good' if outcome ==1 else 'bad'}": get_conditional_probability(
        assignment, took, outcome
    )
    for assignment, took, outcome in states
}

In [81]:
p_states

{'treatment/untreated/bad': 0.026545166402535656,
 'treatment/untreated/good': 0.6774960380348652,
 'treatment/treated/bad': 0.16045958795562598,
 'treatment/treated/good': 0.13549920760697307,
 'control/untreated/bad': 0.327140549273021,
 'control/untreated/good': 0.0420032310177706,
 'control/treated/bad': 0.31865912762520193,
 'control/treated/good': 0.3121970920840065}

Now we can frame our linear programming problem.

We're maximizing our expression of an ATE given the constraints at hand.

In [82]:
import pulp

# first we set up 'problems', objects that
# can take an objective like 'maximize' as well as constraints

max_problem = pulp.LpProblem(
    "max ATE", pulp.LpMaximize
)

We can now specify variables, symbols that the linear program will know to vary in order to maximize or minimize our objective function. These variables are the distribution of **U**: the probabilities of belonging to one of the archetypal (Compliance, Response) partitions.

In [83]:
# Linear optimization ranges over possible variables
# to find those hidden variables that maximize
# or minimize our objective.

# Our variables 'q' are the the probability of
# being in one of the partitions

partition_names = [
    "/".join([compliance, response])
    for compliance, response in partition_types
]

q = {
    partition: pulp.LpVariable(partition, lowBound=0)
    for partition in partition_names
}

We can add constraints on these variables as well, by simply adding them to the problem we've framed!

In [84]:
# Since our vars are probabilities
# the sum of them should all be under 1.

# This '+=' operation is adding this sum constraint
# to the linear programming problem.

max_problem += sum([v for k, v in q.items()]) == 1

In [85]:
# Let's set the relationship between the distribution of U
# and the probabilities we observe. The variable name
# scheme is p_Z_X_Y


p_treatment_untreated_bad = (
    q["never_taker/never_better"]
    + q["defier/never_better"]
    + q["never_taker/helped"]
    + q["defier/helped"]
)

p_treatment_untreated_good = (
    q["never_taker/always_better"]
    + q["defier/always_better"]
    + q["never_taker/hurt"]
    + q["defier/hurt"]
)

p_treatment_treated_bad = (
    q["always_taker/never_better"]
    + q["complier/never_better"]
    + q["always_taker/hurt"]
    + q["complier/hurt"]
)

p_treatment_treated_good = (
    q["always_taker/always_better"]
    + q["complier/always_better"]
    + q["always_taker/helped"]
    + q["complier/helped"]
)

p_control_untreated_bad = (
    q["never_taker/never_better"]
    + q["complier/never_better"]
    + q["never_taker/helped"]
    + q["complier/helped"]
)

p_control_untreated_good = (
    q["never_taker/always_better"]
    + q["complier/never_better"]
    + q["never_taker/hurt"]
    + q["complier/hurt"]
)

p_control_treated_bad = (
    q["always_taker/never_better"]
    + q["defier/never_better"]
    + q["always_taker/hurt"]
    + q["defier/hurt"]
)

p_control_treated_good = (
    q["always_taker/always_better"]
    + q["defier/always_better"]
    + q["always_taker/helped"]
    + q["defier/helped"]
)

In [86]:
# I now apply these linear relationships between U
# and P(X,Y|Z) as constraints on the LP problem.

# Though there's a compact and elegant linear algebraic way
# of doing this, I'm being painfully explicit here because
# it's easier to understand (for me at least)

# These '+=' operations are new constraints I am 
# adding one-by-one to the 'max_problem' linear programming 
# problem.

max_problem += (
    p_treatment_untreated_bad
    == p_states["treatment/untreated/bad"]
)
max_problem += (
    p_treatment_treated_good
    == p_states["treatment/treated/good"]
)
max_problem += (
    p_treatment_treated_bad
    == p_states["treatment/treated/bad"]
)

max_problem += (
    p_control_untreated_bad
    == p_states["control/untreated/bad"]
)
max_problem += (
    p_control_treated_good
    == p_states["control/treated/good"]
)
max_problem += (
    p_control_treated_bad
    == p_states["control/treated/bad"]
)


# I leave some constraints out because it *over*constrains
# the problem and makes it impossible to solve
# It turns out that the other constraints actually imply
# these two since they are complimentary probabilities,
# so we can just leave them commented out

# max_problem += p_control_untreated_good == p_states['control/untreated/good']
# max_problem += p_treatment_untreated_good == p_states['treatment/untreated/good']

With constraints set, all that remains to do set the objective function, our ATE.

$$ATE = P(helped) - P(hurt)$$

where $P(helped)$ is the probability of being in a partition with a 'helped' response type and $P(hurt)$ is the probability of being in a partition with a 'hurt' response type.

Then we can just hit `solve()`.

In [95]:
max_problem += (
    q["complier/helped"]
    + q["defier/helped"]
    + q["always_taker/helped"]
    + q["never_taker/helped"]
    - q["complier/hurt"]
    - q["defier/hurt"]
    - q["always_taker/hurt"]
    - q["never_taker/hurt"]
)

In [97]:
pulp.LpStatus[max_problem.solve()]

'Infeasible'

The problem's been optimized, and now we can see what U looks like in the best case scenario.

In [93]:
q_max = {
    partition: pulp.value(partition_p)
    for partition, partition_p in q.items()
}

# display

pd.DataFrame(q_max, index=["p(U=u)"]).T

Unnamed: 0,p(U=u)
always_taker/always_better,0.0
always_taker/helped,0.0
always_taker/hurt,0.0
always_taker/never_better,0.0
complier/always_better,0.0
complier/helped,0.135499
complier/hurt,0.0
complier/never_better,0.16046
defier/always_better,0.312197
defier/helped,0.0


Given these we can calculate our best-case ATE.

In [94]:
ate = (
    lambda q: q["complier/helped"]
    + q["defier/helped"]
    + q["always_taker/helped"]
    + q["never_taker/helped"]
    - q["complier/hurt"]
    - q["defier/hurt"]
    - q["always_taker/hurt"]
    - q["never_taker/hurt"]
)

best_case_ate = ate(q_max)

# display
print("Best-Case ATE: %6.4f" % best_case_ate)

Best-Case ATE: -0.1520
