# RF
## RF-GA

In [1]:
import random
import numpy as np
import pandas as pd
from deap import base, creator, tools, algorithms
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
import time

In [2]:
# 加载数据集,mobile-phone-train.csv
data = pd.read_csv('mobile-phone-train.csv',index_col=0)
#data.iloc[:,:-1]
X, y = data.iloc[:,:-1], data['price_range']
# 对y进行label encoding
le = LabelEncoder()
y = le.fit_transform(y)

In [3]:
# Gray编码转换函数
def gray_encode(value, bits):
    """ 将十进制数值转换为Gray编码的二进制字符串 """
    binary = bin(value)[2:].zfill(bits)  # 转换为二进制字符串并补齐
    gray = binary[0]  # Gray编码的第一位与二进制相同
    for i in range(1, len(binary)):
        gray += str(int(binary[i]) ^ int(binary[i-1]))  # 计算Gray编码
    return gray

def gray_decode(gray):
    # 将输入的列表转换为字符串形式
    gray_string = ''.join(map(str, gray))
    
    # 初始化二进制字符串
    binary_string = gray_string[0]  # 第一位相同

    # 解码过程
    for i in range(1, len(gray_string)):
        # 当前二进制位为前一位二进制位与当前Gray位的异或
        binary_value = int(binary_string[i-1]) ^ int(gray_string[i])
        binary_string += str(binary_value)

    # 将二进制字符串转换为十进制数
    decimal_value = int(binary_string, 2)
    return decimal_value

# 计算十进制数字对应的Gray编码位数
def gray_bits(value):
    return len(bin(value)) - 2

In [9]:
# 设置遗传算法的个体和种群
creator.create("FitnessMax", base.Fitness, weights=(1.0,))  # 最大化适应度
creator.create("Individual", list, fitness=creator.FitnessMax)

# 设置编码位数
N_ESTIMATORS_BITS = gray_bits(500)  # n_estimators 最大值为 500
MAX_DEPTH_BITS = gray_bits(50)  # max_depth 最大值为 50
MIN_SAMPLES_SPLIT_BITS = gray_bits(10)  # min_samples_split 最大值为 20
MAX_FEATURES_BITS = gray_bits(15)  # max_features 最大值为15

# 计算总位数
TOTAL_BITS = N_ESTIMATORS_BITS + MAX_DEPTH_BITS + MIN_SAMPLES_SPLIT_BITS + MAX_FEATURES_BITS

# 定义空dataframe存储计算过的个体参数及其fitness
df = pd.DataFrame(columns=['n_estimators', 'max_depth', 'min_samples_split', 'max_features', 'fitness'])

# 定义适应度函数
def evaluate_rf(individual):
    # 将individual转换为一整个str，作为dataframe的index
    index_str = ''.join(map(str, individual))

    # 检查是否已经计算过
    if index_str in df.index:
        return (df.loc[index_str, 'fitness'],)
    
    n_estimators = gray_decode(individual[0:9])  # 解码n_estimators（9位编码）
    max_depth = gray_decode(individual[9:15])  # 解码max_depth（6位编码）
    min_samples_split = gray_decode(individual[15:19])  # 解码min_samples_split（4位编码）
    max_features = gray_decode(individual[19:23]) #/ X.shape[1]  # 解码max_features（5位编码，归一化）

    # 参数合法性检查，将非法参数转换为合理范围
    n_estimators = max(1, int(n_estimators))  # 保证 n_estimators 至少为 1
    max_depth = int(max_depth) if max_depth > 0 else None  # 保证 max_depth 大于 0 或为 None
    min_samples_split = max(2, int(min_samples_split))  # 最小分裂样本数不能小于 2
    
    # max_features 校正，确保为有效值
    if max_features <= 0:
        max_features = None  # 默认使用所有特征
    elif max_features <= 1:
        max_features = float(max_features)  # 如果 max_features 小于等于 1，作为比例处理
    else:
        max_features = min(int(max_features),X.shape[1])  # 否则作为整数处理

    # 定义 RandomForestClassifier 模型
    model = RandomForestClassifier(
        n_estimators=n_estimators, 
        max_depth=max_depth, 
        min_samples_split=min_samples_split, 
        max_features=max_features,  # 调整后的 max_features
        random_state=42
    )
    
    # 得分为模型的交叉验证准确率
    scores = cross_val_score(model, X, y, cv=5, scoring='accuracy')

    # 保存计算结果
    df.loc[index_str] = [n_estimators, max_depth, min_samples_split, max_features, np.mean(scores)]

    return (np.mean(scores),)

def generate_gray_individual():
    return [random.randint(0, 1) for _ in range(TOTAL_BITS)]

# 注册超参数生成器
toolbox = base.Toolbox()
toolbox.register("individual", tools.initIterate, creator.Individual, generate_gray_individual)
#toolbox.register("individual", tools.initCycle, creator.Individual,
                 #lambda: [random.choice(['0', '1']) for _ in range(TOTAL_BITS)],n=1)  # 使用随机生成的二进制字符串

# 创建种群
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# 注册操作
toolbox.register("mate", tools.cxTwoPoint)  # 交叉操作
toolbox.register("mutate", tools.mutFlipBit, indpb=0.15)  # 变异操作
toolbox.register("select", tools.selTournament, tournsize=3)  # 选择操作
toolbox.register("evaluate", evaluate_rf)  # 适应度评估

# 进化参数设置，设置random seed
np.random.seed(42)
population = toolbox.population(n=50)  # 种群数量
NGEN = 100  # 迭代次数
CXPB = 0.5  # 交叉概率
MUTPB = 0.15  # 变异概率
ELITE_SIZE = 5  # 精英个体数量



In [None]:
# 执行遗传算法优化,并绘制适应度曲线
fitnesses_plot = []
for gen in range(NGEN):
    print(f"Generation {gen+1}")
    
    # 评估当前种群的适应度
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit
    # 记录最佳适应度
    fitnesses_plot.append(max(fitnesses)[0])

    # 选择下一代个体
    offspring = toolbox.select(population, len(population)-ELITE_SIZE)
    offspring = list(map(toolbox.clone, offspring))

    # 找到精英个体
    elites = tools.selBest(population, ELITE_SIZE)

    # 交叉与变异操作
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        # 检查个体长度
        if len(child1) > 1 and len(child2) > 1:  # 确保个体长度大于1
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    for mutant in offspring:
        if random.random() < MUTPB:
            toolbox.mutate(mutant)
            del mutant.fitness.values

    # 评估新个体的适应度
    invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
    fitnesses = map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    # 替换种群
    population[:] = elites + offspring

# 找出最优个体及其超参数
best_ind = tools.selBest(population, 1)[0]
print(f"Best individual is {best_ind}, with fitness {best_ind.fitness.values}")

Generation 1
Generation 2
Generation 3
Generation 4
Generation 5
Generation 6
Generation 7
Generation 8
Generation 9
Generation 10
Generation 11
Generation 12
Generation 13
Generation 14
Generation 15
Generation 16
Generation 17
Generation 18
Generation 19
Generation 20
Generation 21
Generation 22
Generation 23
Generation 24
Generation 25
Generation 26
Generation 27
Generation 28
Generation 29
Generation 30
Generation 31
Generation 32
Generation 33
Generation 34
Generation 35
Generation 36
Generation 37
Generation 38
Generation 39
Generation 40
Generation 41
Generation 42
Generation 43
Generation 44
Generation 45
Generation 46
Generation 47
Generation 48
Generation 49
Generation 50
Generation 51
Generation 52
Generation 53
Generation 54
Generation 55
Generation 56
Generation 57
Generation 58
Generation 59
Generation 60
Generation 61
Generation 62
Generation 63
Generation 64
Generation 65
Generation 66
Generation 67
Generation 68
Generation 69
Generation 70
Generation 71
Generation 72
G

In [None]:
# 找出最优个体及其超参数
best_ind = tools.selBest(population, 1)[0]
print(f"Best individual is {best_ind}, with fitness {best_ind.fitness.values}")

best_n_estimators = gray_decode(best_ind[0:7])  # 解码n_estimators
best_max_depth = gray_decode(best_ind[7:13])  # 解码max_depth
best_min_samples_split = gray_decode(best_ind[13:18])  # 解码min_samples_split
best_max_features = gray_decode(best_ind[18:26]) / X.shape[1]  # 解码max_features

print(f"Best n_estimators: {best_n_estimators}")
print(f"Best max_depth: {best_max_depth}")
print(f"Best min_samples_split: {best_min_samples_split}")
print(f"Best max_features: {best_max_features}")

# 用最优超参数训练最终模型
best_rf = RandomForestClassifier(
    n_estimators=best_n_estimators,
    max_depth=best_max_depth if best_max_depth > 0 else None,
    min_samples_split=int(best_min_samples_split) if best_min_samples_split >= 2 else 2,
    max_features=int(best_max_features) if best_max_features >= 1 else float(best_max_features),
    random_state=42
)


Best individual is [0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0], with fitness (0.8916666666666666,)
Best n_estimators: 37
Best max_depth: 48
Best min_samples_split: 3
Best max_features: 0.95
Test accuracy: 0.89375


In [12]:
# 读取测试数据文件并进行预测
X_test = pd.read_csv('mobile-phone-test-no-price-range.csv',index_col=0)
best_rf.fit(X, y)
y_pred = best_rf.predict(X_test)

# 将预测结果写入文件
y_pred = le.inverse_transform(y_pred)
pd.Series(y_pred, name='price_range').to_csv('RF-GA_mobile-phone-test-predictions.csv', header=True, index=True)

## RF-PSO

In [19]:
import numpy as np
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import StandardScaler
from pyswarms.single import GlobalBestPSO
# 关闭warning,ConvergenceWarning
import warnings
warnings.filterwarnings("ignore")

In [44]:
# 加载数据集,mobile-phone-train.csv
data = pd.read_csv('mobile-phone-train.csv',index_col=0)
#data.iloc[:,:-1]
X, y = data.iloc[:,:-1], data['price_range']
# 对y进行label encoding
le = LabelEncoder()
y = le.fit_transform(y)

In [45]:
# 定义适应度函数
def evaluate_rf(params):
    #n_estimators, max_depth, min_samples_split, max_features = params
    n_estimators = params[0]
    max_depth = params[1]
    min_samples_split = params[2]
    max_features = params[3]

    # 参数合法性检查，将非法参数转换为合理范围
    n_estimators = max(1, int(n_estimators))  # 保证 n_estimators 至少为 1
    max_depth = int(max_depth) if int(max_depth) > 0 else None  # 保证 max_depth 大于 0 或为 None
    min_samples_split = max(2, int(min_samples_split))  # 最小分裂样本数不能小于 2

    # max_features 校正，确保为有效值
    if max_features <= 0:
        max_features = None  # 默认使用所有特征
    elif max_features <= 1:
        max_features = float(max_features)  # 如果 max_features 小于等于 1，作为比例处理
    else:
        max_features = int(max_features)  # 否则作为整数处理
    # 定义 MLPClassifier 模型
    
    # 定义 RandomForestClassifier 模型
    model = RandomForestClassifier(
        n_estimators=n_estimators, 
        max_depth=max_depth, 
        min_samples_split=min_samples_split, 
        max_features=max_features,  # 调整后的 max_features
        random_state=42
    )

    # 使用交叉验证评估模型的 F1 宏平均分数
    scores = cross_val_score(model, X, y, cv=5, scoring='f1_macro')
    return -np.mean(scores)  # PSO 是最小化问题，返回负值以最大化得分

def evaluate_mlp_batch(params):
    # 用于批量处理的适应度函数
    return np.array([evaluate_rf(param) for param in params])

# 定义参数的边界和限制
lb = [100, 2, 2, 0.001]  # 各参数的最小值: [n_estimators, max_depth, min_samples_split, max_features]
ub = [500, 50, 20, X.shape[1]]  # 各参数的最大值: [n_estimators, max_depth, min_samples_split, max_features]
dimensions = len(lb)  # 超参数维度

# 初始化 PSO
optimizer = GlobalBestPSO(n_particles=20, dimensions=dimensions, options={'c1': 0.5, 'c2': 0.3, 'w': 0.9}, bounds=(lb, ub))
best_cost, best_pos = optimizer.optimize(evaluate_mlp_batch, iters=50)

# 输出最优参数和对应的得分
print("Best Parameters: ", best_pos)
print("Best Score (negative F1 Macro): ", best_cost)

# 使用最优参数训练模型并在测试集上评估
best_rf = RandomForestClassifier(
    n_estimators=int(best_pos[0]),
    max_depth=int(best_pos[1]) if int(best_pos[1]) > 0 else None,
    min_samples_split=int(best_pos[2]),
    max_features=int(best_pos[3]) if int(best_pos[3]) >= 1 else float(best_pos[3]),
    random_state=42
)

best_rf.fit(X, y)

In [46]:
# 读取测试数据文件并进行预测
X_test = pd.read_csv('mobile-phone-test-no-price-range.csv',index_col=0)
y_pred = best_rf.predict(X_test)

# 将预测结果写入文件
y_pred = le.inverse_transform(y_pred)
pd.Series(y_pred, name='price_range').to_csv('RF-PSO_mobile-phone-test-predictions.csv', header=True, index=True)

# MLP
## MLP-GA

In [36]:
import random
import numpy as np
from deap import base, creator, tools, algorithms
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import cross_val_score
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from pyswarms.single import GlobalBestPSO
# 关闭warning,ConvergenceWarning
import warnings
warnings.filterwarnings("ignore")

In [37]:
from sklearn.preprocessing import StandardScaler
# 加载数据集,mobile-phone-train.csv
data = pd.read_csv('mobile-phone-train.csv',index_col=0)
#data.iloc[:,:-1]
X, y = data.iloc[:,:-1], data['price_range']
# 对y进行label encoding
le = LabelEncoder()
y = le.fit_transform(y)

# 标准化数据
scaler = StandardScaler()
X = scaler.fit_transform(X)

In [38]:
# 定义适应度函数
def evaluate_mlp(individual):
    # 个体包含的超参数
    hidden_layer_sizes, activation, solver, alpha, learning_rate_init = individual

    # 将参数从列表形式转换为对应类型，限制 hidden_layer_sizes 为正整数
    hidden_layer_sizes = tuple([max(1, int(hidden_layer_sizes))])  # 隐藏层大小，确保大于0
    activation = ['identity', 'logistic', 'tanh', 'relu'][int(min(max(activation, 0), 3))]  # 激活函数
    solver = ['lbfgs', 'sgd', 'adam'][int(min(max(solver, 0), 2))]  # 优化算法
    alpha = max(0.0001, float(alpha))  # L2 正则化
    learning_rate_init = max(0.0001, float(learning_rate_init))  # 初始学习率

    # 定义 MLPClassifier 模型
    model = MLPClassifier(hidden_layer_sizes=hidden_layer_sizes,
                          activation=activation,
                          solver=solver,
                          alpha=alpha,
                          learning_rate_init=learning_rate_init,
                          random_state=42,
                          max_iter=200)

    # 使用交叉验证评估模型的 F1 宏平均分数
    scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='f1_macro')
    return (np.mean(scores),)

# 初始化超参数的取值范围
toolbox = base.Toolbox()
toolbox.register("hidden_layer_sizes", random.randint, 10, 100)  # 隐藏层大小
toolbox.register("activation", random.randint, 0, 3)  # 激活函数索引（0:identity, 1:logistic, 2:tanh, 3:relu）
toolbox.register("solver", random.randint, 0, 2)  # 优化算法索引（0:lbfgs, 1:sgd, 2:adam）
toolbox.register("alpha", random.uniform, 0.0001, 0.1)  # L2 正则化
toolbox.register("learning_rate_init", random.uniform, 0.0001, 0.1)  # 初始学习率

# 创建个体（包含超参数）
toolbox.register("individual", tools.initCycle, creator.Individual,
                 (toolbox.hidden_layer_sizes, toolbox.activation, toolbox.solver, toolbox.alpha, toolbox.learning_rate_init))

# 创建种群
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# 注册操作
toolbox.register("mate", tools.cxBlend, alpha=0.5)  # 交叉操作
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=1, indpb=0.2)  # 变异操作
toolbox.register("select", tools.selTournament, tournsize=3)  # 选择操作
toolbox.register("evaluate", evaluate_mlp)  # 适应度评估

# 进化参数设置
population = toolbox.population(n=50)  # 种群数量
NGEN = 20  # 迭代次数
CXPB = 0.5  # 交叉概率
MUTPB = 0.2  # 变异概率

In [40]:
# 执行遗传算法优化
for gen in range(NGEN):
    print(f"Generation {gen}")
    
    # 评估当前种群的适应度
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    # 选择下一代个体
    offspring = toolbox.select(population, len(population))
    offspring = list(map(toolbox.clone, offspring))

    # 交叉与变异操作
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < CXPB:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    for mutant in offspring:
        if random.random() < MUTPB:
            toolbox.mutate(mutant)
            del mutant.fitness.values

    # 评估新个体的适应度
    invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
    fitnesses = map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    # 替换种群
    population[:] = offspring

# 找出最优个体及其超参数
best_ind = tools.selBest(population, 1)[0]

# 用最优超参数训练最终模型
best_mlp = MLPClassifier(
    hidden_layer_sizes=(max(1,int(best_ind[0])),),
    activation=['identity', 'logistic', 'tanh', 'relu'][int(best_ind[1])],
    solver=['lbfgs', 'sgd', 'adam'][int(best_ind[2])],
    alpha=float(best_ind[3]),
    learning_rate_init=float(best_ind[4]),
    random_state=42,
    max_iter=200
)
best_mlp.fit(X, y)

In [41]:
# 读取测试数据文件并进行预测
X_test = pd.read_csv('mobile-phone-test-no-price-range.csv',index_col=0)
# 标准化数据
scaler = StandardScaler()
X_test = scaler.fit_transform(X_test)
y_pred = best_mlp.predict(X_test)

# 将预测结果写入文件
y_pred = le.inverse_transform(y_pred)
pd.Series(y_pred, name='price_range').to_csv('GA-MLP_mobile-phone-test-predictions.csv', header=True, index=True)

## MLP-PSO

In [16]:
from pyswarms.single import GlobalBestPSO
# 关闭warning,ConvergenceWarning
import warnings
warnings.filterwarnings("ignore")

In [30]:
from sklearn.preprocessing import StandardScaler
# 加载数据集,mobile-phone-train.csv
data = pd.read_csv('mobile-phone-train.csv',index_col=0)
#data.iloc[:,:-1]
X, y = data.iloc[:,:-1], data['price_range']
# 对y进行label encoding
le = LabelEncoder()
y = le.fit_transform(y)

# 标准化数据
scaler = StandardScaler()
X = scaler.fit_transform(X)

In [34]:
# 适应度函数
def evaluate_mlp(params):
    # 将 params 转换为适合 MLP 的形状
    hidden_layer_sizes = (int(params[0]),)  # 隐藏层大小
    '''activation = 'relu' if params[1] < 0.5 else 'tanh'  # 激活函数
    solver = 'adam' if params[2] < 0.5 else 'sgd'  # 优化器'''
    activation = ['identity', 'logistic', 'tanh', 'relu'][int(params[1])]  # 激活函数
    solver = ['lbfgs', 'sgd', 'adam'][int(params[2])]  # 优化算法
    alpha = params[3]  # L2 正则化
    learning_rate_init = params[4]  # 初始学习率

    model = MLPClassifier(hidden_layer_sizes=hidden_layer_sizes,
                          activation=activation,
                          solver=solver,
                          alpha=alpha,
                          random_state=42,
                          learning_rate_init=learning_rate_init,
                          max_iter=200)

    # 交叉验证的平均得分（F1宏平均）
    scores = cross_val_score(model, X_train, y_train, cv=5, scoring='f1_macro')
    return -np.mean(scores)  # 返回负值，因为 PSO 寻找最小值

def evaluate_mlp_batch(params):
    # 用于批量处理的适应度函数
    return np.array([evaluate_mlp(param) for param in params])

# PSO 参数
lb = [10, 0, 0, 0.0001, 0.0001]  # 各参数的最小值: [隐藏层大小, 激活函数索引, 优化算法索引, alpha, learning_rate_init]
ub = [100, 3, 2, 0.1, 0.1]  # 各参数的最大值: [隐藏层大小, 激活函数索引, 优化算法索引, alpha, learning_rate_init]
dimensions = len(lb)  # 超参数维度

# 初始化 PSO
optimizer = GlobalBestPSO(n_particles=20, dimensions=dimensions, options={'c1': 0.5, 'c2': 0.3, 'w': 0.9}, bounds=(lb, ub))
# 用于记录每次迭代的信息
best_fitness_list = []
average_fitness_list = []
iteration_times = []

# 运行 PSO 优化
start_time = time.time()  # 开始计时
best_cost, best_pos = optimizer.optimize(evaluate_mlp_batch, iters=50)

# 在测试集上验证
hidden_layer_sizes = int(best_pos[0])
activation = ['identity', 'logistic', 'tanh', 'relu'][int(best_pos[1])]
solver = ['lbfgs', 'sgd', 'adam'][int(best_pos[2])]
alpha = best_pos[3]
learning_rate_init = best_pos[4]

best_mlp = MLPClassifier(
    hidden_layer_sizes=(hidden_layer_sizes,),
    activation=activation,
    solver=solver,
    alpha=alpha,
    learning_rate_init=learning_rate_init,
    max_iter=200,
    random_state=42
)

best_mlp.fit(X, y)

In [35]:
# 读取测试数据文件并进行预测
X_test = pd.read_csv('mobile-phone-test-no-price-range.csv',index_col=0)
# 标准化数据
scaler = StandardScaler()
X_test = scaler.fit_transform(X_test)
y_pred = best_mlp.predict(X_test)

# 将预测结果写入文件
y_pred = le.inverse_transform(y_pred)
pd.Series(y_pred, name='price_range').to_csv('MLP-PSO_mobile-phone-test-predictions.csv', header=True, index=True)