In [77]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm

%matplotlib inline

In [106]:
class NotFittedError(Exception):
    pass


class StructuralCausalModel:
    def __init__(
        self,
        structure:dict,
        size:int
    ):
        self.structure = structure
        self.size = size
        self.noise_terms = [
            target 
            for (target, var_name) in self.structure.items()
            if not var_name
        ]
        self.model_params = dict()        
        
    def fit(self, variables:dict):
        if (
            set(self.structure.keys()) - set(variables.keys())
        ):
            raise ValueError(
                "Please provide all the variables required from the provided structure"
            )
        if (
            set(variables.keys()) -  set(self.structure.keys())
        ):
            raise ValueError(
                "Please provide an appropriate structure that mapps to the provided variables"
            )       
        assert all(
            map(
                lambda a: a.shape[0] == self.size, 
                variables.values()
            )
        ), "All provided variables should have the same length"
        
        for (target, vars_) in self.structure.items():
            if target in self.noise_terms:
                continue
            Y = variables.get(target).reshape(-1,1)
            var_names = [var_ for var_ in vars_]
            X = np.concatenate(
                [variables.get(var_).reshape(-1,1) for var_ in vars_], 
                axis=1
            )
            X = sm.add_constant(X)
            model = sm.OLS(Y, X)
            result = model.fit()
            
            self.model_params[target] = {
                **{"Intercept": result.params[0]},
                **{
                    var_: coef 
                    for (var_, coef) in zip(var_names, result.params[1:])
                }
                    
            }
    
    def sample(self, sample_size:int):
        if not self.model_params:
            raise NotFittedError(
                "Please fit the SCM prior sampling"
            )
            
        random_sample_eps_A = np.random.normal(0, 0.2, size=sample_size)
        random_sample_eps_X = np.random.normal(0, 0.2, size=sample_size)
        random_sample_eps_Y = np.random.normal(0, 0.2, size=sample_size)        
        A = self.model_params["A"].get("Intercept") + self.model_params["A"].get("eps_A") * random_sample_eps_A
        X = (
            self.model_params["X"].get("Intercept") + self.model_params["X"].get("A") * A +
            self.model_params["X"].get("eps_X") * random_sample_eps_X
        )
        Y = (
            self.model_params["Y"].get("Intercept") + 
            self.model_params["Y"].get("A") * A +
            self.model_params["X"].get("A") * A +            
            self.model_params["Y"].get("eps_Y") * random_sample_eps_Y
        )     
        return Y
            
        
        
        
structure = {
    "Y": ["X", "A", "eps_Y"],
    "X": ["A", "eps_X"],
    "A": ["eps_A"],
    "eps_A": [],
    "eps_X": [],
    "eps_Y": []
}

SIZE = 10_000
eps_A = np.random.normal(0, 0.2, size=SIZE)
eps_X = np.random.normal(0, 0.2, size=SIZE)
eps_Y = np.random.normal(0, 0.2, size=SIZE)
A = eps_A
X = 2 * A + eps_X
Y = 3 * A + 5 * X + eps_Y

variables = {
    "A": A,
    "X": X,
    "Y": Y,
    "eps_A": eps_A,
    "eps_X": eps_X,
    "eps_Y": eps_Y    
}


scm = StructuralCausalModel(structure, size=SIZE)
scm.fit(variables)
scm.sample(20)

array([-0.53925845,  0.95247253,  0.3698768 , -0.15743349, -0.12420357,
        0.43597521,  0.12822501,  0.68243007,  0.79439103,  0.5929755 ,
        0.82637788,  0.7430315 ,  0.50729524, -2.03583735,  1.56329213,
        2.24197555,  0.90099852, -1.65609656, -2.88262151,  0.82773391])