In [40]:
import pandas as pd
import numpy as np
from scipy.linalg import inv, sqrtm
from numpy import identity as eye
import matplotlib.pyplot as plt

class Calculation:
    def __init__(self, F=8.0, N=40, dt=0.05):
        self.F = F
        self.N = N
        self.dt = dt
        self.u = np.full(self.N, self.F) + np.random.rand(self.N)
        
    def L96(self, x):
        N = self.N
        F = self.F
        dxdt = np.zeros((N))
        for i in range(2, N-1):
            dxdt[i] = (x[i+1] - x[i-2]) * x[i-1] - x[i] + F
        dxdt[0] = (x[1] - x[N-2]) * x[N-1] - x[0] + F
        dxdt[1] = (x[2] - x[N-1]) * x[0] - x[1] + F
        dxdt[N-1] = (x[0] - x[N-3]) * x[N-2] - x[N-1] + F
        return dxdt
    
    def Rk4(self, xold):
        dt = self.dt
        k1 = self.L96(xold)
        k2 = self.L96(xold + k1 * dt / 2.)
        k3 = self.L96(xold + k2 * dt / 2.)
        k4 = self.L96(xold + k3 * dt)
        xnew = xold + dt / 6.0 * (k1 + 2.0 * k2 + 2.0 * k3 + k4)
        return xnew
    
    def RMS(self, error):
        return np.sqrt(np.mean(error**2))
    
    def trRMS(self, x):
        return np.sqrt(x / self.N)
    
    def obs_remove(self, obs_point, y, H, R, method = 0):
        rmv_point = self.N - obs_point 
        methods = [0,1]
        if method in methods:
            if method == 0:
                y = np.delete(y, np.s_[0:rmv_point], axis=1)
                H = np.delete(H, np.s_[0:rmv_point], axis=0)
                R = np.delete(R, np.s_[0:rmv_point], axis=0)
                R = np.delete(R, np.s_[0:rmv_point], axis=1)
                return y, H, R
            else:
                pass
        else:
            print("Wrong method was selected")

In [41]:
class Datagenerater:
    def __init__(self, calculation):
        self.cal = calculation
        self.N = calculation.N
        self.u = np.full(self.N, self.cal.F) + np.random.rand(self.N)
        
    def Data_generate(self, Time_Step=1460):
        t_data, o_data = [], []
        u = self.u

        # Warm-up phase to eliminate transients
        for _ in range(Time_Step):
            u = self.cal.Rk4(u)
        
        # Data generation phase
        for _ in range(Time_Step):
            u = self.cal.Rk4(u)
            t_data.append(u)
            noisy_observation = u + np.random.randn(self.N)
            o_data.append(noisy_observation)
        
        # Save data to CSV
        pd.DataFrame(t_data).to_csv('t_data.csv')
        pd.DataFrame(o_data).to_csv('o_data.csv')

In [42]:
cal = Calculation()
datagen = Datagenerater(cal)
datagen.Data_generate()

In [43]:
class Three_D_VAR:
    def __init__(self, calculation, B = 0.4,obs_point = 40):
        self.F = 8.0
        self.N = 40
        self.dt = 0.05
        self.days = 365
        self.day_steps = int(0.20 / self.dt)
        self.time_step = self.days * self.day_steps
        self.delta = 0.001
        self.ls_time_step = [i for i in range(self.time_step)]
        self.IN = np.eye(self.N)
        self.obs_point = obs_point
        self.df_x_true = pd.read_csv('t_data.csv', header=0, index_col=0)
        self.df_y = pd.read_csv('o_data.csv', header=0, index_col=0)
        self.H = np.eye(self.N)
        self.R = np.eye(self.N)
        self.B = B
        self.cal = calculation
    
    def run_simulation(self):
        x_true = self.df_x_true.values
        y = self.df_y.values
        x_a = x_true[1,:]
        B = self.B * np.eye(self.N)
        forecast, analysis = [], []
        #固定で抜く(連続or等間隔)
        y, self.H, self.R = self.cal.obs_remove(obs_point = self.obs_point, y = y, H = self.H, R = self.R)
        
        for i in range(self.time_step):
            x_f = self.cal.Rk4(x_a)
            forecast.append(x_f)
            #ステップごとにデータを抜く
            K = B @ self.H.T @ np.linalg.pinv(self.H @ B @ self.H.T + self.R)
            #y消せてない
            x_a = x_f + K @ (y[i, :] - self.H @ x_f)
            analysis.append(x_a)
        
        return self.plot_results(forecast, analysis, x_true, y)
    
    def plot_results(self, forecast, analysis, x_true, y):
        forecast = np.array(forecast)
        analysis = np.array(analysis)
        error_f, error_a = [], []
        
        for i in range(self.time_step):
            xf = forecast[i, :]
            xa = analysis[i, :]
            xt = x_true[i, :]
            epsilon_f = xf - xt
            epsilon_a = xa - xt
            error_f.append(self.cal.RMS(epsilon_f))
            error_a.append(self.cal.RMS(epsilon_a))
        
        return error_f, error_a

In [44]:
three_d_var = Three_D_VAR(calculation = cal, obs_point = 39)
f, a = three_d_var.run_simulation()