In [2]:
from pathlib import Path

import numpy as np
import pandas as pd
import plotly.express as px
from sklearn.linear_model import LinearRegression

pd.options.mode.copy_on_write = True
pd.options.future.infer_string = True
pd.options.plotting.backend = "plotly"

In [3]:
true_params = np.ones(6)
y_sd = 1.5
cov_type = "random"
mean = np.zeros(len(true_params))
meas_sds = np.linspace(0, 5, 10) # measurement error std
n_repetitions = 200
rng = np.random.default_rng(925408)
n_obs = 2_000

In [4]:
n_params = len(true_params)
    # Set up parameter names for plotting
names = [f"x_{i}" for i in range(len(true_params))]
    # Initialize list to which we will append DataFrames that are concatenated later
to_concat = []
for meas_sd in meas_sds:
        # Create a covariance matrix for the x variables
    if cov_type == "deterministic":
        cov = np.eye(n_params) + 0.2
    elif cov_type == "random":
            # Create a random but valid (i.e. symmetric positive semi-definite)
            # covariance matrix by multiplying a random matrix with its transpose
            # and adding 1 to the diagonal to improve conditioning
        helper = rng.uniform(low=-1, high=1, size=(n_params, n_params))
        cov = helper @ helper.T + np.eye(n_params)
    else:
        msg = f"Invalid cov_type: {cov_type}. Must be 'random' or 'deterministic.'"
        raise ValueError(
            msg,
        )

        # Set up a list to which we will append parameter estimates
    estimates = []
    for _ in range(n_repetitions):
            # Create independent variables
        x = rng.multivariate_normal(mean=mean, cov=cov, size=n_obs)
            # Draw error
        epsilon = rng.normal(loc=0, scale=y_sd, size=n_obs)
            # Calculate y (before adding measurement error!)
        y = x @ true_params + epsilon
            # Draw measurement error
        meas_error = rng.normal(loc=0, scale=meas_sd, size=n_obs)
            # Add measurement error
        x[:, 0] += meas_error
            # Calculate parameter estimates
        params = LinearRegression().fit(x, y).coef_
            # append them to the list of estimates
        estimates.append(params)

        # Set up empty DataFrame and add results we need for plotting
    df = pd.DataFrame()
    deviations = np.array(estimates) - true_params
    df["name"] = names
    df["bias"] = deviations.mean(axis=0)
    df["rmse"] = np.sqrt((deviations**2).mean(axis=0))
    df["meas_sd"] = meas_sd
    to_concat.append(df)

    # Concatenate the DataFrame
data = pd.concat(to_concat)

In [5]:
this_file_dir = Path(".").resolve()

In [6]:
data_path = this_file_dir / "bld" / "results.pkl"
data = pd.read_pickle(data_path)

In [7]:
deviations.shape

(200, 6)

In [8]:
data

Unnamed: 0,name,bias,rmse,meas_sd
0,x_0,0.002187,0.021335,0.0
1,x_1,-0.000761,0.025728,0.0
2,x_2,0.001714,0.023037,0.0
3,x_3,-0.002656,0.022499,0.0
4,x_4,4.4e-05,0.02494,0.0
5,x_5,0.000738,0.020006,0.0
0,x_0,-0.17705,0.178942,0.555556
1,x_1,0.016457,0.034241,0.555556
2,x_2,-0.035669,0.045298,0.555556
3,x_3,-0.050072,0.055041,0.555556


In [16]:
x = data.loc[data["name"] == "x_1", ["bias"]]
x = x.iloc[3:]
actual =x.reset_index(drop=True).values
all(sorted(actual, reverse=True) == actual)

False

In [35]:
biases = data["bias"]
biases = biases[data["name"] == "x_0" ]
biases
sorted(biases, reverse=True)

[0.002187126443792598,
 -0.17705035497314875,
 -0.3908949852058292,
 -0.6124660883780525,
 -0.7323237324328714,
 -0.7841479151109705,
 -0.8244326097873482,
 -0.8775639248925837,
 -0.9098643267718167,
 -0.9235730506124823]

In [34]:
biases = biases.reset_index(drop=True).values
biases

array([-0.17705035, -0.39089499, -0.61246609, -0.78414792, -0.73232373,
       -0.87756392, -0.82443261, -0.92357305, -0.90986433])

In [23]:
all(sorted(biases, reverse=True) == biases)

False

In [22]:
all(b < 0 for b in biases)

True

In [213]:
expected = -np.ones_like(actual)
np.testing.assert_array_almost_equal(np.round(actual),np.round(expected))

In [9]:
true_params = np.ones(6)
y_sd = 1.5
cov_type = "random"
mean = np.zeros(len(true_params))


'/Users/belacquator/epp/EPP-course-materials/assignment_3'

In [39]:
meas_sds

array([0.        , 0.55555556, 1.11111111, 1.66666667, 2.22222222,
       2.77777778, 3.33333333, 3.88888889, 4.44444444, 5.        ])

In [66]:
import pytest

In [72]:
def do_monte_carlo(true_params, y_sd, cov_type, mean, meas_sds, n_repetitions,seed, n_obs):
    """Run a Monte Carlo simulation for a multivariate linear regrassion to study the impact of measurement error in the first independent variable.
    
    Args:
        true_params (float): The true coefficients vector of regression model
        y_sd (float): The standard deviation of the error term, i.e. of dependent variable y
        cov_type (str): The type of covariance-matrix of independent variables,
        either "random" or "deterministic"
        mean (float): The expected value of independent variables
        meas_sds (float): The standard deviation of measurement error
        n_repetitions (int): Number of repetitions in the simulation
        seed (int): A random number generator seed
        n_obs (int): Number of observations

    Returns: 
        data (DataFrame): Simulation result of Bias, Root-Mean-Sqaure Deviation (rmse), and Standard Deviation of Measurement Error(meas_sd) for each independent x variable.

    Raises:
        ValueError: If invalid cov_type input is given.

    """
    rng = np.random.default_rng(seed)
    n_params = len(true_params)
    # Set up parameter names for plotting
    names = [f"x_{i}" for i in range(len(true_params))]
    # Initialize list to which we will append DataFrames that are concatenated later
    to_concat = []
    for meas_sd in meas_sds:
        # Create a covariance matrix for the x variables
        if cov_type == "deterministic":
            cov = np.eye(n_params) + 0.2
        elif cov_type == "random":
            # Create a random but valid (i.e. symmetric positive semi-definite)
            # covariance matrix by multiplying a random matrix with its transpose
            # every matrix UU.T is positive semidefinite
            # and adding 1 to the diagonal to improve conditioning
            # because adding 1 to the diagonal ensures that our matrix is
            # always invertible
            helper = rng.uniform(low=-1, high=1, size=(n_params, n_params))
            cov = helper @ helper.T + np.eye(n_params)
        else:
            msg = f"Invalid cov_type: {cov_type}. Must be 'random' or 'deterministic.'"
            raise ValueError(
                msg,
            )

        # Set up a list to which we will append parameter estimates
        estimates = []
        for _ in range(n_repetitions):
            # Create independent variables
            x = rng.multivariate_normal(mean=mean, cov=cov, size=n_obs)
            # Draw error
            # loc=mean, scale=std
            epsilon = rng.normal(loc=0, scale=y_sd, size=n_obs)
            # Calculate y (before adding measurement error!)
            y = x @ true_params + epsilon
            # Draw measurement error
            meas_error = rng.normal(loc=0, scale=meas_sd, size=n_obs)
            # Add measurement error
            x[:, 0] += meas_error
            # Calculate parameter estimates
            params = LinearRegression().fit(x, y).coef_
            # append them to the list of estimates
            estimates.append(params)

        # Set up empty DataFrame and add results we need for plotting
        df = pd.DataFrame()
        deviations = np.array(estimates) - true_params
        df["name"] = names
        df["bias"] = deviations.mean(axis=0)
        df["rmse"] = np.sqrt((deviations**2).mean(axis=0))
        df["meas_sd"] = meas_sd
        to_concat.append(df)

    # Concatenate the DataFrame
    data = pd.concat(to_concat)
    return data

In [75]:

@pytest.fixture
def inputs():
    return {
    "true_params" : np.ones(6),
    "y_sd" : 1.5,
    "cov_type" : "random",
    "mean" : np.zeros(len(true_params)),
    "meas_sds" : np.linspace(0, 5, 10),
    "n_repetitions" : 200,
    "seed" : 925408,
    "n_obs" : 2_000,
    }
def test_monte_carlo_x0_parameter_biased_towards_zero(do_monte_carlo, inputs):
      data = do_monte_carlo(*inputs)
      
      return data

In [86]:
np.zeros(8)

array([0., 0., 0., 0., 0., 0., 0., 0.])

In [158]:
len(np.ones(6))

6

In [159]:
meas_sds

array([0.        , 0.55555556, 1.11111111, 1.66666667, 2.22222222,
       2.77777778, 3.33333333, 3.88888889, 4.44444444, 5.        ])

In [208]:
np.linspace(0, 50, 10)

array([ 0.        ,  5.55555556, 11.11111111, 16.66666667, 22.22222222,
       27.77777778, 33.33333333, 38.88888889, 44.44444444, 50.        ])

In [214]:
meas_sds[3:]

array([1.66666667, 2.22222222, 2.77777778, 3.33333333, 3.88888889,
       4.44444444, 5.        ])

In [17]:
c = 4398543

In [18]:
hash(c)

4398543

In [24]:
def _fail_if_seed_not_hashable(seed):
    try: 
        hash(seed)
    except TypeError:
        raise

In [25]:
_fail_if_seed_not_hashable([1,2])

TypeError: unhashable type: 'list'

In [31]:
def _fail_if_cov_type_not_valid(str):
    if not (str == 'deterministic' or str == 'random'):
        report = f"Invalid cov_type: {str}. Must be 'random' or 'deterministic'"
        raise ValueError(
                report,
            )
    

In [34]:
_fail_if_cov_type_not_valid('deterministic')

In [35]:
def _fail_if_parameters_not_numerical(sr):
    if np.any([isinstance(i,str) for i in sr]):
        report = "Parameter cannot be a string."
        raise TypeError(report)
    

In [37]:
import numpy as np

In [39]:
_fail_if_parameters_not_numerical(3)

TypeError: 'int' object is not iterable

In [42]:
c = "3"

In [43]:
c.isnumeric()

True