# Functions for neogrowth model

## Description

store functions for the file neogrowth_model.ipynb

## I. Time iteration

Time iteration code is from QuantEcon website.

Source: https://python.quantecon.org/coleman_policy_iter.html


In [2]:
def v_star(y, α, β, μ):
    """
    True value function
    """
    c1 = np.log(1 - α * β) / (1 - β)
    c2 = (μ + α * np.log(α * β)) / (1 - α)
    c3 = 1 / (1 - β)
    c4 = 1 / (1 - α * β)
    return c1 + c2 * (c3 - c4) + c4 * np.log(y)

def σ_star(y, α, β):
    """
    True optimal policy
    """
    return (1 - α * β) * y

In [3]:
opt_growth_data = [
    ('α', float64),          # Production parameter
    ('β', float64),          # Discount factor
    ('μ', float64),          # Shock location parameter
    ('s', float64),          # Shock scale parameter
    ('grid', float64[:]),    # Grid (array)
    ('shocks', float64[:])   # Shock draws (array)
]

@jitclass(opt_growth_data)
class OptimalGrowthModel:

    def __init__(self,
                α=0.4, 
                β=0.96, 
                μ=0,
                s=0.5,
                grid_max=4,
                grid_size=120,
                shock_size=250,
                seed=1234):

        self.α, self.β, self.μ, self.s = α, β, μ, s

        # Set up grid
        self.grid = np.linspace(1e-5, grid_max, grid_size)

        # Store shocks (with a seed, so results are reproducible)
        np.random.seed(seed)
        self.shocks = np.exp(μ + s * np.random.randn(shock_size))
       

    def f(self, k):
        "The production function"
        return k**self.α
       

    def u(self, c):
        "The utility function"
        return np.log(c)

    def f_prime(self, k):
        "Derivative of f"
        return self.α * (k**(self.α - 1))


    def u_prime(self, c):
        "Derivative of u"
        return 1/c

    def u_prime_inv(self, c):
        "Inverse of u'"
        return 1/c



In [4]:
@njit
def euler_diff(c, σ, y, og):
    """
    Set up a function such that the root with respect to c,
    given y and σ, is equal to Kσ(y).

    """

    β, shocks, grid = og.β, og.shocks, og.grid
    f, f_prime, u_prime = og.f, og.f_prime, og.u_prime

    # First turn σ into a function via interpolation
    σ_func = lambda x: interp(grid, σ, x)

    # Now set up the function we need to find the root of.
    vals = u_prime(σ_func(f(y - c) * shocks)) * f_prime(y - c) * shocks
    return u_prime(c) - β * np.mean(vals)

In [5]:
@njit
def K(σ, og):
    """
    The Coleman-Reffett operator

     Here og is an instance of OptimalGrowthModel.
    """

    β = og.β
    f, f_prime, u_prime = og.f, og.f_prime, og.u_prime
    grid, shocks = og.grid, og.shocks

    σ_new = np.empty_like(σ)
    for i, y in enumerate(grid):
        # Solve for optimal c at y
        c_star = brentq(euler_diff, 1e-12, y-1e-12, args=(σ, y, og))[0]
        σ_new[i] = c_star

    return σ_new


In [6]:
def solve_model_time_iter(model,    # Class with model information
                          σ,        # Initial condition
                          tol=1e-8,
                          max_iter=10000,
                          verbose=True,
                          print_skip=25):

    # Set up loop
    i = 0
    error = tol + 1

    while i < max_iter and error > tol:
        σ_new = K(σ, model)
        error = np.max(np.abs(σ - σ_new))
        i += 1
        if verbose and i % print_skip == 0:
            print(f"Error at iteration {i} is {error}.")
        σ = σ_new

    if error > tol:
        print("Failed to converge!")
    elif verbose:
        print(f"\nConverged in {i} iterations.")

    return σ_new

## II. All-in-One

$$\mathcal{L}_2(\theta) = \frac{1}{T} \sum_{t=1}^{T} \Big( f(s_t,\epsilon_{1,t}|\theta) f(s_t,\epsilon_{2,t}|\theta) \Big) $$


In [7]:
def sparse_mx_to_torch_sparse_tensor(sparse_mx):
    """Convert a scipy sparse matrix to a torch sparse tensor."""
    sparse_mx = sparse_mx.tocoo().astype(np.float32)
    indices = torch.from_numpy(
        np.vstack((sparse_mx.row, sparse_mx.col)).astype(np.int64))
    values = torch.from_numpy(sparse_mx.data)
    shape = torch.Size(sparse_mx.shape)
    return torch.sparse.FloatTensor(indices, values, shape) 

def convert_sparse_matrix_to_sparse_tensor(X):
    """Convert a scipy sparse matrix to a tf sparse tensor."""
    coo = X.tocoo()
    indices = np.mat([coo.row, coo.col]).transpose()
    return tf.SparseTensor(indices, coo.data, coo.shape)

# Root mean square error
class RMSELoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
        
    def forward(self,yhat,y):
        return torch.sqrt(self.mse(yhat,y))
    
# Gaussian quadrature rule
# See: https://chaospy.readthedocs.io/en/master/api/chaospy.generate_quadrature.html
def dist(order, distribution, rule = "gaussian", sp=True):
    #order=int(n**(1/d))-1
    x, w = chaospy.generate_quadrature(order, distribution, rule=(rule), sparse=sp)
    return x, w

def create_W_expanded_matrix(M, N):
    """
    create a sparse matrix W_expanded with U repeate M times on the diagonal elements
    where U is an upper triangular matrix with 0 on the diagonal and 1 on the other upper elements
    W_expanded is a sparse torch matrix
    """
    A_expanded = np.ones((N, N))
    U = np.triu(A_expanded) # upper trianguler matrix of ones
    np.fill_diagonal(U, 0) #fill diagonal with 0
    U = sparse.csr_matrix(U) # convert to sparse
    # Unity matrix of size (M*M)
    B = sparse.csr_matrix(np.eye(M, M))
    W_expanded = sparse_mx_to_torch_sparse_tensor(sparse.kron(B, U))
    return W_expanded

In [12]:
def Ξ_torch(model, params): # objective function if using all-in-one

    # randomly drawing current states    
    # If normal
    #x = torch.normal(mean=0, std=params.σ_x, size=(params.T,)).unsqueeze(1)
    #if Uniform
    if params.x_distribution == "Uniform":
        if params.use_Sobol_T == False:
            x = ((params.x_low - params.x_high) * torch.rand(params.T) + params.x_high).unsqueeze(1)
        else:
            #Very slow if T is large
            x = ((params.x_low - params.x_high) * params.soboleng.draw(params.T) + params.x_high)
    else:
        x = torch.normal(mean=0, std=params.σ_x, size=(params.T,)).unsqueeze(1)

    # randomly drawing 1st realization for shocks    
    if params.e_distribution == "Normal":
        e1 = torch.normal(mean=0, std=params.σ_e, size=(params.T,)).unsqueeze(1)
        # randomly drawing 2nd realization for shocks
        e2 = torch.normal(mean=0, std=params.σ_e, size=(params.T,)).unsqueeze(1)
    elif params.e_distribution == "T":
        m = torch.distributions.studentT.StudentT(torch.tensor([3.0]))
        e1 = m.sample([params.T])
        e2 = m.sample([params.T])
    elif params.e_distribution == "Lognormal":
        # Standard Normal iid shocks
        e1 = torch.normal(mean=0, std=params.σ_e, size=(params.T,)).unsqueeze(1)
        # randomly drawing 2nd realization for shocks
        e2 = torch.normal(mean=0, std=params.σ_e, size=(params.T,)).unsqueeze(1)
        # Transform to lognormlaparams.σ_e
        e1 = np.exp(params.μ_e + params.σ_e * e1)
        e2 = np.exp(params.μ_e + params.σ_e * e2)
    else:
        raise("Distribution unknown.")
    
    # residuals for n random grid points under 2 realizations of shocks
    R1 = Residuals_torch(model, params, x, e1)
    R2 = Residuals_torch(model, params, x, e2)

    # construct all-in-one expectation operator
    R_squared = R1*R2
    
    # V1. give a summary of all the draws:
    return torch.mean(R_squared)

def Residuals_torch(model, params, y: Vector, e_r: Vector):
    # consumption today
    c = model(y)
    LHS = params.u_prime(c)
    
    # implies some investment
    investment = y - c
    # investment scaled by shock
    state_tomorrow = params.f(investment)*e_r
    
    # consumption tomorrow
    # c_tomorrow = c_share_tomorrow*state_tomorrow 
    c_tomorrow = model(state_tomorrow)
    vals = params.u_prime(c_tomorrow) * params.f_prime(investment)*e_r
    RHS = params.β * vals
    #V1
    #R = (RHS - LHS)
    
    #V2: good scaling. Default choice.
    R = (RHS - LHS)/(0.5*RHS + 0.5*LHS)
        
    return R



NameError: name 'Vector' is not defined

## III. bc-MC operator

* Implement the developed formula

$$ \frac{1}{M} \frac{2}{(N)(N-1)} \sum_{m=1}^{M} \sum_{1\leq i < j}^{n} f(s_m, \epsilon_{m}^{(i)})f(s_m, \epsilon_{m}^{(j)})  $$

* This formula can be vectorized as follows:


$$ f' \Big(I_N \otimes U\Big). f $$

In [9]:
def Ξ_torch_MC(model, params): # objective function

    # Monte Carlo outside
    # randomly drawing current states  
    # if Normal
    #x = torch.normal(mean=0, std=params.σ_x, size=(params.M,)).unsqueeze(1)
    if params.x_distribution == "Uniform":
        #if Sobol
        if params.use_Sobol == True:
            x = ((params.x_low - params.x_high) * params.soboleng.draw(params.M) + params.x_high)
        else:
            x = ((params.x_low - params.x_high) * torch.rand(params.M) + params.x_high).unsqueeze(1)
    else:
        x = torch.normal(mean=0, std=params.σ_x, size=(params.M,)).unsqueeze(1)
        
    # repeat elements N times
    #print(x.shape)
    x_repeated = x.repeat_interleave(params.N).unsqueeze(1) #MN*1 matrix
    
    # V1. 
    # Monte Carlo inside
    # N for each value today
    # randomly drawing 1st realization for shocks 
    if params.e_distribution == "Normal":
        e_shock = torch.normal(mean=0, std=params.σ_e, size=(params.MN,)).unsqueeze(1)
    elif params.e_distribution == "T":
        m = torch.distributions.studentT.StudentT(torch.tensor([3.0]))
        e_shock = m.sample([params.MN]).squeeze(1)
    elif params.e_distribution == "Lognormal":
        # Standard Normal iid shocks
        e_shock = torch.normal(mean=0, std=params.σ_e, size=(params.MN,)).unsqueeze(1)
        # Transform to lognormlaparams.σ_e
        e_shock = np.exp(params.μ_e + params.σ_e * e_shock)
    else:
        raise("Distribution unknown.")
        
    #V2 For replicability reasons
    #Gives exactly the  All-in-One for N =2
    #Draw 2 series of shock and reorganize the order of shocks. 
    """
    if params.e_distribution == "Normal":
        e1 = torch.normal(mean=0, std=params.σ_e, size=(params.T,)).unsqueeze(1)
        # randomly drawing 2nd realization for shocks
        e2 = torch.normal(mean=0, std=params.σ_e, size=(params.T,)).unsqueeze(1)
    elif params.e_distribution == "T":
        m = torch.distributions.studentT.StudentT(torch.tensor([3.0]))
        e1 = m.sample([params.T])
        e2 = m.sample([params.T])
    elif params.e_distribution == "Lognormal":
        # Standard Normal iid shocks
        e1 = torch.normal(mean=0, std=params.σ_e, size=(params.T,)).unsqueeze(1)
        # randomly drawing 2nd realization for shocks
        e2 = torch.normal(mean=0, std=params.σ_e, size=(params.T,)).unsqueeze(1)
        # Transform to lognormlaparams.σ_e
        e1 = np.exp(params.μ_e + params.σ_e * e1)
        e2 = np.exp(params.μ_e + params.σ_e * e2)
    else:
        raise("Distribution unknown.")
    
    e_shock = torch.column_stack((e1, e2)).reshape(params.MN, 1) #recombine the 2 series of shocks
    """
    
    # residuals for n random grid points under 2 realizations of shocks
    R1 = Residuals_torch(model, params, x_repeated, e_shock).squeeze(1)
    R_squared = torch.mean((2/((params.M)*(params.N)*(params.N - 1)))*torch.matmul(R1.unsqueeze(1).t(), torch.matmul(params.W_expanded, R1.unsqueeze(1))))
   
    return R_squared 


## Optimal choice of training parameters

### Monte Carlo estimator: optimal choice of M and N


In [1]:
def compute_grad_norm(parameters, norm_type=2.0):
    """
    Compute norm over gradients of model parameters.

    :param parameters:
        the model parameters for gradient norm calculation. Iterable of
        Tensors or single Tensor
    :param norm_type:
        type of p-norm to use

    :returns:
        the computed gradient norm
    """
    if isinstance(parameters, torch.Tensor):
        parameters = [parameters]
    parameters = [p for p in parameters if p is not None and p.grad is not None]
    total_norm = 0
    for p in parameters:
        param_norm = p.grad.data.norm(norm_type)
        total_norm += param_norm.item() ** norm_type
    return total_norm ** (1.0 / norm_type) 

# Calculate the variance of the loss for several choices of M and N
# Start with a fresh model. Then train it for some iterations.
# Then calculate variance of loss
def calculate_variance_loss(params, nb_epoch_opt_M_N, nb_rep, 
                            nb_draws_loss, grid_M, grid_N,
                            norm_chosen = 2.0, freq_accuracy=1000, 
                            calculate_variance_gradient = False,
                            initial_guess = [1.0, 1.0]):
    # To store results
    df_variance_loss = pd.DataFrame()
    print("Norm chosen: {}".format(norm_chosen))
    
    for k in range(0, nb_rep):
        #-----------------------------------------
        # A. Train the model for nb_epoch_opt_M_N
        #-----------------------------------------
        # Initialize a network
        print("Rep {} / {}. Training a model for {} Iterations".format(int(k), nb_rep, nb_epoch_opt_M_N))
        model_opt_M_N = NeuralNetwork().to(device) # New model
        
        # Set initial value
        set_initial_values(model_opt_M_N, initial_guess[0], initial_guess[1])
        
        # Training mode
        model_opt_M_N.train()

        if params.optimizer == "Adam":
            optimizer = torch.optim.Adam(model_opt_M_N.parameters(), lr=params.lr, eps=1e-07, betas=(0.9, 0.999)) 
        elif params.optimizer == "SGD":
            optimizer = torch.optim.SGD(model_opt_M_N.parameters(), params.lr)
        elif params.optimizer == "SWA":
            base_opt = torch.optim.Adam(model_opt_M_N.parameters(), lr=params.lr, eps=1e-07, betas=(0.9, 0.999)) 
            optimizer = SWA(base_opt, swa_start=params.swa_start, swa_freq=params.swa_freq, swa_lr=params.lr)
        else:
            raise("optimizer unknown")

        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=params.freq_gamma)
        list_perc_abs_error_opt_M_N = [] #store abs value percentage error
        list_perc_abs_error_opt_M_N_i = []
        loss_epochs_opt_M_N = torch.zeros(nb_epoch_opt_M_N)

        for i in range(0, nb_epoch_opt_M_N):
            
            optimizer.zero_grad() #clear gradient
            
            loss = Ξ_torch_MC(model_opt_M_N, params) #loss 
            loss_epochs_opt_M_N[[i]] = float(loss.item())

            # Backpropagation
            loss.backward()
            optimizer.step()

            if i % freq_accuracy == 0: #Monitor the predictive power
                # Define the grid
                with torch.no_grad():
                    xvec = params.xvec_test_torch #add a dimension
                    y = model_opt_M_N(xvec) 
                xvec = xvec.detach().numpy()
                y = y.detach().numpy()
                perc_abs_error = 100*(np.abs((y - params.true_function(xvec))/params.true_function(xvec)))
                list_perc_abs_error_opt_M_N.append(np.median(perc_abs_error))
                list_perc_abs_error_opt_M_N_i.append(i)
            if i % 1000 == 0:
                loss, current = float(loss.item()), i
                print(f"loss: {loss:>7f}, median percentage euler error {list_perc_abs_error_opt_M_N[-1]:>7f}, [{current:>5d}/{nb_epoch_opt_M_N:>5d}]")
            if (i % params.freq_scheduler == 0) & (i != 0) & (params.use_scheduler == True):
                scheduler.step()
                print("i : {}. Decreasing learning rate: {}".format(i, scheduler.get_last_lr()))

        if params.optimizer == "SWA":
            optimizer.swap_swa_sgd()
        #--------------------------------------
        # B. Calculate the variance of the loss
        #--------------------------------------
        print("Rep {} / {}. Calculting variance loss for {} Iterations".format(int(k), nb_rep, nb_draws_loss))
    
        model_opt_M_N.eval()
        var_loss = torch.zeros(len(grid_N))
        std_loss = torch.zeros(len(grid_N))
        mean_loss = torch.zeros(len(grid_N))

        # Loop over choice of N and M
        for (ind, (N_chosen, M_chosen)) in enumerate(zip(grid_N.astype('int'), grid_M.astype('int'))):
            # Change M and N
            params_local = MyParams(N_chosen, M_chosen, params.lr, 
                                    params.pre_train_model, params.nb_epochs,
                                    params.order_gauss, params.σ_e, params.use_Sobol, 
                                    params.optimizer)

            with torch.no_grad():        
                Xms = torch.zeros(nb_draws_loss)
                # Loop over realizations of loss function
                for (j_index, j) in enumerate(range(0, nb_draws_loss)):
                    Xms[j] = Ξ_torch_MC(model_opt_M_N, params_local)
                # Calculate mean and variance:
                var_loss[ind] = torch.var(Xms)
                std_loss[ind] = torch.sqrt(var_loss[ind])
                mean_loss[ind] = torch.mean(Xms)
        #-----------------------------------------------
        # C. Calculate variance of the norm the gradient
        #-----------------------------------------------
        print("Rep {} / {}. Calculting variance norm gradient for {} Iterations".format(int(k), nb_rep, nb_draws_loss))
        var_gradient_loss = torch.zeros(len(grid_N )) #variance 
        std_gradient_loss = torch.zeros(len(grid_N )) #standard deviation
        mean_gradient_loss = torch.zeros(len(grid_N ))
        
        if calculate_variance_gradient == True:
            for (ind, (N_chosen, M_chosen)) in enumerate(zip(grid_N.astype('int'), grid_M.astype('int'))):
                
                params_local = MyParams(N_chosen, M_chosen, params.lr, 
                        params.pre_train_model, params.nb_epochs,
                        params.order_gauss, params.σ_e, params.use_Sobol, 
                        params.optimizer)
    
                Xms = torch.zeros(nb_draws_loss)
                # Loop over draws 
                for (j_index, j) in enumerate(range(0, nb_draws_loss)):
                    optimizer.zero_grad() # clear gradient
                    loss = Ξ_torch_MC(model_opt_M_N, params_local)
                    loss.backward() #calculate gradient
                    # Store the norm of the gradient
                    total_norm = compute_grad_norm(model_opt_M_N.parameters(), norm_type=norm_chosen)
                    Xms[j] = total_norm
                var_gradient_loss[ind] = torch.var(Xms)
                std_gradient_loss[ind] = torch.sqrt(var_gradient_loss[ind])
                mean_gradient_loss[ind] = torch.mean(Xms)

        # Save to dataframe
        if df_variance_loss.empty == True:
            df_variance_loss = pd.DataFrame({'N': grid_N,
                                  'M': grid_M,
                                  'var_loss': var_loss,
                                  'std_loss': std_loss,
                                  'mean_loss': mean_loss,
                                  'var_gradient_loss': var_gradient_loss,
                                  'std_gradient_loss': std_gradient_loss,
                                  'mean_gradient_loss': mean_gradient_loss,
                                  'nb_rep': k})
        else:
            df_variance_loss_bis = pd.DataFrame({'N': grid_N,
                                  'M': grid_M,
                                  'var_loss': var_loss,
                                  'std_loss': std_loss,
                                  'mean_loss': mean_loss,
                                  'var_gradient_loss': var_gradient_loss,
                                  'std_gradient_loss': std_gradient_loss,
                                  'mean_gradient_loss': mean_gradient_loss,
                                  'nb_rep': k})
            df_variance_loss = pd.concat([df_variance_loss, df_variance_loss_bis])
        
    # Replace NAN by large values. Need to penalize non convergence of gradient descent.
    list_cols = ["var_loss", "std_loss", "mean_loss", 
                "std_gradient_loss", "var_gradient_loss", "mean_gradient_loss"]
    
    for col in list_cols:
        df_variance_loss[col] = df_variance_loss[col].fillna(np.nanmax(df_variance_loss[col]))
    
    # Stats on df_var
    # Median values
    df_variance_loss_median = df_variance_loss.groupby('M').median().reset_index()

    for col in list_cols:
        df_variance_loss_median["min_" + col] = df_variance_loss.groupby('M')[col].min().reset_index()[col]                     
        df_variance_loss_median["max_" + col] = df_variance_loss.groupby('M')[col].max().reset_index()[col]
        df_variance_loss_median["std_" + col] = df_variance_loss.groupby('M')[col].std().reset_index()[col]
        for qq in [10, 25, 50, 75, 90]:
            df_variance_loss_median["P" + str(qq) + "_" + col] = df_variance_loss.groupby('M')[col].quantile(qq/100).reset_index()[col]
    

    return df_variance_loss, df_variance_loss_median


In [1]:
#Given a model, calculate the current variance of the loss
def calculate_variance_loss_model(model, params, nb_draws_loss):
    model.eval() #eval mode
    with torch.no_grad():        
        Xms = torch.zeros(nb_draws_loss)
        # Loop over realizations of loss function
        for (j_index, j) in enumerate(range(0, nb_draws_loss)):
            Xms[j] = Ξ_torch_MC(model, params)
        # Calculate mean and variance:
        var_loss = torch.var(Xms)
        mean_loss = torch.mean(Xms)
    train() #train mode
    return var_loss, mean_loss

SyntaxError: invalid syntax (3925264104.py, line 2)

In [None]:
def create_optimizer(model, optimizer_name, lr, momentum):
    """
    Function to create an optimizer
    """
    if optimizer_name == "Adam":
        optimizer = torch.optim.Adam(model.parameters(), lr=lr, eps=1e-07, betas=(0.9, 0.999)) 
    elif optimizer_name == "SGD":
        optimizer = torch.optim.SGD(model.parameters(), lr)
    elif optimizer_name == "SGD-momentum":
        optimizer = torch.optim.SGD(model.parameters(), lr, momentum)
    elif optimizer_name == "Adadelta":
        optimizer = torch.optim.Adadelta(model.parameters(), lr)
    elif optimizer_name == "RMSprop":
        optimizer = torch.optim.RMSprop(model.parameters(), lr)
    else:
        raise("optimizer unknown")
    return optimizer

def set_initial_values(model, w, b):
    """
    Function to analyse the weigth and bias to certain values
    """
    with torch.no_grad():
        for name, param in model.named_parameters():
            if 'linear_relu_stack.0.weight' in name:
                param.copy_(torch.tensor([w]))
            elif 'linear_relu_stack.0.bias' in name:
                param.copy_(torch.tensor([b]))

In [33]:
# Solve a model several times, holding initial parameters constant
def calculate_several_runs(params, nb_rep, freq_loss = 1, initial_guess = [1.0, 1.0]):
    #initialization of lists
    list_elapsed_time = []
    list_M = []
    list_N = []
    list_list_losses = [] 
    list_list_perc_abs_error_MC = [] 
    list_list_perc_abs_error_MC_i = [] 
    list_list_perc_abs_error_MC_loss = [] 
    list_lr = []
    list_index_rep = []
    list_list_beta = [] #store coefficients
    list_list_abs_perc_dev_bias = [] #absolute percentage deviation from true bias 
    list_list_abs_perc_dev_weight = [] #absolute percentage deviation from true weight
            
    # A.
    # Run seral times
    for k in range(0, nb_rep):
            print("Training model : {} / {}".format(int(k), nb_rep))
            list_index_rep.append(k)
            list_M.append(params.M)
            list_N.append(params.N)
            list_lr.append(params.lr)

            #-----------------------------------------------------
            # Train a model for some periods
            # Then look at the expected variance of the gradient
            #-----------------------------------------------------
            # Initialize a network
            model_MC = NeuralNetwork().to(device) 

            # Set initial value
            set_initial_values(model_MC, initial_guess[0], initial_guess[1])
        
            if params.optimizer == "Adam":
                optimizer_MC = torch.optim.Adam(model_MC.parameters(), lr=params.lr, eps=1e-07, betas=(0.9, 0.999)) 
            elif params.optimizer == "SGD":
                optimizer_MC = torch.optim.SGD(model_MC.parameters(), params.lr)
            elif params.optimizer == "SWA":
                base_opt_MC = torch.optim.Adam(model_MC.parameters(), lr=params.lr, eps=1e-07, betas=(0.9, 0.999)) 
                optimizer_MC = SWA(base_opt_MC, swa_start=params.swa_start, swa_freq=params.swa_freq, swa_lr=params.lr)
            else:
                raise("optimizer unknown")

            scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer_MC, gamma=params.freq_gamma)
            loss_epochs_MC = torch.zeros(params.nb_epochs)
            list_perc_abs_error_MC = [] #store abs value percentage error
            list_perc_abs_error_MC_i = [] #store index i
            list_perc_abs_error_MC_loss = [] #store loss
            list_beta = [] #store coefficients
            list_abs_perc_dev_bias = [] #np.zeros(params.nb_epochs) #absolute percentage deviation from true bias 
            list_abs_perc_dev_weight = [] #np.zeros(params.nb_epochs) #absolute percentage deviation from true weight
            
            for i in range(0, params.nb_epochs):
                
                optimizer_MC.zero_grad() #clear gradient
                
                loss = Ξ_torch_MC(model_MC, params) #calculate loss
                loss_epochs_MC[[i]] = float(loss.item())
                
                # Store coefficients
                if i % freq_loss == 0:
                    with torch.no_grad():
                        # Extract weight and bias
                        b_current = np.array([k.item() for k in model_MC.parameters()])
                        b_current_ordered = np.array((b_current[1], b_current[0])) #reorder (bias, weight)
                    list_beta.append(b_current_ordered)

                    # Calculate the absolute percentage deviation from true value:
                    ## Bias: b_current_ordered[0]
                    with torch.no_grad():
                        dev_b = 100*(np.abs((b_current_ordered[0] - params.true_bias)/params.true_bias))
                        #list_abs_perc_dev_bias[i] = dev_b #absolute percentage deviation from true bias 
                        list_abs_perc_dev_bias.append(dev_b[0])
                        ## Cannot divide by 0
                        dev_w = 100*(np.abs((b_current_ordered[1] - params.true_weight)))
                        #list_abs_perc_dev_weight[i] = dev_w #absolute percentage deviation from true weight
                        list_abs_perc_dev_weight.append(dev_w)
                        
                # Backpropagation
                loss.backward()
                
                # Gradient descent step
                optimizer_MC.step()

                if i % freq_loss == 0: #Monitor the predictive power
                    # Define the grid
                    with torch.no_grad():
                        xvec = params.xvec_test_torch #add a dimension
                        y_MC = model_MC(xvec)
                    xvec = xvec.detach().numpy()
                    y_MC = y_MC.detach().numpy()
                    perc_abs_error_MC = 100*(np.abs((y_MC - params.true_function(xvec))/params.true_function(xvec)))
                    list_perc_abs_error_MC.append(np.median(perc_abs_error_MC))
                    list_perc_abs_error_MC_i.append(i)
                    list_perc_abs_error_MC_loss.append(float(loss.item()))
                if i % 1000 == 0:
                    loss, current = float(loss.item()), i
                    print(f"loss: {loss:>7f} [{current:>5d}/{params.nb_epochs:>5d}]")
                if (i % params.freq_scheduler == 0) & (i != 0) & (params.use_scheduler == True):
                    scheduler.step()
                    print("i : {}. Decreasing learning rate: {}".format(i, scheduler.get_last_lr()))
                    print(f"loss: {loss:>7f}, median percentage error {list_perc_abs_error_MC[-1]:>7f}, [{current:>5d}/{params.nb_epochs:>5d}]")


            list_list_losses.append(list_perc_abs_error_MC_loss) #list_list_losses.append(loss_epochs_MC)
            list_list_perc_abs_error_MC.append(list_perc_abs_error_MC)
            list_list_perc_abs_error_MC_i.append(list_perc_abs_error_MC_i)
            list_list_perc_abs_error_MC_loss.append(list_perc_abs_error_MC_loss)
            list_list_beta.append(list_beta)
            list_list_abs_perc_dev_bias.append(list_abs_perc_dev_bias) #absolute percentage deviation from true bias 
            list_list_abs_perc_dev_weight.append(list_abs_perc_dev_weight) #absolute percentage deviation from true weight
       
            if params.optimizer == "SWA":
                optimizer_MC.swap_swa_sgd()
                
    # B. Compile results
    #-------------------
    df_MC = pd.DataFrame({'M': list_M[0],
                        'N': list_N[0],
                        'repetition': list_index_rep[0],
                        'iter': list_list_perc_abs_error_MC_i[0],
                        'loss': np.sqrt(np.abs(list_list_losses[0])),
                        'med_percentage_error': list_list_perc_abs_error_MC[0],
                        'intercept': np.array(list_list_beta[0])[:,0],
                        'slope': np.array(list_list_beta[0])[:,1],
                        'abs_perc_dev_bias': list_list_abs_perc_dev_bias[0], #absolute percentage deviation from true bias 
                        'abs_perc_dev_weight': list_list_abs_perc_dev_weight[0] #absolute percentage deviation from true weight
       })

    for k in range(0, len(list_list_perc_abs_error_MC)):
        df_MC_bis = pd.DataFrame({'M': list_M[k],
                                'N': list_N[k],
                                'repetition': list_index_rep[k],
                                'iter': list_list_perc_abs_error_MC_i[0],
                                'loss': np.sqrt(np.abs(list_list_losses[k])),
                                'med_percentage_error': list_list_perc_abs_error_MC[k],
                                'intercept': np.array(list_list_beta[k])[:,0],
                                'slope': np.array(list_list_beta[k])[:,1],
                                'abs_perc_dev_bias': list_list_abs_perc_dev_bias[k], #absolute percentage deviation from true bias 
                                'abs_perc_dev_weight': list_list_abs_perc_dev_weight[k]}) #absolute percentage deviation from true weight)
        df_MC = pd.concat([df_MC, df_MC_bis])
    
    # Replace NAN by large values
    for col in ["loss", "med_percentage_error", "intercept", "slope"]:
        df_MC[col] = df_MC[col].fillna(np.nanmax(df_MC[col]))
    
    # C. Statistics on results
    df_MC_average = df_MC.groupby('iter').mean().reset_index() #mean value by iteration


    list_cols = ["loss", "med_percentage_error", 
                 "intercept", "slope",
                 "abs_perc_dev_bias", "abs_perc_dev_weight"]
    
    for col in list_cols:
        df_MC_average["min_" + col] = df_MC.groupby('iter')[col].min().reset_index()[col]                     
        df_MC_average["max_" + col] = df_MC.groupby('iter')[col].max().reset_index()[col]
        df_MC_average["std_" + col] = df_MC.groupby('iter')[col].std().reset_index()[col]
        for qq in [1, 5, 10, 25, 50, 75, 90, 95, 99]:
            df_MC_average["P" + str(qq) + "_" + col] = df_MC.groupby('iter')[col].quantile(qq/100).reset_index()[col]

    # Add extra info
    df_MC_average['optimizer'] = params.optimizer
    df_MC_average['sigma_e'] = params.σ_e
    df_MC_average['Sobol'] = params.use_Sobol
    df_MC_average['user_scheduler'] = params.use_scheduler
    df_MC_average['gamma_scheduler'] = params.freq_gamma
    df_MC_average['lr'] = params.lr
    
    return df_MC, df_MC_average

In [1]:
def calculate_variance_loss_fast(params, model, nb_draws, grid_M, grid_N):
    """
    Calculate variance of the loss using proposition 4
    Use four independent shocks 
    """
    grid_T = torch.tensor(grid_M*grid_N/2)
    grid_N = torch.tensor(grid_N)
    
    # Calculate variance and covariance
    with torch.no_grad(): 
        # V1
        if params.x_distribution == "Uniform":
            if params.use_Sobol_T == False:
                x = ((params.x_low - params.x_high) * torch.rand(nb_draws) + params.x_high).unsqueeze(1)
            else:
                #Very slow if T is large
                x = ((params.x_low - params.x_high) * params.soboleng.draw(nb_draws) + params.x_high)
        else:
            x = torch.normal(mean=0, std=params.σ_x, size=(nb_draws,)).unsqueeze(1)

        if params.e_distribution == "Normal":
            e1 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e2 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e3 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e4 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
        elif params.e_distribution == "T":
            m = torch.distributions.studentT.StudentT(torch.tensor([3.0]))
            e1 = m.sample([nb_draws]).squeeze(1)
            e2 = m.sample([nb_draws]).squeeze(1)
            e3 = m.sample([nb_draws]).squeeze(1)
            e4 = m.sample([nb_draws]).squeeze(1)
        elif params.e_distribution == "Lognormal":
            # Standard Normal iid shocks
            e1 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e2 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e3 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e4 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)

            # Transform to lognormlaparams.σ_e
            e1 = np.exp(params.μ_e + params.σ_e * e1)
            e2 = np.exp(params.μ_e + params.σ_e * e2)
            e3 = np.exp(params.μ_e + params.σ_e * e4)
            e4 = np.exp(params.μ_e + params.σ_e * e4)
        else:
            raise("Distribution unknown.")

        # residuals for n random grid points under 2 realizations of shocks
        R1 = Residuals_torch(model, params, x, e1)
        R2 = Residuals_torch(model, params, x, e2)
        R3 = Residuals_torch(model, params, x, e3)
        R4 = Residuals_torch(model, params, x, e4)

        # Construct combinations
        R1_R2 = R1*R2
        R1_R3 = R1*R3
        R1_R4 = R1*R4
        R2_R3 = R2*R3
        R2_R4 = R2*R4
        R3_R4 = R3*R4

        #var_R1 = torch.var(R1)

        # Variance cross
        var_R1_R2 = torch.var(R1_R2)

        # Co-variances with one shared element
        cov_R1R2_R1R3 = torch.cov(torch.column_stack((R1_R2, R1_R3)).T)[0,1]

        # Covariances with four different terms
        cov_R1R2_R3R4 = torch.cov(torch.column_stack((R1_R2, R3_R4)).T)[0,1]

        var_L = (1/(grid_T*(grid_N - 1)))*(var_R1_R2 + 2*(grid_N - 2)*(cov_R1R2_R1R3) + 2*((grid_N*(grid_N - 1)/4) - grid_N + 3/2)*(cov_R1R2_R3R4))

    return var_L

In [None]:
def calculate_variance_loss_fast_2(params, model, nb_draws, grid_M, grid_N):
    """
    Calculate variance of the loss using proposition 4
    Use the four shocks to calculate more accurate values
    """
    grid_T = torch.tensor(grid_M*grid_N/2)
    grid_N = torch.tensor(grid_N)
    
    # Calculate variance and covariance
    with torch.no_grad(): 
        # V1
        if params.x_distribution == "Uniform":
            if params.use_Sobol_T == False:
                x = ((params.x_low - params.x_high) * torch.rand(nb_draws) + params.x_high).unsqueeze(1)
            else:
                #Very slow if T is large
                x = ((params.x_low - params.x_high) * params.soboleng.draw(nb_draws) + params.x_high)
        else:
            x = torch.normal(mean=0, std=params.σ_x, size=(nb_draws,)).unsqueeze(1)

        if params.e_distribution == "Normal":
            e1 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e2 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e3 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e4 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
        elif params.e_distribution == "T":
            m = torch.distributions.studentT.StudentT(torch.tensor([3.0]))
            e1 = m.sample([nb_draws]).squeeze(1)
            e2 = m.sample([nb_draws]).squeeze(1)
            e3 = m.sample([nb_draws]).squeeze(1)
            e4 = m.sample([nb_draws]).squeeze(1)
        elif params.e_distribution == "Lognormal":
            # Standard Normal iid shocks
            e1 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e2 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e3 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e4 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)

            # Transform to lognormlaparams.σ_e
            e1 = np.exp(params.μ_e + params.σ_e * e1)
            e2 = np.exp(params.μ_e + params.σ_e * e2)
            e3 = np.exp(params.μ_e + params.σ_e * e4)
            e4 = np.exp(params.μ_e + params.σ_e * e4)
        else:
            raise("Distribution unknown.")

        # residuals for n random grid points under 2 realizations of shocks
        R1 = Residuals_torch(model, params, x, e1)
        R2 = Residuals_torch(model, params, x, e2)
        R3 = Residuals_torch(model, params, x, e3)
        R4 = Residuals_torch(model, params, x, e4)

        # Construct combinations
        R1_R2 = R1*R2
        R1_R3 = R1*R3
        R1_R4 = R1*R4
        R2_R3 = R2*R3
        R2_R4 = R2*R4
        R3_R4 = R3*R4

        #var_R1 = torch.var(R1)

        # Variance cross
        var_R1_R2 = torch.var(R1_R2)
        var_R1_R3 = torch.var(R1_R3)
        var_R1_R4 = torch.var(R1_R4)
        var_R2_R3 = torch.var(R2_R3)
        var_R2_R4 = torch.var(R2_R4)
        var_R3_R4 = torch.var(R3_R4)

        mean_var_R1_R2 = (1/6)*(var_R1_R2 + var_R1_R3 + var_R1_R4 + var_R2_R3 + var_R2_R4 +  var_R3_R4)

        # Co-variances with one shared element
        cov_R1R2_R1R3 = torch.cov(torch.column_stack((R1_R2, R1_R3)).T)[0,1]
        cov_R1R2_R1R4 = torch.cov(torch.column_stack((R1_R2, R1_R4)).T)[0,1]
        cov_R1R2_R2R3 = torch.cov(torch.column_stack((R1_R2, R2_R3)).T)[0,1]
        cov_R1R2_R2R4 = torch.cov(torch.column_stack((R1_R2, R2_R4)).T)[0,1]
        cov_R1R3_R3R4 = torch.cov(torch.column_stack((R1_R3, R3_R4)).T)[0,1]

        mean_cov_R1R2_R1R3 = (1/5)*(cov_R1R2_R1R3 + cov_R1R2_R1R4 + cov_R1R2_R2R3 + cov_R1R2_R2R4 + cov_R1R3_R3R4)

        # Covariances with four different terms
        cov_R1R2_R3R4 = torch.cov(torch.column_stack((R1_R2, R3_R4)).T)[0,1]

        var_L = (1/(grid_T*(grid_N - 1)))*(mean_var_R1_R2 + 2*(grid_N - 2)*(mean_cov_R1R2_R1R3) + 2*((grid_N*(grid_N - 1)/4) - grid_N + 3/2)*(cov_R1R2_R3R4))

    return var_L

In [1]:
def calculate_variance_loss_fast_3(params, model, nb_draws, grid_M, grid_N):
    """
    Calculate variance of the loss using proposition 4
    Use the 8 shocks to calculate more accurate values.
    More costly, but potentially more accurate (?)
    
    """
    grid_T = torch.tensor(grid_M*grid_N/2)
    grid_N = torch.tensor(grid_N)
    
    # Calculate variance and covariance
    with torch.no_grad(): 
        # V1
        if params.x_distribution == "Uniform":
            if params.use_Sobol_T == False:
                x = ((params.x_low - params.x_high) * torch.rand(nb_draws) + params.x_high).unsqueeze(1)
            else:
                #Very slow if T is large
                x = ((params.x_low - params.x_high) * params.soboleng.draw(nb_draws) + params.x_high)
        else:
            x = torch.normal(mean=0, std=params.σ_x, size=(nb_draws,)).unsqueeze(1)

        if params.e_distribution == "Normal":
            e1 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e2 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e3 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e4 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e5 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e6 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e7 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e8 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
        elif params.e_distribution == "T":
            m = torch.distributions.studentT.StudentT(torch.tensor([3.0]))
            e1 = m.sample([nb_draws]).squeeze(1)
            e2 = m.sample([nb_draws]).squeeze(1)
            e3 = m.sample([nb_draws]).squeeze(1)
            e4 = m.sample([nb_draws]).squeeze(1)
            e5 = m.sample([nb_draws]).squeeze(1)
            e6 = m.sample([nb_draws]).squeeze(1)
            e7 = m.sample([nb_draws]).squeeze(1)
            e8 = m.sample([nb_draws]).squeeze(1)
        elif params.e_distribution == "Lognormal":
            # Standard Normal iid shocks
            e1 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e2 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e3 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e4 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e5 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e6 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e7 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            e8 = torch.normal(mean=0, std=params.σ_e, size=(nb_draws,)).unsqueeze(1)
            # Transform to lognormlaparams.σ_e
            e1 = np.exp(params.μ_e + params.σ_e * e1)
            e2 = np.exp(params.μ_e + params.σ_e * e2)
            e3 = np.exp(params.μ_e + params.σ_e * e4)
            e4 = np.exp(params.μ_e + params.σ_e * e4)
            e5 = np.exp(params.μ_e + params.σ_e * e5)
            e6 = np.exp(params.μ_e + params.σ_e * e6)
            e7 = np.exp(params.μ_e + params.σ_e * e7)
            e8 = np.exp(params.μ_e + params.σ_e * e8)
        else:
            raise("Distribution unknown.")

        # residuals 
        R1 = Residuals_torch(model, params, x, e1)
        R2 = Residuals_torch(model, params, x, e2)
        R3 = Residuals_torch(model, params, x, e3)
        R4 = Residuals_torch(model, params, x, e4)
        R5 = Residuals_torch(model, params, x, e5)
        R6 = Residuals_torch(model, params, x, e6)
        R7 = Residuals_torch(model, params, x, e7)
        R8 = Residuals_torch(model, params, x, e8)
        
        # Construct combinations
        R1_R2 = R1*R2
        R1_R3 = R1*R3
        R1_R4 = R1*R4
        R2_R3 = R2*R3
        R2_R4 = R2*R4
        R3_R4 = R3*R4
        
        R5_R6 = R5*R6
        R7_R8 = R7*R8

        # Variance cross
        var_R1_R2 = torch.var(R1_R2)
        var_R1_R3 = torch.var(R1_R3)
        var_R1_R4 = torch.var(R1_R4)
        var_R2_R3 = torch.var(R2_R3)
        var_R2_R4 = torch.var(R2_R4)
        var_R3_R4 = torch.var(R3_R4)

        mean_var_R1_R2 = (1/6)*(var_R1_R2 + var_R1_R3 + var_R1_R4 + var_R2_R3 + var_R2_R4 +  var_R3_R4)

        # Co-variances with one shared element
        cov_R1R2_R1R3 = torch.cov(torch.column_stack((R1_R2, R1_R3)).T)[0,1]
        cov_R1R2_R1R4 = torch.cov(torch.column_stack((R1_R2, R1_R4)).T)[0,1]
        cov_R1R2_R2R3 = torch.cov(torch.column_stack((R1_R2, R2_R3)).T)[0,1]
        cov_R1R2_R2R4 = torch.cov(torch.column_stack((R1_R2, R2_R4)).T)[0,1]
        cov_R1R3_R3R4 = torch.cov(torch.column_stack((R1_R3, R3_R4)).T)[0,1]

        mean_cov_R1R2_R1R3 = (1/5)*(cov_R1R2_R1R3 + cov_R1R2_R1R4 + cov_R1R2_R2R3 + cov_R1R2_R2R4 + cov_R1R3_R3R4)

        # Covariances with four different terms
        cov_R1R2_R3R4 = torch.cov(torch.column_stack((R1_R2, R3_R4)).T)[0,1]
        cov_R5R6_R7R8 = torch.cov(torch.column_stack((R5_R6, R7_R8)).T)[0,1]
        mean_cov_4_terms = 0.5*(cov_R1R2_R3R4 + cov_R5R6_R7R8)
            
        var_L = (1/(grid_T*(grid_N - 1)))*(mean_var_R1_R2 + 2*(grid_N - 2)*(mean_cov_R1R2_R1R3) + 2*((grid_N*(grid_N - 1)/4) - grid_N + 3/2)*(mean_cov_4_terms))

    return var_L

In [None]:
def numpy_flat(a):
    """
    Function to flatten a list
    """
    return list(np.array(a).flat)