In [57]:
import numpy as np
import matplotlib.pyplot as plt
from random import random
from numpy.random import choice

from scipy.stats import norm
from scipy.optimize import minimize
from scipy.signal import fftconvolve

from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import ShuffleSplit, GridSearchCV
from sklearn.manifold import TSNE

import functools
import itertools
from tqdm import tqdm
import copy

In [58]:
import import_ipynb
from yield_strength_model_dual_phase_steels import *
from PlotFunctions import *

The forest is built up manually by seperate trees

In [59]:
def RF(n_trees):
    """
    Creates Random forest model.
    ----------
    n_trees: integer.
        Number of trees.
    Returns
    -------
    trees: array
        Array of tree object instances.
    """
    # Creates n untrained trees to create RF model
    ccp_value = 0
    trees = [DecisionTreeRegressor(max_depth=10, max_features='sqrt', min_samples_split=2, min_samples_leaf=1, ccp_alpha=ccp_value, random_state=None) for i in range(n_trees)]
    return trees


Each tree is trained in a 'Random forest' fashion

In [60]:
def trainRF(x_matrix, y_matrix, trees, Y_range=range(2,4)):
    """
    Trains Random forest.
    ----------
    x_matrix: data points x input dim np.array
        Input data.
    y_matrix: data points x output dim np.array
        Output data.
    trees: array
        All tree objects.
    Y_range: range
        Range of relevant output values.
    Returns
    -------
    samples: n_trees x data points np.array
        Matrix contains the data samples each tree has been trained on.
    trained_trees: array
        Array of trained tree objects
    scalerX: scaler object
        Scaler for input.
    """
    
    # Constants
    n_trees = len(trees)
    n_samples = x_matrix.shape[0]

    # Create and fit scaler to training data
    scalerX = StandardScaler(with_mean=True, with_std=True)
    scalerX.fit(x_matrix)
    X = scalerX.transform(x_matrix)
    Y = y_matrix[:,Y_range]

    # Bootstrap samples
    samples = np.zeros([n_trees, n_samples])
    for i in range(n_trees):
        samples[i,:] = np.random.choice(n_samples, n_samples, replace=True)
    samples = samples.astype(int)
 
    # Train a separate tree on each bootstrapped sample
    trained_trees = [trees[i].fit(X[samples[i,:]], Y[samples[i,:]]) for i in range(n_trees)]    

    return samples, trained_trees, scalerX

Weighted vote

In [61]:
def get_weights(x_matrix, X_test, samples, scalerX, trees):
    """
    Trains Random forest.
    ----------
    x_matrix: np.array
        Input data.
    X_test: np.array
        Candidates to predict.
    samples: np.array
        Bootstrap samples.
    scalerX: scaler object
        Scaling object.
    trees: object
        Tree objects.
    Returns
    -------
    norm_weights: np.array
        Normalized weights used for possibly improved voting.
    """
    n_trees = samples.shape[0]
    n_candidates = X_test.shape[0]
    x_matrix = scalerX.transform(x_matrix)
    X_test = scalerX.transform(X_test)

    mean_tree = np.mean(x_matrix[samples,:], axis=1)

    tree_predictions, RF_prediction, mean_tree = predict(mean_tree, trees, scalerX, weights=[0])

    # The distance in a TSNE projection is used to determine how close each tree is to a new candidate material.
    tsne = TSNE(n_components=2, init='pca', learning_rate=200.0)
    meanTsne = tsne.fit_transform(mean_tree)
    testTsne = tsne.fit_transform(X_test)

    output_size = trees[0].predict(X_test).shape[1]

    weights = np.zeros([n_trees, n_candidates, output_size])
    for i in range(n_trees):
        weights[i,:,:] = np.tile(1/np.linalg.norm((meanTsne[i] - testTsne), axis=1), (output_size, 1)).T

    norm_weights = weights/np.sum(weights, axis=0)

    return norm_weights

Based on the created trees and test data a prediction is made

In [62]:
def predict(X_test, trees, scalerX, weights):
    """
    Makes predictions based on tree objects and X_test data.
    ----------
    X_test: candidate data points x input dim np.array.
        Input data for candidate materials.
    trees: array
        All tree objects.
    scalerX: scaler object.
        Scaler for input.
    Returns
    -------
    tree_predictions: n_trees x n_candidates
        Predictions for each data point made by each tree.
    RF_prediction:
        Prediction made by Random forest. 
    X_test: candidate data points x input dim np.array.
        Input data for candidate materials.
    """

    # Constants
    n_candidates = X_test.shape[0]
    n_trees = len(trees)
    
    # Transform test data using scalar object
    X_test = scalerX.transform(X_test)

    # Size of prediction
    output_size = trees[0].predict(X_test).shape[1]

    # Make predictions using each tree. 3D numpy-array on format tree_predictions[tree number, candidate point, property (yield strength, elongation)]
    tree_predictions = np.zeros([n_trees, n_candidates, output_size])
    for i, tree in enumerate(trees):
        tree_predictions[i,:,:] = tree.predict(X_test)

    if np.sum(weights) == 0:
        RF_prediction = np.average(tree_predictions, axis=0)
    else:
        RF_prediction = np.average(tree_predictions, axis=0, weights=weights)

    # Inverse transform of data
    X_test = scalerX.inverse_transform(X_test)

    return tree_predictions, RF_prediction, X_test

Collect all neccesary functions to create new forest instance and make prediction

In [63]:
def RandomForestAll(n_trees, x_matrix, y_matrix, n_candidates, weighted = False, Y_range=range(2,4)):
    """
    Creates RF model and produces predictions on candidate materials.
    Parameters
    ----------
    n_trees: integer
        Number of trees in model.
    x_matrix: data points x input dim np.array
        Input data.
    y_matrix: data points x output dim np.array
        Output data.
    n_candidates: integer.
        Number of candidate materials to be produced.
    Y_range: range
        Range of relevant output values.
    Returns
    -------
    samples: n_trees x data points np.array
        Matrix contains the data samples each tree has been trained on.
    tree_predictions: n_trees x n_candidates
        Predictions for each data point made by each tree.
    RF_prediction:
        Prediction made by Random forest.
    trees: array of object instances.
        All tree objects. 
    X_test: candidate data points x input dim np.array.
        Input data for candidate materials.
    Y_test: candidate data points x output dim np.array
        Output data for candidate materials.
    scalerX: scaler object.
        Scaler for input.
    """
    
    weights = np.zeros([1])
    trees = RF(n_trees)
    samples, trained_trees, scalerX = trainRF(x_matrix, y_matrix, trees, Y_range=range(2,4))
    X_test, Y_test = data_generation(n = n_candidates, seed = 0)

    if weighted:
        weights = get_weights(x_matrix, X_test, samples, scalerX, trained_trees)
    tree_predictions, RF_prediction, X_test = predict(X_test, trained_trees, scalerX, weights)

    return samples, tree_predictions, RF_prediction, trained_trees, X_test, Y_test, scalerX

# Uncertainty measure

Uncertainty measure inspired by 'Confidence Intervals for Random Forests: The Jackknife and the Infinitesimal Jackknife' with correction of negative variance measures

In [64]:
def uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction, dim):
    """
    Calculates the variance estimates of the predictions. Base of the uncertainty measure.
    Parameters
    ----------
    samples: n_trees x data points np.array
        Matrix contains the data samples each tree has been trained on.
    tree_predictions: n_trees x n_candidates
        Predictions for each data point made by each tree.
    RF_prediction:
        Prediction made by Random forest.
    trees: array
        All tree objects.
    correcction: Bool
        Decides if correction should be used.
    dim: integer 0/1
        Sets which dimension of the uncertainty measure should be corrected.
    Returns
    -------
    An array of variance estimates
    """

    n_samples = samples.shape[1]
    n_trees = tree_predictions.shape[0]
    
    # Create n_ji matrix showing on what each tree was trained
    n_ji = np.zeros([n_trees, n_samples])
    for j in range(n_trees):
        for i, sample in enumerate(samples[j,:]):
            n_ji[j,sample] += 1

    # Expected value of n_ji
    E_nji = 1
       
    # Individual tree predictions and forest prediction
    t_jx = tree_predictions
    E_tjx = RF_prediction

    V_IJ = np.sum((np.dot(np.transpose(t_jx - E_tjx), (n_ji - E_nji))/n_trees)**2, axis=2).T
    
    # Bias correction
    T = n_trees
    n = n_samples
    v = np.var(tree_predictions, axis=0)
    bias_corr = (n*v/T)

    # Total result
    V_IJ_unbiased = V_IJ - bias_corr
    V_IJ_unbiased = V_IJ_unbiased[:,dim]

    
    if correction:
        # Calibration is a correction for converging quicker to the case of infinite n_estimators,
        # as presented in Wager (2014) http://jmlr.org/papers/v15/wager14a.html
        calibration_ratio = 2
        n_sample = np.ceil(n_trees / calibration_ratio)

        new_trees = [copy.deepcopy(trees[i]) for i in range(len(trees))]

        random_idx = np.random.permutation(len(new_trees))[: int(n_sample)]
        new_trees = np.array(new_trees)

        new_samples = samples[random_idx]
        new_tree_predictions = tree_predictions[random_idx]
        new_RF_prediction = np.mean(new_tree_predictions, axis=0)
        
        reference_varince = uncertainty_measure_CI(new_samples, new_tree_predictions, new_RF_prediction, trees, correction = False, dim = dim)

        # Use this second set of variance estimates to estimate scale of Monte Carlo noise
        sigma2_ss = np.mean((reference_varince - V_IJ_unbiased) ** 2)

        delta = n_sample / n_trees
        sigma2 = (delta ** 2 + (1 - delta) ** 2) / (2 * (1 - delta) ** 2) * sigma2_ss

        # Use Monte Carlo noise scale estimate for empirical Bayes calibration
        V_IJ_calibrated = calibrateEB(V_IJ_unbiased, sigma2)

        return V_IJ_calibrated
    else:
        return V_IJ_unbiased       
        

Start of calibration to mitigate negative uncertainty estimates

In [65]:
def calibrateEB(variances, sigma2):
    """
    Calibrate noisy variance estimates with empirical Bayes.
    Parameters
    ----------
    vars: ndarray
        List of variance estimates.
    sigma2: int
        Estimate of the Monte Carlo noise in vars.
    Returns
    -------
    An array of the calibrated variance estimates
    """

    # General error check
    if (sigma2 <= 0 or min(variances) == max(variances)):
        return(np.maximum(variances, 0)) 
    sigma = np.sqrt(sigma2)
    # calculated prior g
    eb_prior = gfit(variances, sigma)

    # Set up a partial execution of the function
    part = functools.partial(gbayes, g_est=eb_prior, sigma=sigma)
    
    # Calculates posterior distribution given all instances of the calculated variance 
    if len(variances) >= 200:
        # Interpolate to speed up computations:
        calib_x = np.percentile(variances,
                                np.arange(0, 102, 2))
        calib_y = list(map(part, calib_x))
        calib_all = np.interp(variances, calib_x, calib_y)
    else:
        calib_all = list(map(part, variances))

    return np.asarray(calib_all)

In [66]:
def gbayes(x0, g_est, sigma):
    """
    Estimate Bayes posterior with Gaussian noise [Efron2014]_.
    Parameters
    ----------
    x0: ndarray
        an observation
    g_est: float
        a prior density, as returned by gfit
    sigma: int
        noise estimate
    Returns
    -------
    An array of the posterior estimate E[mu | x0]
    """

    Kx = norm().pdf((g_est[0] - x0) / sigma) # Likelihood Pr(data|g_est[0])
    # Bayes theorem
    post = Kx * g_est[1] # Likelihood * prior
    post /= sum(post) # Divide by evidence
    
    return sum(post * g_est[0]) # Returns expected value E[mu|x0]

In [67]:
def gfit(X, sigma, q=5, nbin=200, unif_fraction=0.1):
    """
    Fit empirical Bayes prior in the hierarchical model [Efron2014]_.
    .. math::
        theta ~ G, X ~ N(theta, sigma^2)
    Parameters
    ----------
    X: ndarray
        A 1D array of observations.
    sigma: float
        Noise estimate on X.
    q: int
        Number of parameters used to fit G. Default: 5
    nbin: int
        Number of bins used for discrete approximation.
        Default: 200
    unif_fraction: float
        Fraction of G modeled as "slab". Default: 0.1
    Returns
    -------
    An array of the posterior density estimate g.
    """
    
    # Create interval in which real variance parameter could be
    min_x = min(min(X) - 2 * np.std(X, ddof=1), 0)
    max_x = max(max(X) + 2 * np.std(X, ddof=1),
                np.std(X, ddof=1))
    xvals = np.linspace(min_x, max_x, nbin)
    binw = (max_x - min_x) / (nbin - 1)

    zero_idx = max(np.where(xvals <= 0)[0])

    # Creates a noise kernel.
    noise_kernel = norm().pdf(xvals / sigma) * binw / sigma

    if zero_idx > 0:
        noise_rotate = noise_kernel[list(np.arange(zero_idx, len(xvals))) +
                                    list(np.arange(0, zero_idx))]
    else:
        noise_rotate = noise_kernel

    # Creates Q matrix. See Report for more info.
    Q = np.zeros((q, len(xvals)), dtype="float")
    for ind, exp in enumerate(range(1, q+1)):
        mask = np.ones_like(xvals)
        mask[np.where(xvals <= 0)[0]] = 0
        Q[ind, :] = pow(xvals, exp) * mask
    Q = Q.T

    def neg_loglik1(alpha):
        mask = np.ones_like(xvals) # Only positive values are allowed
        mask[np.where(xvals <= 0)[0]] = 0
        g_alpha_raw = np.exp(np.dot(Q, alpha) - np.max(np.dot(Q, alpha))) * mask # First part of g(alpha)
        
        if np.sum(g_alpha_raw) <= 100 * np.finfo(np.double).tiny: # Checks for to small values
                return (1000 * (len(X) + sum(alpha ** 2)))

        g_alpha_main = g_alpha_raw / sum(g_alpha_raw) # Last step in getting g(alpha)        
        g_alpha = ((1 - unif_fraction) * g_alpha_main + unif_fraction * mask / sum(mask)) # Manipulates g_alpha_main in order to put a probability over all possible values of the variance parameter. Non-negative.
        f_alpha = fftconvolve(g_alpha, noise_rotate, mode='same') # Convolution between g_alpha prior and the noise kernel(rotate). Evaluates likelihood.

        return np.sum(np.interp(X, xvals, -np.log(np.maximum(f_alpha, 0.0000001)))) # Sum of the total likelihood. This is minimized. np.maximum is there to protect from negative values in np.log.

    alpha_hat = minimize(neg_loglik1, list(itertools.repeat(-1, q))).x
    
    g_alpha_raw = np.exp(np.dot(Q, alpha_hat) - np.max(np.dot(Q, alpha_hat))) * mask
    g_alpha_main = g_alpha_raw / sum(g_alpha_raw)

    # Manipulates g_alpha_main in order to put a probability over all possible values of the variance parameter 
    g_alpha = ((1 - unif_fraction) * g_alpha_main + unif_fraction * mask) / sum(mask)

    return xvals, g_alpha

Selection strategy helping to evaluate results based on strategy criteria

In [68]:
def selection_strategy(RF_prediction, sigma_x, strategy, goal_value, closeness_crit, y_matrix, share=0):
    """
    Evaluate all candidate predictions and uncertainties and choose the best one based on
    strategy, goal_value and closeness criteria.

    Parameters
    ----------
    RF_predictions: np.array
        Prediction values of all candidate materials.
    sigma_x: np.array
        Uncertainties in predictions.
    strategy: string.
        Can take values 'BPV', 'OVU' or 'LU'.
    goal_value: array
        Array specifying where algorithm should search for new materials in the output space.
    closeness_crit: string.
        Can take values 'Projection' or 'Pythagoras'.
    share: float.
        Value between 0 and 1 indicating share of 'BPV' vs. LU strategy used.
    Returns
    -------
    result_index: Integer
        Index of best candidate.
    """
    
    # Normalizing in terms of maximum possible values
    y_max_matrix = y_matrix

    # Normalize candidate values in terms of maximum possible values
    factors = np.max(y_max_matrix[:,2:4], axis=0)
    normalized_uncertainty = sigma_x/factors
    normalized_prediction = RF_prediction/factors
    normalized_goal = goal_value/factors
    # Finds angle corresponding to the goal value
    priority_angle = np.arctan(normalized_goal[0]/normalized_goal[1])
    result_index = None # Dummy

    if strategy == 'Random':
        strategy = np.random.choice(['BPV', 'LU'], p=[share, 1-share])

    if closeness_crit == 'Projection':
        
        if strategy == 'BPV': # Choose highest predicted valued data point
            projections = normalized_prediction[:,0]*np.cos(np.pi/2 - priority_angle) + normalized_prediction[:,1]*np.cos(priority_angle)
            result_index = np.argmax(projections)
    
        elif strategy == 'LU': # Choose largest uncertainty data point
            projections = normalized_uncertainty[:,0]*np.cos(np.pi/2 - priority_angle) + normalized_uncertainty[:,1]*np.cos(priority_angle)
            result_index = np.argmax(projections)
        
        elif strategy == 'OVU': # Choose largest combined predicted value and uncertainty
            projections = (normalized_prediction[:,0] + normalized_uncertainty[:,0])*np.cos(np.pi/2 - priority_angle) + (normalized_prediction[:,1] + normalized_uncertainty[:,1])*np.cos(priority_angle)
            result_index = np.argmax(projections)

        else:
            print('ERROR: choose valid strategy')
        
    elif closeness_crit == 'Pythagoras':

        if strategy == 'BPV': # Choose closest predicted valued data point
            distance = np.sqrt(np.sum((normalized_prediction - normalized_goal)**2, axis=1))
            result_index = np.argmin(distance)
    
        elif strategy == 'LU': # Choose largest uncertainty data point
            abs_diff = np.abs(normalized_goal - normalized_prediction)
            angle_to_goal = np.arctan(abs_diff[:,1]/abs_diff[:,0])
            uncertainty = normalized_uncertainty[:,0]*np.cos(np.pi/2 - angle_to_goal) + normalized_uncertainty[:,1]*np.cos(angle_to_goal)
            result_index = np.argmax(uncertainty)
        
        elif strategy == 'OVU': # Choose value closest to goal value based on predicted value and uncertainty
            abs_diff = np.abs(normalized_goal - normalized_prediction)
            angle_to_goal = np.arctan(abs_diff[:,1]/abs_diff[:,0])
            projected_to_goal = normalized_uncertainty[:,0]*np.cos(np.pi/2 - angle_to_goal) + normalized_uncertainty[:,1]*np.cos(angle_to_goal)
            E_uncertainty = projected_to_goal*np.cos(angle_to_goal)
            YS_uncertainty = projected_to_goal*np.sin(angle_to_goal)
            movement = np.array([YS_uncertainty, E_uncertainty]).T
            sign_diff = ((normalized_goal - normalized_prediction)/abs_diff).astype(int)
            resulting_coordinate = normalized_prediction + sign_diff * movement
            distance = np.sqrt(np.sum((resulting_coordinate - normalized_goal)**2, axis=1))
            result_index = np.argmin(distance)

        else:
            print('ERROR: choose valid strategy')

    else:
        print('ERROR: choose valid closeness criteria')
    
    return result_index
    

calculating performance based on chosen criteria

In [69]:
def calc_performance(Y_test, index_to_add, goal_value, closeness_criteria, y_matrix):
    """
    Calculate performance of found candidate material.

    Parameters
    ----------
    Y_test: np.array
        Correct output values of candidate material.
    index_to_add: Integer
        Integer indicating which candidate material is best.
    goal_value: array
        Array specifying where algorithm should search for new materials in the output space.
    closeness_crit: string.
        Can take values 'Projection' or 'Pythagoras'.
    y_matrix: np.array
        Output values of all accessible data.
    Returns
    -------
    performance: float
        Performance of candidate material.
    """

    # Normalizing in terms of maximum possible values
    y_max_matrix = y_matrix

    # Normalize candidate values in terms of maximum possible values
    factors = np.max(y_max_matrix[:,2:4], axis=0)
    norm_exper_result = Y_test[index_to_add][2:4]/factors
    #test = y_mat/factors
    normalized_goal = goal_value/factors
    # Finds angle corresponding to the goal value
    priority_angle = np.arctan(normalized_goal[0]/normalized_goal[1])

    # Check performance of known data
    if closeness_criteria == 'Projection':
        result_projection = norm_exper_result[0]*np.cos(np.pi/2 - priority_angle) + norm_exper_result[1]*np.cos(priority_angle)
        goal_projection = normalized_goal[0]*np.cos(np.pi/2 - priority_angle) + normalized_goal[1]*np.cos(priority_angle)
        performance = result_projection/goal_projection
        
    elif closeness_criteria == 'Pythagoras':
        performance = np.sqrt(np.sum(((Y_test[index_to_add][2:4] - goal_value)/goal_value)**2))
    
    return performance

Generate redsult showing bias in predictions at different stages of predictions

In [70]:
def get_norm_residuals(n_candidates, experiment, x_data=None, y_data=None):
    """
    Calculates normalized residuals. 

    Parameters
    ----------
    n_candidates: Integer
        number of candidate materials.
    experiment: string or Bool
        Dictates what type of data to use.
    x_data: np.array
        Bespoke input data.
    y_data: np.array
        bespoke output data.
    Returns
    -------
    r_n: np.array
        Normalized residuals.
    sigma_x: np.array
        Uncertainty measure.
    """

    n_datapoints = 50
    n_trees = int(n_datapoints/2)
    
    if experiment ==  True:
        x_matrix, y_matrix = experiment_data_generation(compositions = comps, n_temp_tests = 2)
    
    elif experiment == 'data':
        x_matrix, y_matrix = x_data, y_data
    
    else:
        x_matrix, y_matrix = data_generation(n = n_datapoints, seed = 0)
    
    # Prediction with bespoke random forest model
    samples, tree_predictions, RF_prediction, trees, X_test, Y_test, scalerX = RandomForestAll(n_trees, x_matrix, y_matrix, n_candidates, Y_range=range(2,4))

    # Uncertainty in dimension (dim =) 0/1
    sigma_x2_Y = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 0)
    sigma_x2_E = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 1)

    sigma_x = np.array([np.sqrt(np.abs(sigma_x2_Y)), np.sqrt(np.abs(sigma_x2_E))]).T

    # normalized residuals
    r_n = (RF_prediction - Y_test[:,2:4])/sigma_x

    return r_n, sigma_x

In [71]:
def calc_cost_diff(better_material_idx, found_x_matrix, found_y_matrix):
    """
    Calculates cost difference between cheapest and most expensive composition. 

    Parameters
    ----------
    better_material_idx: np.array
        Index of high performing materials.
    found_x_matrix: np.array
        High performing materials input
    found_y_matrix: np.array
        High performing materials output
    Returns
    -------
    relative_cost_diff: float
        Cost difference between cheap and expensive material
    cheap_expens_chem: np.array
        Chemistry of cheap and expensive material
    cheap_expens_perf: np.array
        Performance of cheap and expensive material
    """
    
    non_chem_input = 2
    materials = found_x_matrix[better_material_idx,:]
    performance = found_y_matrix[better_material_idx,:]
    if materials.shape[0] == 0:
        return 1
    share_iron = np.array(1 - np.sum(materials[:,non_chem_input:], axis=1))
    materials = np.hstack((materials, share_iron.reshape(-1,1)))
    materials_cost = cost_vec()
    cost = np.dot(materials[:,non_chem_input:], materials_cost)
    cheap = np.min(cost)
    expensive = np.max(cost)
    cheap_index = np.where(cost == cheap)[0][0]
    expensive_index = np.where(cost == expensive)[0][0]
    relative_cost_diff = cheap/expensive
    cheap_expens_chem = np.array([materials[cheap_index], materials[expensive_index]])
    cheap_expens_perf = np.array([performance[cheap_index], performance[expensive_index]])
    
    return relative_cost_diff, cheap_expens_chem, cheap_expens_perf


In [72]:
def evaluationOfAlgo(goal_value, x_matrix_stat, y_matrix_stat, n_candidates, n_experiments, strategy_list, closeness_criteria, share, weighted):
    """
    Evaluates algorithm. 

    Parameters
    ----------
    goal_value: np.array
        Goal material.
    x_matrix_stat: np.array
        input.
    y_matrix_stat: np.array
        output.
    n_candidates: integer
        Number of candidates to evaluate.
    n_experiments: integer
        Number of experiments to perform.
    strategy_list: np.array
        List of strategies.
    closeness_criteria: np.array
        How to evaluate performance.
    share: float
        How much BPV is used.
    weighted: Boolean
        Sets if weighting of RF should be used.
    Returns
    -------
    n_better_vec: float
        Number of high performing materials.
    performance_mat: np.array
        Performance indication for all strategies.
    cost_diff_vec: np.array
        Vector of cost diff.
    cheap_expens_chem_vec: np.array
        Chemistry of cheap and expensive material.
    cheap_expens_perf_vec: np.array
        Performance of cheap and expensive material.
    """

    input_dim = x_matrix_stat.shape[1] + 1
    output_dim = y_matrix_stat.shape[1]
    n_better_vec = np.zeros(len(strategy_list))
    performance_mat = np.zeros([len(strategy_list), n_experiments])
    cost_diff_vec = np.zeros(len(strategy_list))
    cheap_expens_chem_vec = np.zeros([len(strategy_list), 2, input_dim])
    cheap_expens_perf_vec = np.zeros([len(strategy_list), 2, output_dim])
    
    # This is now the main loop for the sequential part
    for index, strategy in enumerate(strategy_list):
        # Generate experiment data
        x_matrix = x_matrix_stat
        y_matrix = y_matrix_stat
        
        for k in range(n_experiments):

            n_datapoints = x_matrix.shape[0]
            n_trees = int(n_datapoints/2)

            samples, tree_predictions, RF_prediction, trees, X_test, Y_test, scalerX = RandomForestAll(n_trees, x_matrix, y_matrix, n_candidates, weighted = weighted[index], Y_range=range(2,4))

            # Uncertainty in dimension (dim =) 0/1
            sigma_x2_Y = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 0)
            sigma_x2_E = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 1)
            sigma_x = np.array([np.sqrt(np.abs(sigma_x2_Y)), np.sqrt(np.abs(sigma_x2_E))]).T
            
            index_to_add = selection_strategy(RF_prediction, sigma_x, strategy = strategy, goal_value = goal_value, closeness_crit = closeness_criteria[index], y_matrix=y_matrix, share=share[index])

            x_matrix = np.concatenate((x_matrix, X_test[index_to_add,:].reshape(1,-1)), axis=0)
            y_matrix = np.concatenate((y_matrix, Y_test[index_to_add,:].reshape(1,-1)), axis=0)
            
            performance = calc_performance(Y_test, index_to_add, goal_value, closeness_criteria[index], y_matrix)
            performance_mat[index, k] = performance

        
        if closeness_criteria[index] == 'Projection':
            n_better = np.sum((performance_mat[index,:] > 1)*1, axis=0)
            better_material_idx = (performance_mat[index,:]>1)
            cost_diff, cheap_expens_chem, cheap_expens_perf = calc_cost_diff(better_material_idx, x_matrix[-n_experiments:,:], y_matrix[-n_experiments:,:])

        else:
            n_better = np.sum((performance_mat[index,:] < 0.15)*1, axis=0)
            better_material_idx = (performance_mat[index,:] < 0.15)
            cost_diff, cheap_expens_chem, cheap_expens_perf = calc_cost_diff(better_material_idx, x_matrix[-n_experiments:,:], y_matrix[-n_experiments:,:])
            
        n_better_vec[index] = n_better
        cost_diff_vec[index] = cost_diff
        cheap_expens_chem_vec[index,:] = cheap_expens_chem
        cheap_expens_perf_vec[index,:] = cheap_expens_perf

    return n_better_vec, performance_mat, cost_diff_vec, cheap_expens_chem_vec, cheap_expens_perf_vec

In [73]:
def calc_r2(trees, scalerX, weights = np.zeros([1])):
    """
    Calculates mean square error. 

    Parameters
    ----------
    trees: class object
        Tree object.
    scalerX: class object
        Scaler object
    weights: np.array
        Dictates weighting of RF.
    Returns
    -------
    Mean square error
    
    """
    X_test, Y_test = data_generation(n = 1000, seed = 0)
    tree_predictions, RF_prediction, X_test = predict(X_test, trees, scalerX, weights)
    factors = np.max(Y_test[:,2:], axis=0)
    
    return np.mean(np.linalg.norm((Y_test[:,2:] - RF_prediction)/factors, axis=1))

In [74]:
def evaluationOfAlgo2(goal_value, x_matrix_stat, y_matrix_stat, n_candidates, n_experiments, strategy_list, closeness_criteria, share, weighted):
    """
    Evaluates algorithm. 

    Parameters
    ----------
    goal_value: np.array
        Goal material.
    x_matrix_stat: np.array
        input.
    y_matrix_stat: np.array
        output.
    n_candidates: integer
        Number of candidates to evaluate.
    n_experiments: integer
        Number of experiments to perform.
    strategy_list: np.array
        List of strategies.
    closeness_criteria: np.array
        How to evaluate performance.
    share: float
        How much BPV is used.
    weighted: Boolean
        Sets if weighting of RF should be used.
    Returns
    -------
    performance_mat: np.array
        Performance indication for all strategies.
    """
    performance_mat = np.zeros([len(strategy_list), n_experiments])

    # This is now the main loop for the sequential part
    for index, strategy in enumerate(strategy_list):
    
        # Generate experiment data
        x_matrix = x_matrix_stat
        y_matrix = y_matrix_stat
        
        for k in range(n_experiments):

            n_datapoints = x_matrix.shape[0]
            n_trees = int(n_datapoints/2)

            samples, tree_predictions, RF_prediction, trees, X_test, Y_test, scalerX = RandomForestAll(n_trees, x_matrix, y_matrix, n_candidates, weighted = weighted[index], Y_range=range(2,4))

            # Uncertainty in dimension (dim =) 0/1
            sigma_x2_Y = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 0)
            sigma_x2_E = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 1)

            sigma_x = np.array([np.sqrt(np.abs(sigma_x2_Y)), np.sqrt(np.abs(sigma_x2_E))]).T

            if strategy == 'RS':
                x_data, y_data = data_generation(n = 1, seed = 0)
                x_matrix = np.concatenate((x_matrix, x_data.reshape(1,-1)), axis=0)
                y_matrix = np.concatenate((y_matrix, y_data.reshape(1,-1)), axis=0)
    
            else:
                index_to_add = selection_strategy(RF_prediction, sigma_x, strategy = strategy, goal_value = goal_value, closeness_crit = closeness_criteria[index], y_matrix=y_matrix, share=share[index])
                x_matrix = np.concatenate((x_matrix, X_test[index_to_add,:].reshape(1,-1)), axis=0)
                y_matrix = np.concatenate((y_matrix, Y_test[index_to_add,:].reshape(1,-1)), axis=0)
            
            performance = calc_r2(trees, scalerX)
            performance_mat[index, k] = performance            

    return performance_mat

In [None]:
# optimize the forest model
n_datapoints = 500
x_matrix, y_matrix = data_generation(n = n_datapoints, seed = 0)

Y_range = range(2,4)
init_data = 24

scalerX = StandardScaler(with_mean=True, with_std=True)
scalerX.fit(x_matrix)

X = scalerX.transform(x_matrix)
Y = y_matrix[:,Y_range]

# Initialize Leave-p-Out cross-validation
ss = ShuffleSplit(n_splits=30, test_size=(n_datapoints-init_data)/n_datapoints , random_state=0)

# Random forest model
RFM = RandomForestRegressor()
parameters_grid = {'max_depth': [1, 4, 7, None], 'n_estimators': [init_data*2, int(init_data/2), init_data], 'min_samples_leaf': [1], 'min_samples_split': [2], 'ccp_alpha': [0, 0.1]}

RF_gs = GridSearchCV(estimator=RFM, cv=ss, param_grid=parameters_grid, verbose=1)
gs_RF_result = RF_gs.fit(X, Y)

print('Best accuracy', gs_RF_result.best_score_)
print('Best parameteres', gs_RF_result.best_params_)


# TEST
Test of calibration of uncertainty measure

In [None]:
## Test and visualization of calibrated variance estimates

comps = fetch_comp_data(filename = 'realistic_compositions_in_weight_percent.xlsx')

# Generate experiment data
x_matrix, y_matrix = experiment_data_generation(compositions = comps, n_temp_tests = 2)

# Prediction with bespoke random forest model
n_trees = 15
n_candidates = 1000
trees = RF(n_trees)
weighted = False
samples, tree_predictions, RF_prediction, trees, X_test, Y_test, scalerX = RandomForestAll(n_trees, x_matrix, y_matrix, n_candidates, weighted, Y_range=range(2,4))

# Uncertainty in dimension (dim =) 0/1
sigma_x2_Y = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 0)
sigma_x2_Y_u = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = False, dim = 0)

plt.figure(3)
plt.hist(sigma_x2_Y, label='Empirical Bayes corrected', alpha=0.6)
plt.hist(sigma_x2_Y_u, label='Non-corrected', alpha=0.6)
plt.xlabel('Variance estimates', fontsize = 15)
plt.title('Corrected and non-corrected variance estimates', fontsize=15)
plt.legend()


# TEST

Try to see if normalized residuals differ depending on training stratety and initial data

In [None]:
# Set goal as [Yield strength, Elongation]
goal_value = np.array([1300, 7])

# Fetch realistic data
comps = fetch_comp_data(filename = 'realistic_compositions_in_weight_percent.xlsx')
#comps = fetch_comp_data(filename = 'cluster_compositions.xlsx')

# Generate experiment data
x_matrix_stat, y_matrix_stat = experiment_data_generation(compositions = comps, n_temp_tests = 2)

# Prediction with bespoke random forest model
n_candidates = 1000

# Set parameters
weighted = [False, False, False]
strategy_list = ['BPV', 'OVU', 'LU']
closeness_criteria = ['Projection', 'Projection', 'Projection']#['Pythagoras']
n_better_vec = np.zeros(len(strategy_list))
experiment_list = np.zeros(len(strategy_list))

# This is now the main loop for the sequential part
for index, strategy in enumerate(strategy_list):
    
    # Generate experiment data
    x_matrix = x_matrix_stat
    y_matrix = y_matrix_stat
    
    n_experiments = 10
    performance_list = np.zeros(n_experiments)

    for k in tqdm(range(n_experiments)):

        n_datapoints = x_matrix.shape[0]
        n_trees = int(n_datapoints/2)

        samples, tree_predictions, RF_prediction, trees, X_test, Y_test, scalerX = RandomForestAll(n_trees, x_matrix, y_matrix, n_candidates, weighted = weighted[index], Y_range=range(2,4))

        # Uncertainty in dimension (dim =) 0/1
        sigma_x2_Y = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 0)
        sigma_x2_E = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 1)
        sigma_x = np.array([np.sqrt(np.abs(sigma_x2_Y)), np.sqrt(np.abs(sigma_x2_E))]).T

        index_to_add = selection_strategy(RF_prediction, sigma_x, strategy = strategy, goal_value = goal_value, y_matrix=y_matrix, closeness_crit = closeness_criteria[index])

        x_matrix = np.concatenate((x_matrix, X_test[index_to_add,:].reshape(1,-1)), axis=0)
        y_matrix = np.concatenate((y_matrix, Y_test[index_to_add,:].reshape(1,-1)), axis=0)
    

    plot_traversion(x_matrix, y_matrix, n_experiments, strategy, index, goal_value)

    # Make histogram
    iter = 2
    n_cand = 5000
    experiment = 'data'
    r_n_vec = np.zeros([iter*n_cand, 2])
    sigma_x_vec = np.zeros([iter*n_cand, 2])

    for i in tqdm(range(iter)):
        r_n, sigma_x = get_norm_residuals(n_cand, experiment, x_matrix, y_matrix)#x_data, y_data)
        r_n_vec[i*n_cand:i*n_cand+n_cand,:] = r_n
        sigma_x_vec[i*n_cand:i*n_cand+n_cand,:] = sigma_x

    print('Mean: ', np.mean(r_n_vec, axis=0))

    plt.figure(index + 50)
    plt.hist(r_n_vec[:,0], bins=200, color='royalblue', lw=0, density=True, alpha=0.6, label='Yield strength, $\sigma_y$')
    plt.hist(r_n_vec[:,1], bins=200, color='orangered', lw=0, density=True, alpha=0.5, label='Elongation, $\epsilon_t$')

    mu = 0
    std = 1
    x = np.linspace(-10, 10, 100)
    y = norm.pdf(x, mu, std)
    plt.plot(x, y, '-k', label='N(0, 1)')
    plt.xlim(-10,10)
    plt.legend()
    

# TEST

Find examples of cheap and expensive materials

In [None]:
# Set goal as [Yield strength, Elongation]
goal_value = np.array([1300, 7])

# Fetch realistic data
comps = fetch_comp_data(filename = 'realistic_compositions_in_weight_percent.xlsx')
# Generate experiment data with less information about input-output
#comps = fetch_comp_data(filename = 'cluster_compositions.xlsx')

print(cost_vec())

# Generate experiment data
x_matrix_stat, y_matrix_stat = experiment_data_generation(compositions = comps, n_temp_tests = 2)

# Prediction with bespoke random forest model
n_experiments = 10
n_candidates = 1000

# Set parameters
weighted = [False]
share = [0]
strategy_list = ['BPV']
closeness_criteria = ['Projection']

iter = 1
n_better_vec_tot = np.zeros(len(strategy_list))
performance_mat_tot = np.zeros([len(strategy_list), n_experiments])
cost_diff_vec_tot = np.zeros(len(strategy_list))

print('Goal: ', goal_value)

for i in tqdm(range(iter)):
    n_better_vec, performance_mat, cost_diff_vec, cheap_expens_chem_vec, cheap_expens_perf_vec = evaluationOfAlgo(goal_value, x_matrix_stat, y_matrix_stat, n_candidates, n_experiments, strategy_list, closeness_criteria, share, weighted)
    n_better_vec_tot += n_better_vec
    performance_mat_tot += performance_mat
    cost_diff_vec_tot += cost_diff_vec

n_better_mean = n_better_vec_tot/iter
performance_mat_mean = performance_mat_tot/iter
cost_diff_vec_mean = cost_diff_vec_tot/iter

print('cheap vs expensive', cheap_expens_chem_vec)
print('cheap vs expensive perf', cheap_expens_perf_vec)

print('Mean better materials: ', n_better_mean)
print('Mean cost span: ', cost_diff_vec_mean)



# Test
 
Can we see what tendencies the model has picked up on? 

In [None]:
# Set goal as [Yield strength, Elongation]
goal_value = np.array([1100, 12.5])
# [Annealing_temperature, Tempering_temperature, wC, wMn, wCr, wMo, wNi, wSi]
test_material_annealing = np.array([0, 150, 0.2E-2, 1.5E-2, 0.5E-2, 0.5E-2, 0.5E-2, 1E-2])
test_material_solidsol = np.array([850, 150, 0.1E-2, 1.5E-2, 0.5E-2, 0, 0.5E-2, 1E-2])

eval_points = 100

temps = np.linspace(700,900, eval_points)
Mo_shares = np.linspace(0, 0.5E-2, eval_points)
sigma_real_a = np.zeros(eval_points)
sigma_real_s = np.zeros(eval_points)

X_eval_annealing = np.zeros([eval_points, len(test_material_annealing)])
X_eval_solidsol = np.zeros([eval_points, len(test_material_annealing)])

for i in range(eval_points):
    test_material_annealing[0] = temps[i]
    test_material_solidsol[5] = Mo_shares[i]

    X_eval_annealing[i,:] = test_material_annealing
    X_eval_solidsol[i,:] = test_material_solidsol

    y_real_a = fetch_data_scenario(test_material_annealing)
    y_real_s = fetch_data_scenario(test_material_solidsol)

    sigma_real_a[i] = y_real_a[2]
    sigma_real_s[i] = y_real_s[2]

# Fetch realistic data
comps = fetch_comp_data(filename = 'realistic_compositions_in_weight_percent.xlsx')

# Generate experiment data
x_matrix_stat, y_matrix_stat = experiment_data_generation(compositions = comps, n_temp_tests = 2)
#x_matrix_stat, y_matrix_stat = data_generation(n = 1000, seed = 0)

# Prediction with random forest model
n_candidates = 1000
n_experiments = 11

# Set parameters
weighted = [False, False, False]
strategy_list = ['BPV', 'OVU', 'LU']
closeness_criteria = ['Pythagoras', 'Pythagoras', 'Pythagoras']
n_better_vec = np.zeros(len(strategy_list))
experiment_list = np.zeros(len(strategy_list))
RF_prediction_a = np.zeros([len(strategy_list), eval_points])
RF_prediction_s = np.zeros([len(strategy_list), eval_points])

# This is now the main loop for the sequential part
for index, strategy in enumerate(strategy_list):
    
    # Generate experiment data
    x_matrix = x_matrix_stat
    y_matrix = y_matrix_stat

    for k in tqdm(range(n_experiments)):

        n_datapoints = x_matrix.shape[0]
        n_trees = int(n_datapoints/2)

        samples, tree_predictions, RF_prediction, trees, X_test, Y_test, scalerX = RandomForestAll(n_trees, x_matrix, y_matrix, n_candidates, weighted=weighted[index], Y_range=range(2,4))

        # Uncertainty in dimension (dim =) 0/1
        sigma_x2_Y = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 0)
        sigma_x2_E = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 1)
        sigma_x = np.array([np.sqrt(np.abs(sigma_x2_Y)), np.sqrt(np.abs(sigma_x2_E))]).T

        index_to_add = selection_strategy(RF_prediction, sigma_x, strategy = strategy, goal_value = goal_value, y_matrix=y_matrix, closeness_crit = closeness_criteria[index])

        x_matrix = np.concatenate((x_matrix, X_test[index_to_add,:].reshape(1,-1)), axis=0)
        y_matrix = np.concatenate((y_matrix, Y_test[index_to_add,:].reshape(1,-1)), axis=0)
    
    weights = np.zeros([1])
    if weighted[index]:
        weights = get_weights(x_matrix, X_test, samples, scalerX, trees)
    tree_predictions, RF_prediction_apre, X_test = predict(X_eval_annealing, trees, scalerX, weights)
    tree_predictions, RF_prediction_spre, X_test = predict(X_eval_solidsol, trees, scalerX, weights)
    RF_prediction_a[index,:] = RF_prediction_apre[:,0]
    RF_prediction_s[index,:] = RF_prediction_spre[:,0]

colors = ['tab:blue', 'tab:orange', 'tab:green']

plt.figure(1)
for i in range(len(strategy_list)):
    plt.plot(temps, RF_prediction_a[i,:], label=strategy_list[i], color=colors[i])#strategy_list[i]
plt.plot(temps, sigma_real_a, label='True model', color='dimgrey', alpha=0.7)
plt.xlabel('Annealing temperature [$^\circ$C]', fontsize=14)
plt.ylabel('$\sigma_y$, yield strength [MPa]', fontsize=14)
plt.legend()
plt.title('Annealing effect', fontsize=14)

plt.figure(2)
for i in range(len(strategy_list)):
    plt.plot(Mo_shares, RF_prediction_s[i,:], label=strategy_list[i], color=colors[i])#strategy_list[i]
plt.plot(Mo_shares, sigma_real_s, label='True model', color = 'dimgrey', alpha=0.7)
plt.xlabel('Molybdenum [$\%$]', fontsize=14)
plt.ylabel('$\sigma_y$, yield strength [MPa]', fontsize=14)
plt.legend()
plt.title('Molybdenum effect', fontsize=14)

# Test

How does MSE evolve?

In [None]:
# Set goal as [Yield strength, Elongation]
goal_value = np.array([1100, 12.5])

# Fetch realistic data
comps = fetch_comp_data(filename = 'realistic_compositions_in_weight_percent.xlsx')
# Generate experiment data with less information about input-output
#comps = fetch_comp_data(filename = 'cluster_compositions.xlsx')

# Generate experiment data
x_matrix_stat, y_matrix_stat = experiment_data_generation(compositions = comps, n_temp_tests = 2)

# Prediction with bespoke random forest model
n_experiments = 10
n_candidates = 1000

# Set parameters
weighted = [False, False, False, False]
share = [0,0,0,0]
strategy_list = ['BPV', 'OVU', 'LU', 'RS']
closeness_criteria = ['Pythagoras', 'Pythagoras', 'Pythagoras', 'Pythagoras']
iter = 10

performance_mat_tot = np.zeros([len(strategy_list), n_experiments])

print('Goal: ', goal_value)

for i in tqdm(range(iter)):
    performance_mat = evaluationOfAlgo2(goal_value, x_matrix_stat, y_matrix_stat, n_candidates, n_experiments, strategy_list, closeness_criteria, share, weighted)
    performance_mat_tot += performance_mat

performance_mat_mean = performance_mat_tot/iter

for j in range(len(strategy_list)):
    plt.plot(performance_mat_mean[j,:], label=strategy_list[j])#label=f'{int(share[j]*100)} % BPV')

plt.xlabel('Experiment')
plt.ylabel('MSE')
plt.legend()
plt.title('MSE')

# Test

Heatmap of uncertainties

In [None]:
# Fetch realistic data
#comps = fetch_comp_data(filename = 'realistic_compositions_in_weight_percent.xlsx')
comps = fetch_comp_data(filename = 'cluster_compositions.xlsx')

# Generate experiment data
x_matrix_stat, y_matrix_stat = experiment_data_generation(compositions = comps, n_temp_tests = 2)

# Prediction with bespoke random forest model
n_candidates = 500

# Set parameters
weighted = [False, False, False]
strategy_list = ['LU']
closeness_criteria = ['Projection']
n_better_vec = np.zeros(len(strategy_list))
experiment_list = np.zeros(len(strategy_list))
n_experiments = 1
performance_list = np.zeros(n_experiments)

# This is now the main loop for the sequential part
for index, strategy in enumerate(strategy_list):
    
    for k in tqdm(range(n_experiments)):

        n_datapoints = x_matrix_stat.shape[0]
        n_trees = int(n_datapoints/2)

        samples, tree_predictions, RF_prediction, trees, X_test, Y_test, scalerX = RandomForestAll(n_trees, x_matrix_stat, y_matrix_stat, n_candidates, weighted = weighted[index], Y_range=range(2,4))

        # Uncertainty in dimension (dim =) 0/1
        sigma_x2_Y = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 0)
        sigma_x2_E = uncertainty_measure_CI(samples, tree_predictions, RF_prediction, trees, correction = True, dim = 1)

        sigma_x = np.array([np.sqrt(np.abs(sigma_x2_Y)), np.sqrt(np.abs(sigma_x2_E))]).T

values = np.linalg.norm(sigma_x/np.array([1400, 22]), axis=1)  # The values used for coloring

# Create a colormap
colormap = plt.cm.get_cmap('cool')

plt.figure(figsize=[10, 7])

# Plot the scatter plot with colored points
plt.scatter(y_matrix_stat[:, 3], y_matrix_stat[:, 2], color='black')
scatter = plt.scatter(Y_test[:, 3], Y_test[:, 2], c=values, cmap=colormap, vmin=0, vmax=.3)
#scatter = plt.scatter(RF_prediction[:,1], RF_prediction[:,0], c=values, cmap=colormap, vmin=0, vmax=.3)

# Add a colorbar
colorbar = plt.colorbar(scatter)
colorbar.set_label('Values')

plt.xlim(0, 23)
plt.ylim(0, 1400)

# Show the plot
plt.show()

