In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import copy

In [None]:
def batch_loader(x, y, batch_size=64):
    data_size = x.shape[0]
    permutation = np.random.permutation(data_size)
    for i in range(0, data_size, batch_size):
        batch_permutation = permutation[i: i+batch_size]
        yield x[batch_permutation], y[batch_permutation]

In [None]:
def plot_model(model, x_tensor):
    x_axis = np.linspace(*X_BOUND, 200)
    plt.plot(x_axis, F(x_axis))
    plt.scatter(x_origin, np.squeeze(model(x_tensor).cpu().detach().numpy(), -1), color='r')
    plt.show()

In [None]:
def train(model, x_tensor, y_tensor, num_epoches=10000, batch_size=4096, learning_rate=1e-3):
    loss_fn = torch.nn.MSELoss(reduction='mean')
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    loss_recorder = []
    for e in range(num_epoches):
        loss_recorder.clear()
        for i, (x_batch, y_batch) in enumerate(batch_loader(x_tensor, y_tensor, batch_size=batch_size)):
            y_pred = model(x_batch)
            loss = loss_fn(y_pred, y_batch)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_recorder.append(loss.item())
            print('batch {} loss {}'.format(
                i+1, loss.item()
            ), end='\r')
        # print('epoch {} loss {}'.format(
        #     e+1, np.mean(loss_recorder)
        # ))
    return np.mean(loss_recorder)

In [None]:
def reset_model(model):
    for layer in model:
        if hasattr(layer, 'reset_parameters'):
            layer.reset_parameters()

# Generate Data

In [None]:
X_BOUND = [-10, 10]
def F(x):
    return x + 10*np.sin(5*x) + 7*np.cos(4*x)

In [None]:
data_size = 1000
noise = 0.1
x_origin = (X_BOUND[0] + (X_BOUND[1]-X_BOUND[0]) * np.random.rand(data_size))
y_origin = F(x_origin) + noise * np.random.rand(data_size)

In [None]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
x_tensor = torch.from_numpy(x_origin).unsqueeze_(-1).float().to(device)
y_tensor = torch.from_numpy(y_origin).unsqueeze_(-1).float().to(device)

In [None]:
model = torch.nn.Sequential(
    torch.nn.Linear(1, 100),
    torch.nn.ReLU(),
    torch.nn.Linear(100, 100),
    torch.nn.ReLU(),
    torch.nn.Linear(100, 100),
    torch.nn.ReLU(),
    torch.nn.Linear(100, 100),
    torch.nn.ReLU(),
    torch.nn.Linear(100, 1),
).to(device)

In [None]:
train(model, x_tensor, y_tensor, num_epoches=1000, batch_size=4096)
plot_model(model, x_tensor)

In [None]:
params = model.state_dict()
for key in params:
    params[key] += torch.randn_like(params[key])
model.load_state_dict(params)

# 遗传算法

In [None]:
# 遗传算法参数初始化
maxgen = 1000                         # 进化代数，即迭代次数
pop_size = 100                       # 种群规模
pcross = 0.3                        # 交叉概率选择，0和1之间
pmutation = 0.1                     # 变异概率选择，0和1之间

In [None]:
def params_to_chrom(params):
    chrom = np.empty(0)
    for key in params:
        chrom = np.append(chrom, params[key].cpu().numpy().flatten(), axis=-1)
    return chrom

def chrom_to_params(chrom, params_template):
    params = copy.deepcopy(params_template)
    idx = 0
    for key in params:
        param_length = np.prod(params_template[key].shape)
        param = torch.from_numpy(chrom[idx: idx+param_length].reshape(params_template[key].shape)).to(device)
        params[key] = param
        idx += param_length
    return params

params = model.state_dict()
params_ori = params
params_template = copy.deepcopy(params)
chrom = params_to_chrom(params)
print(chrom.shape)

params = chrom_to_params(chrom, params_template)

# for key in params:
#     print(params_ori[key] - params[key])

In [None]:
def compute_fitness(model, params):
    model.load_state_dict(params)
    loss = train(model, x_tensor, y_tensor, num_epoches=100, batch_size=8192, learning_rate=1e-3)
    fitness = 1 / loss
    return fitness

In [None]:
num_params = 0
for key in params_template:
    num_params += np.prod(params_template[key].shape)
genome = np.zeros((pop_size, num_params))
fitness_array = np.zeros(pop_size)

# 初始化种群，并计算相应的适应度
for i in range(pop_size):
    reset_model(model)
    params = copy.deepcopy(model.state_dict())
    fitness = compute_fitness(model, params)
    chrom = params_to_chrom(params)
    genome[i] = chrom
    fitness_array[i] = fitness

In [None]:
# 看一下最好的
best_index = np.argmax(fitness_array)
best_chrom = genome[best_index]
best_fitness = fitness_array[best_index]
avg_fitness = np.mean(fitness_array)
trace = (avg_fitness, best_fitness)

In [None]:
def select(genome, fitness_array, pop_size):
    indices = np.random.choice(np.arange(pop_size), size=pop_size, p=fitness_array/fitness_array.sum())
    return genome[indices], fitness_array[indices]

In [None]:
def cross(genome, idx, pop_size, num_params, cross_prob=0.8):
    if np.random.rand() < cross_prob:
        idx_other = np.random.choice(np.delete(np.arange(pop_size), idx), size=1)
        cross_rate = np.random.rand()
        cross_points = np.random.random(num_params) < 0.5
        genome[idx, cross_points], genome[idx_other, cross_points] = \
            (1-cross_prob) * genome[idx, cross_points] + cross_prob * genome[idx_other, cross_points], \
            (1-cross_prob) * genome[idx_other, cross_points] + cross_prob * genome[idx, cross_points]

In [None]:
def mutate(genome, idx, pop_size, num_params, mutate_prob=0.03):
    mutate_points = np.random.random(num_params) < mutate_prob
    mutate_mea = np.random.randn(num_params) * np.std(genome, axis=0) + np.mean(genome, axis=0)
    genome[idx] += mutate_points * mutate_mea

In [None]:
for i in range(maxgen):
    # 选择
    genome, fitness_array = select(genome, fitness_array, pop_size)
    for idx in range(pop_size):
        # 交叉
        cross(genome, idx, pop_size, num_params)
        # 变异
        # mutate(genome, idx, pop_size, num_params)
    for idx in range(pop_size):
        # 更新适应度
        fitness_array[idx] = compute_fitness(model, chrom_to_params(genome[idx], params_template))
    print(genome)
    print(fitness_array)
    print(fitness_array.max())

In [None]:
genome.shape

In [None]:
np.mean(genome, axis=0).shape
np.std(genome, axis=0).shape

In [None]:
device

In [None]:
params = chrom_to_params(genome[np.argmax(fitness_array)], params_template)
model.load_state_dict(params)
train(model, x_tensor, y_tensor, num_epoches=100, batch_size=4096, learning_rate=1e-3)
plot_model(model, x_tensor)