# SCDAA Coursework

In [2]:
import numpy as np
import scipy
import matplotlib.pyplot as plt
import torch
from scipy.integrate import solve_ivp

Exercise1.1   

In [1]:

class LQR:
    def __init__(self, H, M, C, D, R, T):
        self.H = torch.tensor(H, dtype=torch.float32)
        self.M = torch.tensor(M, dtype=torch.float32)
        self.C = torch.tensor(C, dtype=torch.float32)
        self.D = torch.tensor(D, dtype=torch.float32)
        self.R = torch.tensor(R, dtype=torch.float32)
        self.T = T
        self.sigma = None  # This should be set based on the problem's context if needed

    def solve_ricatti_ode(self, time_grid):
        H, M, C, D, R = self.H.numpy(), self.M.numpy(), self.C.numpy(), self.D.numpy(), self.R.numpy()
        T = self.T

        def ricatti_ode(t, S_flat):
            S = S_flat.reshape(2, 2)
            dSdt = -2 * H.T @ S + S @ M @ np.linalg.inv(D) @ M.T @ S - C
            return dSdt.flatten()
        
        sol = solve_ivp(ricatti_ode, [T, time_grid[0]], R.flatten(), t_eval=torch.flip(time_grid, dims=[0]), vectorized=True)
        S_values = sol.y.T.reshape(-1, 2, 2)
        S_values_tensor = torch.tensor(S_values, dtype=torch.float32)
        S_values_reversed = torch.flip(S_values_tensor, [0])
        return S_values_reversed
    
    def control_problem_value(self, t, x):
        S_values = self.solve_ricatti_ode(t)
        v_values = torch.zeros(x.size(0), 1, dtype=torch.float32)  # 初始化 v_values

        # 循环遍历批次中的样本
        for i in range(x.size(0)):
            x_i = x[i]  # x_i 形状为 (1, 2)
            S_i = S_values[i]  # S_i 形状为 (2, 2)
            v_i = torch.matmul(torch.matmul(x_i.transpose(0, 1), S_i), x_i)  # 计算 x_i * S_i * x_i'

            integral_value = 0  # 初始化积分值
            if self.sigma is not None:  # 如果指定了 sigma
                sigma_mat = torch.tensor(self.sigma, dtype=torch.float32)
                # 从当前时间点到终止时间 T 的积分
                for j in range(i, len(t) - 1):
                    S_j = S_values[j]  # 获取时间点 t_j 的 S 值
                    tr_value = torch.trace(torch.matmul(torch.matmul(sigma_mat, sigma_mat.transpose(0, 1)), S_j))
                    delta_t = t[j + 1] - t[j]  # 计算时间差
                    integral_value += tr_value * delta_t  # 累积积分影响

            v_i += integral_value  # 将积分的影响加到 v_i 上
            v_values[i] = v_i

        return v_values.squeeze()
    
    def markov_control_function(self, t, x):
        # t_grid用于解决Ricatti ODE，确保是numpy数组
        S_values = self.solve_ricatti_ode(t)  # 解决Ricatti ODE
        # 计算每个样本的控制向量a
        a_values = [-torch.inverse(self.D) @ self.M.T @ torch.tensor(S_values[i], dtype=torch.float32) @ x_[None, :].T for i, x_ in enumerate(x)]
        # 将列表中的张量堆叠成一个新的张量
        a_values = torch.stack(a_values).squeeze(2)  # 移除多余的维度，确保最终形状是批次大小×2
    
        return a_values

# Example of how to initialize and use the class (without actual matrices and T)
# H, M, C, D, R are matrices and T is a scalar
# lqr = LQR(H=[[0, 1], [-1, 0]], M=[[1, 0], [0, 1]], C=[[1, 0], [0, 1]], D=[[1, 0], [0, 1]], R=[[1, 0], [0, 1]], T=1)
# time_grid = torch.linspace(0, 1, steps=100)  # Example time grid
# x = torch.tensor([[[1.0, 2.0]]])  # Example state tensor
# print(lqr.markov_control_function(time_grid, x))

Exercise 1.2

Some assumption for the model. We set the value for matrix H, M, C, D, R and sigma as follow. And time T is  1 years.

In [3]:
H = [[0, 1], [-1, 0]]
M = [[1, 0], [0, 1]]
C = [[1, 0], [0, 1]]
D = [[1, 0], [0, 1]]
R = [[1, 0], [0, 1]]
T = 10

# Initialize the LQR class instance
lqr = LQR(H=H, M=M, C=C, D=D, R=R, T=T)

# Example time grid
time_grid = torch.linspace(0, T, steps=100)

# Example state tensor

initial_x = torch.tensor([[1.0], [2.0]])  # shape: (1, 2)

# 扩展x以匹配time_grid的长度
# 我们需要的形状是(batch_size, 1, 2)，其中batch_size等于time_grid的长度
x = initial_x.repeat(time_grid.size(0), 1, 1)
# Compute the Markov control function
markov_control = lqr.markov_control_function(time_grid, x)

# Compute the S values
S_values = lqr.solve_ricatti_ode(time_grid)

# Compute the control problem value (v value)
v_values = lqr.control_problem_value(time_grid, x)

# Print the results
print("Markov Control Function:")
print(markov_control)

print("\nS Values:")
print(S_values)

print("\nControl Problem Values (v Values):")
print(v_values)

print(x)

Markov Control Function:
tensor([[[[ 1.8952],
          [-1.1856]]],


        [[[ 1.8940],
          [-1.1873]]],


        [[[ 1.8929],
          [-1.1891]]],


        [[[ 1.8918],
          [-1.1909]]],


        [[[ 1.8906],
          [-1.1928]]],


        [[[ 1.8894],
          [-1.1947]]],


        [[[ 1.8881],
          [-1.1966]]],


        [[[ 1.8869],
          [-1.1985]]],


        [[[ 1.8856],
          [-1.2005]]],


        [[[ 1.8843],
          [-1.2026]]],


        [[[ 1.8829],
          [-1.2047]]],


        [[[ 1.8816],
          [-1.2068]]],


        [[[ 1.8802],
          [-1.2089]]],


        [[[ 1.8788],
          [-1.2111]]],


        [[[ 1.8773],
          [-1.2134]]],


        [[[ 1.8758],
          [-1.2157]]],


        [[[ 1.8743],
          [-1.2180]]],


        [[[ 1.8727],
          [-1.2204]]],


        [[[ 1.8711],
          [-1.2229]]],


        [[[ 1.8695],
          [-1.2254]]],


        [[[ 1.8678],
          [-1.2280]]],


        [

  a_values = [-torch.inverse(self.D) @ self.M.T @ torch.tensor(S_values[i], dtype=torch.float32) @ x_[None, :].T for i, x_ in enumerate(x)]
  a_values = [-torch.inverse(self.D) @ self.M.T @ torch.tensor(S_values[i], dtype=torch.float32) @ x_[None, :].T for i, x_ in enumerate(x)]


In [None]:
# 模拟过程
def simulation_a(lqr,t,):
    '''这个function是单个路径中模拟单次a
    输入：
    lqr model
    S tensor(为方便运算)
    当前时间点t
    当前状态x
    输出：当前控制量a
    '''
    


Define the error.

In [4]:
def calculate_absolute_error(predicted_values, true_values):
    """
    使用PyTorch计算预测值和真实值之间的绝对误差。
    参数:
    predicted_values: 预测值，可以是一个数值、列表、Numpy数组或PyTorch张量。
    true_values: 真实值，形状与predicted_values相同。
    返回:
    绝对误差，与输入数据的形状相同。
    """
    # 确保输入为PyTorch张量
    if not isinstance(predicted_values, torch.Tensor):
        predicted_values = torch.tensor(predicted_values)
    if not isinstance(true_values, torch.Tensor):
        true_values = torch.tensor(true_values)
    
    # 计算绝对误差
    absolute_error = torch.abs(predicted_values - true_values)
    
    return absolute_error

def calculate_max_error(predicted_values, true_values):
    """
    计算最大绝对误差。
    """
    absolute_error = calculate_absolute_error(predicted_values, true_values)
    max_error = torch.max(absolute_error)
    return max_error

def calculate_mean_error(predicted_values, true_values):
    """
    计算平均绝对误差。
    """
    absolute_error = calculate_absolute_error(predicted_values, true_values)
    mean_error = torch.mean(absolute_error)
    return mean_error

Simulation

In [5]:
def fit_monte_carlo_simulation(H, M, C, D, R, sigma, x0, S_values, T, N, num_simulations):
    dt = T / N
    average_cost_per_time = []

    for _ in range(num_simulations):
        # 进行模拟
        simulated_states = torch.zeros(N + 1, 1, 2)  # 初始化模拟的状态
        simulated_states[0] = torch.tensor(x0)  # 设置初始状态

        for i in range(N):
            # 计算控制量
            control = -torch.inverse(D) @ M.T @ S_values[0] @ simulated_states[i].T

            # 计算下一个状态
            noise = torch.randn(2)  # 生成高斯白噪声
            simulated_states[i + 1] = simulated_states[i] + (
                H @ simulated_states[i] + M @ control + noise @ sigma.T
            ) * dt

        # 计算每个时间点的成本
        cost_per_time = torch.einsum('ntd,dt->nt', simulated_states, C) + torch.einsum('ntd,dt->nt', control.unsqueeze(0), D) + torch.einsum('ntd,dt->nt', simulated_states[-1], R)
        average_cost_per_time.append(cost_per_time)

    # 计算每个时间点的平均成本
    average_cost_per_time_tensor = torch.stack(average_cost_per_time).mean(dim=0)
    return average_cost_per_time_tensor