<a href="https://colab.research.google.com/github/NumbLime/MPC-control-for-RC-model/blob/main/test2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
import requests

# url for the BOPTEST service
url = 'http://api.boptest.net'

# Select test case and get identifier
testcase = 'bestest_hydronic_heat_pump'
# Check if already started a test case and stop it if so before starting another
try:
  requests.put('{0}/stop/{1}'.format(url, testid))
except:
  pass
# Select and start a new test case
testid = \
requests.post('{0}/testcases/{1}/select'.format(url,testcase)).json()['testid']

In [11]:
class Controller_Proportional(object):

    def __init__(self, TSet=273.15+21, k_p=10.):
        '''Constructor.

        Parameters
        ----------
        TSet : float, optional
            Temperature set-point in Kelvin.
        k_p : float, optional
            Proportional gain.

        '''

        self.TSet = TSet
        self.k_p  = k_p

    def compute_control(self, y):
        '''Compute the control input from the measurement.

        Parameters
        ----------
        y : dict
            Contains the current values of the measurements.
            {<measurement_name>:<measurement_value>}

        Returns
        -------
        u : dict
            Defines the control input to be used for the next step.
            {<input_name> : <input_value>}

        '''

        # Compute control
        if y['reaTZon_y']<self.TSet:
            e = self.TSet - y['reaTZon_y']
        else:
            e = 0

        value = self.k_p*e
        u = {'oveHeaPumY_u':value,
             'oveHeaPumY_activate': 1}

        return u

In [12]:
# Initialize scenario
y = requests.put('{0}/scenario/{1}'.format(url, testid),
                 json={'time_period':'peak_heat_day',
                       'electricity_price':'dynamic'}).json()['payload']['time_period']
# Set control step
requests.put('{0}/step/{1}'.format(url, testid), json={'step':3600})
# Instantiate controller
con = Controller_Proportional(TSet=273.15+21, k_p=5.)

# for model buliding
data_for_model = {
    "Operative temperature": [],
    "Wet bulb temperature": [],
    "Horizontal infrared irradiation": [],
    "control signal": []
}
x0 = round(y['reaTZon_y']-273.15,2)
# Simulation loop
from IPython.display import clear_output
while y:
    # Clear the display output at each step
    clear_output(wait=True)
    # Print the current operative temperature and simulation time
    print('-------------------------------------------------------------------')
    print('Operative temperature [degC]  = {:.2f}'.format(y['reaTZon_y']-273.15))
    print('Wet bulb temperature [degC]  = {:.2f}'.format(y['weaSta_reaWeaTWetBul_y']-273.15))
    print('Horizontal infrared irradiation [W/m2]  = {:.2f}'.format(y['weaSta_reaWeaHHorIR_y']))
    # print('InternalGains [W]  = {:.2f}'.format(y['InternalGainsCon[1]']))
    simulation_time_days = y['time']/3600/24
    start_time_days = 0
    print('Simulation time [elapsed days] = {:.2f}'.format((simulation_time_days - \
                                                    start_time_days)))
    print('-------------------------------------------------------------------')
    # Compute control signal
    u = con.compute_control(y)
    print(u)
    data_for_model["Operative temperature"].append(round(y['reaTZon_y']-273.15,2))
    data_for_model["Wet bulb temperature"].append(round(y['weaSta_reaWeaTWetBul_y']-273.15,2))
    data_for_model["Horizontal infrared irradiation"].append(round(y['weaSta_reaWeaHHorIR_y'],2))
    data_for_model["control signal"].append(u)
    # Advance simulation with control signal
    y = requests.post('{0}/advance/{1}'.format(url, testid), json=u).json()['payload']


-------------------------------------------------------------------
Operative temperature [degC]  = 20.91
Wet bulb temperature [degC]  = -0.85
Horizontal infrared irradiation [W/m2]  = 247.00
Simulation time [elapsed days] = 30.00
-------------------------------------------------------------------
{'oveHeaPumY_u': 0.4265474362313171, 'oveHeaPumY_activate': 1}


In [13]:
import numpy as np
from scipy.optimize import minimize

# 状态空间模型
def state_space_model(params, x0, u, t, process_noise_std=0.1, measurement_noise_std=0.1):
    A = np.array([[params[0]]])
    B = np.array([params[1:4]]).T
    # E = np.array([[params[9], params[10]], [params[11], params[12]]])

    x = np.zeros(len(t))
    y = np.zeros(len(t))
    x[:, 0] = x0

    for i in range(1, len(t)):
        w_t = np.random.normal(0, process_noise_std, size=(2,))
        v_t = np.random.normal(0, measurement_noise_std)
        x[:, i] = A @ x[:, i-1] + B.flatten() * u[i-1] + E @ w_t
        y[i] = x[i] + D * u[i] + v_t

    return y

# 误差函数
def error_function(params, x0, u, y_true, t):
    y_pred = state_space_model(params, x0, u, t)
    return np.sum((y_true - y_pred) ** 2)

# 示例数据
t = np.linspace(0, 10, 100)
u = np.sin(t)  # 输入
y_true = 0.5 * np.sin(t)  # 示例真实输出（可加入噪声）

# 初始状态
x0 = np.array([0, 0])

# 初始参数猜测
initial_guess = np.random.rand(13)

# 最小化误差函数
result = minimize(error_function, initial_guess, args=(x0, u, y_true, t))
optimal_params = result.x

print("Optimal parameters:", optimal_params)


  y[i] = C @ x[:, i] + D * u[i] + v_t
  y[i] = C @ x[:, i] + D * u[i] + v_t
  y[i] = C @ x[:, i] + D * u[i] + v_t


Optimal parameters: [0.98359052 0.09314188 0.29490141 0.43678552 0.0806308  0.3150859
 0.21625919 0.63243552 0.56231254 0.20067858 0.84664565 0.51851574
 0.35398422]


  y[i] = C @ x[:, i] + D * u[i] + v_t


In [7]:
# Get available forecast points
res = requests.get('{0}/forecast_points/{1}'.format(url, testid)).json()['payload']
print(res)
# Get forecast data for a subset of example variables
points = ['TDryBul', 'PriceElectricPowerDynamic', 'Occupancy[1]',
          'UpperSetp[1]', 'LowerSetp[1]']
w = requests.put('{0}/forecast/{1}'.format(url, testid),
                  json={'point_names':points, 'horizon':6*3600, 'interval':1*3600}).json()['payload']
print(w)

{'PriceElectricPowerConstant': {'Description': 'Completely constant electricity price', 'Unit': '($/Euro)/kWh'}, 'PriceElectricPowerDynamic': {'Description': 'Electricity price for a day/night tariff', 'Unit': '($/Euro)/kWh'}, 'PriceElectricPowerHighlyDynamic': {'Description': 'Spot electricity price', 'Unit': '($/Euro)/kWh'}, 'pAtm': {'Description': 'Atmospheric pressure', 'Unit': 'Pa'}, 'lon': {'Description': 'Longitude of the location', 'Unit': 'rad'}, 'lat': {'Description': 'Latitude of the location', 'Unit': 'rad'}, 'TDewPoi': {'Description': 'Dew point temperature', 'Unit': 'K'}, 'HDirNor': {'Description': 'Direct normal radiation', 'Unit': 'W/m2'}, 'HDifHor': {'Description': 'Horizontal diffuse solar radiation', 'Unit': 'W/m2'}, 'HGloHor': {'Description': 'Horizontal global radiation', 'Unit': 'W/m2'}, 'relHum': {'Description': 'Relative Humidity', 'Unit': '1'}, 'nTot': {'Description': 'Total sky Cover [0, 1]', 'Unit': '1'}, 'ceiHei': {'Description': 'Ceiling height', 'Unit': 'm