In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

df = pd.read_csv('data/statespace.csv')

inputs = df[['Time', 'T_ext', 'P_hea', 'I_sol']].values
output = df['T_int'].values

# Function that will calculate the indoor temperature for a given value of R and C
def simpleRC(inputs, Ri, Ro, Ci, Ce, Ai, Ae):
    
    time = inputs[:, 0]
    to   = inputs[:, 1]
    phi_h = inputs[:, 2]
    phi_s = inputs[:, 3]
    
    ti = np.zeros(len(time))
    te = np.zeros(len(time))
    
    # Initial temperatures
    ti[0] = output[0]
    te[0] = ( Ri*to[0] + Ro*ti[0] ) / (Ri + Ro)
    
    # Loop for calculating all temperatures
    for t in range(1, len(output)):
      dt = time[t] - time[t-1]
      ti[t] = ti[t-1] + dt / Ci * ( (te[t-1]-ti[t-1])/Ri + phi_h[t-1] + Ai*phi_s[t-1] )
      te[t] = te[t-1] + dt / Ce * ( (ti[t-1]-te[t-1])/Ri + (to[t-1]-te[t-1])/Ro + Ae*phi_s[t-1] )
      
    return ti

df

Unnamed: 0,Time,T_ext,P_hea,I_sol,T_int
0,0.0,16.233909,0.0,13.817618,30.281172
1,1800.0,15.836256,0.0,13.828211,30.209944
2,3600.0,15.484294,0.0,13.814245,30.155675
3,5400.0,15.272714,0.0,13.847024,30.086922
4,7200.0,14.999161,0.0,13.841355,30.023879
...,...,...,...,...,...
175,315000.0,27.917201,0.0,482.038910,30.177581
176,316800.0,28.147587,0.0,389.229309,30.164632
177,318600.0,28.504467,0.0,474.254730,30.145209
178,320400.0,27.698600,0.0,380.483002,30.132256


In [2]:
import scipy.optimize as so

p_opt, p_cov = so.curve_fit(f=simpleRC,
                            xdata=inputs,
                            ydata=output,
                            p0=(0.01, 0.01, 1e6, 1e7, 5, 5))

# Saving results into a dataframe and displaying it
res1 = pd.DataFrame(index=['Ri', 'Ro', 'Ci', 'Ce', 'Ai', 'Ae'])
res1['avg'] = p_opt
res1['std'] = np.diag(p_cov)**0.5
print(res1)

             avg            std
Ri  4.091570e-03       0.000125
Ro  5.038526e-02       0.004552
Ci  5.344581e+06  225321.203258
Ce  1.761891e+07  841478.831398
Ai  1.861247e-01       0.055401
Ae -1.571828e+00       0.096530


In [5]:
from scipy.linalg import expm
from numpy.linalg import inv

class RC(object):
    """This is the generic class for any RC model structure"""

    def __init__(self):
        pass

    def discretize(self, dt):
        """ This method applies the discretisation equations shown in the previous chapter
        It is only function of the time step size dt
        """
        n = self.N_states
        # Discretisation
        F = expm(self.Ac * dt)
        G = np.dot(inv(self.Ac), np.dot(F - np.eye(n), self.Bc))
        H = self.Cc
        # System error covariance matrix (continuous Qc and discrete Q)
        foo = expm(np.block([[-self.Ac, self.Qc], [np.zeros(np.shape(self.Ac)), self.Ac.T]]) * dt)
        Q = np.dot(foo[n:2 * n, n:2 * n].T, foo[0:n, n:2 * n])
        # Measurement error covariance matrix (discrete)
        R = self.Rc / dt

        return F, G, H, Q, R

    def prediction(self, x_0, t, u, y=None, update=False):
        """ This method predicts the indoor temperature.
        
        If update=True, the function will apply all steps of the Kalman filter
        and return the mean state, covariances of innovations and the likelihood
        
        If update=False, the function will only predict states and their variance,
        as if no measurements were available. Use this for forecasting.
        
        x_0: initial state vector
        t: vector of time coordinates
        u: input vector
        y: measurement vector (only required if update=True)
        """

        N_time = len(t)

        # Initialisation of all arrays that will be used in the calculations
        #  Mean and variance of the states calculated at the prediction step
        x_avg_predict = np.zeros((N_time, self.N_states))
        x_var_predict = np.zeros((N_time, self.N_states, self.N_states))
        x_avg_predict[0] = x_0
        x_var_predict[0] = np.eye(self.N_states)

        x_avg_update = np.zeros((N_time, self.N_states))
        x_var_update = np.zeros((N_time, self.N_states, self.N_states))
        x_avg_update[0] = x_0
        x_var_update[0] = np.eye(self.N_states)

        epsilon = np.zeros(N_time)  # prediction errors (innovations)
        Sigma = np.zeros(N_time)  # innovation covariances
        loglike = np.zeros(N_time) # log-likelihood to be minimized

        for i in range(1, N_time):

            #  Matrices of the current time step (depend on the time step size dt)
            dt = t[i] - t[i - 1]
            F, G, H, Q, R = self.discretize(dt)

            # KALMAN 1: Prediction step
            x_avg_predict[i] = np.dot(F, x_avg_update[i-1]) + np.dot(G, u[i])
            x_var_predict[i] = np.dot(F, np.dot(x_var_update[i-1], F.T)) + Q

            if update:
                # KALMAN 2: Residuals and Kalman gain
                epsilon[i] = y[i] - np.dot(H, x_avg_predict[i])
                foo = np.dot(H, np.dot(x_var_predict[i], H.T)) + R
                Sigma[i] = foo
                K = np.dot(x_var_predict[i], np.dot(H.T, np.linalg.inv(foo)))
                loglike[i] = -0.5 * epsilon[i] ** 2 / Sigma[i] - 0.5 * Sigma[i] * 2 * np.pi
                # KALMAN 3: Update and weight
                x_avg_update[i] = x_avg_predict[i] + np.dot(K, y[i] - np.dot(H, x_avg_predict[i]))
                x_var_update[i] = x_var_predict[i] - np.dot(K, np.dot(H, x_var_predict[i]))
            else:
                x_avg_update[i] = x_avg_predict[i]
                x_var_update[i] = x_var_predict[i]

        # Returns
        if update:
        # If the Kalman update was used, return 
            X = np.dot(H, x_avg_predict.T).flatten()
            S = Sigma.flatten()
            return X, S, loglike
        else:
            X_avg = np.dot(H, x_avg_predict.T).flatten()
            X_var = np.dot(H, np.dot(x_var_predict, H.T)).flatten()
            return X_avg, X_var
        
class tite(RC):

    def __init__(self, Ri, Ro, Ci, Ce, Ai, Ae, qi, qe, r):
        """ Model inputs: ambient temperature, heating, solar irradiance"""

        # Model parameters
        self.Ri = Ri # The first resistance (K/W) of the model, on the indoor side
        self.Ro = Ro # The second resistance (K/W) of the model, on the outdoor side
        self.Ci = Ci # Heat capacity (J/K) connected to the indoor temperature
        self.Ce = Ce # Heat capacity (J/K) connected to the envelope temperature
        self.Ai = Ai # Solar aperture coefficient directed to the indoor temperature
        self.Ae = Ae # Solar aperture coefficient directed to the envelope temperature
        self.qi = qi # Standard deviation of the modelling error on Ti
        self.qe = qe # Standard deviation of the modelling error on Ti
        self.r = r   # Standard deviation of the measurement error

        # Number of states
        self.N_states = 2

        # Matrices of the continuous system
        self.Ac = np.array([[-1/(Ci*Ri), 1/(Ci*Ri)],
                            [1/(Ce*Ri), -1/(Ce*Ri) - 1/(Ce*Ro)]])
        self.Bc = np.array([[0, 1 / Ci, Ai / Ci],
                            [1/(Ce*Ro), 0, Ae/Ce]])
        self.Cc = np.array([[1, 0]])
        # System and measurement error covariance matrices
        self.Qc = np.diag([qi**2, qe**2])
        self.Rc = np.array([[r ** 2]])
        
# Definition of an evaluation function that takes "normalized" values or each parameter
def evaluation(df, Ri, Ro, Ci, Ce, Ai, Ae, qi, qe, r, xe0):

    # Reading the dataframe given as argument
    t = df['Time'].values
    u = df[['T_ext', 'P_hea', 'I_sol']].values
    y = df['T_int'].values

    # Model specification and initial condition
    model = tite(Ri, Ro, Ci, Ce, Ai, Ae, qi, qe, r)
    x_0 = [y[0], xe0]

    # Model prediction, without Kalman update
    X, S, L = model.prediction(x_0, t, u, y, update=True)
    
    # We only need the likelihood for the minimize method
    return X

from scipy.optimize import curve_fit

# Curve fitting happens here. popt and pcov are the mean and covariance of parameter estimates
popt, pcov = curve_fit(evaluation,
                       xdata = df,
                       ydata = df['T_int'],
                       p0 = np.append(res1['avg'], [1e-3, 1e-3, 1e-1, 20]) ,
                       method='lm')

res2 = pd.DataFrame(index=['Ri', 'Ro', 'Ci', 'Ce', 'Ai', 'Ae', 'qi', 'qe', 'r', 'xe0'])
res2['avg'] = popt
res2['std'] = np.diag(pcov)**0.5

print(res2)

              avg            std
Ri   2.950052e-03       0.000104
Ro   1.531622e-02       0.000642
Ci   3.969820e+06  174698.811888
Ce   1.463189e+07  383973.977429
Ai   2.338763e-01       0.035236
Ae  -7.783643e-02       0.100376
qi   4.557820e-04       0.016659
qe  -1.906346e-07       0.016820
r    1.670017e+00      60.983250
xe0  2.991427e+01       0.617212


In [6]:
# Model specification with the mean of optimised parameters
model_opt = tite(*res2['avg'][:-1])

# Initial states
t = df['Time'].values
u = df[['T_ext', 'P_hea', 'I_sol']].values
y = df['T_int'].values
x_0 = [y[0], res2['avg']['xe0'] ]

# Prediction without Kalman update, to be compared with the data
x_avg, x_var = model_opt.prediction(x_0, t, u, y=None, update=False)

In [7]:
X, S, L = model_opt.prediction(x_0, t, u, y, update=True)

# eps_ are the one-step ahead prediction residuals
eps_ = y - X
# Let's only take every other value, so that we have time lags of one hour.
eps_ = eps_[::2]

# Correlation function of two time series. Can be used for the autocorrelation of a single time series
def crosscorrelation(x, u):
    """
    http://stackoverflow.com/q/14297012/190597
    http://en.wikipedia.org/wiki/Autocorrelation#Estimation
    """
    n = len(x)
    x = x-x.mean()
    u = u-u.mean()
    r = np.correlate(x, u, mode = 'full')[-n:]
    return r/(x.std()*u.std()*(np.arange(n, 0, -1)))

# Autocorrelation of the residuals (only keeping the first 50 lags)
ACF = crosscorrelation(eps_, eps_)[0:50]
lags = np.linspace(0,49,50)