In [2]:
import numpy as np
import pandas as pd
from math import *
import matplotlib.pyplot as plt
import cmath
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim

def read_excel(filepath,kind):
	excel_file = pd.ExcelFile(filepath)
	active_sheet = excel_file.sheet_names[0]
	df = pd.read_excel(filepath,sheet_name=active_sheet)
	headers = df.columns.tolist()
	if kind == 1: # 列
		columns_data = {}
		for header in headers:
			columns_data[header] = df[header].tolist()
		return columns_data
	elif kind == 0: # 行
		rows_data = []
		for index, row in df.iterrows():
			rows_data.append(row.tolist())
		columns_data = {}
		columns_data[headers[0]] = headers[1:]
		for lists in rows_data:
			columns_data[lists[0]] = lists[1:]
		return columns_data
df = read_excel('./HANDLE_data/ThetaData.xlsx',1)
DELTA_THETA = df['θ-θideal']
M = df['M']
OMEGA = df['w']
TR = df['Tr']
PR = df['pr']

In [None]:
X = np.column_stack((M, OMEGA, TR, PR))
y = np.array(DELTA_THETA)

# 划分
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# 转换为张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32).view(-1, 1)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
X_train = X_train.to(device)
y_train = y_train.to(device)
X_val = X_val.to(device)
y_val = y_val.to(device)
X_test = X_test.to(device)
y_test = y_test.to(device)

# class SimpleNN(nn.Module):
#     def __init__(self, input_size, hidden_size, output_size):
#         super(SimpleNN, self).__init__()
#         self.fc1 = nn.Linear(input_size, hidden_size)
#         self.relu = nn.ReLU()
#         self.fc2 = nn.Linear(hidden_size, output_size)

#     def forward(self, x):
#         out = self.fc1(x)
#         out = self.relu(out)
#         out = self.fc2(out)
#         return out

# input_size = X_train.shape[1]
# hidden_size = 50
# output_size = 1
# model = SimpleNN(input_size, hidden_size, output_size).to(device)
# criterion = nn.MSELoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)
class BayesianNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout_rate=0.5):
        super(BayesianNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = torch.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

    def predict(self, x, n_samples=10):
        self.train()
        outputs = torch.stack([self.forward(x) for _ in range(n_samples)])
        return outputs.mean(0), outputs.std(0)

# 定义模型、损失函数和优化器
input_size = X_train.shape[1]
hidden_size = 50
output_size = 1
model = BayesianNN(input_size, hidden_size, output_size).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 主动学习策略
def select_data(model, X_pool, n_samples=10):
    model.eval()
    with torch.no_grad():
        _, uncertainties = model.predict(X_pool, n_samples)
    uncertainties = uncertainties.mean(dim=1)
    return uncertainties.argsort(descending=True)[:n_samples]
n_iterations = 10
n_samples_per_iteration = 10

In [5]:
for iteration in range(n_iterations):
    print(f'Iteration {iteration+1}/{n_iterations}')
    model.train()
    for epoch in range(100):  # 每次迭代训练100个epoch
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (epoch+1) % 20 == 0:
            model.eval()
            val_outputs = model(X_val)
            val_loss = criterion(val_outputs, y_val)
            print(f'Epoch [{epoch+1}/100], Loss: {loss.item():.10f}, Val Loss: {val_loss.item():.10f}')
    
    # 选择最有信息量的数据点
    selected_indices = select_data(model, X_test, n_samples=n_samples_per_iteration)
    selected_X = X_test[selected_indices]
    selected_y = y_test[selected_indices]
    
    # 将选择的数据点添加到训练集中
    X_train = torch.cat((X_train, selected_X))
    y_train = torch.cat((y_train, selected_y))
    
    # 从池中移除选择的数据点
    mask = torch.ones(len(X_test), dtype=bool)
    mask[selected_indices] = False
    X_test = X_test[mask]
    y_test = y_test[mask]

Iteration 1/10
Epoch [20/100], Loss: 0.0475287773, Val Loss: 0.0222263690
Epoch [40/100], Loss: 0.0196662564, Val Loss: 0.0197238680
Epoch [60/100], Loss: 0.0181728657, Val Loss: 0.0184753388
Epoch [80/100], Loss: 0.0170044117, Val Loss: 0.0172407515
Epoch [100/100], Loss: 0.0159453750, Val Loss: 0.0161942784
Iteration 2/10
Epoch [20/100], Loss: 0.0398363359, Val Loss: 0.0191291831
Epoch [40/100], Loss: 0.0151536381, Val Loss: 0.0151316673
Epoch [60/100], Loss: 0.0136699313, Val Loss: 0.0139399236
Epoch [80/100], Loss: 0.0126524298, Val Loss: 0.0129106026
Epoch [100/100], Loss: 0.0117638987, Val Loss: 0.0119989347
Iteration 3/10
Epoch [20/100], Loss: 0.0328953862, Val Loss: 0.0153226135
Epoch [40/100], Loss: 0.0112158563, Val Loss: 0.0113211069
Epoch [60/100], Loss: 0.0100210411, Val Loss: 0.0102222459
Epoch [80/100], Loss: 0.0091917701, Val Loss: 0.0094308946
Epoch [100/100], Loss: 0.0084770331, Val Loss: 0.0086918771
Iteration 4/10
Epoch [20/100], Loss: 0.0290464312, Val Loss: 0.0135

In [6]:
# 测试
model.eval()
with torch.no_grad():
    test_outputs = model(X_test)
    test_loss = criterion(test_outputs, y_test)
    print(f'Test Loss: {test_loss.item():.10f}')

Test Loss: 0.0009573305


In [7]:
model_path = './MODEL/MODEL_2_TRY_BYES.pth'
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}')

Model saved to ./MODEL/MODEL_2_TRY_BYES.pth


In [8]:
loaded_model = BayesianNN(input_size, hidden_size, output_size).to(device)
loaded_model.load_state_dict(torch.load(model_path))
loaded_model.eval()
print('Model loaded and ready for inference')

Model loaded and ready for inference
