<a href="https://colab.research.google.com/github/HanseulJo/BOCS_NKmodel/blob/master/BO_on_NKmodel_game.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Original codes: `https://github.com/HanseulJo/BOCS_NKmodel.git`



# Dependencies (Bind it all, Run it all!)

In [1]:
import os, sys, time, itertools

import numpy as np
from scipy.special import softmax
from tqdm.auto import tqdm

from easydict import EasyDict

from sklearn.linear_model import LinearRegression, Lasso
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import Pipeline

In [2]:
"""
Author: Hanseul Cho
Date: 2021.08.04
"""
# Helper functions
def flip(state, ind):
    """ given a state (in numpy array), flip (0 <-> 1) a digit """
    assert state.ndim == 1, state.ndim
    new_state = state.copy()
    new_state[ind] = 1-new_state[ind]
    return new_state

def neighbors(state):
    """ given a state (in numpy array), return a 2D array whose rows are neighbors of the state """
    return np.stack([flip(state, i) for i in range(len(state))])

# Class object of NKmodel
class NKmodel(object):
    """
    NKmodel class. A single NK model.
    Huge thanks to https://github.com/elplatt/nkmodel.git
    
    <PARAM>
    N: the number of loci
    K: the number of the other dependent loci for each locus (0 <= K <= N-1)
    A: the number of states that each locus can have (e.g.: 2 for binary variables)
    <attributes>
    self.interdependence : interdependency matrix in 2D boolean numpy array.
    self.contributions   : list of dict's - ith dict maps (tuple (x_i, x_i0, x_i1, ..., x_iK) of length K) |--> (contribution(=payoff) f_i(x))
    """
    def __init__(self, N, K, A=2, interdependence=None, contributions=None, random_seeds=(None, None)):
        assert 0 <= K <= N-1
        self.N, self.K, self.A = N, K, A
        if interdependence is None:
            # randomly generated interdependence matrix
            self.interdependence = np.full((N,N), False)
            rng_state_dep = np.random.RandomState(seed=random_seeds[0])
            for i in range(N):
                dependence = [i] + list(rng_state_dep.choice(list(set(range(N)) - set([i])), size=K, replace=False))
                self.interdependence[i][dependence] = True
        else:
            self.interdependence = interdependence
        if contributions is None:
            self.contributions = [{} for _ in range(N)]
            rng_state_ctrb = np.random.RandomState(seed=random_seeds[1])
            for i in range(N):
                for label in itertools.product(range(A), repeat=K+1):  # K+1 subcollection of loci values that effects the locus i
                    self.contributions[i][label] = float(rng_state_ctrb.random())  # float [0, 1)
        else:
            self.contributions = contributions

    def _calculate_ith_contribution(self, state, i):
        assert i in range(self.N), i
        assert type(state) == np.ndarray, type(state)
        interdep = self.interdependence[i].copy()
        interdep[i] = False
        label = tuple([state[i]] + list(state[interdep]))  # the value of i-th locus should be the first entry of the 'label'.
        return self.contributions[i][label]

    def fitness_and_contributions(self, state, negative=False):
        """
        Given a state(: a tuple/string of length N), 
        Return fitness value and a list of contributions of each loci.
        """
        if type(state) == str:
            state = np.array([int(state[i]) for i in range(self.N)])
        else:
            state = np.array(state)
        ctrbs = [self._calculate_ith_contribution(state, i) for i in range(self.N)]
        fitness_value = sum(ctrbs) / self.N  # averaged fitness value --> btw 0 ~ 1
        if negative:
            fitness_value = -fitness_value
        return fitness_value, ctrbs
    
    def fitness(self, state, negative=False):
        f, _ = self.fitness_and_contributions(state, negative=negative)
        return f
    
    def evaluate(self, state, negative=False):
        if state.ndim == 1:
            state = np.expand_dims(state, axis=0)
        assert state.shape[1] == self.N, state.shape[1]
        return np.array([self.fitness(state[i], negative=negative) for i in range(state.shape[0])])
    
    def fitness_and_contrib_diff(self, state_prev, flip_ind, fitness_prev=None, ctrbs_prev=None, negative=False):
        state_new = flip(state_prev, flip_ind)
        depend_on_flip = self.interdependence[:,flip_ind]
        if fitness_prev is None or ctrbs_prev is None:
            fitness_prev, ctrbs_prev = self.fitness_and_contributions(state_prev, negative=negative)
        fitness_new = fitness_prev * self.N
        ctrbs_diff = np.zeros(self.N)
        for j in np.nonzero(depend_on_flip)[0]:
            ctrb_new_j = self._calculate_ith_contribution(state_new, j)
            ctrb_prev_j = ctrbs_prev[j]
            fitness_new = fitness_new - ctrb_prev_j + ctrb_new_j
            ctrbs_diff[j] = ctrb_new_j - ctrb_prev_j
        fitness_new /= self.N
        return fitness_new, ctrbs_diff

    def landscape(self, negative=False):
        """
        Return a dictionary mapping each state to its fitness value. (Naive algorithm)
        """
        landscape_dic = {}
        states = itertools.product(range(self.A), repeat=self.N)
        for state in states:
            landscape_dic[state] = self.fitness(state, negative=negative)
        return landscape_dic

    def landscape_with_contributions(self):
        """
        Return a dictionary mapping each state to its fitness value and contributions of loci. (Naive algorithm)
        """
        return {state: self.fitness_and_contributions(state) for state in itertools.product(range(self.A), repeat=self.N)}  # along all possible states

    def get_global_optimum(self, negative=False, anti_opt=False, cache=False, given_landscape=None):
        """
        Global maximum fitness value and its maximizer(state), in a NAIVE way.
        If "anti_opt=True", this returns the "minimum" fitness value and "minimizer". 
        """
        landscape = self.landscape() if given_landscape is None else given_landscape
        optimum = max(landscape.values()) if not anti_opt else min(landscape.values())
        states = [s for s in landscape.keys() if landscape[s] == optimum]
        if negative:
            optimum = -optimum
        if cache:
            return optimum, states, landscape
        else:
            return optimum, states
        
    def get_optimum_and_more(self, order, negative=False, anti_opt=False, cache=False, given_landscape=None):
        """
        First several maximum fitness values and their maximizers(states), in a NAIVE way.
        If "anti_opt=True", this returns first several "minimum" fitness values and "minimizers". 
        """
        landscape = self.landscape() if given_landscape is None else given_landscape
        landscape_list = sorted(landscape.items(), key=lambda x: -x[1])
        if anti_opt:
            landscape_list.reverse()
        state_opt, fit_opt = landscape_list[0]
        if negative:
            fit_opt = -fit_opt
        optima2states = [{"fitness": fit_opt, "states":[state_opt]}]
        cnt = 1
        for state, fitness in landscape_list[1:]:
            if negative:
                fitness = -fitness
            if fitness == optima2states[-1]["fitness"]:
                optima2states[-1]["states"].append(state)
            else:
                cnt += 1
                if cnt > order:
                    break
                optima2states.append({"fitness": fitness, "states":[state]})
        if cache:
            return optima2states, landscape
        else:
            return optima2states
    
    def print_info(self, path=None, order=10):
        optlist = self.get_optimum_and_more(order)
        if path is None:
            print("\nInterdependence Matrix:")
            for i in range(self.N):
                print("".join(["X" if b else "O" for b in self.interdependence[i]]))
            print("\nLandscape:")
            d = self.landscape_with_contributions()
            for state, (fit, ctrbs) in d.items():
                ctrbs = [str(round(v, 4)) for v in ctrbs]
                fit = str(round(fit, 4))
                state = "".join([str(x) for x in state])
                print("\t".join([state] + ctrbs + [fit]))
            for i in range(order):
                opt, optstates = optlist[i]["fitness"], optlist[i]["states"]
                print(f"{i+1}-th optimum: {opt} {optstates}")
        else:
            with open(path + "/knowledge.txt", "w") as f1:
                for i in range(self.N):
                    print("".join(["X" if b else "O" for b in self.interdependence[i]]), file=f1)
            with open(path + "/landscape.txt", "w") as f2:
                d = self.landscape_with_contributions()
                for state, (fit, ctrbs) in d.items():
                    ctrbs = [str(round(v, 4)) for v in ctrbs]
                    fit = str(round(fit, 4))
                    state = "".join([str(x) for x in state])
                    print("\t".join([state] + ctrbs + [fit]), file=f2)
            with open(path + "/rankboard.txt", "w") as f3:
                for i in range(order):
                    opt, optstates = optlist[i]["fitness"], optlist[i]["states"]
                    print(f"{i+1}-th optimum: {opt} {optstates}", file=f3)


# Fuctions to generate random seeds for NKmodel
def _generate_random_seeds(seed_str, n_im_seed=3, n_ctrbs_seed=3, n_init_point_seed=3):
    """
    Original code: COMBO.experiments.random_seed_config.py
    """
    rng_state = np.random.RandomState(seed=sum([ord(ch) for ch in seed_str]))
    result = {}
    for _ in range(n_im_seed):
        result[rng_state.randint(0, 10000)] = (list(rng_state.randint(0, 10000, (n_ctrbs_seed,))), list(rng_state.randint(0, 10000, (n_init_point_seed,))))
    return result

def generate_random_seeds_nkmodel():
    """
    Original code: COMBO.experiments.random_seed_config.py
    """
    return _generate_random_seeds(seed_str="NK_MODEL", n_im_seed=100, n_ctrbs_seed=100, n_init_point_seed=100)


# Easy Construction of NKmodel
def Construct_NKmodel(kwargs, im_seed_num=None, ctrbs_seed_num=None, verbose=False):

    if im_seed_num is None:
        im_seed_num = np.random.randint(100)
    if ctrbs_seed_num is None:
        ctrbs_seed_num = np.random.randint(100)
    
    # Random Seed for NKmodel
    random_seeds = generate_random_seeds_nkmodel()
    im_seed_ = sorted(random_seeds.keys())[im_seed_num]
    ctrbs_seed_list_, _ = sorted(random_seeds[im_seed_])
    ctrbs_seed_ = ctrbs_seed_list_[ctrbs_seed_num]

    # Create NK model
    nkmodel = NKmodel(kwargs['N'], kwargs['K'], A=kwargs['A'], random_seeds=(im_seed_, ctrbs_seed_))

    # Miscelleneous
    if verbose:
        print(f"im_seed_num {im_seed_num} ctrbs_seed_num {ctrbs_seed_num}")

    return nkmodel

In [3]:
# Author: Ricardo Baptista and Matthias Poloczek
# Date:   June 2018
#
# See LICENSE.md for copyright information
# Brought from BOCS github repo. (HanseulJo)

def bhs(Xorg, yorg, nsamples, burnin, thin):
    # Implementation of the Bayesian horseshoe linear regression hierarchy.
    # Parameters:
    #   Xorg     = regressor matrix [n x p]
    #   yorg     = response vector  [n x 1]
    #   nsamples = number of samples for the Gibbs sampler (nsamples > 0)
    #   burnin   = number of burnin (burnin >= 0)
    #   thin     = thinning (thin >= 1)
    #
    # Returns:
    #   beta     = regression parameters  [p x nsamples]
    #   b0       = regression param. for constant [1 x nsamples]
    #   s2       = noise variance sigma^2 [1 x nsamples]
    #   t2       = hypervariance tau^2    [1 x nsamples]
    #   l2       = hypervariance lambda^2 [p x nsamples]
    #  
    #
    # Example:
    # % Load a dataset:
    # load hald;        
    # % Run horseshoe sampler. Normalising the data is not required.
    # [beta, b0] = bhs(ingredients, heat, 1000, 100, 10);    
    # % Plot the samples of the regression coefficients:
    # boxplot(beta', 'labels', {'tricalcium aluminate','tricalcium silicate',...
    #   'tetracalcium aluminoferrite', 'beta-dicalcium silicate'});
    # title('Bayesian linear regression with the horseshoe hierarchy');
    # xlabel('Predictors');
    # ylabel('Beta');
    # grid;
    #
    #
    # References:
    # A simple sampler for the horseshoe estimator
    # E. Makalic and D. F. Schmidt
    # arXiv:1508.03884, 2015
    #
    # The horseshoe estimator for sparse signals
    # C. M. Carvalho, N. G. Polson and J. G. Scott
    # Biometrika, Vol. 97, No. 2, pp. 465--480, 2010
    #
    # (c) Copyright Enes Makalic and Daniel F. Schmidt, 2015
    # Adapted to python by Ricardo Baptista, 2018

    n, p = Xorg.shape

    # Normalize data
    X, _, _, y, muY = standardise(Xorg, yorg)

    # Return values
    beta = np.zeros((p, nsamples))
    s2 = np.zeros((1, nsamples))
    t2 = np.zeros((1, nsamples))
    l2 = np.zeros((p, nsamples))

    # Initial values
    sigma2  = 1.
    lambda2 = np.random.uniform(size=p)
    tau2    = 1.
    nu      = np.ones(p)
    xi      = 1.

    # pre-compute X'*X (used with fastmvg_rue)
    XtX = np.matmul(X.T,X)  

    # Gibbs sampler
    k = 0
    iter = 0
    while(k < nsamples):

        # Sample from the conditional posterior distribution
        sigma = np.sqrt(sigma2)
        Lambda_star = tau2 * np.diag(lambda2)
        # Determine best sampler for conditional posterior of beta's
        if (p > n) and (p > 200):
            b = fastmvg(X/sigma, y/sigma, sigma2*Lambda_star)
        else:
            b = fastmvg_rue(X/sigma, XtX/sigma2, y/sigma, sigma2*Lambda_star)

        # Sample sigma2
        e = y - np.dot(X,b)
        shape = (n + p) / 2.
        scale = np.dot(e.T,e)/2. + np.sum(b**2/lambda2)/tau2/2.
        sigma2 = 1. / np.random.gamma(shape, 1./scale)

        # Sample lambda2
        scale = 1./nu + b**2./2./tau2/sigma2
        lambda2 = 1. / np.random.exponential(1./scale)

        # Sample tau2
        shape = (p + 1.)/2.
        scale = 1./xi + np.sum(b**2./lambda2)/2./sigma2
        tau2 = 1. / np.random.gamma(shape, 1./scale)

        # Sample nu
        scale = 1. + 1./lambda2
        nu = 1. / np.random.exponential(1./scale)

        # Sample xi
        scale = 1. + 1./tau2
        xi = 1. / np.random.exponential(1./scale)

        # Store samples
        iter = iter + 1;
        if iter > burnin:
            # thinning
            if (iter % thin) == 0:
                beta[:,k] = b
                s2[:,k]   = sigma2
                t2[:,k]   = tau2
                l2[:,k]   = lambda2
                k         = k + 1

    # Re-scale coefficients
    #div_vector = np.vectorize(np.divide)
    #beta = div_vector(beta.T, normX)
    #b0 = muY-np.dot(muX,beta)
    b0 = muY

    return (beta, b0, s2, t2, l2)

def fastmvg(Phi, alpha, D):
    # Fast sampler for multivariate Gaussian distributions (large p, p > n) of 
    #  the form N(mu, S), where
    #       mu = S Phi' y
    #       S  = inv(Phi'Phi + inv(D))
    # Reference: 
    #   Fast sampling with Gaussian scale-mixture priors in high-dimensional 
    #   regression, A. Bhattacharya, A. Chakraborty and B. K. Mallick
    #   arXiv:1506.04778

    n, p = Phi.shape

    d = np.diag(D)
    u = np.random.randn(p) * np.sqrt(d)
    delta = np.random.randn(n)
    v = np.dot(Phi,u) + delta
    #w = np.linalg.solve(np.matmul(np.matmul(Phi,D),Phi.T) + np.eye(n), alpha - v)
    #x = u + np.dot(D,np.dot(Phi.T,w))
    mult_vector = np.vectorize(np.multiply)
    Dpt = mult_vector(Phi.T, d[:,np.newaxis])
    w = np.linalg.solve(np.matmul(Phi,Dpt) + np.eye(n), alpha - v)
    x = u + np.dot(Dpt,w)

    return x

def fastmvg_rue(Phi, PtP, alpha, D):
    # Another sampler for multivariate Gaussians (small p) of the form
    #  N(mu, S), where
    #  mu = S Phi' y
    #  S  = inv(Phi'Phi + inv(D))
    #
    # Here, PtP = Phi'*Phi (X'X is precomputed)
    #
    # Reference:
    #   Rue, H. (2001). Fast sampling of gaussian markov random fields. Journal
    #   of the Royal Statistical Society: Series B (Statistical Methodology) 
    #   63, 325-338

    p = Phi.shape[1]
    Dinv = np.diag(1./np.diag(D))

    # regularize PtP + Dinv matrix for small negative eigenvalues
    try:
        L = np.linalg.cholesky(PtP + Dinv)
    except:
        mat  = PtP + Dinv
        Smat = (mat + mat.T)/2.
        maxEig_Smat = np.max(np.linalg.eigvals(Smat))
        L = np.linalg.cholesky(Smat + maxEig_Smat*1e-15*np.eye(Smat.shape[0]))

    v = np.linalg.solve(L, np.dot(Phi.T,alpha))
    m = np.linalg.solve(L.T, v)
    w = np.linalg.solve(L.T, np.random.randn(p))

    x = m + w

    return x

def standardise(X, y):
    # Standardize the covariates to have zero mean and x_i'x_i = 1

    # set params
    n = X.shape[0]
    meanX = np.mean(X, axis=0)
    stdX  = np.std(X, axis=0) * np.sqrt(n)

    # Standardize X's
    #sub_vector = np.vectorize(np.subtract)
    #X = sub_vector(X, meanX)
    #div_vector = np.vectorize(np.divide)
    #X = div_vector(X, stdX)

    # Standardize y's
    meany = np.mean(y)
    y = y - meany

    return (X, meanX, stdX, y, meany)

# -- END OF FILE --

In [4]:
# Author: Ricardo Baptista and Matthias Poloczek
# Modified by HanseulJo
# Date:   June 2018
#
# See LICENSE.md for copyright information
#

class LinReg:

	def __init__(self, order):
		# Old name: __init__(self, nVars, order)
		self.order = order

	# ---------------------------------------------------------
	# ---------------------------------------------------------

	def setupData(self):

		# limit data to unique points
		X, x_idx = np.unique(self.xTrain, axis=0, return_index=True)
		y = self.yTrain[x_idx]

		# set upper threshold
		infT = 1e6

		# separate samples based on Inf output
		y_Infidx  = np.where(np.abs(y) > infT)[0]
		y_nInfidx = np.setdiff1d(np.arange(len(y)), y_Infidx)

		# save samples in two sets of variables
		self.xInf = X[y_Infidx,:]
		self.yInf = y[y_Infidx]

		self.xTrain = X[y_nInfidx,:]
		self.yTrain = y[y_nInfidx]

	# ---------------------------------------------------------
	# ---------------------------------------------------------

	def fit(self, x_vals, y_vals):
		# Old name: train(self, inputs)

		# Set nGibbs (Gibbs iterations to run)
		nGibbs = int(1e3)

		# set data
		self.xTrain = x_vals.copy()
		self.yTrain = y_vals.copy()

		# setup data for training
		self.setupData()

		# create matrix with all covariates based on order
		self.xTrain = self.order_effects(self.xTrain, self.order)
		(nSamps, nCoeffs) = self.xTrain.shape

		# check if x_train contains columns with zeros or duplicates
		# and find the corresponding indices
		check_zero = np.all(self.xTrain == np.zeros((nSamps,1)), axis=0)
		idx_zero   = np.where(check_zero == True)[0]
		idx_nnzero = np.where(check_zero == False)[0]

		# remove columns of zeros in self.xTrain
		if np.any(check_zero):
			self.xTrain = self.xTrain[:,idx_nnzero]

		# run Gibbs sampler for nGibbs steps
		attempt = 1
		trial = 100
		while(attempt):

			# re-run if there is an error during sampling
			try:
				alphaGibbs,a0,_,_,_ = bhs(self.xTrain,self.yTrain,nGibbs,0,1)
			except:
				print('error during Gibbs sampling. Trying again.')
				trial -= 1
				if trial > 0:
					continue
				else:
					raise TimeoutError

			# run until alpha matrix does not contain any NaNs
			if not np.isnan(alphaGibbs).any():
				attempt = 0
			
		# append zeros back - note alpha(1,:) is linear intercept
		alpha_pad = np.zeros(nCoeffs)
		alpha_pad[idx_nnzero] = alphaGibbs[:,-1]
		self.alpha = np.append(a0, alpha_pad)

	# ---------------------------------------------------------
	# ---------------------------------------------------------

	def predict(self, x):
		# Old name: surrogate_model(self, x, alpha): ...
		
		# SURROGATE_MODEL: Function evaluates the linear model
		# Assumption: input x only contains one row -- loosen by HanseulJo

		# generate x_all (all basis vectors) based on model order
		#x_all = np.append(1, self.order_effects(x, self.order))
		alpha = self.alpha

		x_all = PolynomialFeatures(degree=self.order, interaction_only=True).fit_transform(x)

		# check if x maps to an Inf output (if so, barrier=Inf)
		barrier = 0.
		if self.xInf.shape[0] != 0:
			if np.equal(x, self.xInf).all(axis=1).any():
				barrier = np.inf

		# compute and return objective with barrier
		out = x_all @ alpha.reshape((-1,1)) + barrier
		
		return out.reshape(-1)


	# ---------------------------------------------------------
	# ---------------------------------------------------------

	def order_effects(self, x_vals, ord_t):
		# order_effects: Function computes data matrix for all coupling
		# orders to be added into linear regression model.

		# Find number of variables
		n_samp, n_vars = x_vals.shape

		# Generate matrix to store results
		x_allpairs = x_vals

		for ord_i in range(2,ord_t+1):

			# generate all combinations of indices (without diagonals)
			offdProd = np.array(list(combinations(np.arange(n_vars),ord_i)))

			# generate products of input variables
			x_comb = np.zeros((n_samp, offdProd.shape[0], ord_i))
			for j in range(ord_i):
				x_comb[:,:,j] = x_vals[:,offdProd[:,j]]
			x_allpairs = np.append(x_allpairs, np.prod(x_comb,axis=2),axis=1)

		return x_allpairs


# -- END OF FILE --

In [5]:
def flip(state, ind):
    """ given a state (in numpy array), flip (0 <-> 1) a digit """
    assert state.ndim == 1, state.ndim
    new_state = state.copy()
    new_state[ind] = 1-new_state[ind]
    return new_state

def neighbors(state):
    """ given a state (in numpy array), return a 2D array whose rows are neighbors of the state """
    return np.stack([flip(state, i) for i in range(len(state))])

def hamming_dist(start, end):
    assert start.ndim == end.ndim == 1 , (start.ndim, end.ndim)
    assert start.size == end.size, (start.size, end.size)
    return (start != end).sum()

def is_reachable(start, end, flips):
    return (start + end).sum() % 2 == flips % 2 and hamming_dist(start,end) <= flips

def path(start, end, flips):
    assert is_reachable(start, end, flips)
    if flips == 0:
        return np.array([[]])
    N = start.size
    diff = start != end
    flip_inds = np.arange(N)[diff]
    if flips > diff.sum():
        assert (flips-diff.sum()) % 2 == 0
        n = (flips-diff.sum()) // 2
        flip_inds = np.append(flip_inds, np.tile(np.random.choice(N, n), 2))
    np.random.shuffle(flip_inds)
    return flip_inds

def wander(start, flips):
    assert start.ndim == 1 and flips % 2 == 0
    n = flips // 2
    flip_inds = np.arange(start.shape[0])
    np.random.shuffle(flip_inds)
    flip_inds = np.tile(flip_inds[:n], 2)
    return flip_inds

def is_visited(x_vals, x_new):
    assert x_new.ndim == 1, x_new.ndim
    return np.all(x_new == x_vals, axis=1).any()

def states(N):
    return np.stack([np.array(tup) for tup in itertools.product(range(2), repeat=N)])

# add new data
def add_data(inputs, x, y):
  assert len(y) == 1
  inputs['x_vals'] = np.vstack((inputs['x_vals'], x))
  inputs['y_vals'] = np.hstack((inputs['y_vals'], y))

In [6]:
def random_next(x_vals, curr_x=None):
    """
    RANDOM NEXT index to flip, but avoid visited nbh as much as possible
    """
    if curr_x is None:
        curr_x = x_vals[-1]
    num_nbh = curr_x.shape[0]  # N
    nbh_inds = np.arange(num_nbh)
    np.random.shuffle(nbh_inds)
    i_ = 0
    while i_ < num_nbh:
        flip_ind = nbh_inds[i_]
        if not is_visited(x_vals, flip(curr_x.reshape(-1), flip_ind)):
            break
        else:
            i_ += 1
    if i_ == num_nbh:
        flip_ind = np.random.randint(num_nbh)
    return flip_ind
    

def monitor_reachable_best(x_vals, y_vals, t, n_eval, show_objective_x=False):
    """
    Return a PATH to Reachable-best (and the value of R-best)
    """
    curr_x = x_vals[-1]
    left_chance = n_eval - t
    y_sort_ind = np.argsort(-y_vals)  # positive: min -y ~= max fitness
    PATH = None
    for j in y_sort_ind:
        objective_x = x_vals[j]
        objective_y = y_vals[j]
        if is_reachable(curr_x, objective_x, left_chance):
            PATH = path(curr_x, objective_x, left_chance)
            break
    if show_objective_x:
        return PATH, objective_y, objective_x
    return PATH, objective_y


def stochastic_ascent(stat_model, x_vals, max_flips, visit_weight, calculated_acqs=None):
    """
    Stochastic Ascent of Acquisition.
    """
    assert max_flips > 0
    x_vals = x_vals.copy()
    x = x_vals[-1]
    N = x.shape[0]

    for flip_ in range(1,max_flips+1):
        x_nbrs = neighbors(x)
        
        # evaluate acquisitions
        if calculated_acqs is None:
            nbrs_acquisition = np.array([stat_model(x_nbrs[i].reshape((1,-1))) for i in range(N)])
        else:
            nbrs_to_bin = [x_nbrs[i].dot(1 << np.arange(N)[::-1]) for i in range(N)]
            nbrs_acquisition = np.array([calculated_acqs[b] for b in nbrs_to_bin])

        # convert to probability (less acq, more prob)
        m_ = nbrs_acquisition.min()
        score = nbrs_acquisition.copy()
        if m_ < 0:
            score -= m_ # make all scores non negative
        vw = visit_weight(max_flips+1-flip_)
        for i in range(N):
            if is_visited(x_vals, x_nbrs[i]):
                score[i] *= vw  # give weight by a num <= 1 to visited vertex
        score_stand = softmax(score*3)  # multiplying 3: just for appropriate scaling
        #print(score_stand)

        # update x
        next_ind = int(np.random.choice(np.arange(N), p=score_stand))
        x = x_nbrs[next_ind]
        x_vals = np.concatenate((x_vals, x.reshape((1,-1))))

    return nbrs_acquisition[next_ind]


In [7]:
surrogate_model_dict = {
    'BOCS':      LinReg(order=2),
    'PolyReg':   Pipeline([('poly', PolynomialFeatures(interaction_only=True)), ('linear', LinearRegression())]),
    'PolyLasso': Pipeline([('poly', PolynomialFeatures(interaction_only=True)), ('linear', Lasso())]),
}

def neighbor_suggestion(args, stat_model, inputs, max_flips, trials=10, tqdm_on=False, calculated_acqs=None, return_scores=False):
    assert max_flips > 0

    x_vals = inputs['x_vals'].copy()
    x = x_vals[-1]
    N = args.N
    x_nbrs = neighbors(x)

    # visited penalty
    n_eval = args.n_eval
    visit_weight = lambda fl: (n_eval - fl)/(n_eval - N) if fl>N else 1.
    vw = visit_weight(max_flips)

    if max_flips == 1:
        if calculated_acqs is None:
            ascent_scores = [stat_model(x_nbrs[i].reshape((1,-1))) for i in range(N)]
        else:
            nbrs_to_bin = [x_nbrs[i].dot(1 << np.arange(N)[::-1]) for i in range(N)]  # change x_nbrs[i]=array([0,0,1,0,1,1]) into integer 11.
            ascent_scores = [calculated_acqs[b] for b in nbrs_to_bin]
    elif max_flips > 1:
        ascent_scores = []
        it = tqdm(range(N), desc=f"Stochastic Ascent from {x}") if tqdm_on else range(N)
        for i in it:
            if calculated_acqs is None:
                _asc = [stochastic_ascent(stat_model, np.vstack((x_vals, x_nbrs[i])), max_flips-1, visit_weight) for _ in range(trials)]
            else:
                _asc = [stochastic_ascent(stat_model, np.vstack((x_vals, x_nbrs[i])), max_flips-1, visit_weight, calculated_acqs=calculated_acqs) for _ in range(trials)]
            ascent_scores.append(sum(_asc)/trials)  # average
        for i in range(N):
            if is_visited(x_vals, x_nbrs[i]):
                ascent_scores[i] *= vw  # give weight by a num <= 1
    
    ascent_scores = np.array(ascent_scores)
    if return_scores:
        return ascent_scores

    best_nbr_ind = np.argmax(ascent_scores)
    return best_nbr_ind

In [8]:
def _show_best(best):
    print("*** Displaying Your Best Attempt So Far ***")
    for key in best:
        print("best", key, ":", best[key])

def _show_all(inputs):
    print("*** Displaying All Your Attempt So Far (state --> score) ***")
    x_vals, y_vals = inputs['x_vals'], inputs['y_vals']
    for i in range(y_vals.size):
        print(f"Round{i:02d}: {x_vals[i]} --> {y_vals[i]:.4f}")

def _yes_or_no():
    """
    example:
    if _yes_or_no():
        pass
    """
    YORN = 'undefined'
    while True:
        YORN = input("yes[y] or no[n]: ").strip()
        if len(YORN) <= 0:
            continue
        if YORN.lower()[0] in ['y', 'n']:
            break
    return YORN.lower()[0] == 'y'


def input_start_point(N):
    print(f"\nNK MODEL GAME MODE ON\n")
    print(f"Before start, write {N} numbers: each number should be 0 or 1.")
    print(f"Separate numbers by ' '(spacebar).")
    INPUT = []
    error_str=None
    while len(INPUT) != N or not set(INPUT).issubset(set(['0', '1'])):
        INPUT = input("* Your input: ").strip().split()
        if len(INPUT) != N:
            print(f"Wrong length: write {N} numbers and separate with spacebars.")
        if not set(INPUT).issubset(set(['0', '1'])):
            print("Wrong number: write 0 or 1 only")
    init_x = np.array([int(x) for x in INPUT])
    return init_x


def player_decision(N, inputs, best, next_x):
    print( "NOW: Which digit do YOU want to flip?")
    print(f"     Write a number m if you want to flip m-th digit: (1 <= m <= {N})")
    print(f"     Or, write 'BEST' if you want to display the BEST results so far")
    print(f"     Or, write 'history' if you want to display the ALL your attempts so far")
    while True:
        flip_idx = None
        while flip_idx not in range(1, N+1):
            INPUT = input("* Your input: ").strip()
            if INPUT == 'BEST':
                _show_best(best)
            elif INPUT == 'history':
                _show_all(inputs)
            elif INPUT.isdigit():
                flip_idx = int(INPUT)
                if flip_idx not in range(1, N+1):
                    print(f"Wrong number: Write a number from 1 to {N}: ")
            else:
                print("Wrong input format: Write again: ")
        flip_idx -= 1  # index: 0 ~ N-1
        print("Do you really want to change the state as follows?:")
        print(">> FROM :", next_x)
        print(">>   TO :", flip(next_x, flip_idx))
        if _yes_or_no():
            return flip_idx
        else:
            print("You answered NO: Re-write your input. ")
            print( "NOW: Which digit do U wanna flip?")
            print(f"     Write a number m if you want to flip m-th digit: (1 <= m <= {N})")
            print(f"     Or, Write 'BEST' if you wnat to display the BEST results so far")

In [9]:
def GAME(args, chances=None, show_interdependence=False, can_restart=False, surrogate_model_name='PolyReg', back_to_best=False, ascent_trial=64):

    print("Preparing the game.....")

    assert surrogate_model_name in ['BOCS', 'PolyReg', 'PolyLasso']

    N = args.N
    if chances is None:
        chances = args.n_eval
    elif chances != args.n_eval:
        print("args.n_eval changed:", chances)
        args.n_eval = chances
    all_states = states(N)
    
    # Construct NKmodel
    nkmodel = Construct_NKmodel(args)
    optimum, _, _landscape = nkmodel.get_global_optimum(cache=True)
    optlist = nkmodel.get_optimum_and_more(2**N)
    if show_interdependence:
        print(nkmodel.interdependence) # Boolean interdependence matrix

    start = True
    while start:
        init_x = input_start_point(N)
        init_y = nkmodel.evaluate(init_x)
        inputs = {'x_vals': init_x.reshape(1,-1), 'y_vals': init_y}
        
        best = {'x': init_x.copy(), 'y': float(init_y), 'round': 1}  # store so far best result
        
        # Surrogate model
        LR = surrogate_model_dict[surrogate_model_name]
        
        fit_diff = 0                # fitness  difference (previous --> current)       
        ctrbs_diff = np.zeros(N)    # contrib. difference (previous --> current)

        next_x = init_x.copy()
        next_y = init_y
        PATH = None
        print()
        for ROUND in range(1, chances+1):
            print(f"*** Round {ROUND}/{chances} ***")
            print( "Previous results:")
            print( "Current state:", next_x, '<== IMPROVED' if ROUND>1 and np.all(best['x'] == next_x) else '')
            print(f"Current fitness: {float(next_y):.4f} ({fit_diff:.4f} from before)")
            print( "Current improvement of contributions:", ctrbs_diff)
            print()

            # Flip-index suggestion
            left_chance = chances - ROUND + 1
            if PATH is None:
                if ROUND > 1:
                    stat_model = lambda x: LR.predict(x.reshape(1,N))[0]
                    acqs_ = LR.predict(all_states)
                    ascent_score = neighbor_suggestion(args, stat_model, inputs, left_chance, trials=ascent_trial, tqdm_on=True, calculated_acqs=acqs_, return_scores=True)
                    flip_suggest = ascent_score.argmax()
                else: # ROUND == 1
                    ascent_score = np.ones(N) / 2        # uniformly random suggestion
                    flip_suggest = np.random.randint(6)
                print(f"### The ALGORITHM says *<<{flip_suggest+1}-th>>* is the best position to flip, because...")
                for i in range(6):
                    print(f"### the value of flipping {i+1}-th position is {ascent_score[i]:.4f};")
            elif back_to_best:
                flip_suggest = PATH[-left_chance]
                print(f"### The ALGORITHM says *<<{flip_suggest+1}-th>>* is the best position to flip, because...")
                print(f"### You are ON A WAY to go back to the so-far-best (reachable) state !:", objective_x)
            print()
            

            # Flip-index Choice (by Player)
            flip_idx = player_decision(N, inputs, best, next_x)
            
            # Compute next state / fitness / contribution improvement
            next_y_temp, ctrbs_diff = nkmodel.fitness_and_contrib_diff(next_x, flip_idx)
            next_x = flip(next_x, flip_idx)
            fit_diff = next_y_temp - float(next_y)
            next_y = np.array([next_y_temp])

            # inputs update
            add_data(inputs, next_x, next_y)
            

            if back_to_best and (ROUND >= chances - N):
                # If the player did different action from the suggestion, initialize PATH
                if flip_suggest != flip_idx:
                    PATH = None 

                # Possibly, update PATH (if it exists)
                if (PATH is not None) and (left_chance-1>0) and ((left_chance-1) % 2 == 0) and (next_y > objective_y):  # new reachable best --> update PATH
                    PATH[-(left_chance-1):] = wander(next_x, left_chance-1)
            
                # At some point, we should save a PATH to reachable best so far.
                if PATH is None: 
                    PATH, objective_y, objective_x = monitor_reachable_best(inputs['x_vals'], inputs['y_vals'], ROUND, chances, show_objective_x=True)
                    print("*#*#* Notice: From now, your on the way to go back to the state:", objective_x)
                    print("*#*#* Of course, you may take different way from the algorithm's suggestion! ")
                    time.sleep(0.5)
            
            # surrogate model train
            LR.fit(inputs['x_vals'], inputs['y_vals'])

            # best state update
            if next_y > best['y']:
                best["y"] = next_y
                best["x"] = next_x
                best["round"] = ROUND
            print("\n"+"="*100+"\n")
        
        time.sleep(2)
        print("THE END: All the chances Ran out!")
        print("Finally, your best attempt is:")
        _show_best(best)
        print()

        print("Also, your FINAL attempt is:")
        print("*** Displaying Your Final Score ***")
        print("final x :", next_x)
        print("final y :", next_y)
        print()

        # Compute reachable best
        reachable_best = None
        for ind in range(len(optlist)):
            opt, optstates = optlist[ind]["fitness"], optlist[ind]["states"]
            for st in optstates:
                if is_reachable(init_x, np.array(st), chances):
                    reachable_best = opt
                    break
            if reachable_best is not None:
                break

        time.sleep(2)
        # Assesment of result
        if abs(float(next_y) - optimum) <= 1e-5:
            print("You have reached the global optimum !!! You made it !!!")
            start = False
        else:
            print("You did not reached the global optimum.")
            if abs(float(next_y) - reachable_best) <= 1e-5:
                print("However, you did your best!! (Your final answer is 'reachable-optimum')")
                start = False
            else:
                print("Well, everyone has their bad days.")
        print()
        
        # Ask whether to restart
        start = start and can_restart
        time.sleep(3)
        if start:
            print("Do you want to try again? (with the same landscape)")
            if _yes_or_no():
                print("Caution: your best score will be removed from the memory\n")
            else:
                start = False
        
        # Scoreboard
        if not start:
            print("Do you want to see the score board? (Caution: It will print 64-line results.")
            if _yes_or_no():
                for i in range(len(optlist)):
                    opt, optstates = optlist[i]["fitness"], optlist[i]["states"]
                    print(f"{i+1}-th optimum: {opt:.5f} {optstates}")

## Config

In [10]:
args = EasyDict()

args.random_seed = 2021
args.n_eval = 18
args.n_init = 2
args.N = 6
args.K = 1
args.A = 2
args.terminal_size = 50

np.set_printoptions(precision=3)

# Play a GAME!

* surrogate_model_name은 'BOCS', 'PolyReg', 'PolyLasso' 중에서 고를 수 있습니다. 속도와 정확성을 위해 PolyReg 또는 PolyReg를 추천합니다. (BOCS가 4배 정도 느립니다.)
* back_to_best는 True, False 중에서 고를 수 있으며, True 이면 Back-to-Reachable-Best Heuristic을 마지막 N=6번에 적용합니다. False를 추천합니다.
* ascent_trial은 아무런 자연수를 넣어도 됩니다. 작을수록 빠르지만 부정확하며, 클수록 느리지만 정확합니다.

In [11]:
GAME(args, chances=18, show_interdependence=False, surrogate_model_name='PolyReg', back_to_best=False, ascent_trial=64)


Preparing the game.....


NameError: ignored