In [1]:
from SALib.sample import saltelli
from SALib.sample.fast_sampler import sample as fast_sampler
from SALib.sample.morris import sample as morris_sample
from SALib.sample import latin

from SALib.analyze import sobol
from SALib.analyze.fast import analyze as fast_analyzer
from SALib.analyze.morris import analyze as morris_analyze


from SALib.test_functions import Ishigami

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

import seaborn as sns

sns.set_style('white')
sns.set_context('talk')

Here we create a wrapping interface that accepts the dummy parameter input (the 4th parameter `x4`), but strips the parameter values from the input array. This has the effect of making the fourth parameter inactive, and therefore insensitive.

Technically this is unneeded as the fourth parameter never gets used!

In [2]:
# def ishigami_wrapper(param_values):
#     strip_dummy_param = param_values[:, 0:3]
#     return Ishigami.evaluate(strip_dummy_param)

Because SALib concentrates on GSA methods, we implement our own local OAT methods here.

In [3]:
def traditional_oat_sample(problem, N):
    """Create uniformly distributed samples for a 'traditional' OAT analysis.
    
    Parameters
    ==========
    * problem : dict, the problem definition following SALib specs
    * N : int, total number of samples to generate
    
    Returns
    ==========
    * np.array of samples
    """
    bounds = problem.get('bounds')
    nominal = problem.get('nominal')
    num_factors = int(problem.get('num_vars'))
    
    samples_per_param = N / num_factors
    
    if samples_per_param % 1 != 0:
        raise ValueError("Require equal number of samples per parameter. \
        {} samples across {} params is {}".format(N, num_factors, samples_per_param))
    
    samples_per_param = int(samples_per_param)
    
    # experiment = np.empty((N+1, num_factors))
    experiment = np.array(nominal * (N+1)).reshape(N + 1, num_factors)
    
    cols = np.arange(num_factors)
    
    # Sample positions between bounds
    pos = 1
    for i in range(num_factors):
        
        min_val = bounds[i][0]
        max_val = bounds[i][1]
        
        p_samp = np.linspace(min_val, max_val, samples_per_param)
        
        subset = slice(pos, pos+samples_per_param)
        experiment[subset, i] = p_samp
        
        pos += samples_per_param
    # End for
    
    return experiment
# End traditional_oat_sample()


def oat_extremity_sample(problem):
    """Create extremity samples for a 'traditional' OAT analysis.
    
    Here we assume outputs will be affected at the extreme ends of the nominal bounds.
    
    Parameters
    ==========
    * problem : dict, the problem definition following SALib specs        
    """
    bounds = problem.get('bounds')
    nominal = problem.get('nominal')
    num_vars = problem.get('num_vars')

    experiment = [nominal[:]]
    
    # Samples represent runs with values at the extremes of each parameter boundary
    for i in range(num_vars):
        min_val = bounds[i][0]
        max_val = bounds[i][1]
        
        after = i+1
        if i > 0:
            experiment.append(nominal[0:i] + [min_val] + nominal[after:])  # min
            experiment.append(nominal[0:i] + [max_val] + nominal[after:])  # max
        else:
            experiment.append([min_val] + nominal[after:])  # min
            experiment.append([max_val] + nominal[after:])  # max
            
    
    return np.array(experiment)
# End oat_extremity_sample()

    
def oat_analyze(problem, inputs, results):
    """Analyze a 'traditional' OAT result.
    
    Parameters
    ==========
    * problem : dict, the problem definition following SALib specs
    * inputs : np.array, samples used to generate results. 
               The first row is expected to hold nominal values.
    * results : np.array, model run results obtained using the samples.
                The first row is expected to hold nominal results.
                
    Returns
    ==========
    * np.array
    """
    X = inputs
    Y = results
    
    num_results, num_factors = X.shape
    nominal_values = problem.get('nominal')
    grp_len = int((len(Y) - 1) / num_factors)

    # Set up result array (ignores one row as it holds the nominal results)
    # si_res = np.full((grp_len, num_factors), fill_value=np.nan)
    si_res = np.empty((grp_len, num_factors))

    col = 0
    nominal_res = Y[0]
    for i in range(1, num_results, grp_len):
    
        input_subset = X[i:i+grp_len, col]
        res_subset = Y[i:i+grp_len]

        diff = (input_subset - X[0, col])

        calc = ((res_subset - nominal_res) / diff) * (input_subset / res_subset)
        
        # si_res[col] = np.sum(calc) / grp_len
        si_res[:, col] = ((res_subset - nominal_res) / diff) * (input_subset / res_subset)

        col += 1
    # End for
    
    return si_res
    
#     col = 0
#     idx = 0
#     for i in range(1, num_results):
#         diff = (X[i, col] - X[0, col])
#         if diff == 0.0:
#             si_res[idx, col] = 0.0
#         else:
#             try:
#                 si_res[idx, col] += ((Y[i] - Y[0]) / diff) * (X[i, col] / Y[i])
#             except IndexError:
#                 raise IndexError("Invalid index, no row/col at {}, {}".format(idx, col))
#         # End if
        
#         if i % groupings == 0:
#             si_res[idx, col] = si_res[idx, col] / groupings
#             col += 1
#             idx = 0
#             continue
#         # End if
        
#         idx += 1
#     # End for
    
#     return si_res
# End oat_analyze()

Common variables and problem definitions

In [4]:
SEED_VALUE = 101
NUM_SALTELLI_SAMPLES = 500
NUM_FAST_SAMPLES = 5000
NUM_MORRIS_SAMPLES = 1000
NUM_OAT_SAMPLES = 5000

SPEC_WO_INACTIVE = {
  'num_vars': 3,
  'names': ['x1', 'x2', 'x3'],
  'bounds': [[-np.pi, np.pi]]*3
}

SPEC_WITH_INACTIVE = {
  'num_vars': 4,
  'names': ['x1', 'x2', 'x3', 'x4'],
  'bounds': [[-np.pi, np.pi]]*4
}

# problem specification for traditional OAT analysis
OAT_SPEC = {
  'num_vars': 4,
  'names': ['x1', 'x2', 'x3', 'x4'],
  'bounds': [[-np.pi, np.pi]]*4,
  'nominal': [1.0,] * 4
}

The Lake Problem is defined below

In [5]:
def lake_problem(X, alpha=0.4, Y=1.0, q=2.0, b=0.42, x=0.0):
    """
    Lake Problem
    
    Parameters
    ==========
    X : float, normalized concentration of Phosphorus
    alpha : float, utility from pollution
    Y : float, Phosphorus input
    q : float, exponent controlling recycling rate.
    b : float, decay rate for Phosphorus 
        (0.42 = irreversible, as in Kwakkel et al. 2017 https://doi.org/10.1016/j.envsoft.2017.06.054)
    x : float, dummy parameter value. Any value passed in will be overriden with 0
    """
    x = 0.0
    Xq = X**q
    X_t1 = X + alpha + Y + (Xq / (1.0 + Xq)) - (b*X) + x

    return X_t1

In [6]:
def evaluate_lake(values):
    Y = np.zeros([values.shape[0]])
    for i, vals in enumerate(values):
        Y[i] = lake_problem(*vals)
    return Y

In [7]:
LAKE_SPEC = {
  'num_vars': 6,
  'names': ['X', 'alpha', 'Y', 'q', 'b', 'x'],
  'bounds': [[0.0, 1.0], [0.001, 1.0], [0.0, 1.0], [2.0, 4.0], [0.0, 0.5], [0.0, 1.0]]
}