In [None]:
import random
from datetime import timedelta, datetime

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import holidays
import optuna

from ai.testing.buildingConstant import EPW_COLUMN_NAMES
from ai.testing.register import register_environments, create_env
from ai.testing.simple_pid.pid import PID
from ai.testing.utils import get_value_from_obs_dict, get_future_values, get_time, check_summer_or_winter_by_time

plot_action_yearly = pd.DataFrame(columns=['date', 'heating', 'cooling', 'average', 'outsideTemp','target_temp'])


In [None]:
def read_epw_file(epw_file_path, runperiod, noise_level=1.0) -> pd.DataFrame:
    start_date = datetime(*runperiod[2::-1])
    end_date = datetime(*runperiod[:2:-1], 23,59,59)

    df = pd.read_csv('./weather_data/' + epw_file_path, skiprows=8, header=None, names=EPW_COLUMN_NAMES)
    df['DateTime'] = pd.to_datetime(df[['Year', 'Month', 'Day', 'Hour', 'Minute']])
    df.set_index('DateTime', inplace=True)
    df.drop(columns=['Year', 'Month', 'Day', 'Hour', 'Minute'], inplace=True)

    df.iloc[:, 1] += noise_level * random.uniform(-1, 1)
    df.iloc[:, 12] += noise_level * random.uniform(-1, 1)
    df.iloc[:, 13] += noise_level * random.uniform(-1, 1)

    df = df.resample('15T').interpolate(method='linear')

    return df.loc[start_date:end_date]

'''
Error calculation maybe for later, when energy consumption is considered.
'''
def error_calc(error, env) -> float:
    rewards = env.reward_fn
    energy = get_value_from_obs_dict(env.obs_dict, rewards.energy_variable) / rewards.maximum_electricity_demand
    weight_temp = 1.2
    weight_energy = 0.5

    energy_reward = energy * rewards.energy_factor

    combined_error = (weight_temp * error + weight_energy * energy_reward)

    return combined_error

'''
Disturbance Caluclation, where future target temp, future outside temp and solar is considered
'''
def calculate_disturbance(df, info, env, future_target_temp, v1, v2, v3) -> float:
    future_mean_outside_temp = get_future_values(df, info, 'Dry Bulb Temperature (Â°C)', 6)
    future_diff_light = get_future_values(df, info, 'Diffuse Horizontal Illuminance (lux)', 6)
    future_direct_light = get_future_values(df, info, 'Direct Normal Illuminance (lux)', 6)

    room_temp = env.obs_dict['Zone Air Temperature(SPACE1-1)']
    dist_temp = future_mean_outside_temp - room_temp

    #convert from lux to W/m2
    future_direct_light *= 0.0079
    future_diff_light *= 0.0079

    new_target_temp = np.mean(future_target_temp)
    if new_target_temp != env.spec.kwargs['reward_kwargs']['target_temperature']:
        dist_room_temp =  new_target_temp - room_temp
        return v1 * dist_temp + v2 * (future_diff_light + future_direct_light) + v3 * dist_room_temp

    return v1 * dist_temp + v2 * (future_diff_light + future_direct_light)

'''
This method has two different tasks. First when the target temperature is changing it changes for all important variables.
Secondly it is used to get the future target temperature
'''
def checking_time(info, env, pid_cooling, pid_heating, future = False) -> float:
    current_time,_, _, _, hour = get_time(info)
    season = check_summer_or_winter_by_time(info)
    at_holidays = holidays.Austria()
    target_temp = 20
    if current_time.weekday() >= 5 or (int(hour) >= 18 or int(hour) <= 6) or current_time in at_holidays:
        if season == 'winter':
            if future:
                target_temp = 18
            else:
                pid_heating.setpoint = 18
                pid_cooling.setpoint = 18
                env.target_temperature = 18
                env.unwrapped.reward_fn.target_temperature = 18
        else:
            if future:
                target_temp = 22
            else:
                pid_heating.setpoint = 22
                pid_cooling.setpoint = 22
                env.target_temperature = 22
                env.unwrapped.reward_fn.target_temperature = 22

    else:
        if future:
            target_temp = 20
        else:
            pid_heating.setpoint = 20
            pid_cooling.setpoint = 20
            env.target_temperature = 20
            env.unwrapped.reward_fn.target_temperature = 20

    return target_temp

'''
Future target temperature gets saved into a list or it fills the list with the net one.
'''
def future_temp(env, info, looping = True) -> list[float] | float:
    start_time, _, _, _, _ = get_time(info)
    end_time = start_time + timedelta(hours=3)
    time_interval = timedelta(seconds=60) #TODO: the same amount like it would step
    current_time = start_time

    target_temp = []
    if looping:
        while current_time <= end_time:
            new_info = {
                'year': current_time.year,
                'month': current_time.month,
                'day': current_time.day,
                'hour': current_time.hour
            }

            target_temp.append(checking_time(new_info, env, None, None, True))
            current_time += time_interval
        return target_temp
    else:
        new_info = {
            'year': end_time.year,
            'month': end_time.month,
            'day': end_time.day,
            'hour': end_time.hour
        }
        target_temp.append(checking_time(new_info, env, None, None, True))
        return target_temp[0]

def run_simulation(trial, env, plots, winter_summer = (True, True), param_testing = None, best_value = None) -> float:

    params = choose_opt(trial, param_testing)
    #mean value : -62941.46284956693.
    pid_heating = PID(10.1, 0.6951, 0.004900000000000001,  setpoint=20)
    #pid_heating = PID(**params, setpoint=20)
    pid_heating.output_limits = (20, 60)
    #pid_heating.error_map = functools.partial(error_calc, env=env)

    #mean value -68203.24015285478
    pid_cooling = PID(0.2, 0.48710000000000003, 0.06670000000000001,  setpoint=20)
    #pid_cooling = PID(**params, setpoint=20)
    pid_cooling.output_limits = (15, 40)
    #pid_cooling.error_map = functools.partial(error_calc, env=env)


    rewards = []
    temp_sum_reward = 0
    for i in range(1):
        obs, info = env.reset()

        future_target_temp = future_temp(env, info)

        terminated = False
        current_month = 0
        df = read_epw_file(env.unwrapped.weather_files[0], env.spec.kwargs['config_params']['runperiod'])
        while not terminated:
            checking_time(info, env, pid_cooling, pid_heating)

            a = [0,0] if all(winter_summer) else [0]
            cooling_index = 1 if all(winter_summer) else 0
            if check_summer_or_winter_by_time(info) == 'summer':
                disturbance = calculate_disturbance(df, info, env, future_target_temp, 1.18,
                                                    0.29, 0.72)
                #disturbance = calculate_disturbance(df, info, env, future_target_temp, **params)
                control_value = pid_cooling(env.obs_dict['Zone Air Temperature(SPACE1-1)'], disturbance)
                a[cooling_index] = (control_value - 15.) / (40. - 15.)
            else:
                disturbance = calculate_disturbance(df, info, env, future_target_temp, 0.97,
                                                    0.53, 0.59)
                #disturbance = calculate_disturbance(df, info, env, future_target_temp, **params)
                control_value = pid_heating(env.obs_dict['Zone Air Temperature(SPACE1-1)'], disturbance)
                a[0] = (control_value - 20.) / (60. - 20.)
            obs, reward, terminated, truncated, info = env.step(a)
            rewards.append(reward)

            temp_sum_reward += reward

            yearly_row = {'date': pd.to_datetime(datetime(info['year'], info['month'], info['day'], info['hour'])),
                          'average': env.obs_dict['Zone Air Temperature(SPACE1-1)'],
                          'outsideTemp': env.obs_dict['Site Outdoor Air Drybulb Temperature(Environment)'],
                          'cooling':  env.obs_dict['Chiller Evaporator Outlet Temperature(Main Chiller)'],
                          'heating': env.obs_dict['Boiler Outlet Temperature(Main Boiler)'],
                          'target_temp': env.unwrapped.reward_fn.target_temperature }
            plot_action_yearly.loc[len(plot_action_yearly)] = yearly_row

            if info['month'] != current_month:  # display results every month
                current_month = info['month']
                print('Reward: ', sum(rewards), info)

            if best_value is not None and best_value > temp_sum_reward: # for optuna if the reward value is already higher then it stops
                raise optuna.TrialPruned()

            del future_target_temp[0] #To get the next future target temp we need to delete the last one and add a new one
            future_target_temp.append(future_temp(env, info, False))
    return sum(rewards)


def choose_opt(trial, choosen_param) -> dict:
    match choosen_param:
        case "future":
            weight_temp = trial.suggest_float('v1', 0.0, 1.5, step=0.01)
            weight_diffuse = trial.suggest_float('v2', 0.0, 1.5, step=0.01)
            weight_target_temp = trial.suggest_float('v3', 0.0, 1.5, step=0.01)
            params = {'v1': weight_temp, 'v2': weight_diffuse,'v3':weight_target_temp}
        case "PID":
            weight_propo = trial.suggest_float('Kp', 0.1, 0.5, step=0.0002)
            weight_error = trial.suggest_float('Ki', 0.0001, 1, step=0.0002)
            weight_derivative = trial.suggest_float('Kd', 0.0001, 1, step=0.0002)
            #weight_feedforward = trial.suggest_uniform('feedforward_gain', 0.0001, 1)
            params = {'Kp': weight_propo, 'Ki': weight_error, 'Kd': weight_derivative}
        case "error":
            pass
        case _:
            params = None
            print("Nothing to optimize")

    return params


Run this to just run the simulation once

In [None]:
winter_summer = (True, True)

environment_ids = register_environments(
            target_temperature=np.arange(20,21),
            filter_building_files=('factory',),
            filter_weather_files=('vienna',),
            winter=winter_summer[0],
            summer=winter_summer[1] )

env = create_env(environment_ids[0], winter_summer)
run_simulation(None, env, True, winter_summer= winter_summer)

Run this to use optuna to find the best parameter using "future" for disturbance variables or "PID" for the PID parameters


In [None]:
study = optuna.create_study(direction="maximize")
study.optimize(lambda trial: run_simulation(trial, env, False, winter_summer, "future", study.best_value
                if len(study.best_trials) != 0 else None), n_trials=100)

best_params = study.best_params
best_value = study.best_value

print("Best parameters: ", best_params)
print("Best objective value: ", best_value)

Converting all saved values into daily/monthly and yearly dataframes, so it can be plotted


In [None]:
plot_monthly = plot_action_yearly.resample('D', on='date').mean()
plot_monthly = plot_monthly.groupby([plot_monthly.index.year,plot_monthly.index.month])
plot_daily = plot_action_yearly.resample('H', on='date').mean()
plot_daily = plot_daily.groupby([plot_daily.index.year,plot_daily.index.month])

plot_action_yearly = plot_action_yearly.resample('M', on='date').mean()
figs, sum_axes = plt.subplots((len(plot_monthly) // 2) + 1, 2, figsize=(10, 12))

year_fig, ax_year = plt.subplots()
ax_year.set_title("Yearly Overall")

Heating Monthly Plotting

In [None]:
for i, (month, group) in enumerate(plot_monthly):
    if (10 <= month[1] <= 12 or month[1] <= 5) and len(group) > 1:
        print(month)
        fig1, ax = plt.subplots()
        ax.set_title(f"{month[1]}-{group.index.year[0]} Monthly Overall")

        montly_plot(ax, group, 20, 'heating')
        print(ax)
        plt.show()

Cooling Monthly Plotting

In [None]:
if winter_summer[1]:
    for i, (month, group) in enumerate(plot_monthly) :
        if 6 <= month[1] <= 9 and len(group) > 10:
            fig1, ax = plt.subplots()
            ax.set_title(f"{month[1]}-{group.index.year[0]} Monthly Overall")

            montly_plot(ax, group, 20, 'cooling')

            plt.close(fig1)

Yearly Plotting

In [None]:
if len(plot_action_yearly) > 1:
    montly_plot(ax_year, plot_action_yearly, 20, 'heating')