In [1]:




import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import expm
from scipy.integrate import quad
import time



def matrix_exponential(matrix, t):
    return expm(matrix * t)



def run_simulation_D1_transformed(fixed_dt, end_t, gamma, beta, second_iteration_transformed, D1, initial,k_max,k_function):
    D = D1
    
    # 初始化变量
    x = []
    p = []
    clock = []
    # 使用给定的 initial 初始化 x[0]
    x_k = initial
    p_k = 0
    tau = 0
    
    for i in range(k_max+1):
        delta_t = fixed_dt
        h = delta_t / 2
        W_h = np.random.normal(0, np.sqrt(h))
        exp_Dh = matrix_exponential(D, h)
        
        
        update_vector = exp_Dh @ np.array([x_k, p_k]) + exp_Dh @ (np.array([0, np.sqrt(2 * gamma / beta) * W_h]))

        x_mid, p_mid = update_vector
        
        x_mid, p_mid = second_iteration_transformed(x_mid, p_mid, delta_t, beta)
        
        W_h = np.random.normal(0, np.sqrt(h))
        update_vector = exp_Dh @ np.array([x_mid, p_mid]) + exp_Dh @ np.array([0, np.sqrt(2 * gamma / beta)]) * W_h
        x_k,p_k = update_vector
        tau = tau + delta_t/k_function(update_vector[0])
        
        if tau > end_t :  # 确保不会超出范围
            x.append(update_vector[0])
            p.append(update_vector[1])
            clock.append(tau)
            
        
    
    
    
    return np.array(x),np.array(clock)


def run_simulation_D2_transformed(fixed_dt, end_t, gamma, beta, second_iteration_transformed, D2, initial,k_max,k_function):
    D = D2
    
    # 初始化变量
    x = []
    p = []
    clock = []
    # 使用给定的 initial 初始化 x[0]
    x_k = initial
    p_k = 0
    tau = 0
    
    for i in range(k_max+1):
        delta_t = fixed_dt
        h = delta_t / 2
        x_mid, p_mid = second_iteration_transformed(x_k, p_k, h, beta)
        
        x_mid += p_mid * h
        p_mid += 0
        
        W_h = np.random.normal(0, np.sqrt(delta_t))
        exp_Dh = matrix_exponential(D, delta_t)
        update_vector = exp_Dh @ np.array([x_mid, p_mid]) + np.array(exp_Dh @ np.array([0, np.sqrt(2 * gamma / beta)]) )* W_h
        x_mid, p_mid = update_vector
        
        x_mid += p_mid * h
        p_mid += 0
        
        x_mid, p_mid = second_iteration_transformed(x_mid, p_mid, h, beta)
        x_k,p_k = x_mid,p_mid
        tau = tau + delta_t/k_function(update_vector[0])
        
        if tau > end_t :  # 确保不会超出范围
            x.append(x_k)
            p.append(p_k)
            clock.append(tau)
            
        
    
    
    
    return np.array(x),np.array(clock)
    
    
    

def run_simulation_D3_transformed(fixed_dt, end_t, gamma, beta, second_iteration_transformed, D3, initial,k_max,k_function):
    D = D3
    
    # 初始化变量
    x = []
    p = []
    clock = []
    # 使用给定的 initial 初始化 x[0]
    x_k = initial
    p_k = 0
    tau = 0
    
    for i in range(k_max+1):
        delta_t = fixed_dt
        h = delta_t / 2
        x_mid, p_mid = second_iteration_transformed(x_k, p_k, h, beta)
        
        W_h = np.random.normal(0, np.sqrt(delta_t))
        exp_Dh = matrix_exponential(D, delta_t)
        update_vector = exp_Dh @ np.array([x_mid, p_mid]) + np.array(exp_Dh @ np.array([0, np.sqrt(2 * gamma / beta)])) * W_h
        x_mid, p_mid = update_vector
        
        x_mid, p_mid = second_iteration_transformed(x_mid, p_mid, h, beta)
        x_k,p_k = x_mid,p_mid
        tau = tau + delta_t/k_function(update_vector[0])
        
        if tau > end_t :  # 确保不会超出范围
            x.append(x_k)
            p.append(p_k)
            clock.append(tau)
            
        
    
    
    
    return np.array(x),np.array(clock)


ModuleNotFoundError: No module named 'scipy'

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import expm
from scipy.integrate import quad
import time



def matrix_exponential(matrix, t):
    return expm(matrix * t)




# 使用 D1 进行模拟
def run_simulation_D1(delta_t, end_t, gamma, beta, second_iteration, D1, initial):
    D = D1
    h = delta_t / 2
    n_t = int(end_t / delta_t)
    
    # 初始化变量
    x = np.zeros(n_t + 1)
    p = np.zeros(n_t + 1)

    # 使用给定的 initial 初始化 x[0]
    x[0],p[0] = initial
    
    
    for i in range(n_t):
        # 第一次迭代使用矩阵指数公式
        W_h = np.random.normal(0, np.sqrt(h))
        exp_Dh = matrix_exponential(D, h)
        
        update_vector = exp_Dh @ np.array([x[i], p[i]]) + exp_Dh @ np.array([0, np.sqrt(2 * gamma / beta)]) * W_h
        x_mid, p_mid = update_vector
        
        # 第二次迭代使用前向欧拉公式
        x_mid, p_mid = second_iteration(x_mid, p_mid, delta_t, beta)
        
        # 第三次迭代使用矩阵指数公式
        W_h = np.random.normal(0, np.sqrt(h))
        update_vector = exp_Dh @ np.array([x_mid, p_mid]) + exp_Dh @ np.array([0, np.sqrt(2 * gamma / beta)]) * W_h
        if i + 1 < len(x):  # 确保不会超出范围
            x[i + 1], p[i + 1] = update_vector
        else:
            break
    
    return np.array(x)

# 使用 D2 进行模拟
def run_simulation_D2(delta_t, end_t, gamma, beta, second_iteration, D2, initial):
    D = D2
    h = delta_t / 2
    n_t = int(end_t / delta_t)
    
    # 初始化变量
    x = np.zeros(n_t + 1)
    p = np.zeros(n_t + 1)

    # 使用给定的 initial 初始化 x[0]
    x[0],p[0] = initial
    
    
    for i in range(n_t):
        # 第一次迭代使用前向欧拉公式（原来的第二次迭代）
        x_mid, p_mid = second_iteration(x[i], p[i], h, beta)
        
        # 在第一次和第二次迭代之间添加 A 迭代
        x_mid += p_mid * h
        p_mid += 0
        
        # 第二次迭代使用矩阵指数公式（原来的第一次迭代）
        W_h = np.random.normal(0, np.sqrt(delta_t))
        exp_Dh = matrix_exponential(D, delta_t)
        update_vector = exp_Dh @ np.array([x_mid, p_mid]) + exp_Dh @ np.array([0, np.sqrt(2 * gamma / beta)]) * W_h
        x_mid, p_mid = update_vector
        
        # 在第二次和第三次迭代之间添加 A 迭代
        x_mid += p_mid * h
        p_mid += 0
        
        # 第三次迭代使用前向欧拉公式（原来的第二次迭代）
        x_mid, p_mid = second_iteration(x_mid, p_mid, h, beta)
        
        if i + 1 < len(x):  # 确保不会超出范围
            x[i + 1], p[i + 1] = x_mid, p_mid
        else:
            break
    
    return np.array(x)

# 使用 D3 进行模拟
def run_simulation_D3(delta_t, end_t, gamma, beta, second_iteration, D3, initial):
    D = D3
    h = delta_t / 2
    n_t = int(end_t / delta_t)
    
    # 初始化变量
    x = np.zeros(n_t + 1)
    p = np.zeros(n_t + 1)

    # 使用给定的 initial 初始化 x[0]
    x[0],p[0] = initial
    
    
    for i in range(n_t):
        # 第一次迭代使用前向欧拉公式（原来的第二次迭代）
        x_mid, p_mid = second_iteration(x[i], p[i], h, beta)
        
        # 第二次迭代使用矩阵指数公式（原来的第一次迭代）
        W_h = np.random.normal(0, np.sqrt(delta_t))
        exp_Dh = matrix_exponential(D, delta_t)
        update_vector = exp_Dh @ np.array([x_mid, p_mid]) + exp_Dh @ np.array([0, np.sqrt(2 * gamma / beta)]) * W_h
        x_mid, p_mid = update_vector
        
        # 第三次迭代使用前向欧拉公式（原来的第二次迭代）
        x_mid, p_mid = second_iteration(x_mid, p_mid, h, beta)
        
        if i + 1 < len(x):  # 确保不会超出范围
            x[i + 1], p[i + 1] = x_mid, p_mid
        else:
            break
    
    return np.array(x)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import expm
from scipy.integrate import quad
import time
import ipywidgets as widgets
from IPython.display import display
from sklearn.neighbors import KernelDensity

# 定义参数
gamma = 1
beta = 1
num_delta_t = 5
num_simulations = 1
x_start = -10
x_end = 10
num_x_steps = 400
end_t = 100
T = 1000
sample_number = 1000

# 定义矩阵 D1 和 D2
D1 = np.array([[0, 1], [0, -gamma]])
D2 = np.array([[0, 0], [0, -gamma]])
D3 = np.array([[0, 1], [0, -gamma]])

# 定义不同的时间步长
fixed_dts = np.zeros(num_delta_t)
for i in range(num_delta_t):
    fixed_dts[i] = 0.05 * 2 ** (i)

# 动态时间步长函数
def dynamic_time_step(x_prev, fixed_dt):
    return fixed_dt * k_function(x_prev)

# 定义势函数和函数 k(x)
def potential(x):
    return 0.5 * np.sqrt(np.abs(x) ** 3)

def k_function(x):
    return np.sqrt(np.abs(x)) # k(x) 为常数

# 计算边缘分布的归一化常数
def normalizing_constant():
    integrand = lambda x: np.exp(-beta * potential(x))
    integral, _ = quad(integrand, -np.inf, np.inf)
    return 1 / integral

# 计算边缘分布
def rho_eq(x):
    Z = normalizing_constant()
    return Z * np.exp(-beta * potential(x))

# 手动计算势函数的梯度
def grad_potential(x):
    return 1.5 * 0.5 * np.sign(x) * np.sqrt(np.abs(x))

# 手动计算 k(x) 的对数梯度
def grad_log_k_function(x):
    return 0.5 * 1/np.sqrt(np.abs(x)) * np.sign(x)

def compute_x_real(x_values):
    return np.array([rho_eq(x) for x in x_values])



# 第二次迭代
def second_iteration(x, p, delta_t, beta):
    grad_potential_x = grad_potential(x)
    dp = -(grad_potential_x) * delta_t
    p_new = p + dp
    x_new = x
    return x_new, p_new

# 第二次迭代 (transformed)
def second_iteration_transformed(x, p, delta_t, beta):
    grad_potential_x = grad_potential(x)
    grad_log_k_x = grad_log_k_function(x)
    dp = -(grad_potential_x - (1 / beta) * grad_log_k_x) * delta_t
    p_new = p + dp
    x_new = x
    return x_new, p_new

# 计算 x_real 分布
x_values = np.linspace(x_start, x_end, num_x_steps)
x_real = compute_x_real(x_values)

# 初始化模拟数据存储
simulated_pdfs_D1 = []
simulated_pdfs_D2 = []
simulated_pdfs_D3 = []
simulated_pdfs_D1_transformed = []
simulated_pdfs_D2_transformed = []
simulated_pdfs_D3_transformed = []



# 定义KDE函数
def kde_smoothing(data, bandwidth=0.2, num_points=1000):
    kde = KernelDensity(kernel='gaussian', bandwidth=bandwidth)
    data = data[:, np.newaxis]  # 将数据格式调整为 (n_samples, 1)
    kde.fit(data)
    
    x_d = np.linspace(x_start, x_end, num_points)[:, np.newaxis]
    log_dens = kde.score_samples(x_d)  # 计算对数密度
    return x_d.flatten(), np.exp(log_dens)  # 返回x值和对应的平滑密度值

# 在模拟过程中应用KDE平滑
for fixed_dt in fixed_dts:
    start_time = time.time()
    k_max = np.int(T/fixed_dt)
    
    final_x_values_D1 = []
    final_x_values_D2 = []
    final_x_values_D3 = []
    final_x_values_D1_transformed = []
    final_x_values_D2_transformed = []
    final_x_values_D3_transformed = []
    
    for _ in range(num_simulations):
        initial = [100,0]

        #x_values_D1 = run_simulation_D1(fixed_dt, T, gamma, beta, second_iteration, D1, initial)[-5000:]
        #x_values_D2 = run_simulation_D2(fixed_dt, T, gamma, beta, second_iteration, D2, initial)[-5000:]
        #x_values_D3 = run_simulation_D3(fixed_dt, T, gamma, beta, second_iteration, D3, initial)[-5000:]
        x_values_D1_transformed = run_simulation_D1_transformed(fixed_dt, end_t, gamma, beta, second_iteration_transformed, D1, initial, k_max, k_function)[0]
        x_values_D2_transformed = run_simulation_D2_transformed(fixed_dt, end_t, gamma, beta, second_iteration_transformed, D2, initial, k_max, k_function)[0]
        x_values_D3_transformed = run_simulation_D3_transformed(fixed_dt, end_t, gamma, beta, second_iteration_transformed, D3, initial, k_max, k_function)[0]

    # 使用KDE平滑模拟结果
    x_vals_D1, smoothed_pdf_D1 = kde_smoothing(np.array(x_values_D1).flatten())
    simulated_pdfs_D1.append((x_vals_D1, smoothed_pdf_D1))

    x_vals_D2, smoothed_pdf_D2 = kde_smoothing(np.array(x_values_D2).flatten())
    simulated_pdfs_D2.append((x_vals_D2, smoothed_pdf_D2))

    x_vals_D3, smoothed_pdf_D3 = kde_smoothing(np.array(x_values_D3).flatten())
    simulated_pdfs_D3.append((x_vals_D3, smoothed_pdf_D3))

    x_vals_D1_transformed, smoothed_pdf_D1_transformed = kde_smoothing(np.array(x_values_D1_transformed).flatten())
    simulated_pdfs_D1_transformed.append((x_vals_D1_transformed, smoothed_pdf_D1_transformed))

    x_vals_D2_transformed, smoothed_pdf_D2_transformed = kde_smoothing(np.array(x_values_D2_transformed).flatten())
    simulated_pdfs_D2_transformed.append((x_vals_D2_transformed, smoothed_pdf_D2_transformed))

    x_vals_D3_transformed, smoothed_pdf_D3_transformed = kde_smoothing(np.array(x_values_D3_transformed).flatten())
    simulated_pdfs_D3_transformed.append((x_vals_D3_transformed, smoothed_pdf_D3_transformed))

    end_time = time.time()
    print(f"delta_t = {fixed_dt:.2f}, Iteration Time = {end_time - start_time:.2f} seconds")


# 创建滑动条
delta_t_slider = widgets.FloatSlider(
    value=fixed_dts[0],
    min=fixed_dts[0],
    max=fixed_dts[-1],
    step=0.01,
    description='Delta t:',
    continuous_update=False
)

def plot_pdf(delta_t):
    idx = np.argmin(np.abs(fixed_dts - delta_t))
    
    bins_D1, pdf_simulated_D1 = simulated_pdfs_D1[idx]
    bins_D2, pdf_simulated_D2 = simulated_pdfs_D2[idx]
    bins_D3, pdf_simulated_D3 = simulated_pdfs_D3[idx]
    bins_D1_transformed, pdf_simulated_D1_transformed = simulated_pdfs_D1_transformed[idx]
    bins_D2_transformed, pdf_simulated_D2_transformed = simulated_pdfs_D2_transformed[idx]
    bins_D3_transformed, pdf_simulated_D3_transformed = simulated_pdfs_D3_transformed[idx]
    
    plt.figure(figsize=(12, 8))
    
    plt.plot((bins_D1[:-1] + bins_D1[1:]) / 2, pdf_simulated_D1, label='Simulated PDF D1')
    plt.plot((bins_D2[:-1] + bins_D2[1:]) / 2, pdf_simulated_D2, label='Simulated PDF D2')
    plt.plot((bins_D3[:-1] + bins_D3[1:]) / 2, pdf_simulated_D3, label='Simulated PDF D3')
    plt.plot((bins_D1_transformed[:-1] + bins_D1_transformed[1:]) / 2, pdf_simulated_D1_transformed, label='Simulated PDF D1_transformed', linestyle='--')
    plt.plot((bins_D2_transformed[:-1] + bins_D2_transformed[1:]) / 2, pdf_simulated_D2_transformed, label='Simulated PDF D2_transformed', linestyle='--')
    plt.plot((bins_D3_transformed[:-1] + bins_D3_transformed[1:]) / 2, pdf_simulated_D3_transformed, label='Simulated PDF D3_transformed', linestyle='--')
    
    plt.plot(x_values, x_real, label='Theoretical PDF', linestyle='--', color='black')
    
    plt.xlabel('x')
    plt.ylabel('PDF')
    plt.title(f'PDF Comparison at Delta t = {delta_t:.2f}')
    plt.xlim(-10,10)
    plt.legend()
    plt.show()

# 将滑动条与绘图函数绑定
widgets.interact(plot_pdf, delta_t=delta_t_slider)




TypeError: can't multiply sequence by non-int of type 'float'