In [14]:
import pandas as pd
import warnings
from sklearn.exceptions import ConvergenceWarning
import numpy as np
import os
import pickle
import math
import random
THRESHOLD = 10
warnings.filterwarnings('ignore', category=ConvergenceWarning)
class RegretAlgorithm:
    def __init__(self, n_arms, counts=None, emp_means=None):
        self.counts = counts if counts else [0 for col in range(n_arms)]
        self.emp_means = emp_means if emp_means else [0.0 for col in range(n_arms)]
        self.ranking = []  # ranks from worst to best
        return
    def __str__(self):
        return None
    def reset(self, n_arms):
        self.counts = [0 for col in range(n_arms)]
        self.emp_means = [0.0 for col in range(n_arms)]
        self.ranking = []
        return
    def select_next_arm(self):
        pass
    def update(self, chosen_arm, reward):
        self.counts[chosen_arm] = self.counts[chosen_arm] + 1
        n = self.counts[chosen_arm]
        value = self.emp_means[chosen_arm]
        new_value = ((n - 1) / float(n)) * value + (1 / float(n)) * reward
        self.emp_means[chosen_arm] = new_value
        self.ranking = list(np.arange(len(self.counts))[np.argsort(self.counts)])
        return
class UCB1(RegretAlgorithm):
    def __init__(self, n_arms, counts=None, emp_means=None, ucbs=None, batch=False):
        RegretAlgorithm.__init__(self, n_arms, counts, emp_means)
        self.ucbs = ucbs if ucbs else [0.0 for col in range(n_arms)]  
        self.batch = batch
        return
    def __str__(self):
        return 'ucb1'
    def reset(self, n_arms):
        RegretAlgorithm.reset(self, n_arms)
        self.ucbs = [0.0 for col in range(n_arms)]
        return
    def select_next_arm(self):
        if not self.batch:
            if 0 in self.counts:  
                for arm in range(len(self.counts)):
                    if self.counts[arm] == 0:
                        return arm
            else:  
                return np.argmax(self.ucbs)
        else:
            return np.argmax(self.ucbs)
    def update(self, chosen_arm, reward):
        RegretAlgorithm.update(self, chosen_arm, reward)
        bonuses = [math.sqrt((2 * math.log(sum(self.counts) + 1)) / float(self.counts[arm] + 1e-7)) for arm in range(len(self.counts))]
        self.ucbs = [e + b for e, b in zip(self.emp_means, bonuses)]
        return
class Random(RegretAlgorithm):
    def __str__(self):
        return 'random'
    def select_next_arm(self):
        return random.randrange(len(self.emp_means))
class BayesUCBGaussian(UCB1):
    def __init__(self, n_arms, counts=None, emp_means=None, ucbs=None, c=2, assumed_sd=0.25, batch=False):
        UCB1.__init__(self, n_arms, counts, emp_means, ucbs, batch)
        self.c = c  
        self.assumed_sd = assumed_sd
        return
    def __str__(self):
        return f'bayes_ucb_gaussian_c={self.c}_assumed_sd={self.assumed_sd}'
    def update(self, chosen_arm, reward):
        RegretAlgorithm.update(self, chosen_arm, reward)
        stds = [self.c * self.assumed_sd/math.sqrt(cc+1e-7) for cc in self.counts]
        self.ucbs = [m + s for m, s in zip(self.emp_means, stds)]
        
class ETC(RegretAlgorithm):  # explore then commit

    def __init__(self, n_arms, counts=None, emp_means=None, explore_limit=1):
        RegretAlgorithm.__init__(self, n_arms, counts, emp_means)
        self.limit = explore_limit  # how many rounds per arm
        self.best_arm = -1
        return

    def __str__(self):
        return 'etc'

    def reset(self, n_arms):
        RegretAlgorithm.reset(self, n_arms)
        self.best_arm = -1
        return

    def select_next_arm(self):
        if sum(self.counts) == self.limit*len(self.counts):  # exploration just complete, pick the best arm
            self.best_arm = np.argmax(self.emp_means)

        if self.best_arm == -1:  # no best arm set, still in the exploration phase
            return np.argmin(self.counts)  # plays the arm with lowest count until exploration ends
        else:  # commit
            return self.best_arm
        
def get_field(data,arm,substrate):
    field=data[arm][substrate]
    return field
def select_substrate_ohe(Substrate_list):
    selected_substrate = random.choice(Substrate_list)
    return selected_substrate
def data_anlyse():
    data_file=pd.read_csv(r'C:\Users\Administrator\Desktop\supply data\Deoxyfluorination\deoxyf.csv')
    data_file['yield']=data_file['yield']/100
    Substrate_list=list(data_file['substrate_name'].unique())
    Substrate_list
    arm_name=pd.read_csv(r'C:\Users\Administrator\Desktop\supply data\Deoxyfluorination\arm_list_name.csv')
    arms_list=list(arm_name['base_name+fluoride_name'])
    # arms_list
    data={}
    for i in arms_list:
        data[i]={}
        for j in Substrate_list:
            i1=i.split('+')[1]
            i2=i.split('+')[0]
        
            data[i][j]=data_file[(data_file['fluoride_name']==i1)&(data_file['substrate_name']==j)&(data_file['base_name']==i2)]['yield'].values[0]

    return data,arms_list,Substrate_list
def run_simulation(num_sims, horizons, arms_list, Substrate_list,data):
    results = []
    for sim in range(num_sims):
        algo = BayesUCBGaussian(len(arms_list))
        # algo = Random(len(arms_list))
        # algo=  ETC(len(arms_list))
        algo.reset(len(arms_list))
        history = []
        for horizon in range(horizons):
            arm = algo.select_next_arm()
            condition = arms_list[arm]
            substrate = select_substrate_ohe(Substrate_list)
            field = get_field(data, condition, substrate)
            history.append({
                'sim': sim + 1,  
                'horizon': horizon + 1,  
                'condition': condition,
                'substrate': substrate,
                'yield': field
            })
            algo.update(arm, field)
        results.extend(history)
        print(f"Simulation {sim + 1} completed.")
    return results
if __name__ == '__main__':
    data,arms_list,Substrate_list=data_anlyse()
    num=400
    history = run_simulation(
        num_sims=num,
        horizons=100,
        arms_list=arms_list,
        Substrate_list=Substrate_list,
        data=data)
    history_df = pd.DataFrame(history)
    path=r'C:\Users\Administrator\Desktop\supply data\Basic experiment\Deoxyfluorination\results'
    history_df.to_csv(path+r'\BayesUCBGaussian.csv', index=False)

Simulation 1 completed.
Simulation 2 completed.
Simulation 3 completed.
Simulation 4 completed.
Simulation 5 completed.
Simulation 6 completed.
Simulation 7 completed.
Simulation 8 completed.
Simulation 9 completed.
Simulation 10 completed.
Simulation 11 completed.
Simulation 12 completed.
Simulation 13 completed.
Simulation 14 completed.
Simulation 15 completed.
Simulation 16 completed.
Simulation 17 completed.
Simulation 18 completed.
Simulation 19 completed.
Simulation 20 completed.
Simulation 21 completed.
Simulation 22 completed.
Simulation 23 completed.
Simulation 24 completed.
Simulation 25 completed.
Simulation 26 completed.
Simulation 27 completed.
Simulation 28 completed.
Simulation 29 completed.
Simulation 30 completed.
Simulation 31 completed.
Simulation 32 completed.
Simulation 33 completed.
Simulation 34 completed.
Simulation 35 completed.
Simulation 36 completed.
Simulation 37 completed.
Simulation 38 completed.
Simulation 39 completed.
Simulation 40 completed.
Simulatio