In [1]:
# -*- coding: utf-8 -*-
# 文件名: solve_problem3_mpga.py
# 功能: 使用多群体遗传算法(MPGA)解决问题三

import pandas as pd
import numpy as np
import os
import random
import re
from pathlib import Path
from collections import defaultdict
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import copy

# --- 1. 全局配置区 ---

# MPGA 参数
N_POPULATIONS = 5          # 子种群数量
POP_SIZE_PER_POP = 50      # 每个子种群的大小
MAX_GEN = 200              # 最大进化代数
CX_PROB = 0.8              # 交叉概率
MUT_PROB = 0.2             # 变异概率
MIGRATION_INTERVAL = 20    # 迁移间隔代数
MIGRATION_RATE = 0.1       # 迁移率 (迁移最优个体的比例)

# 适应度评估参数
FITNESS_SAMPLE_SIZE = 20   # 每个个体适应度评估的采样情景数

# 其他参数与之前相同
YEARS = list(range(2024, 2031))
CORR_MATRIX = np.array([
    [1.0, 0.2, 0.1, 0.1, 0.1], [0.2, 1.0, 0.1, 0.1, 0.1],
    [0.1, 0.1, 1.0, 0.6, 0.2], [0.1, 0.1, 0.6, 1.0, 0.2],
    [0.1, 0.1, 0.2, 0.2, 1.0]
])
RANDOM_VAR_NAMES = ['小麦_price', '玉米_price', '番茄_price', '黄瓜_price', 'cost_factor']
CROSS_ELASTICITY = {
    '番茄': {'黄瓜': 0.5, '茄子': 0.4}, '黄瓜': {'番茄': 0.5, '茄子': 0.3},
    '茄子': {'番茄': 0.4, '黄瓜': 0.3}, '辣椒': {'茄子': 0.3}
}


# --- 2. 数据加载与情景生成模块 (与之前方案相同) ---
# 此处为了简洁，省略 load_and_prepare_data 和 ScenarioGenerator 类的代码
# 实际运行时请将上一回答中的这两个部分复制到此处

def load_and_prepare_data(data_path):
    """
    加载并预处理所有输入数据，返回一个包含所有参数的字典。
    此函数逻辑严格参考用户已有代码，以确保兼容性。
    """
    try:
        # print("（1）正在读取Excel文件...")
        path_f1 = data_path / '附件1.xlsx'
        path_f2 = data_path / '附件2.xlsx'
        
        plots_df = pd.read_excel(path_f1, sheet_name='乡村的现有耕地')
        crops_info_df = pd.read_excel(path_f1, sheet_name='乡村种植的农作物')
        stats_df = pd.read_excel(path_f2, sheet_name='2023年统计的相关数据')
        past_planting_df = pd.read_excel(path_f2, sheet_name='2023年的农作物种植情况')
        
        # 清理列名空格
        for df in [plots_df, crops_info_df, stats_df, past_planting_df]:
            df.columns = df.columns.str.strip()
        
        params = {}
        # 地块信息
        params['I_plots'] = sorted(plots_df['地块名称'].tolist())
        params['P_area'] = dict(zip(plots_df['地块名称'], plots_df['地块面积/亩']))
        params['P_plot_type'] = dict(zip(plots_df['地块名称'], plots_df['地块类型']))
        
        # 作物信息
        params['J_crops'] = sorted(crops_info_df['作物名称'].dropna().unique().tolist())
        params['P_crop_type'] = dict(zip(crops_info_df['作物名称'], crops_info_df['作物类型']))
        params['J_bean'] = [j for j, ctype in params['P_crop_type'].items() if isinstance(ctype, str) and '豆' in ctype]
        params['J_grain'] = [j for j, ctype in params['P_crop_type'].items() if '粮食' in str(ctype)]
        params['J_vegetable'] = [j for j, ctype in params['P_crop_type'].items() if '蔬菜' in str(ctype)]
        params['J_fungus'] = [j for j, ctype in params['P_crop_type'].items() if '食用菌' in str(ctype)]
        params['crop_to_id'] = {name: i for i, name in enumerate(params['J_crops'])}
        params['id_to_crop'] = {i: name for i, name in enumerate(params['J_crops'])}


        # 2023年种植历史
        params['P_past_planting'] = defaultdict(list)
        for _, row in past_planting_df.iterrows():
            params['P_past_planting'][row['种植地块']].append(row['作物名称'])

        # 经济和生产数据
        def clean_and_convert_price(value):
            if isinstance(value, str) and any(c in value for c in '-–—'):
                parts = re.split(r'[-–—]', value.strip())
                try: return (float(parts[0]) + float(parts[1])) / 2
                except (ValueError, IndexError): return np.nan
            return pd.to_numeric(value, errors='coerce')

        stats_df['销售单价/(元/斤)'] = stats_df['销售单价/(元/斤)'].apply(clean_and_convert_price)
        stats_df.dropna(subset=['销售单价/(元/斤)'], inplace=True)
        
        params['P_yield_base'] = dict(zip(stats_df['作物名称'], stats_df['亩产量/斤']))
        params['P_cost_base'] = dict(zip(stats_df['作物名称'], stats_df['种植成本/(元/亩)']))
        params['P_price_base'] = dict(zip(stats_df['作物名称'], stats_df['销售单价/(元/斤)']))
        params['P_demand_base'] = dict(zip(stats_df['作物名称'], stats_df['预期销售量/斤']))

        # 适种性 (简化逻辑，实际应更精细)
        params['S_suitability'] = defaultdict(int)
        for _, row in crops_info_df.iterrows():
            crop_name = row['作物名称']
            if pd.isna(crop_name): continue
            
            plot_types_str = row['适种的地块类型']
            if isinstance(plot_types_str, str):
                try:
                    allowed_plot_types = eval(plot_types_str)
                    for p_type in allowed_plot_types:
                        # 简化处理，假设所有地块类型都可种植两季（后续由作物类型约束）
                        params['S_suitability'][(crop_name, p_type, 1)] = 1
                        params['S_suitability'][(crop_name, p_type, 2)] = 1
                except:
                    pass
        
        return params
    except Exception as e:
        print(f"错误: 加载数据失败: {e}"); raise

class ScenarioGenerator:
    """
    根据题目要求和设定的相关性，生成随机场景。
    """
    def __init__(self, base_params, corr_matrix, var_names):
        self.base_params = base_params
        self.corr_matrix = corr_matrix
        self.var_names = var_names
        self.cholesky_L = np.linalg.cholesky(corr_matrix)
        self.var_map = {name: i for i, name in enumerate(var_names)}

    def generate_one_scenario(self):
        scenario = {
            'yield': defaultdict(dict), 'cost': defaultdict(dict),
            'price': defaultdict(dict), 'demand': defaultdict(dict)
        }
        corr_randoms = (self.cholesky_L @ np.random.normal(0, 1, size=(len(self.var_names), len(YEARS)))).T

        for t_idx, year in enumerate(YEARS):
            cost_growth = (1.05) ** (t_idx + 1)
            cost_perturb = corr_randoms[t_idx, self.var_map['cost_factor']] * 0.02
            
            for crop in self.base_params['J_crops']:
                scenario['yield'][year][crop] = self.base_params['P_yield_base'][crop] * (1 + np.random.uniform(-0.1, 0.1))
                scenario['cost'][year][crop] = self.base_params['P_cost_base'][crop] * cost_growth * (1 + cost_perturb)
                
                base_price = self.base_params['P_price_base'][crop]
                price_factor = 1.0
                if crop in self.base_params['J_grain']: price_factor = 1.0
                elif crop in self.base_params['J_vegetable']:
                    price_growth = (1.05) ** (t_idx + 1)
                    p_perturb = 0
                    if f"{crop}_price" in self.var_map: p_perturb = corr_randoms[t_idx, self.var_map[f"{crop}_price"]] * 0.05
                    price_factor = price_growth * (1 + p_perturb)
                elif crop in self.base_params['J_fungus']:
                    decrease_rate = np.random.uniform(0.01, 0.05)
                    if crop == '羊肚菌': decrease_rate = 0.05
                    price_factor = (1 - decrease_rate) ** (t_idx + 1)
                scenario['price'][year][crop] = base_price * price_factor

                base_demand = self.base_params['P_demand_base'][crop]
                demand_factor = 1.0
                if crop in ['小麦', '玉米']: demand_factor = (1 + np.random.uniform(0.05, 0.1))**(t_idx + 1)
                else: demand_factor = 1 + np.random.uniform(-0.05, 0.05)
                scenario['demand'][year][crop] = base_demand * demand_factor

        for year in YEARS:
            price_changes = {c: (scenario['price'][year][c] - self.base_params['P_price_base'][c]) / self.base_params['P_price_base'][c] for c in self.base_params['J_crops']}
            adjusted_demand = scenario['demand'][year].copy()
            for crop_a, rivals in CROSS_ELASTICITY.items():
                if crop_a in adjusted_demand:
                    demand_change_factor = sum(elasticity * price_changes.get(crop_b, 0) for crop_b, elasticity in rivals.items())
                    adjusted_demand[crop_a] *= (1 + demand_change_factor)
            scenario['demand'][year] = adjusted_demand
        return scenario


# --- 3. MPGA 核心实现模块 ---
class MultiPopulationGA:
    def __init__(self, params, scenario_generator):
        self.params = params
        self.scenario_generator = scenario_generator
        self.num_plots = len(params['I_plots'])
        self.num_genes_per_year = self.num_plots
        self.num_genes = self.num_genes_per_year * len(YEARS)
        self.populations = [self._create_population() for _ in range(N_POPULATIONS)]
        self.best_individual = None
        self.best_fitness = -np.inf

        # 预计算有效作物列表，加速个体生成和变异
        self._precompute_valid_crops()

    def _precompute_valid_crops(self):
        self.valid_crops = defaultdict(lambda: [[-1], [-1]]) # [season1, season2]
        for plot_name in self.params['I_plots']:
            plot_type = self.params['P_plot_type'][plot_name]
            s1_crops, s2_crops = [], []
            for crop_name, crop_id in self.params['crop_to_id'].items():
                if self.params['S_suitability'].get((crop_name, plot_type, 1), 0): s1_crops.append(crop_id)
                if self.params['S_suitability'].get((crop_name, plot_type, 2), 0): s2_crops.append(crop_id)
            if s1_crops: self.valid_crops[plot_name][0] = s1_crops
            if s2_crops: self.valid_crops[plot_name][1] = s2_crops

    def _create_individual(self):
        chromosome = []
        for _ in range(len(YEARS)):
            for plot_name in self.params['I_plots']:
                s1_crop = random.choice(self.valid_crops[plot_name][0])
                s2_crop = random.choice(self.valid_crops[plot_name][1])
                chromosome.append((s1_crop, s2_crop))
        return chromosome

    def _create_population(self):
        return [self._create_individual() for _ in range(POP_SIZE_PER_POP)]
    
    def _calculate_profit_in_scenario(self, chromosome, scenario):
        total_revenue, total_cost = 0, 0
        total_production = defaultdict(float)
        
        for y_idx, year in enumerate(YEARS):
            yearly_production = defaultdict(float)
            for p_idx, plot_name in enumerate(self.params['I_plots']):
                gene_idx = y_idx * self.num_plots + p_idx
                s1_crop_id, s2_crop_id = chromosome[gene_idx]
                area = self.params['P_area'][plot_name]

                for crop_id in [s1_crop_id, s2_crop_id]:
                    if crop_id != -1:
                        crop_name = self.params['id_to_crop'][crop_id]
                        yearly_production[crop_name] += area * scenario['yield'][year][crop_name]
                        total_cost += area * scenario['cost'][year][crop_name]
            
            # 计算年度收入 (考虑销量上限)
            for crop, production in yearly_production.items():
                demand = scenario['demand'][year][crop]
                price = scenario['price'][year][crop]
                sold_qty = min(production, demand)
                over_qty = production - sold_qty
                total_revenue += sold_qty * price + over_qty * price * 0.5 # 超过部分降价50%

        # 计算惩罚项
        penalty = self._calculate_penalty(chromosome)
        return total_revenue - total_cost - penalty

    def _calculate_penalty(self, chromosome):
        # 简化版惩罚，主要针对重茬和豆类轮作
        penalty = 0
        # 重茬
        for p_idx, plot_name in enumerate(self.params['I_plots']):
            # 2023 vs 2024
            y2024_crops = {c for c in chromosome[p_idx] if c != -1}
            for past_crop_name in self.params['P_past_planting'][plot_name]:
                past_crop_id = self.params['crop_to_id'][past_crop_name]
                if past_crop_id in y2024_crops: penalty += 1e7
            
            # 2024-2030
            for y_idx in range(len(YEARS) - 1):
                idx1 = y_idx * self.num_plots + p_idx
                idx2 = (y_idx + 1) * self.num_plots + p_idx
                crops1 = {c for c in chromosome[idx1] if c != -1}
                crops2 = {c for c in chromosome[idx2] if c != -1}
                if crops1.intersection(crops2): penalty += 1e7

        # 豆类轮作 (简化检查: 7年内至少有一次)
        bean_ids = {self.params['crop_to_id'][c] for c in self.params['J_bean']}
        for p_idx in range(self.num_plots):
            planted_bean = False
            for y_idx in range(len(YEARS)):
                idx = y_idx * self.num_plots + p_idx
                if bean_ids.intersection(set(chromosome[idx])):
                    planted_bean = True; break
            if not planted_bean: penalty += 5e7
        
        return penalty

    def calculate_fitness(self, chromosome):
        profits = [self._calculate_profit_in_scenario(chromosome, self.scenario_generator.generate_one_scenario())
                   for _ in range(FITNESS_SAMPLE_SIZE)]
        return np.mean(profits)

    def _selection(self, population, fitnesses):
        idx1, idx2 = random.sample(range(len(population)), 2)
        return population[idx1] if fitnesses[idx1] > fitnesses[idx2] else population[idx2]

    def _crossover(self, p1, p2):
        if random.random() < CX_PROB:
            point = random.randint(1, self.num_genes - 1)
            return p1[:point] + p2[point:], p2[:point] + p1[point:]
        return p1, p2

    def _mutate(self, chromosome):
        mutated_chrom = list(chromosome)
        for i in range(self.num_genes):
            if random.random() < MUT_PROB:
                plot_idx = i % self.num_plots
                plot_name = self.params['I_plots'][plot_idx]
                s1_crop = random.choice(self.valid_crops[plot_name][0])
                s2_crop = random.choice(self.valid_crops[plot_name][1])
                mutated_chrom[i] = (s1_crop, s2_crop)
        return mutated_chrom

    def _migrate(self):
        num_to_migrate = int(POP_SIZE_PER_POP * MIGRATION_RATE)
        if num_to_migrate == 0: return

        all_fitnesses = [[self.calculate_fitness(ind) for ind in pop] for pop in self.populations]
        best_individuals = []
        for i in range(N_POPULATIONS):
            sorted_pop = sorted(zip(self.populations[i], all_fitnesses[i]), key=lambda x: x[1], reverse=True)
            best_individuals.append([ind for ind, fit in sorted_pop[:num_to_migrate]])

        # 环形迁移: pop_i -> pop_{i+1}
        for i in range(N_POPULATIONS):
            dest_pop_idx = (i + 1) % N_POPULATIONS
            
            # 替换目标种群的最差个体
            dest_fitnesses = all_fitnesses[dest_pop_idx]
            worst_indices = np.argsort(dest_fitnesses)[:num_to_migrate]
            
            for j, source_ind in enumerate(best_individuals[i]):
                self.populations[dest_pop_idx][worst_indices[j]] = source_ind

    def run(self):
        tqdm_bar = tqdm(range(MAX_GEN), desc="MPGA进化中")
        for gen in tqdm_bar:
            all_fitnesses = []
            for i in range(N_POPULATIONS):
                # 独立进化
                fitnesses = [self.calculate_fitness(ind) for ind in self.populations[i]]
                all_fitnesses.append(fitnesses)
                
                # 寻找全局最优
                max_fit_in_pop = np.max(fitnesses)
                if max_fit_in_pop > self.best_fitness:
                    self.best_fitness = max_fit_in_pop
                    self.best_individual = self.populations[i][np.argmax(fitnesses)]
                
                # 生成下一代
                new_pop = []
                # 精英保留
                elite_idx = np.argmax(fitnesses)
                new_pop.append(self.populations[i][elite_idx])

                while len(new_pop) < POP_SIZE_PER_POP:
                    p1 = self._selection(self.populations[i], fitnesses)
                    p2 = self._selection(self.populations[i], fitnesses)
                    c1, c2 = self._crossover(p1, p2)
                    new_pop.append(self._mutate(c1))
                    if len(new_pop) < POP_SIZE_PER_POP:
                        new_pop.append(self._mutate(c2))
                self.populations[i] = new_pop

            # 迁移
            if (gen + 1) % MIGRATION_INTERVAL == 0:
                self._migrate()
            
            tqdm_bar.set_postfix(best_fitness=f"{self.best_fitness:,.0f}")
            
        return self.best_individual, self.best_fitness

# --- 4. 主程序入口 ---
if __name__ == '__main__':
    try:
        current_dir = Path.cwd()
        data_path = current_dir
        output_dir = current_dir / 'Problem3_MPGA_Results'
        output_dir.mkdir(parents=True, exist_ok=True)
        
        print("--- 问题三 MPGA 求解启动 ---")
        print(f"结果将保存至: {output_dir}")

        # (1) 加载数据
        print("\n(1/4) 加载基础数据...")
        base_params = load_and_prepare_data(data_path)
        
        # (2) 初始化场景生成器
        print("(2/4) 初始化不确定性场景生成器...")
        scenario_gen = ScenarioGenerator(base_params, CORR_MATRIX, RANDOM_VAR_NAMES)
        
        # (3) 运行MPGA求解
        print("(3/4) 开始运行MPGA算法...")
        mpga_solver = MultiPopulationGA(base_params, scenario_gen)
        best_plan, best_avg_profit = mpga_solver.run()
        print("\nMPGA求解完成！")
        print(f"找到的最优鲁棒策略的预期平均利润: {best_avg_profit:,.2f} 元")

        # (4) 结果解析与保存
        print("(4/4) 正在解析并保存最优计划...")
        result_rows = []
        for y_idx, year in enumerate(YEARS):
            for p_idx, plot_name in enumerate(self.params['I_plots']):
                gene_idx = y_idx * len(self.params['I_plots']) + p_idx
                s1_id, s2_id = best_plan[gene_idx]
                if s1_id != -1:
                    result_rows.append([year, plot_name, 1, base_params['id_to_crop'][s1_id], base_params['P_area'][plot_name]])
                if s2_id != -1:
                    result_rows.append([year, plot_name, 2, base_params['id_to_crop'][s2_id], base_params['P_area'][plot_name]])
        
        result_df = pd.DataFrame(result_rows, columns=['年份', '地块编号', '季节', '种植作物', '种植面积'])
        output_path = output_dir / "最优种植策略_MPGA.xlsx"
        result_df.to_excel(output_path, index=False)
        print(f"最优种植计划已保存至: {output_path}")

    except Exception as e:
        print(f"\n程序主流程发生严重错误: {e}")
        import traceback
        traceback.print_exc()

--- 问题三 MPGA 求解启动 ---
结果将保存至: c:\Users\86185\Desktop\2024C\5问题三\Problem3_MPGA_Results

(1/4) 加载基础数据...
错误: 加载数据失败: [Errno 2] No such file or directory: 'c:\\Users\\86185\\Desktop\\2024C\\5问题三\\附件1.xlsx'

程序主流程发生严重错误: [Errno 2] No such file or directory: 'c:\\Users\\86185\\Desktop\\2024C\\5问题三\\附件1.xlsx'


Traceback (most recent call last):
  File "C:\Users\86185\AppData\Local\Temp\ipykernel_35608\1225494881.py", line 380, in <module>
    base_params = load_and_prepare_data(data_path)
  File "C:\Users\86185\AppData\Local\Temp\ipykernel_35608\1225494881.py", line 59, in load_and_prepare_data
    plots_df = pd.read_excel(path_f1, sheet_name='乡村的现有耕地')
               ~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\Conda\Lib\site-packages\pandas\io\excel\_base.py", line 495, in read_excel
    io = ExcelFile(
        io,
    ...<2 lines>...
        engine_kwargs=engine_kwargs,
    )
  File "d:\Conda\Lib\site-packages\pandas\io\excel\_base.py", line 1550, in __init__
    ext = inspect_excel_format(
        content_or_path=path_or_buffer, storage_options=storage_options
    )
  File "d:\Conda\Lib\site-packages\pandas\io\excel\_base.py", line 1402, in inspect_excel_format
    with get_handle(
         ~~~~~~~~~~^
        content_or_path, "rb", storage_options=storage_options, is_t