In [None]:
import pandas as pd
import numpy as np
import re
import copy
import matplotlib.pyplot as plt

# data preprocessing
df = pd.read_excel('questions.xlsx',header=None)

questions = pd.DataFrame(columns=['Question', 'Colour', 'Concept', 'Discipline', 'Place'])
scores = pd.DataFrame(columns=['Question', 'Colour', 'Concept', 'Discipline', 'Place'])

preferences = pd.read_csv('perference_transformed.csv')
user_preferences = preferences.iloc[20-1].to_dict()

decision_datas = pd.read_csv('decision.csv',header=None)
decision_data = decision_datas[decision_datas.iloc[:, 0] == 20]

groups = {
    'Colour': ['PURPLE', 'BLUE', 'GREEN', 'RED', 'ORANGE'],
    'Concept': ['RISK', 'HOPE', 'SAFETY', 'VITALITY', 'POWER'],
    'Discipline': ['LITERATURE', 'PHYSICS', 'MUSIC', 'HISTORY', 'GEOGRAPHY'],
    'Place': ['SEA', 'DESERT', 'CITY', 'MOUNTAIN', 'VILLAGE']
}

value_to_group = {
    value: group
    for group, values in groups.items()
    for value in values
}

for index, row in df.iterrows():
    question_num = row.iloc[0]
    groups = re.findall(r'([A-Z]+) - (\d+), ([A-Z]+) - (\d+), ([A-Z]+) - (\d+), ([A-Z]+) - (\d+)', row.iloc[1])
    if groups:
        groups = groups[0]
        question_row = {
            'Question': question_num,
            'Colour': groups[0],
            'Concept': groups[2],
            'Discipline': groups[4],
            'Place': groups[6]
        }
        score_row = {
            'Question': question_num,
            'Colour': int(groups[1]),
            'Concept': int(groups[3]),
            'Discipline': int(groups[5]),
            'Place': int(groups[7])
        }
        
        questions = pd.concat([questions, pd.DataFrame([question_row])], ignore_index=True)
        scores = pd.concat([scores, pd.DataFrame([score_row])], ignore_index=True)


In [None]:
class BehavioralModel:
    # initialization
    def __init__(self, questions, scores, decision_data, user_preferences,
                 SC=0.5, G=0.5, adve=0.5, Sub=0.5,
                 lr=0.1, eps_grad=1e-5, beta=0.9, verbose=False):

        self.questions = questions.reset_index(drop=True)
        self.scores = scores.reset_index(drop=True)
        self.decision_data = decision_data.reset_index(drop=True)

        self.q_col = 1
        self.chosen_word_col = 2

        self.groups = {
            'Colour': ['PURPLE', 'BLUE', 'GREEN', 'RED', 'ORANGE'],
            'Concept': ['RISK', 'HOPE', 'SAFETY', 'VITALITY', 'POWER'],
            'Discipline': ['LITERATURE', 'PHYSICS', 'MUSIC', 'HISTORY', 'GEOGRAPHY'],
            'Place': ['SEA', 'DESERT', 'CITY', 'MOUNTAIN', 'VILLAGE']
        }

        self.user_preferences = dict(user_preferences)
        self.value_to_group = {v: k for k, vals in self.groups.items() for v in vals}

        self.group_preferences = {
            grp: np.mean([self.user_preferences.get(w, 2.5) for w in words])
            for grp, words in self.groups.items()
        }

        # learnable parameters
        self.SC = float(SC)
        self.G = float(G)
        self.adve = float(adve)
        self.Sub = float(Sub)
        self.lr = float(lr)
        self.eps_grad = float(eps_grad)
        self.beta = float(beta)
        self.verbose = verbose

        # momentum
        self.v_SC = 0.0
        self.v_G = 0.0
        self.v_adve = 0.0
        self.v_Sub = 0.0

        # memory: key = word, vals = list of past scores
        self.memory = {w: [] for w in self.value_to_group}
        self.group_memory = {grp: [] for grp in self.groups}


    # memory update
    def _add_memory_record(self, qnum):
        idx = self.questions.index[self.questions["Question"].astype(int) == int(qnum)][0]
        word_map = {
            'Colour': self.questions.loc[idx, 'Colour'],
            'Concept': self.questions.loc[idx, 'Concept'],
            'Discipline': self.questions.loc[idx, 'Discipline'],
            'Place': self.questions.loc[idx, 'Place']
        }
        score_map = {
            'Colour': int(self.scores.loc[idx, 'Colour']),
            'Concept': int(self.scores.loc[idx, 'Concept']),
            'Discipline': int(self.scores.loc[idx, 'Discipline']),
            'Place': int(self.scores.loc[idx, 'Place'])
        }

        for grp, w in word_map.items():
            self.memory[w].append(score_map[grp])
        for grp in score_map:
            self.group_memory[grp].append(score_map[grp])

    # Calculate the mean and standard deviation
    def _compute_stats_for_item(self, word):
        vals = self.memory[word]
        if not vals:
            return 0, 0
        arr = np.array(vals, float)
        return float(arr.mean()), float(arr.std())

    def _compute_group_stats(self, group):
        vals = []
        for w in self.groups[group]:
            vals.extend(self.memory[w])
        if not vals:
            return 0, 0
        arr = np.array(vals, float)
        return float(arr.mean()), float(arr.std())

    # SC score (The average of the first two scores) 
    # (After testing, the effect of retaining twice was the best)
    def _SC_score(self, history_values):
        if len(history_values) == 0:
            return 2.5
        else:
            return float(history_values[-1])

    # signal (Various parameters and variables are weighted to obtain)
    def _compute_option_signal(self, word, group):
        pref_word = float(self.user_preferences.get(word, 2.5))
        pref_group = float(self.group_preferences.get(group, 2.5))
        pref_final = self.G * pref_group + (1 - self.G) * pref_word

        w_mean, w_std = self._compute_stats_for_item(word)
        g_mean, g_std = self._compute_group_stats(group)

        mean_final = self.G * g_mean + (1 - self.G) * w_mean
        std_final = self.G * g_std + (1 - self.G) * w_std
        meanstd = (mean_final + (2 * self.adve - 1) * std_final)

        w_sc = self._SC_score(self.memory[word])
        g_sc = self._SC_score(self.group_memory[group])
        SC_final = self.G * g_sc + (1 - self.G) * w_sc

        return self.Sub * pref_final + (1 - self.Sub) * (self.SC * SC_final + (1 - self.SC) * meanstd)

    # choice probabilities (Widen the gap)
    def _compute_choice_probabilities(self, qnum):
        idx = self.questions.index[self.questions["Question"].astype(int) == int(qnum)][0]
        opts = {
            'Colour': self.questions.loc[idx, 'Colour'],
            'Concept': self.questions.loc[idx, 'Concept'],
            'Discipline': self.questions.loc[idx, 'Discipline'],
            'Place': self.questions.loc[idx, 'Place']
        }
        signals = []
        keys = []
        for grp, w in opts.items():
            keys.append((grp, w))
            signals.append(self._compute_option_signal(w, grp))
        arr = np.array(signals)
        exps = np.exp(arr - arr.max())
        probs = exps / exps.sum() * 100
        return {k: p for k, p in zip(keys, probs)}

    # preference update (Loss aversion and satisfaction)
    def _update_preferences_after_reveal(self, qnum, selected_word):
        idx = self.questions.index[self.questions["Question"].astype(int) == int(qnum)][0]
        scores = {
            'Colour': int(self.scores.loc[idx, 'Colour']),
            'Concept': int(self.scores.loc[idx, 'Concept']),
            'Discipline': int(self.scores.loc[idx, 'Discipline']),
            'Place': int(self.scores.loc[idx, 'Place'])
        }
        words = {
            'Colour': self.questions.loc[idx, 'Colour'],
            'Concept': self.questions.loc[idx, 'Concept'],
            'Discipline': self.questions.loc[idx, 'Discipline'],
            'Place': self.questions.loc[idx, 'Place']
        }

        for grp, w in words.items():
            if w == selected_word:
                delta = self.lr * np.abs(3 - scores[grp])
                if scores[grp] >= 3:
                    self.user_preferences[w] += delta
                else:
                    self.user_preferences[w] -= delta
            else:
                self.user_preferences[w] -= self.lr

        for grp, w in words.items():
            gscore = scores[grp]
            delta = self.lr * self.Sub * 2
            gp = self.group_preferences.get(grp, 2.5)
            if grp == self.value_to_group[selected_word]:
                gp = gp + delta if gscore >= 3 else gp - delta
            else:
                gp = gp - 0.25 * delta
            self.group_preferences[grp] = gp

    # training
    def train(self, init_qs=None, train_qs=None, max_epochs=1, convergence_tol=1e-6):

        if init_qs is None:
            init_qs = list(self.questions["Question"].iloc[:5].astype(int))
        if train_qs is None:
            train_qs = list(self.questions["Question"].iloc[5:].astype(int))

        self.memory = {w: [] for w in self.value_to_group}

        for q in init_qs:
            self._add_memory_record(q)
            actual = self.decision_data[self.decision_data.iloc[:, self.q_col].astype(int) == q]
            if len(actual):
                selected_word = actual.iloc[0, self.chosen_word_col]
                self._update_preferences_after_reveal(q, selected_word)

        results = []
        prev_loss = None

        for ep in range(max_epochs):
            total_loss = 0

            for q in train_qs:
                probs = self._compute_choice_probabilities(q)
                pred_key, pred_prob = max(probs.items(), key=lambda x: x[1])
                _, pred_word = pred_key
                pred_prob /= 100

                actual = self.decision_data[self.decision_data.iloc[:, self.q_col].astype(int) == q]
                if not len(actual):
                    self._add_memory_record(q)
                    continue
                actual_word = actual.iloc[0, self.chosen_word_col]

                actual_prob = None
                for (g, w), p in probs.items():
                    if w == actual_word:
                        actual_prob = p / 100
                        break
                if actual_prob is None:
                    self._add_memory_record(q)
                    continue

                loss = 1 - actual_prob
                total_loss += loss

                results.append({
                    "Question": q,
                    "PredWord": pred_word,
                    "ActualWord": actual_word,
                    "PredProb": pred_prob * 100,
                    "ActualProb": actual_prob * 100,
                    "Loss": loss
                })

                results[-1]["SC_val"] = self.SC
                results[-1]["G_val"] = self.G
                results[-1]["adve_val"] = self.adve
                results[-1]["Sub_val"] = self.Sub


                # gradient
                if pred_word != actual_word:
                    base = np.array([self.SC, self.G, self.adve, self.Sub])
                    grads = np.zeros_like(base)

                    for i in range(len(base)):
                        new = base.copy()
                        new[i] += self.eps_grad
                        temp = copy.deepcopy(self)
                        temp.SC, temp.G, temp.adve, temp.Sub = new

                        pert_probs = temp._compute_choice_probabilities(q)
                        pert_actual = None
                        for (g, w), p in pert_probs.items():
                            if w == actual_word:
                                pert_actual = p / 100
                                break
                        pert_loss = 1 - (pert_actual if pert_actual is not None else actual_prob)
                        grads[i] = (pert_loss - loss) / self.eps_grad

                    self.v_SC = self.beta * self.v_SC + self.lr * grads[0]
                    self.v_G = self.beta * self.v_G + self.lr * grads[1]
                    self.v_adve = self.beta * self.v_adve + self.lr * grads[2]
                    self.v_Sub = self.beta * self.v_Sub + self.lr * grads[3]

                    self.SC -= self.v_SC
                    self.G -= self.v_G
                    self.adve -= self.v_adve
                    self.Sub -= self.v_Sub

                    self.SC = np.clip(self.SC, 0, 1)
                    self.G = np.clip(self.G, 0, 1)
                    self.adve = np.clip(self.adve, 0, 1)
                    self.Sub = np.clip(self.Sub, 0, 1)

                self._update_preferences_after_reveal(q, actual_word)
                self._add_memory_record(q)

            avg_loss = total_loss / max(1, len(results))
            if self.verbose:
                print(f"Epoch {ep+1} loss={avg_loss:.6f}  SC={self.SC:.3f} G={self.G:.3f} adve={self.adve:.3f} Sub={self.Sub:.3f}")

            if prev_loss is not None and abs(prev_loss - avg_loss) < convergence_tol:
                break
            prev_loss = avg_loss

        return {
            "SC": self.SC,
            "G": self.G,
            "adve": self.adve,
            "Sub": self.Sub,
            "FinalPreferences": self.user_preferences,
            "FinalGroupPrefer": self.group_preferences
        }, pd.DataFrame(results)


In [None]:
n = 20
preferences = pd.read_csv('perference_transformed.csv')
user_preferences = preferences.iloc[n-1].to_dict()

decision_datas = pd.read_csv('decision.csv',header=None)
decision_data = decision_datas[decision_datas.iloc[:, 0] == n]

model = BehavioralModel(questions, scores, decision_data, user_preferences,
                        lr=0.6, verbose=True)

final_params, results_df = model.train(max_epochs=1)

In [None]:
from itertools import product

# The search range of the learning rate
lr_grid = [0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09,
           0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 
           1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

results = []

for n in range(1, 54):
    # Read user preference data
    preferences = pd.read_csv('perference_transformed.csv')
    user_preferences = preferences.iloc[n-1].to_dict()
    
    # Read decision-making data
    decision_datas = pd.read_csv('decision.csv', header=None)
    decision_data = decision_datas[decision_datas.iloc[:, 0] == n]
    
    # Grid search for optimal parameters
    best_accuracy = -1  # Initialize
    best_params_list = []  # Save all the optimal parameter combinations
    
    for lr in lr_grid:
        model = BehavioralModel(
            questions, scores, decision_data, user_preferences,
            lr=lr, verbose=False
        )
        
        final_params, results_df = model.train(max_epochs=1)
        
        # Calculate the accuracy rate of the last 35 questions
        subset = results_df.tail(35)
        correct_count = (subset['PredWord'] == subset['ActualWord']).sum()
        accuracy = correct_count / len(subset)
        
        # Extract the parameters and retain three decimal places
        SC_rounded = round(float(final_params['SC']), 3)
        g_rounded = round(float(final_params['G']), 3)
        adve_rounded = round(float(final_params['adve']), 3)
        sub_rounded = round(float(final_params['Sub']), 3)
        
        current_params = {
            'lr': lr,
            'SC': SC_rounded,
            'G': g_rounded,
            'adve': adve_rounded,
            'Sub': sub_rounded,
            'correct_count': int(correct_count),
            'total_samples': len(subset)
        }
        
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_params_list = [current_params]  # Only retain the current best option
        elif accuracy == best_accuracy:
            best_params_list.append(current_params)  # With the same accuracy rate, add to the list
    
    # Generate a separate row for each optimal learning rate
    for best_params in best_params_list:
        results.append({
            'n': n,
            'accuracy': f"{best_accuracy:.2%}",
            'lr': best_params['lr'],
            'SC': best_params['SC'],
            'G': best_params['G'],
            'adve': best_params['adve'],
            'Sub': best_params['Sub'],
            'correct_count': best_params['correct_count'],
            'total_samples': best_params['total_samples']
        })

# Save
if results:
    results_df_output = pd.DataFrame(results)
    results_df_output = results_df_output.sort_values('n')
    
    column_order = ['n', 'accuracy', 'lr', 'SC', 'G', 'adve', 'Sub', 'correct_count', 'total_samples']
    results_df_output = results_df_output[column_order]
    
    output_filename = 'lr_eps.csv'
    results_df_output.to_csv(output_filename, index=False, encoding='utf-8-sig')
    
    # Statistical Summary
    print("\nStatistical Summary:")
    accuracy_values = [float(r['accuracy'].rstrip('%')) / 100 for r in results]
    
    print(f"  Average best accuracy rate: {np.mean(accuracy_values):.2%}")
    print(f"  Maximum accuracy rate: {np.max(accuracy_values):.2%}")
    print(f"  Minimum accuracy rate: {np.min(accuracy_values):.2%}")
    print(f"  Standard deviation: {np.std(accuracy_values):.2%}")

In [None]:
unique_results = results_df_output.drop_duplicates('n', keep='first')
accuracy_values = unique_results['accuracy'].apply(
    lambda x: float(x.rstrip('%')) / 100
).tolist()

print(f"Average best accuracy rate: {np.mean(accuracy_values):.2%}")
print(f"Maximum accuracy rate: {np.max(accuracy_values):.2%}")
print(f"Minimum accuracy rate: {np.min(accuracy_values):.2%}")
print(f"Standard deviation: {np.std(accuracy_values):.2%}")
print(f"Number of users: {len(accuracy_values)}")

In [None]:
acc_list = []
for i in range(1,36):
    window = results_df.head(i)
    correct = (window["PredWord"] == window["ActualWord"]).sum()
    acc = correct / len(window)
    acc_list.append(acc)

results_df["Accuracy"] = acc_list

plt.figure(figsize=(12, 6))
plt.plot(results_df["Question"], results_df["Sub_val"], label="Sub")
plt.plot(results_df["Question"], results_df["SC_val"], label="SC")
plt.plot(results_df["Question"], results_df["G_val"], label="G")
plt.plot(results_df["Question"], results_df["adve_val"], label="adve")
plt.plot(results_df["Question"], results_df["Accuracy"], label="Accuracy", linewidth=2.3)

plt.ylim(0, 1)
plt.xlim(26, 60)
plt.xlabel("Question Number")
plt.ylabel("Value")
plt.title("Parameter Trajectories and Accuracy (Questions 26-60)")
plt.legend()
plt.grid(alpha=0.35)
plt.savefig("parameter_trajectories_and_accuracy_20.png")
plt.show()