### MPC 

model predictive control

현재 상태와 시스템 모델을 바탕으로 미래의 입력들을 예측하고 최적화하는 방식 

In [None]:
import tclab
import numpy as np
import time, csv
import matplotlib.pyplot as plt
from APMonitor.apm import apm_web        
from mpc_lib import mpc_init, mpc          
from pathlib import Path


class MPCDataCollector:

    def __init__(
        self,
        episodes: int = 100,
        total_time_sec: int = 1200,      # 20 분
        sample_interval: float = 5.0,    # 5 초 간격
        start_temp: float = 29.0,        # 냉각 완료 기준
        setpoint_range: tuple = (25.0, 65.0),
        data_dir: Path = Path('../data_back/PID2MPC/MPC2')
    ):
        # 시간 파라미터
        self.total_time = total_time_sec
        self.dt         = sample_interval
        self.steps      = int(total_time_sec / sample_interval)  # 240 step

        # 실험 파라미터
        self.episodes   = episodes
        self.start_temp = start_temp
        self.sp_low, self.sp_high = setpoint_range

        # 데이터 디렉터리
        self.csv_dir = data_dir / 'csv'
        self.png_dir = data_dir / 'png'
        self.csv_dir.mkdir(parents=True, exist_ok=True)
        self.png_dir.mkdir(parents=True, exist_ok=True)

        # APMonitor 서버 정보
        self.apm_server = 'http://byu.apmonitor.com'
        self.apm_app    = 'my_MPC'

    def generate_random_tsp(self) -> np.ndarray:
        """
        5 초 샘플 간격에 맞춘 set‑point 배열(길이 = self.steps) 생성.
        • 각 구간 길이: 평균 48 초, σ 10 초, 16 ∼ 80 초 (초 단위)
        """
        tsp = np.zeros(self.steps)
        i = 0
        while i < self.steps:
            dur_sec   = int(np.clip(np.random.normal(48, 10), 16, 80))
            dur_steps = max(1, int(dur_sec / self.dt))
            end = min(i + dur_steps, self.steps)
            tsp[i:end] = round(np.random.uniform(self.sp_low, self.sp_high), 2)
            i = end
        return tsp

    def save_plot(self, t, T1, Tsp1, T2, Tsp2, Q1, Q2, ep):
        fig, axs = plt.subplots(2, 1, figsize=(10, 8))

        # 온도
        axs[0].plot(t, T1,  'b-', label='T1 (measured)')
        axs[0].plot(t, Tsp1,'k--',label='T1 (setpoint)')
        axs[0].plot(t, T2,  'r-', label='T2 (measured)')
        axs[0].plot(t, Tsp2,'k:', label='T2 (setpoint)')
        axs[0].set_ylabel('Temperature (°C)')
        axs[0].legend(); axs[0].grid()

        # 히터
        axs[1].plot(t, Q1, 'b-', label='Q1')
        axs[1].plot(t, Q2, 'r--', label='Q2')
        axs[1].set_ylabel('Heater Output (%)')
        axs[1].set_xlabel('Time (s)')
        axs[1].legend(); axs[1].grid()

        plt.tight_layout()
        plt.savefig(self.png_dir / f'mpc_episode_{ep}.png')
        plt.close()

    # ---------- 에피소드 실행 ----------
    def run_episode(self, arduino, ep):
        print(f"\n=== Episode {ep} start ===")

        # 안정 온도까지 냉각
        arduino.Q1(0); arduino.Q2(0)
        while arduino.T1 >= self.start_temp or arduino.T2 >= self.start_temp:
            print(f" Cooling... T1={arduino.T1:.1f}, T2={arduino.T2:.1f}")
            time.sleep(20)

        # 배열 초기화
        t   = np.zeros(self.steps)
        T1  = np.zeros(self.steps); T2  = np.zeros(self.steps)
        Q1  = np.zeros(self.steps); Q2  = np.zeros(self.steps)
        Tsp1 = self.generate_random_tsp(); Tsp2 = self.generate_random_tsp()
        iae  = 0.0  # Integral of Absolute Error

        # CSV 로그
        csv_path = self.csv_dir / f'mpc_episode_{ep}_data.csv'
        with open(csv_path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['Episode','Time','T1','T2','Q1','Q2','TSP1','TSP2','IAE'])

            mpc_init()
            start_time = time.time()

            for k in range(self.steps):            # 240 step
                loop_start = time.time()
                t[k] = k * self.dt                 # 0,5,10,...,1195

                T1[k] = round(arduino.T1, 2)
                T2[k] = round(arduino.T2, 2)

                # IAE 누적
                iae += abs(Tsp1[k] - T1[k]) + abs(Tsp2[k] - T2[k])

                # MPC 계산
                q1, q2 = mpc(T1[k], Tsp1[k], T2[k], Tsp2[k])
                Q1[k] = round(q1, 2); Q2[k] = round(q2, 2)
                arduino.Q1(Q1[k]); arduino.Q2(Q2[k])

                # 1분마다 로그
                if k % 12 == 0 or k == self.steps - 1:  # 12 step × 5 s = 60 s
                    print(f" t={t[k]:4.0f}s | "
                          f"SP1={Tsp1[k]:5.2f}, PV1={T1[k]:5.2f}, Q1={Q1[k]:5.2f} | "
                          f"SP2={Tsp2[k]:5.2f}, PV2={T2[k]:5.2f}, Q2={Q2[k]:5.2f} | "
                          f"IAE={iae:7.2f}")

                # CSV 기록
                writer.writerow([
                    ep, f"{t[k]:.0f}",
                    f"{T1[k]:.2f}", f"{T2[k]:.2f}",
                    f"{Q1[k]:.2f}", f"{Q2[k]:.2f}",
                    f"{Tsp1[k]:.2f}", f"{Tsp2[k]:.2f}",
                    f"{iae:.2f}"
                ])

                # 5 초 간격 유지
                elapsed = time.time() - loop_start
                time.sleep(max(0.0, self.dt - elapsed))

        # 그래프 저장
        self.save_plot(t, T1, Tsp1, T2, Tsp2, Q1, Q2, ep)
        print(f"=== Episode {ep} done, data saved to {csv_path} ===")

    def run(self):
        with tclab.TCLab() as arduino:
            print(arduino.version)
            arduino.LED(100)
            for ep in range(1, self.episodes + 1):
                self.run_episode(arduino, ep)
        print("All episodes finished.")



if __name__ == '__main__':
    collector = MPCDataCollector()
    collector.run()
