In [None]:
import numpy as np
import random
import tqdm
from itertools import chain
from typing import List, Dict, Tuple
from collections import defaultdict
from util.data_generate import *
from util.methods import *

# 初始化参数
p = 50
m = 8
group_sizes = [10]*5
seed, J, n_iter, num_repeats = 1, 10000, 150, 30
np.random.seed(seed)
Sigma1 = np.loadtxt("Sigma1.txt")
mu1 = 70 * np.ones(p)
mu2, std2, R2, Sigma2 = data2(p=p, seed=seed, group_sizes=group_sizes)
mu = np.random.multivariate_normal(mu1, Sigma1)

# ---------------------------- 模拟环境 ----------------------------
class ExamEnvironment:
    def __init__(self, n_subjects=50):
        np.random.seed(42)
        self.true_means = mu
        self.cov_matrix = self._generate_cov_matrix(n_subjects)
        
    def _generate_cov_matrix(self, n: int) -> np.ndarray:
        return np.loadtxt("Sigma1.txt")
        
    def observe_scores(self, selected_indices: List[int]) -> List[float]:
        ids = selected_indices
        return np.random.multivariate_normal(mu[ids], Sigma2[np.ix_(ids, ids)])

class DynamicProbabilityEncoder:
    """设置候选集筛选时的概率向量"""
    def __init__(self, n_subjects: int):
        self.n_subjects = n_subjects
        self.prob_vector = np.ones(n_subjects) / n_subjects
        
    def update(self, sample_means: np.ndarray) -> None:
        """
        使用 Softmax 更新概率：
        """
        inverted_means = 80-sample_means

        max_inverted = np.max(inverted_means)
        exp_values = np.exp((inverted_means - max_inverted))
        softmax_probs = exp_values / np.sum(exp_values)
        
        self.prob_vector = softmax_probs / np.sum(softmax_probs)
        
    def sample_candidates(self, size: int) -> List[int]:
        """采样）"""
        return sorted(np.random.choice(
            self.n_subjects,
            size=size,
            replace=False,
            p=self.prob_vector
        ))

# ---------------------------- 整数编码遗传算法 ----------------------------
class IntegerGeneticAlgorithm:
    """基于整数序列编码的遗传算法"""
    def __init__(self, select_size: int = 8):
        self.select_size = select_size
        self.pop_size = 50        # 种群大小
        self.elite_size = 5       # 精英保留数量
        self.mutation_rate = 0.3  # 变异概率
        
    def initialize_population(self, candidates: List[int]) -> List[List[int]]:
        """初始化种群"""
        population = []
        for _ in range(self.pop_size):
            individual = random.sample(candidates, self.select_size)
            population.append(sorted(individual))
        return population
    
    def calculate_fitness(self, individual: List[int], sample_means: np.ndarray) -> float:
        """适应度函数"""
        ids = individual
        z_samples = np.random.randn(J, m)
        sub_Sigma = Sigma2[np.ix_(ids, ids)]
        U = np.linalg.cholesky(sub_Sigma)
        result = np.mean(np.min(sample_means[ids] + (U @ z_samples.T).T, axis=1))
        return -result
        
    def tournament_selection(self, population: List[List[int]], fitness: List[float]) -> List[int]:
        """tournament规模=3"""
        contestants = random.sample(list(zip(population, fitness)), 3)
        return max(contestants, key=lambda x: x[1])[0]
    
    def ordered_crossover(self, parent1: List[int], parent2: List[int]) -> List[int]:
        """交叉,保持科目不重复"""
        size = len(parent1)
        cx1, cx2 = sorted(random.sample(range(size), 2))
        child = [None] * size
        
        # 保留父代1的片段
        child[cx1:cx2] = parent1[cx1:cx2]
        
        # 从父代2填充剩余科目（按顺序且不重复）
        remaining = [item for item in parent2 if item not in child]
        ptr = 0
        for i in chain(range(cx2, size), range(0, cx2)):
            if child[i] is None:
                child[i] = remaining[ptr]
                ptr += 1
        return sorted(child)
    
    def mutate(self, individual: List[int], candidates: List[int]) -> List[int]:
        """变异：随机替换1个候选集中的科目"""
        if random.random() < self.mutation_rate:
            idx_to_replace = random.randint(0, self.select_size-1)
            available = list(set(candidates) - set(individual))
            if available:
                individual[idx_to_replace] = random.choice(available)
        return sorted(individual)
    
    def evolve(self, population: List[List[int]], fitness: List[float], candidates: List[int]) -> List[List[int]]:
        """进化"""
        new_pop = []
        
        # 保留精英
        elite_indices = np.argsort(fitness)[-self.elite_size:]
        new_pop.extend([population[i] for i in elite_indices])
        
        # 生成后代
        while len(new_pop) < self.pop_size:
            parent1 = self.tournament_selection(population, fitness)
            parent2 = self.tournament_selection(population, fitness)
            child = self.ordered_crossover(parent1, parent2)
            child = self.mutate(child, candidates)
            new_pop.append(child)
            
        return new_pop


In [None]:
# ---------------------------- 主优化流程 ----------------------------
csv_path = f'{p}_{m}_GA.csv'
csv_path1 = f'{p}_{m}_GA_pre.csv'
csv_path2 = f'{p}_{m}_GA_post.csv'

# Initialize CSV
with open(csv_path, 'w') as f:
    f.write(','.join(['Repetition'] + [f'Step_{i}' for i in range(1, n_iter+1)]) + '\n')
with open(csv_path1, 'w') as f:
    f.write(','.join(['Repetition'] + [f'Step_{i}' for i in range(1, n_iter+1)]) + '\n')
with open(csv_path2, 'w') as f:
    f.write(','.join(['Repetition'] + [f'Step_{i}' for i in range(1, n_iter+1)]) + '\n')


# 初始化环境与组件
env = ExamEnvironment(n_subjects=50)
encoder = DynamicProbabilityEncoder(n_subjects=50)
ga = IntegerGeneticAlgorithm(select_size=8)

for repeat in range(num_repeats): 

    obs_num = np.zeros(p)

    # 记录历史信息
    history = {
        'best_groups': [],
        'sample_means': np.full(50, 70.0)  # 初始样本均值
    }

    G = [] #用于计算累积遗憾
    G_pre = [] 
    G_post = []

    for epoch in tqdm(range(n_iter)):

        # 动态生成候选集（大小=20）
        candidates = encoder.sample_candidates(size=20)
        
        # 运行遗传算法
        population = ga.initialize_population(candidates)
        for _ in range(100):  # GA内部迭代
            fitness = [ga.calculate_fitness(ind, history['sample_means']) for ind in population]
            population = ga.evolve(population, fitness, candidates)
        
        # 选择最佳个体
        best_individual = max(population, key=lambda x: ga.calculate_fitness(x, history['sample_means']))
        
        # 观测分数并更新
        observed_scores = env.observe_scores(best_individual)
        obs_num[best_individual] += 1
        
        # 更新样本均值（模拟增量更新）
        for idx, score in zip(best_individual, observed_scores):
            old_mean = history['sample_means'][idx]
            history['sample_means'][idx] = 0.9 * old_mean + 0.1 * score  # 指数平滑
        
        # 更新概率编码器
        encoder.update(history['sample_means'])
        
        # 记录信息
        history['best_groups'].append(best_individual)

        z_samples = np.random.randn(J*10, m)
        sub_Sigma = Sigma2[np.ix_(best_individual, best_individual)]
        U = np.linalg.cholesky(sub_Sigma)
        G.append(np.mean(np.min(mu[best_individual] + (U @ z_samples.T).T, axis=1)))

        Gmin = 1000
        idmin = []
        z_samples2 = np.random.randn(J, m)
        for id in history['best_groups']:
            sub_Sigma = Sigma2[np.ix_(id, id)]
            B = np.linalg.cholesky(sub_Sigma)
            Gtemp = np.mean(np.min(history['sample_means'][id] + (B @ z_samples2.T).T, axis=1))
            if Gtemp < Gmin:
                idmin = id.copy()
                Gmin = Gtemp
        ids1 = idmin

        sub_Sigma1 = Sigma2[np.ix_(ids1, ids1)]
        U1 = np.linalg.cholesky(sub_Sigma1)
        G_pre.append(np.mean(np.min(mu[ids1] + (U1 @ z_samples.T).T, axis=1)))

        U2, ids2 = improvement(p, m, history['sample_means'], std2, R2, Sigma2, J=J)
        G_post.append(np.mean(np.min(mu[ids2] + (U2 @ z_samples.T).T, axis=1)))
        
    # 计算 G_real
    U3, ids3 = improvement(p, m, mu, std2, R2, Sigma2, J=J)
    z_samples = np.random.randn(J*10, m)
    G_real = np.mean(np.min(mu[ids3] + (U3 @ z_samples.T).T, axis=1))

    # 保存结果
    with open(csv_path, 'a') as f:
        f.write(','.join([str(repeat)] + [f'{x:.6f}' for x in np.array(G)-G_real]) + '\n')
    with open(csv_path1, 'a') as f:
        f.write(','.join([str(repeat)] + [f'{x:.6f}' for x in np.array(G_pre)-G_real]) + '\n')
    with open(csv_path2, 'a') as f:
        f.write(','.join([str(repeat)] + [f'{x:.6f}' for x in np.array(G_post)-G_real]) + '\n')

In [None]:
!shutdown -s -t 60