<a href="https://colab.research.google.com/github/Yuto-Koga/my-macro-project/blob/main/Untitled37.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn.functional as F

Tensor = torch.Tensor

# the code of financial acceletor model
# ---------- 1) Household Utility (CES in c, separable disutility of hours) (17) ----------
class HouseholdUtility:
    def __init__(self,beta=0.99):
        self.beta = torch.tensor(beta, dtype=torch.double)

    def C_next(self, C: Tensor, C_next: Tensor, R: Tensor) -> Tensor:
        """
        c: (3,)  consumption per sector [c1,c2,c3]
        h: (3,)  hours per sector      [h1,h2,h3]
        N: (3,)  employment shares     [N1,N2,N3]  (使い方は任意だが、重み付けに利用)
        lam: scalar weight on disutility
        """
        C = C.clamp_min(1e-8)
        R = R.clamp_min(0.0)



        return self.beta * R * C

# ---------- 2)  labor supply condition (18)----------
class laborsupply:
    def __init__(self,zeta==0.9):
        self.zeta = torch.tensor(zeta, dtype=torch.double)

    def W(self, C: Tensor, H: Tensor, W: Tensor) -> Tensor:
        """

        """
        W = C.clamp_min(1e-8)
        C = R.clamp_min(0.0)
        H = H.clamp_min(1e-8)



        return C * self.zeta /(1-H)

 # ---------- 3) Capital sotck dynamics (19)----------
class capitalstock:
    def __init__(self, delta=0.9, kappa = 0.5 ):
        self.delta = torch.tensor(delta, dtype=torch.double)
        self.kappa = torch.tensor(kappa, dtype=torch.double)
    def K(self, K: Tensor, I: Tensor, u: Tensor) -> Tensor:
        """
       K(t+1) = (1-dleta)K + kappa/2(I/K - delta)**2 * K
        """
        K = K.clamp_min(1e-12)
        I = I.clamp_min(1e-12)

        return (1-self.delta)K + self.kappa/2 * (I/K - self.delta)**2 * K


# ---------- 4) Tobin's Q (20)----------
class capitalprice:
    def __init__(self, delta=0.9, kappa = 0.5 ):
        self.delta = torch.tensor(delta, dtype=torch.double)
        self.kappa = torch.tensor(kappa, dtype=torch.double)
    def q(self, K: Tensor, I: Tensor) -> Tensor:
        """
       Q_t = (kappa * (I/K - delta) * K)**(-1)
        """
        K = K.clamp_min(1e-12)
        I = I.clamp_min(1e-12)
        return (self.kappa * (I/K - self.delta) * K)**(-1)


# ---------- 5) the expected return on holding capital (21)----------
class capitalprice:
    def __init__(self, delta=0.9, R = 0.1 ):
        self.delta = torch.tensor(delta, dtype=torch.double)
        self.R = torch.tensor(R, dtype=torch.double)
    def Rk_next(self, R_next: Tensor, Q_next: Tensor, Q: Tensor) -> Tensor:
        """
       Q_t = (kappa * (I/K - delta) * K)**(-1)
        """
        Q = Q.clamp_min(1e-12)
        Q_next = Q_next.clamp_min(1e-12)
        R_next = R_next.clamp_min(1e-12)
        return (self.R * R_next + (1- self.deta) * Q_next) / Q


# ---------- 6) Entrepreneurial Sector (25)----------
class production:
    def __init__(self, theta = 0.5, ipsilon=0.2 ):
        self.theta = torch.tensor(theta, dtype=torch.double)
        self.ipsilon = torch.tensor(ipsilon, dtype=torch.double)



    def d(self, sum: Tensor, d_(t-1): Tensor) -> Tensor:
        """

        """
        sum = sum.clamp_min(1e-12)
        d_t1 = d_t1.clamp_min(1e-12)
        return (1-self.theta) * sum**(-self.ipsilon) + self.theta * sum**(self.ipsilon) * d_(t-1)

# ---------- 7) optimal relative reset price----------
class reset price:
    def __init__(self,  ipsilon=0.2 ):
        self.ipsilon = torch.tensor(ipsilon, dtype=torch.double)



    def sum(self,x1: Tensor, x2: Tensor) -> Tensor:
        """

        """
        sum = sum.clamp_min(1e-12)
        x1 = x1.clamp_min(1e-12)
        x2 = x2.clamp_min(1e-12)
        return (self.ipsilon * x1) / (x2*(self.ipsilon-1))









        """
# ---------- 7) production function (24) ----------
class production:
    def __init__(self, Omega=0.9, alpha=0.2):
        self.Omega = torch.tensor(Omega, dtype=torch.double)
        self.alpha = torch.tensor(alpha, dtype=torch.double)
    def Y(self, h: Tensor, h_e: Tensor, K; Tensor, A: Tensor, d: Tensor) -> Tensor:
        """
       h**(Omega) * h_e**(1-Omega)
        """
        h  = h.clamp_min(1e-12)
        h_e = h_e.clamp_min(1e-12)
        K = K.clamp_min(1e-12)
        A = A.clamp_min(1e-12)
        d = d.clamp_min(1e-12)
        return A * (K**self.alpha) * (h**(self.Omega * (1- self.alpha))) * (h_e**((1-self.Omega)*(1-self.alpha))) / d

# ---------- 4) expected gross return  ----------
class greturn:
    def __init__(self, delta=0.9, a = 0.5 ):
        self.delta = torch.tensor(delta, dtype=torch.double)
        self.a = torch.tensor(a, dtype=torch.double)

    def Rk_next(self,
               K_next: Tensor,         # t の資本
               X_next: Tensor,      # t+1 の労働（または利用率）
               q_t: Tensor,         # t の q
               q_next: Tensor
               Y_t; Tensor       # t+1 の q
               ) -> Tensor:

 　　　 K_t = K_t.clamp_min(1e-12)
        h_next = h_next.clamp_min(1e-12)

        mpk_next = self.alpha * Y_t / (X_next * K_next)
        numer = (1.0 - self.delta) * q_next
        return numer / q_t.clamp_min(1e-12)

# ---------- 5) labor supply----------
class laborsupply:
    def __init__(self, Omega=0.9):
        self.Omega = torch.tensor(Omega, dtype=torch.double)

    def L(self, h: Tensor, h_e: Tensor) -> Tensor:
        """
       h**(Omega) * h_e**(1-Omega)
        """
        h  = h.clamp_min(1e-12)
        h_e = h_e.clamp_min(1e-12)
        return h**(self.Omega) * h_e**(1-self.Omega)
        """
# ---------- 5) the dynamics of net worth----------
class V:
    def __init__(self, w=0.9):
        self.w = torch.tensor(w, dtype=torch.double)

    def V(self,
          Rk: Tensor,
          q_t-1: Tensor,
          K_t:Tensor,
          N_t;Tensor,
          ) -> Tensor:












# ---------- 5) Planner: choose {m_t, h_t} to max Σ β^t u(c_t,h_t; N_t) ----------
import torch
import torch.nn.functional as F

class PTPlanner:
    def __init__(self, T=50, beta=0.96, lam=1.0):
        self.T = T
        self.beta = torch.tensor(beta, dtype=torch.double)
        self.lam = lam

        self.util = HouseholdUtility()
        self.match = Matching()
        self.ldyn = LaborDynamics(matching=self.match)
        self.prod = Production()

        self.N0 = torch.tensor([0.3, 0.3, 0.3], dtype=torch.double)  # 合計<=1を想定
        self.z  = torch.tensor([1.0, 1.0, 1.0], dtype=torch.double)

    # ←← この simulate をクラス内に入れてください（インプレース更新なし）
    def simulate(self, M_free: torch.Tensor, H_free: torch.Tensor):
        """
        決定変数（非負制約は softplus で表現）:
          M_free: (T,3) → m_t >= 0
          H_free: (T,3) → h_t >= 0
        制約（雇用動学や資源制約）はこの中で前進シミュレーションとして満たす。
        """
        device = M_free.device
        m_t = F.softplus(M_free)  # (T,3)
        h_t = F.softplus(H_free)  # (T,3)

        N_list   = [self.N0.to(device)]
        u_list   = []
        x_list   = []
        c_list   = []
        u_agg_ls = []

        for t in range(self.T):
            N_prev = N_list[-1]                # (3,)
            u_total = 1.0 - N_prev.sum()
            share   = (N_prev / N_prev.sum().clamp_min(1e-12)).clamp_min(0.0)
            u_vec   = u_total * share          # (3,)

            x_t = self.prod.x(N_prev, self.z.to(device), h_t[t], m_t[t])  # (3,)
            c_t = x_t                              # 簡略：生産=消費

            u_t = self.util.u(c_t, h_t[t], N_prev, lam=self.lam)
            N_next = self.ldyn.N_next(N_prev, m_t[t], u_vec)

            u_list.append(u_vec)
            x_list.append(x_t)
            c_list.append(c_t)
            u_agg_ls.append(u_t)
            N_list.append(N_next)

        N     = torch.stack(N_list,   dim=0)   # (T+1,3)
        u_pool= torch.stack(u_list,   dim=0)   # (T,3)
        x_out = torch.stack(x_list,   dim=0)   # (T,3)
        c     = torch.stack(c_list,   dim=0)   # (T,3)
        u_agg = torch.stack(u_agg_ls, dim=0)   # (T,)

        betas = self.beta ** torch.arange(self.T, dtype=torch.double, device=device)
        total_util = torch.sum(u_agg * betas)

        return total_util, {"N": N, "m": m_t, "h": h_t, "c": c, "x": x_out, "u_per_t": u_agg, "u_pool": u_pool}

    # ←← これが新規：クラス内の optimize（Adamで効用最大化）
    def optimize(self, steps=1000, lr=1e-2, device=None):
        device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        # 決定変数（自由パラメータ）を作って勾配対象に
        M_free = torch.zeros((self.T, 3), dtype=torch.double, requires_grad=True, device=device)
        H_free = torch.zeros((self.T, 3), dtype=torch.double, requires_grad=True, device=device)

        # 参照テンソルも同じ device に
        self.N0 = self.N0.to(device)
        self.z  = self.z.to(device)

        opt = torch.optim.Adam([M_free, H_free], lr=lr)

        # （デバッグしたい場合は次の行のコメントを外す）
        # torch.autograd.set_detect_anomaly(True)

        for s in range(1, steps+1):
            opt.zero_grad()
            total_util, _ = self.simulate(M_free, H_free)  # 制約付き前進→目的
            loss = -total_util                             # 最大化→最小化に反転
            loss.backward()
            opt.step()
            if s % 100 == 0:
                print(f"[step {s}] utility={float(total_util):.6f}")

        return M_free.detach(), H_free.detach()
