In [124]:
import mesa
import numpy as np
# from mesa.time import RandomActivation
from mesa.space import MultiGrid
# 导入ABMSimulator instance for event scheduling
from mesa.experimental.devs import ABMSimulator
import csv

In [125]:
# 中央政府智能体类,拓展自mesa.Agent
class CentralGovernment(mesa.Agent):
    
    def __init__(self, model):
        super().__init__(model)
        # 添加智能体属性
        self.reproductive_subsidy = 1000  # 生育补贴标准
        self.retirement_age = 65  # 退休年龄
        self.transfer_ratio = 0.1  # 财政转移支付比例
        self.role="CentralGovernment"
        self.region = "China"
    def step(self):
        # 待定义逻辑
        pass


In [126]:

# 地方政府智能体类
class LocalGovernment(mesa.Agent):
    def __init__(self, model, region):
        super().__init__( model)
        self.region = region
        self.extra_subsidy = 500  # 额外补贴
        self.fiscal_income = 0
        self.fiscal_expenditure = 0
        self.role = "Central Government"
    def step(self):
        # 执行中央政策并附加地方性措施
        pass

In [127]:

# 居民智能体类
class Resident(mesa.Agent):
    def __init__(self, model, region):
        super().__init__( model)
        # 地区
        self.region = region
        # 随机薪资
        self.salary = np.random.randint(2000, 8000)
        self.role = "resident"

    def step(self):
        # 跨区域迁移决策
        # 获取模型中所有地区
        regions = self.model.regions
        current_region = self.region
        for region in regions:
            if region != current_region:
                # 获取当前和目标地区的平均工资
                wj = self.model.get_average_wage(region)
                wi = self.model.get_average_wage(current_region)
                k = 0.1  # 迁移敏感系数
                # 计算,并判断是否迁移
                migration_prob = k * (wj - wi) / wi
                if np.random.random() < migration_prob:
                    self.region = region
                    self.model.move_resident(self, current_region, region)


In [128]:

# 企业智能体类
class Enterprise(mesa.Agent):
    def __init__(self, model, region):
        super().__init__(model)
        self.region = region
        # 获取地区居民平均薪资
        self.labor_cost = self.model.get_average_wage(region)
        self.recruitment_strategy = 10  # 初始招聘策略
        self.role = "enterprise"
    def step(self):
        # 根据区域劳动力成本调整招聘策略
        #企业对应地区薪资
        current_labor_cost = self.model.get_average_wage(self.region)
        if current_labor_cost > self.labor_cost:
            self.recruitment_strategy -= 1
        elif current_labor_cost < self.labor_cost:
            self.recruitment_strategy += 1
        self.labor_cost = current_labor_cost


In [129]:
# 模型类
class NationalModel(mesa.Model):
    def __init__(self, num_regions, num_residents, num_enterprises ,simulator:ABMSimulator = None):
        # 添加一行
        super().__init__(seed=None)
        # 添加两行和对应参数
        # self.simulator = simulator
        # self.simulator.setup(self)
        self.num_regions = num_regions
        # self.schedule = RandomActivation(self)
        # 可能要改
        self.grid = MultiGrid(10, 10, torus=True)
        # 地区就是i
        self.regions = list(range(num_regions))
        self.residents = []
        self.enterprises = []
# 添加data收集功能s
        self.datacollector = mesa.datacollection.DataCollector(
            agent_reporters={"region":lambda a: a.region, "salary":lambda a: a.salary if a.role == "resident" else 0 },
        )
        # self.datacollector = mesa.DataCollector(
        #             model_reporters={"mean_age": lambda m: m.agents.agg("age", np.mean)},
        #             agent_reporters={"age": "age"}
        #         )

        # 创建中央政府智能体
        self.central_government = CentralGovernment(model=self)
        # self.schedule.add(self.central_government)

        # 创建地方政府智能体
        for i in range(num_regions):
            # 地区就是i
            local_gov = LocalGovernment(self, i)
            # self.schedule.add(local_gov)

        # 创建居民智能体
        for i in range(num_residents):
            region = np.random.choice(self.regions)
            resident = Resident(self, region)
            self.residents.append(resident)
            # self.schedule.add(resident)

        # 创建企业智能体
        for i in range(num_enterprises):
            region = np.random.choice(self.regions)
            enterprise = Enterprise( self, region)
            self.enterprises.append(enterprise)
            # self.schedule.add(enterprise)

    def get_average_wage(self, region):
        residents_in_region = [r for r in self.residents if r.region == region]
        if len(residents_in_region) == 0:
            return 0
        total_salary = sum([r.salary for r in residents_in_region])
        return total_salary / len(residents_in_region)

    def move_resident(self, resident, from_region, to_region):
        resident.region = to_region
        pass

    def calculate_labor_flow(self):
        # 计算劳动力流动
        for i in self.regions:
            for j in self.regions:
                if i != j:
                    wi = self.get_average_wage(i)
                    wj = self.get_average_wage(j)
                    # 对应地区居民
                    Li = len([r for r in self.residents if r.region == i])
                    k = 0.1
                    # 计算迁移人口:迁移系数*(j地pj薪资-i地pj薪资)/w地pj薪资*居民数
                    Mi_j = k * (wj - wi) / wi * Li
                    print(f"从区域 {i} 到区域 {j} 的迁移人口: {Mi_j}")

    def calculate_fiscal_transfer(self):
        # 计算财政转移支付
        central_gov = self.central_government
        for i in self.regions:
            for j in self.regions:
                if i != j:
                    GDPi = np.random.randint(10000, 50000)
                    GDPj = np.random.randint(10000, 50000)
                    # 转移比例*(i地gdp-j地gdp)
                    transfer = central_gov.transfer_ratio * (GDPi - GDPj)
                    print(f"从区域 {i} 到区域 {j} 的财政转移支付: {transfer}")

    def step(self):
        # self.schedule.step()
        # 添加一行
# 收集特定智能体数据
        self.agents.shuffle_do("step")
        self.calculate_labor_flow()
        self.calculate_fiscal_transfer()
        self.datacollector.collect(self)

In [130]:
# 创建模型实例并运行模拟
model = NationalModel(num_regions=3, num_residents=100, num_enterprises=20)
for _ in range(10):
    model.step()
resident_data = model.datacollector.get_agent_vars_dataframe()
with open("resident_data.csv", "w",encoding="gbk") as f:
    resident_data.to_csv(f)
print(resident_data)
    

从区域 0 到区域 1 的迁移人口: 0.17061614833782038
从区域 0 到区域 2 的迁移人口: -0.17694077777498346
从区域 1 到区域 0 的迁移人口: -0.18680872098005624
从区域 1 到区域 2 的迁移人口: -0.3805423195132612
从区域 2 到区域 0 的迁移人口: 0.16430308202156826
从区域 2 到区域 1 的迁移人口: 0.32273326056527496
从区域 0 到区域 1 的财政转移支付: -1356.0
从区域 0 到区域 2 的财政转移支付: -404.6
从区域 1 到区域 0 的财政转移支付: -355.6
从区域 1 到区域 2 的财政转移支付: 549.6
从区域 2 到区域 0 的财政转移支付: -949.5
从区域 2 到区域 1 的财政转移支付: -100.4
从区域 0 到区域 1 的迁移人口: 0.17061614833782038
从区域 0 到区域 2 的迁移人口: -0.17694077777498346
从区域 1 到区域 0 的迁移人口: -0.18680872098005624
从区域 1 到区域 2 的迁移人口: -0.3805423195132612
从区域 2 到区域 0 的迁移人口: 0.16430308202156826
从区域 2 到区域 1 的迁移人口: 0.32273326056527496
从区域 0 到区域 1 的财政转移支付: 237.4
从区域 0 到区域 2 的财政转移支付: -42.7
从区域 1 到区域 0 的财政转移支付: -2559.0
从区域 1 到区域 2 的财政转移支付: 518.3000000000001
从区域 2 到区域 0 的财政转移支付: 766.4000000000001
从区域 2 到区域 1 的财政转移支付: -1172.4
从区域 0 到区域 1 的迁移人口: 0.17061614833782038
从区域 0 到区域 2 的迁移人口: -0.17694077777498346
从区域 1 到区域 0 的迁移人口: -0.18680872098005624
从区域 1 到区域 2 的迁移人口: -0.3805423195132612
从区域 2 到区域 0 

In [131]:
# import mesa
# import numpy as np
# from mesa.space import MultiGrid

# # 中央政府智能体类
# class CentralGovernment(mesa.Agent):
#     def __init__(self,  model):
#         super().__init__( model)
#         self.reproductive_subsidy = 1000  # 生育补贴标准
#         self.retirement_age = 65  # 退休年龄
#         self.transfer_ratio = 0.1  # 财政转移支付比例

#     def step(self):
#         pass

# # 地方政府智能体类
# class LocalGovernment(mesa.Agent):
#     def __init__(self, unique_id, model, region):
#         super().__init__(unique_id, model)
#         self.region = region
#         self.extra_subsidy = 500  # 额外补贴
#         self.fiscal_income = 0
#         self.fiscal_expenditure = 0

#     def step(self):
#         # 执行中央政策并附加地方性措施
#         pass

# # 居民智能体类
# class Resident(mesa.Agent):
#     def __init__(self, unique_id, model, region):
#         super().__init__(unique_id, model)
#         self.region = region
#         self.salary = np.random.randint(2000, 8000)

#     def step(self):
#         # 跨区域迁移决策
#         regions = self.model.regions
#         current_region = self.region
#         for region in regions:
#             if region != current_region:
#                 wj = self.model.get_average_wage(region)
#                 wi = self.model.get_average_wage(current_region)
#                 k = 0.1  # 迁移敏感系数
#                 migration_prob = k * (wj - wi) / wi
#                 if np.random.random() < migration_prob:
#                     self.region = region
#                     self.model.move_resident(self, current_region, region)

# # 企业智能体类
# class Enterprise(mesa.Agent):
#     def __init__(self, unique_id, model, region):
#         super().__init__(unique_id, model)
#         self.region = region
#         self.labor_cost = self.model.get_average_wage(region)
#         self.recruitment_strategy = 10  # 初始招聘策略

#     def step(self):
#         # 根据区域劳动力成本调整招聘策略
#         current_labor_cost = self.model.get_average_wage(self.region)
#         if current_labor_cost > self.labor_cost:
#             self.recruitment_strategy -= 1
#         elif current_labor_cost < self.labor_cost:
#             self.recruitment_strategy += 1
#         self.labor_cost = current_labor_cost

# # 模型类
# class NationalModel(mesa.Model):
#     def __init__(self, num_regions, num_residents, num_enterprises):
#         self.num_regions = num_regions
#         self.schedule = mesa.RandomActivation(self)
#         self.grid = MultiGrid(10, 10, torus=True)
#         self.regions = list(range(num_regions))
#         self.residents = []
#         self.enterprises = []

#         # 创建中央政府智能体
#         self.central_government = CentralGovernment(0, self)
#         self.schedule.add(self.central_government)

#         # 创建地方政府智能体
#         for i in range(num_regions):
#             local_gov = LocalGovernment(i + 1, self, i)
#             self.schedule.add(local_gov)

#         # 创建居民智能体
#         for i in range(num_residents):
#             region = np.random.choice(self.regions)
#             resident = Resident(i + num_regions + 1, self, region)
#             self.residents.append(resident)
#             self.schedule.add(resident)

#         # 创建企业智能体
#         for i in range(num_enterprises):
#             region = np.random.choice(self.regions)
#             enterprise = Enterprise(i + num_regions + num_residents + 1, self, region)
#             self.enterprises.append(enterprise)
#             self.schedule.add(enterprise)

#     def get_average_wage(self, region):
#         residents_in_region = [r for r in self.residents if r.region == region]
#         if len(residents_in_region) == 0:
#             return 0
#         total_salary = sum([r.salary for r in residents_in_region])
#         return total_salary / len(residents_in_region)

#     def move_resident(self, resident, from_region, to_region):
#         # 处理居民迁移的逻辑
#         pass

#     def calculate_labor_flow(self):
#         # 计算劳动力流动
#         for i in self.regions:
#             for j in self.regions:
#                 if i != j:
#                     wi = self.get_average_wage(i)
#                     wj = self.get_average_wage(j)
#                     Li = len([r for r in self.residents if r.region == i])
#                     k = 0.1
#                     Mi_j = k * (wj - wi) / wi * Li
#                     print(f"从区域 {i} 到区域 {j} 的迁移人口: {Mi_j}")

#     def calculate_fiscal_transfer(self):
#         # 计算财政转移支付
#         central_gov = self.central_government
#         for i in self.regions:
#             for j in self.regions:
#                 if i != j:
#                     GDPi = np.random.randint(10000, 50000)
#                     GDPj = np.random.randint(10000, 50000)
#                     transfer = central_gov.transfer_ratio * (GDPi - GDPj)
#                     print(f"从区域 {i} 到区域 {j} 的财政转移支付: {transfer}")

#     def step(self):
#         self.schedule.step()
#         self.calculate_labor_flow()
#         self.calculate_fiscal_transfer()

# # 创建模型实例并运行模拟
# model = NationalModel(num_regions=3, num_residents=100, num_enterprises=20)
# for _ in range(10):
#     model.step()
    

In [132]:
# import mesa
# import numpy as np
# from mesa.time import RandomActivation
# from mesa.space import MultiGrid

# # 中央政府智能体类
# class CentralGovernment(mesa.Agent):
#     def __init__(self, unique_id, model):
#         super().__init__(unique_id, model)
#         self.reproductive_subsidy = 1000  # 生育补贴标准
#         self.retirement_age = 65  # 退休年龄
#         self.transfer_ratio = 0.1  # 财政转移支付比例

#     def step(self):
#         pass

# # 地方政府智能体类
# class LocalGovernment(mesa.Agent):
#     def __init__(self, unique_id, model, region):
#         super().__init__(unique_id, model)
#         self.region = region
#         self.extra_subsidy = 500  # 额外补贴
#         self.fiscal_income = 0
#         self.fiscal_expenditure = 0

#     def step(self):
#         # 执行中央政策并附加地方性措施
#         pass

# # 居民智能体类
# class Resident(mesa.Agent):
#     def __init__(self, unique_id, model, region):
#         super().__init__(unique_id, model)
#         self.region = region
#         self.salary = np.random.randint(2000, 8000)

#     def step(self):
#         # 跨区域迁移决策
#         regions = self.model.regions
#         current_region = self.region
#         for region in regions:
#             if region != current_region:
#                 wj = self.model.get_average_wage(region)
#                 wi = self.model.get_average_wage(current_region)
#                 k = 0.1  # 迁移敏感系数
#                 migration_prob = k * (wj - wi) / wi
#                 if np.random.random() < migration_prob:
#                     self.region = region
#                     self.model.move_resident(self, current_region, region)

# # 企业智能体类
# class Enterprise(mesa.Agent):
#     def __init__(self, unique_id, model, region):
#         super().__init__(unique_id, model)
#         self.region = region
#         self.labor_cost = self.model.get_average_wage(region)
#         self.recruitment_strategy = 10  # 初始招聘策略

#     def step(self):
#         # 根据区域劳动力成本调整招聘策略
#         current_labor_cost = self.model.get_average_wage(self.region)
#         if current_labor_cost > self.labor_cost:
#             self.recruitment_strategy -= 1
#         elif current_labor_cost < self.labor_cost:
#             self.recruitment_strategy += 1
#         self.labor_cost = current_labor_cost

# # 模型类
# class NationalModel(mesa.Model):
#     def __init__(self, num_regions, num_residents, num_enterprises):
#         self.num_regions = num_regions
#         self.schedule = RandomActivation(self)
#         self.grid = MultiGrid(10, 10, torus=True)
#         self.regions = list(range(num_regions))
#         self.residents = []
#         self.enterprises = []

#         # 创建中央政府智能体
#         self.central_government = CentralGovernment(0, self)
#         self.schedule.add(self.central_government)

#         # 创建地方政府智能体
#         for i in range(num_regions):
#             local_gov = LocalGovernment(i + 1, self, i)
#             self.schedule.add(local_gov)

#         # 创建居民智能体
#         for i in range(num_residents):
#             region = np.random.choice(self.regions)
#             resident = Resident(i + num_regions + 1, self, region)
#             self.residents.append(resident)
#             self.schedule.add(resident)

#         # 创建企业智能体
#         for i in range(num_enterprises):
#             region = np.random.choice(self.regions)
#             enterprise = Enterprise(i + num_regions + num_residents + 1, self, region)
#             self.enterprises.append(enterprise)
#             self.schedule.add(enterprise)

#     def get_average_wage(self, region):
#         residents_in_region = [r for r in self.residents if r.region == region]
#         if len(residents_in_region) == 0:
#             return 0
#         total_salary = sum([r.salary for r in residents_in_region])
#         return total_salary / len(residents_in_region)

#     def move_resident(self, resident, from_region, to_region):
#         # 处理居民迁移的逻辑
#         pass

#     def calculate_labor_flow(self):
#         # 计算劳动力流动
#         for i in self.regions:
#             for j in self.regions:
#                 if i != j:
#                     wi = self.get_average_wage(i)
#                     wj = self.get_average_wage(j)
#                     Li = len([r for r in self.residents if r.region == i])
#                     k = 0.1
#                     Mi_j = k * (wj - wi) / wi * Li
#                     print(f"从区域 {i} 到区域 {j} 的迁移人口: {Mi_j}")

#     def calculate_fiscal_transfer(self):
#         # 计算财政转移支付
#         central_gov = self.central_government
#         for i in self.regions:
#             for j in self.regions:
#                 if i != j:
#                     GDPi = np.random.randint(10000, 50000)
#                     GDPj = np.random.randint(10000, 50000)
#                     transfer = central_gov.transfer_ratio * (GDPi - GDPj)
#                     print(f"从区域 {i} 到区域 {j} 的财政转移支付: {transfer}")

#     def step(self):
#         self.schedule.step()
#         self.calculate_labor_flow()
#         self.calculate_fiscal_transfer()

# # 创建模型实例并运行模拟
# model = NationalModel(num_regions=3, num_residents=100, num_enterprises=20)
# for _ in range(10):
#     model.step()
    