# Causal Analysis Pipeline

This notebook was automatically generated from a PyAutoCausal pipeline execution.




## Executable Graph

```mermaid
graph TD
    node0[df]
    node1{multi_period}
    node2[stand_spec]
    node3[ols_stand]
    node4[ols_stand_output]
    node5[did_spec]
    node6{multi_treated_units}
    node7[ols_did]
    node8[save_ols_did]
    node9[synth_control]
    node10[wols_did_synth]
    node11[save_wols_did_synth]
    node0 --> node1
    node1 -->|False| node2
    node1 -->|True| node5
    node2 --> node3
    node3 --> node4
    node5 --> node6
    node6 -->|True| node7
    node6 -->|False| node9
    node7 --> node8
    node9 --> node10
    node10 --> node11

    %% Node styling
    classDef pendingNode fill:lightblue,stroke:#3080cf,stroke-width:2px,color:black;
    classDef runningNode fill:yellow,stroke:#3080cf,stroke-width:2px,color:black;
    classDef completedNode fill:lightgreen,stroke:#3080cf,stroke-width:2px,color:black;
    classDef failedNode fill:salmon,stroke:#3080cf,stroke-width:2px,color:black;
    style node0 fill:lightgreen,stroke:#3080cf,stroke-width:2px,color:black
    style node1 fill:lightgreen,stroke:#3080cf,stroke-width:2px,color:black
    style node2 fill:lightblue,stroke:#3080cf,stroke-width:2px,color:black
    style node3 fill:lightblue,stroke:#3080cf,stroke-width:2px,color:black
    style node4 fill:lightblue,stroke:#3080cf,stroke-width:2px,color:black
    style node5 fill:lightgreen,stroke:#3080cf,stroke-width:2px,color:black
    style node6 fill:lightgreen,stroke:#3080cf,stroke-width:2px,color:black
    style node7 fill:lightgreen,stroke:#3080cf,stroke-width:2px,color:black
    style node8 fill:lightgreen,stroke:#3080cf,stroke-width:2px,color:black
    style node9 fill:lightblue,stroke:#3080cf,stroke-width:2px,color:black
    style node10 fill:lightblue,stroke:#3080cf,stroke-width:2px,color:black
    style node11 fill:lightblue,stroke:#3080cf,stroke-width:2px,color:black
```

## Node Legend

### Node Types
```mermaid
graph LR
    actionNode[Action Node] ~~~ decisionNode{Decision Node}
    style actionNode fill:#d0e0ff,stroke:#3080cf,stroke-width:2px,color:black
    style decisionNode fill:#d0e0ff,stroke:#3080cf,stroke-width:2px,color:black
```

### Node States
```mermaid
graph LR
    pendingNode[Pending]:::pendingNode ~~~ runningNode[Running]:::runningNode ~~~ completedNode[Completed]:::completedNode ~~~ failedNode[Failed]:::failedNode

    classDef pendingNode fill:lightblue,stroke:#3080cf,stroke-width:2px,color:black;
    classDef runningNode fill:yellow,stroke:#3080cf,stroke-width:2px,color:black;
    classDef completedNode fill:lightgreen,stroke:#3080cf,stroke-width:2px,color:black;
    classDef failedNode fill:salmon,stroke:#3080cf,stroke-width:2px,color:black;
```

Node state coloring indicates the execution status of each node in the graph.


In [None]:
from specifications import BaseSpec
from pyautocausal.persistence.parameter_mapper import make_transformable
import statsmodels.api as sm
from pyautocausal.pipelines.library.specifications import validate_and_prepare_data
import numpy as np
import pandas as pd
from typing import Optional
from typing import List
from statsmodels.base.model import Results
from pyautocausal.pipelines.library.specifications import DiDSpec
import io

## Node: df


In [None]:
# TODO: Load your input data for 'df' here
df_output = None  # Replace with your data

## Node: did_spec


In [None]:
# This node uses a wrapper function that calls a target function with adapted arguments
# Argument mapping: 'df' → 'data'

def create_did_specification(
    data: pd.DataFrame, 
    outcome_col: str = 'y', 
    treatment_cols: List[str] = ['treat'],
    time_col: str = 't',
    unit_col: str = 'id_unit',
    post_col: Optional[str] = None,
    treatment_time_col: Optional[str] = None,
    include_unit_fe: bool = True,
    include_time_fe: bool = True,
    control_cols: Optional[List[str]] = None
) -> DiDSpec:
    """
    Create a DiD specification.
    
    Args:
        df: DataFrame with outcome, treatment, time, and unit identifiers
        outcome_col: Name of outcome column
        treatment_col: Name of treatment column
        time_col: Name of time column
        unit_col: Name of unit identifier column
        post_col: Name of post-treatment indicator column
        treatment_time_col: Name of treatment timing column
        include_unit_fe: Whether to include unit fixed effects
        include_time_fe: Whether to include time fixed effects
        control_cols: List of control variable columns
        
    Returns:
        DiDSpec object with DiD specification information
    """
    # TODO: Use first treatment column for now (may extend to multiple in future)
    treatment_col = treatment_cols[0]
    # Validate and prepare data
    data, control_cols = validate_and_prepare_data(
        data=data,
        outcome_col=outcome_col,
        treatment_cols=treatment_cols,
        required_columns=[time_col, unit_col],
        control_cols=control_cols,
        excluded_cols=[time_col, unit_col]
    )
    
    # Create post-treatment indicator if not provided
    if post_col is None:
        if treatment_time_col is not None:
            # Create post indicator based on treatment timing
            data['post'] = (data[time_col] >= data[treatment_time_col]).astype(int)
            post_col = 'post'
        else:
            # Try to infer post periods for treated units
            treat_start = data[data[treatment_col] == 1][time_col].min()
            data['post'] = (data[time_col] >= treat_start).astype(int) if pd.notna(treat_start) else 0
            post_col = 'post'
    
    # Create interaction term
    data['treat_post'] = data[treatment_col] * data[post_col]
    
    # Construct formula
    formula_parts = [outcome_col, "~", "treat_post"]
    
    if not include_unit_fe and not include_time_fe:
        # Base model with just the interaction and controls
        formula_parts.extend(["+", treatment_col, "+", post_col])
        if control_cols:
            formula_parts.extend(["+ " + " + ".join(control_cols)])
    else:
        # Model with fixed effects
        if include_unit_fe:
            formula_parts.append("+ C(" + unit_col + ")")
        if include_time_fe:
            formula_parts.append("+ C(" + time_col + ")")
        if control_cols:
            formula_parts.extend(["+ " + " + ".join(control_cols)])
    
    formula = " ".join(formula_parts)
    
    # Create and return specification
    return DiDSpec(
        outcome_col=outcome_col,
        treatment_cols=treatment_cols,
        time_col=time_col,
        unit_col=unit_col,
        post_col=post_col,
        include_unit_fe=include_unit_fe,
        include_time_fe=include_time_fe,
        control_cols=control_cols,
        data=data,
        formula=formula
    )


In [None]:
did_spec_output = create_did_specification(data=df_output)

## Node: ols_did


In [None]:
# This node uses a wrapper function that calls a target function with adapted arguments
# Argument mapping: 'did_spec' → 'spec'

def fit_ols(spec: BaseSpec, weights: Optional[np.ndarray] = None) -> Results:
    """
    Estimate treatment effect using OLS regression.
    
    Args:
        spec: A specification dataclass with data and formula
        weights: Optional sample weights for weighted regression
        
    Returns:
        Fitted statsmodels regression results
    """
    # We can still use the formula interface directly for OLS
    data = spec.data
    formula = spec.formula
    
    if weights is not None:
        model = sm.WLS.from_formula(formula, data=data, weights=weights).fit(cov_type='HC1')
    else:
        model = sm.OLS.from_formula(formula, data=data).fit(cov_type='HC1')
            
    return model


In [None]:
ols_did_output = fit_ols(spec=did_spec_output)

## Node: save_ols_did


In [None]:
# This node uses a wrapper function that calls a target function with adapted arguments
# Argument mapping: 'ols_did' → 'res'

def write_statsmodels_summary(res: Results ) -> str:
    buffer = io.StringIO()

    buffer.write(str(res.summary()))
                
    return buffer.getvalue()


In [None]:
save_ols_did_output = write_statsmodels_summary(res=ols_did_output)

In [None]:
# Display Result
write_statsmodels_summary_notebook(save_ols_did_output)