In [1]:
import numba
from numba import cuda, float32
import numpy as np
import math
from numba.cuda.random import create_xoroshiro128p_states, xoroshiro128p_uniform_float32, xoroshiro128p_normal_float32
import pandas as pd    
import os
from Prob import D_Prob
from scipy.optimize import minimize_scalar
from scipy.interpolate import RegularGridInterpolator
from numpy import int32

### 计算死亡率

In [2]:
def gen_Ptrans_tau(x0, T0, x_end, p, k):
    ax_path = "./data/lc_ax_male.csv"
    bx_path = "./data/lc_bx_male.csv"
    kt_path = "./data/lc_kt_male.csv"
    path = [ax_path, bx_path, kt_path]


    death_Prob = D_Prob()
    death_Prob.read_abk(path)

    interval_P_cache = []

    deal_span = (x_end - x0) * p
    print(type(x_end))
    for i in range(k, deal_span):
            if i == 0:
                interval_P_cache.append(death_Prob.unit_live(x0, T0, i+1, p))
            else:
                interval_P_cache.append(death_Prob.interval_live_P(x0, T0, i, i+1, p))
    
    trans_tau_np = np.array(interval_P_cache)
    
    assert trans_tau_np.__len__() == deal_span - k
    # trans_tau_d = cuda.to_device(trans_tau_np)
    # return trans_tau_d
    return trans_tau_np



#### 定义各种账户的变化，以及实际取款金额、退保收益。
其中各个变量含义如下：
omega_withdrawal：取款金额\omega_n；
penalized_omega_withdrawal：超额部分被惩罚之后的实际取款金额；

guaranteed_A_before：A_{n-}账户金额；A_after：A_{n+} = A_{{n+1}-}；

guaranteed_G_before：G_{n-}账户金额；G_after：G_{n+} = G_{{n+1}-}；

accumulated_E_before：E_{n-}账户金额；E_after：E_{n+} = math.exp(-r)*E_{{n+1}-}；

beta：超额取款惩罚系数。

In [3]:
# 定义实际取款金额函数
def penalized_withdrawal_amount(omega_withdrawal, guaranteed_A_before, guaranteed_G_before, beta):
    if omega_withdrawal > min(guaranteed_A_before, guaranteed_G_before):
        penalized_omega_withdrawal = omega_withdrawal - beta * (omega_withdrawal - min(guaranteed_A_before, guaranteed_G_before))
    else:
        penalized_omega_withdrawal = omega_withdrawal
    return penalized_omega_withdrawal

#定义函数：取款\omega_n导致的E_{n-}账户变成E_{n+}
def accumulated_penalized_E_after(accumulated_E_before, omega_withdrawal, guaranteed_A_before, guaranteed_G_before, beta):
    E_after = accumulated_E_before + penalized_withdrawal_amount(omega_withdrawal, guaranteed_A_before, guaranteed_G_before, beta)
    return E_after

#定义函数：取款\omega_n导致的A_{n-}账户变成A_{n+}
def guaranteed_annual_A_after(omega_withdrawal, guaranteed_A_before, guaranteed_G_before, index_S_before, index_S_after, accumulated_E_after, i_G, g):
    if omega_withdrawal > min(guaranteed_A_before, guaranteed_G_before):
        A_after = 0 if index_S_after == 0 else max(g * index_S_after, (guaranteed_A_before * index_S_after)/index_S_before)
    else:
        if accumulated_E_after == 0:
            A_after = (1 + i_G) * max(g * index_S_after, guaranteed_A_before)
        else:
            A_after = max(g * index_S_after, guaranteed_A_before)
    return A_after

#定义函数：取款\omega_n导致的G_{n-}账户变成G_{n+}
def guaranteed_future_G_after(omega_withdrawal, guaranteed_A_before, guaranteed_G_before, index_S_before, index_S_after, accumulated_E_after, i_G):
    if omega_withdrawal > min(guaranteed_A_before, guaranteed_G_before):
        G_after = max(index_S_after, min(guaranteed_G_before - omega_withdrawal, (guaranteed_G_before * index_S_after)/index_S_before))
    else:
        if accumulated_E_after == 0:
            G_after = (1 + i_G) * max(index_S_after, guaranteed_G_before)
        else:
            G_after = max(index_S_after, guaranteed_G_before - omega_withdrawal)
    return G_after

#定义函数退出合约将收到的收益
def surrender_benefit(index_S_before, guaranteed_G_before, guaranteed_A_before, beta):
    return max(
        index_S_before,
        penalized_withdrawal_amount(guaranteed_G_before, guaranteed_A_before, guaranteed_G_before, beta)
    )

In [4]:
#寻找value对应的grid的下标
#例如grid= array([ 0.,  2.,  4.,  6.,  8., 10., 12., 14., 16., 18., 20.])
# value= 3
# binary_search(grid, value) = 1

#为了寻找账户制最接近的格点下标
def binary_search(grid, value):
    left, right = 0, len(grid) - 1
    while left <= right:
        mid = (left + right) // 2
        if grid[mid] < value:
            left = mid + 1
        else:
            right = mid - 1
    return max(0, left - 1)

In [5]:
#循环生成几何布朗运动的随机变量值，其中，St：U_{n+}
def simulate_gbm_multi(St, num_paths, q, dt, r, l, sigma):
    output_paths = np.zeros(num_paths, dtype=np.float32)
    for path_id in range(num_paths):
        St_path = St
        for step in range(q):
            rand_x = np.random.uniform(0, 1)
            St_path *= math.exp((r - l - 0.5 * sigma ** 2) * dt + sigma * math.sqrt(dt) * rand_x)
        output_paths[path_id] = St_path
    return output_paths #U_{{n+1}-}

#对网格中的每一个点进行初始化
def initialize_terminal_value(V, T, beta, index_S_grid, guaranteed_G_grid, guaranteed_A_grid, accumulated_E_grid):
    for i in range(len(index_S_grid)):
        for j in range(len(guaranteed_G_grid)):
            for k in range(len(guaranteed_A_grid)):
                for e in range(len(accumulated_E_grid)):
                    index_S = index_S_grid[i]
                    guaranteed_G = guaranteed_G_grid[j]
                    guaranteed_A = guaranteed_A_grid[k]
                    accumulated_E = accumulated_E_grid[e]
                    V[T - 1, i, j, k, e] = max(index_S, guaranteed_G - beta * (guaranteed_G - min(guaranteed_A, guaranteed_G)))

# 计算t = T-1,T-2, ..., 0,值函数中的蒙特卡洛期望部分
# 给定取款\omega, 记蒙特卡洛期望部分为J_{n-1} = 蒙特卡洛模拟取平均{P_下一时刻死亡概率 * D_{n} + P_下一时刻存活概率 * V_{n+1}}
def calculate_value_function(W, index_S, guaranteed_G, guaranteed_A, accumulated_E, beta, i_G, g, t, V, discount_factor, p, q, r, l, sigma, trans_tau_d, num_paths, index_S_grid, guaranteed_G_grid, guaranteed_A_grid, accumulated_E_grid):
    dt = 1.0 / (p * q)
    St = index_S - W if index_S - W >0 else 0 #index_S: U_{n-}; W: withdrawal money \omega_n; St:U_{n+}
    paths = simulate_gbm_multi(St, num_paths, q, dt, r, l, sigma) #paths:下一时刻的index value U_{{n+1}+}
    
    avg_future_value = 0.0
    for path_id in range(num_paths):
        St_path = paths[path_id] 
        next_accumulated_E = accumulated_penalized_E_after(accumulated_E, W, guaranteed_A, guaranteed_G, beta) 
        next_guaranteed_A = guaranteed_annual_A_after(W, guaranteed_A, guaranteed_G, index_S, St_path, next_accumulated_E, i_G, g) 
        next_guaranteed_G = guaranteed_future_G_after(W, guaranteed_A, guaranteed_G, index_S, St_path, next_accumulated_E, i_G)

        P_tau = math.prod(trans_tau_d[:t-2]) if t > 1 else 1.0
        Ptrans_tau_m = trans_tau_d[t - 1] if t > 0 else 1.0
        death_probability = 1 - Ptrans_tau_m
        death_benefit = discount_factor * max(St_path, next_guaranteed_G)

        int_St_path = binary_search(index_S_grid, St_path)
        int_next_G = binary_search(guaranteed_G_grid, next_guaranteed_G)
        int_next_A = binary_search(guaranteed_A_grid, next_guaranteed_A)
        int_next_E = binary_search(accumulated_E_grid, next_accumulated_E)

        future_value = V[t + 1, int_St_path, int_next_G, int_next_A, int_next_E] * discount_factor if t + 1 < V.shape[0] else 0
        avg_future_value += P_tau * (death_benefit * death_probability + Ptrans_tau_m * future_value)

    V_value = avg_future_value / num_paths
    # print(f'第t={t}次, 取款{W},index_S, guaranteed_G, guaranteed_A, accumulated_E= {(index_S, guaranteed_G, guaranteed_A, accumulated_E)}对应的V_value={V_value}')
    return V_value

#动态规划函数，从T-1时刻倒退计算（时间下标为t = T-2）直到 t=1 时刻
def dynamic_programming(V, W_grid, T, p, q, beta, i_G, g, r, l, sigma, index_S_grid, guaranteed_G_grid, guaranteed_A_grid, accumulated_E_grid, trans_tau_d, num_paths):
    #计算t = T-1,T-2, ..., 1时刻的值函数
    for t in range(T - 2, 0, -1):
        for i in range(len(index_S_grid)):
            for j in range(len(guaranteed_G_grid)):
                for k in range(len(guaranteed_A_grid)):
                    for e in range(len(accumulated_E_grid)):
                        index_S = index_S_grid[i]
                        guaranteed_G = guaranteed_G_grid[j]
                        guaranteed_A = guaranteed_A_grid[k]
                        accumulated_E = accumulated_E_grid[e]
                        surrender_value = max(index_S, guaranteed_G - beta * (guaranteed_G - min(guaranteed_A, guaranteed_G)))# 当前退款收益
                        
                        for W in W_grid:
                            if W <= guaranteed_G:
                                penalized_withdrawal = penalized_withdrawal_amount(W, guaranteed_A, guaranteed_G, beta)#当前取款收益f_{n-1}(\omega_{n-1})
                                # 计算取款后账户的值函数
                                #比较不同的取款值W下，最大值（已有价值V[t, i, j, k, e] ； 当前取款收益+ 未来价值函数）
                                V[t, i, j, k, e] = max(V[t, i, j, k, e], penalized_withdrawal+ calculate_value_function(
                                    W, index_S, guaranteed_G, guaranteed_A, accumulated_E, beta, i_G, g, t, V,
                                    math.exp(-r), p, q, r, l, sigma, trans_tau_d, num_paths, index_S_grid, guaranteed_G_grid, guaranteed_A_grid, accumulated_E_grid
                                ))
                        V[t, i, j, k, e] = max(surrender_value, V[t, i, j, k, e])#max{当前退款收益, 最优（当前取款收益+ 未来价值函数)} 

    # 处理初始时刻t= 0时刻的价值函数,也就是假设0时刻投资者的取款金额为0，且不会退出合约。
    # V_0 = 蒙特卡洛模拟取平均{P_下一时刻死亡概率 * D_{1} + P_下一时刻存活概率 * V_{1}}
    index_S = index_S_grid[5]
    guaranteed_G = guaranteed_G_grid[10]
    guaranteed_A = guaranteed_A_grid[2]
    accumulated_E = accumulated_E_grid[0]
    
    V[0, 5, 10, 2, 0] = calculate_value_function(
                    0, index_S, guaranteed_G, guaranteed_A, accumulated_E, beta, i_G, g, 0, V,
                    math.exp(-r), p, q, r, l, sigma, trans_tau_d, num_paths, index_S_grid, guaranteed_G_grid, guaranteed_A_grid, accumulated_E_grid)


def MC(M, trans_tau_d, l, kwargs):
    for i, j in enumerate(kwargs):
        locals()[j] = kwargs[j]
    
    T= x_end - x0
    print(f"MC中的x_end = {x_end}, x0 = {x0}, T={T}")

        # 初始化所有网格变量
    W_grid = np.linspace(0, 200, 21).astype(np.float32)
    index_S_grid = np.linspace(0, 200, 11).astype(np.float32)
    guaranteed_G_grid = np.linspace(0, 200, 21).astype(np.float32)
    guaranteed_A_grid = np.linspace(0, 200, 21).astype(np.float32)
    accumulated_E_grid = np.linspace(0, 100, 2).astype(np.float32)

    V = np.zeros((T, len(index_S_grid), len(guaranteed_G_grid), len(guaranteed_A_grid), len(accumulated_E_grid)), dtype=np.float32)
    initialize_terminal_value(V, T, beta, index_S_grid, guaranteed_G_grid, guaranteed_A_grid, accumulated_E_grid)

    dynamic_programming(V, W_grid, T, p, q, beta, i_G, g, r, l, sigma, index_S_grid, guaranteed_G_grid, guaranteed_A_grid, accumulated_E_grid, trans_tau_d, M)

    initial_value = V[0, 5, 10, 2, 0]
    print(f"[MC] 初始价值计算完成，平均值: {initial_value}")
    return initial_value

In [6]:
def preliminary_search_per_i(initial_l, M, step_size, trans_tau_d, kwargs, max_iter, initial_investment):
    l = initial_l  # 初始化费用率
    lower_l, upper_l = 0, 1  # 初始化上下界

    for _ in range(max_iter):
        avg_res = MC(M, trans_tau_d, l, kwargs)
        if avg_res is None:  # 检查 MC 返回值
            print("MC 返回值无效，跳过此迭代")
            break
        print(f"For l = {l:.8f}; Expectation = {avg_res:.8f}; Difference = {abs(initial_investment - avg_res):.8f}")
        
        if avg_res > initial_investment:
            upper_l = l + step_size
            break
        l = max(l - step_size, 0)

    step_size /= 2  # 减少步长以提高精度
    for _ in range(max_iter):
        l += step_size
        avg_res = MC(M, trans_tau_d, l, kwargs)
        if avg_res is None:
            print("MC 返回值无效，跳过此迭代")
            break
        print(f"For l = {l:.8f}; Expectation = {avg_res:.8f}; Difference = {abs(initial_investment - avg_res):.8f}")
        
        if avg_res < initial_investment:
            lower_l = l - step_size
            upper_l = l
            break

    print(f"粗略搜索完成，范围: lower_l = {lower_l:.8f}, upper_l = {upper_l:.8f}")
    return lower_l, upper_l

In [7]:
def fine_search_per_i(lower_l, upper_l, M, fine_step_size, trans_tau_d, kwargs, initial_investment):
    best_l = lower_l  
    avg_res = MC(M, trans_tau_d, best_l, kwargs)

    if avg_res is None:
        print("[fine_search_per_i] MC() 计算返回 None，设置默认值为 0.0")
        avg_res = 0.0
    
    min_difference = abs(initial_investment - avg_res)
    found_small_diff = False  

    # 从 lower_l 开始逐步迭代到 upper_l
    l = lower_l
    while l <= upper_l:
        avg_res = MC(M, trans_tau_d, l, kwargs)
        if avg_res is None:
            print("[fine_search_per_i] MC() 计算返回 None，设置默认值为 0.0")
            avg_res = 0.0  # 避免 NoneType 错误

        difference = abs(initial_investment - avg_res)
        print(f"[fine_search_per_i] For l = {l:.8f}; Expectation = {avg_res:.8f}; Difference = {difference:.8f}")

        if difference < 0.00005:
            best_l = l
            min_difference = difference
            print(f"[fine_search_per_i] 提前找到最佳费用率 l = {best_l:.8f}; 最小差异 = {min_difference:.8f}")
            break

        if difference < 0.0003:
            found_small_diff = True

        if found_small_diff and difference > 0.002:
            print("[fine_search_per_i] 差异超过 0.002 后停止搜索")
            break

        if difference < min_difference:
            min_difference = difference
            best_l = l

        l += fine_step_size  

    print(f"[fine_search_per_i] 精细搜索完成: 最佳费用率 l = {best_l:.8f}, 最小差异 = {min_difference:.8f}")
    return best_l, min_difference

In [8]:
if __name__ == "__main__":
    kwargs = {
        'x_end': None,
        'x0': 67,
        'T0': 2020,
        'g': None,
        'r': 0.05,
        'p': 1,
        'q': 1,
        'sigma': 0.2,
        'beta': 0.15,
        'i_G': 0.025,
    }

    initial_investment = 100.0
    delta_l = 0
    initial_l = 0.04
    M = 300
    step_size = 0.001
    max_iter=500

    for a in range(5, 1, -1):
        kwargs['g'] = 1 / a
        kwargs['x_end'] = int(kwargs['x0'] + 1 / kwargs['g'])
        print(f'当前 x_end = {kwargs["x_end"]},g = {kwargs["g"]}')

        for i, j in enumerate(kwargs):
            locals()[j] = kwargs[j]
            
        trans_tau_d = gen_Ptrans_tau(x0, T0, x_end, p, 0)

        # 调用粗略搜索函数
        lower_l, upper_l = preliminary_search_per_i(
            initial_l, M, step_size, trans_tau_d, kwargs, max_iter, initial_investment
        )
        print(f"粗略搜索结果: lower_l = {lower_l:.8f}, upper_l = {upper_l:.8f}")

        if lower_l is not None and upper_l is not None:
            fine_step_size = 0.0000001
            best_l, min_difference = fine_search_per_i(
                lower_l, upper_l, M, fine_step_size, trans_tau_d, kwargs, initial_investment
            )
            delta_l = best_l - initial_l
            initial_l = best_l
            result_str = f"最佳费用率 l = {best_l:.8f}，最小差异 = {min_difference:.8f}\n"
            print(result_str)

当前 x_end = 72,g = 0.2
<class 'int'>
MC中的x_end = 72, x0 = 67, T=5


  G_after = max(index_S_after, min(guaranteed_G_before - omega_withdrawal, (guaranteed_G_before * index_S_after)/index_S_before))


[MC] 初始价值计算完成，平均值: 96.12181854248047
For l = 0.04000000; Expectation = 96.12181854; Difference = 3.87818146
MC中的x_end = 72, x0 = 67, T=5
[MC] 初始价值计算完成，平均值: 95.92523193359375
For l = 0.03900000; Expectation = 95.92523193; Difference = 4.07476807
MC中的x_end = 72, x0 = 67, T=5
[MC] 初始价值计算完成，平均值: 96.18206024169922
For l = 0.03800000; Expectation = 96.18206024; Difference = 3.81793976
MC中的x_end = 72, x0 = 67, T=5


KeyboardInterrupt: 