In [None]:
# %pip freeze > requirements.txt
# %pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


# 1. Initiation


#### 1.1 Importing Libraries

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from scipy import stats

#### 1.2 Importing Daataset




In [2]:
Dataset = pd.read_csv('Dataset for PMV Prediction.csv')
print(Dataset)

       Season            Climate  Building type   Clo  Met  \
0      Summer  Humid subtropical         Office  0.57  1.0   
1      Summer  Humid subtropical         Office  0.57  1.1   
2      Summer  Humid subtropical         Office  0.57  1.1   
3      Summer  Humid subtropical         Office  0.57  1.0   
4      Summer  Humid subtropical         Office  0.57  1.0   
...       ...                ...            ...   ...  ...   
47331  Winter  Humid subtropical  Senior center  0.94  1.0   
47332  Winter  Humid subtropical  Senior center  0.66  1.0   
47333  Winter  Humid subtropical  Senior center  0.69  1.0   
47334  Winter  Humid subtropical  Senior center  0.82  1.0   
47335  Winter  Humid subtropical  Senior center  0.86  1.2   

       Air temperature (C)  Relative humidity (%)  Air velocity (m/s)  \
0                     24.3                   36.8                0.27   
1                     25.7                   33.1                0.09   
2                     24.6          

#### 1.4 Dealing with Outliars

In [3]:
# Step 1: Remove Outliers using Z-scores
def remove_outliers_zscore(df, threshold=3):
    z_scores = np.abs(stats.zscore(df.select_dtypes(include=[np.number])))  # Only numeric columns
    outliers = (z_scores > threshold).any(axis=1)
    print(f"Number of outliers detected: {outliers.sum()}")
    return df[~outliers]

# Step 2: Apply outlier removal
Dataset = remove_outliers_zscore(Dataset)

# Step 3: Dataset is now ready for any machine learning model
print(f"Shape of the Dataset after cleaning: {Dataset.shape}")

Number of outliers detected: 3071
Shape of the Dataset after cleaning: (44265, 10)


#### 1.3 Max and Min Values of Variables

In [4]:
# Print the max and min value of every variable after removing outliers
max_values = Dataset.max()  # Maximum values of each column
min_values = Dataset.min()  # Minimum values of each column

# Printing the results
print("Maximum values of each variable after removing outliers:")
print(max_values)

print("\nMinimum values of each variable after removing outliers:")
print(min_values)

Maximum values of each variable after removing outliers:
Season                                                        Winter
Climate                                Warm-summer humid continental
Building type                                          Senior center
Clo                                                             1.53
Met                                                              1.8
Air temperature (C)                                             34.1
Relative humidity (%)                                           89.0
Air velocity (m/s)                                              1.51
Outdoor monthly air temperature (C)                             38.1
PMV                                                              2.7
dtype: object

Minimum values of each variable after removing outliers:
Season                                         Autumn
Climate                                Cold semi-arid
Building type                               Classroom
Clo                

#### 1.4 Input and Output Features

In [5]:
print (Dataset.shape)

(44265, 10)


In [6]:
# Input and Output Features
X = Dataset.drop("PMV", axis=1)
y = Dataset["PMV"]                    # Output Features

In [7]:
print(X)

       Season            Climate  Building type   Clo  Met  \
0      Summer  Humid subtropical         Office  0.57  1.0   
1      Summer  Humid subtropical         Office  0.57  1.1   
2      Summer  Humid subtropical         Office  0.57  1.1   
3      Summer  Humid subtropical         Office  0.57  1.0   
4      Summer  Humid subtropical         Office  0.57  1.0   
...       ...                ...            ...   ...  ...   
47331  Winter  Humid subtropical  Senior center  0.94  1.0   
47332  Winter  Humid subtropical  Senior center  0.66  1.0   
47333  Winter  Humid subtropical  Senior center  0.69  1.0   
47334  Winter  Humid subtropical  Senior center  0.82  1.0   
47335  Winter  Humid subtropical  Senior center  0.86  1.2   

       Air temperature (C)  Relative humidity (%)  Air velocity (m/s)  \
0                     24.3                   36.8                0.27   
1                     25.7                   33.1                0.09   
2                     24.6          

In [8]:
# -----------------------------
# Identify feature types
# -----------------------------
categorical_features = ["Season", "Building type","Climate"]
numeric_features = [
    "Clo", "Met", "Air temperature (C)", "Relative humidity (%)",
    "Air velocity (m/s)","Outdoor monthly air temperature (C)"
]

In [9]:
# Preprocessor: Scale numeric + One-hot encode categorical
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer 

preprocessor = ColumnTransformer([
    ("num", StandardScaler(), numeric_features),
    ("cat", OneHotEncoder(handle_unknown="ignore"), categorical_features)
])

#### 1.5 Split the Dataset: Train, Validation, and Test

In [10]:
#### 1.6 Split the Dataset: Train, Validation, and Test
# Split raw first
from sklearn.model_selection import train_test_split
X_train_raw, X_temp_raw, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=0)
X_val_raw, X_test_raw, y_val, y_test = train_test_split(X_temp_raw, y_temp, test_size=0.5, random_state=0)

#### 1.6 Data Scaling : Standardization

In [11]:
# Fit preprocessor only on training set
preprocessor.fit(X_train_raw)

In [12]:
# Transform all splits
X_train = preprocessor.transform(X_train_raw)
X_val = preprocessor.transform(X_val_raw)
X_test = preprocessor.transform(X_test_raw)

In [13]:
import joblib
joblib.dump(preprocessor, "preprocessor.pkl")

['preprocessor.pkl']

In [14]:
print (X_train)

[[-1.28948584 -0.02551972  1.24761483 ...  0.          0.
   0.        ]
 [ 1.53088191 -0.02551972 -0.69078384 ...  0.          0.
   0.        ]
 [-0.98575393 -0.02551972  1.59375745 ...  0.          0.
   0.        ]
 ...
 [ 0.22917372 -0.02551972 -0.69078384 ...  0.          0.
   1.        ]
 [ 0.22917372 -0.02551972 -0.48309827 ...  0.          0.
   1.        ]
 [ 0.01222235 -1.27416127  0.90147221 ...  1.          0.
   0.        ]]


In [15]:
print (X_test)

[[ 1.44410137 -0.02551972  0.93608647 ...  0.          0.
   0.        ]
 [-1.28948584 -0.02551972  1.90528581 ...  0.          0.
   0.        ]
 [ 0.01222235 -1.27416127  0.7284009  ...  0.          0.
   0.        ]
 ...
 [ 0.35934454 -0.64984049 -1.72921171 ...  0.          0.
   0.        ]
 [ 0.31595427 -0.02551972 -0.76001237 ...  0.          1.
   0.        ]
 [-0.29150956 -1.27416127  1.66298598 ...  0.          0.
   0.        ]]


In [16]:
print (X_val)

[[ 0.0990029  -0.64984049 -0.20618417 ...  0.          0.
   0.        ]
 [ 0.88002782 -0.02551972 -0.79462663 ...  0.          0.
   0.        ]
 [ 0.22917372  1.22312183 -0.96769794 ...  0.          0.
   0.        ]
 ...
 [-0.5518512  -0.02551972 -0.03311286 ...  0.          0.
   0.        ]
 [ 1.53088191 -0.64984049 -0.48309827 ...  0.          0.
   0.        ]
 [-1.0291442   3.09608415  0.52071533 ...  1.          0.
   0.        ]]


In [17]:
print (y_train)

7630     0.4
19461    0.5
36408    1.2
13148    0.2
16051   -0.7
        ... 
31729    0.3
22185   -0.8
45599   -0.1
46553    0.1
2788     0.4
Name: PMV, Length: 35412, dtype: float64


In [18]:
print (y_test)

23371    1.1
36322    1.6
5683     0.5
36942   -0.8
4805     1.0
        ... 
25079    0.1
38531    0.3
13513   -0.8
33690   -0.1
3376     1.1
Name: PMV, Length: 4427, dtype: float64


In [19]:
print (y_val)

25573   -0.1
32146    0.2
17678   -0.1
9815     0.4
26853   -0.9
        ... 
16538    0.2
27803   -0.7
28494   -0.2
41731    0.2
39325    1.1
Name: PMV, Length: 4426, dtype: float64


# 2. Train the Models and Save

In [20]:
# Custom R² Metric
import tensorflow.keras.backend as K

def coeff_determination(y_true, y_pred):
    SS_res = K.sum(K.square(y_true - y_pred))
    SS_tot = K.sum(K.square(y_true - K.mean(y_true)))
    return 1 - SS_res / (SS_tot + K.epsilon())

# Build the ANN Model
dnn = tf.keras.models.Sequential()
dnn.add(tf.keras.layers.Input(shape=(X_train.shape[1],)))
dnn.add(tf.keras.layers.Dense(units=128, activation='relu'))
dnn.add(tf.keras.layers.Dense(units=128, activation='relu'))
dnn.add(tf.keras.layers.Dense(units=128, activation='relu'))
dnn.add(tf.keras.layers.Dense(units=1, activation='linear'))

# Compile the ANN MOdel
import keras
dnn.compile(optimizer = 'adam',
              loss      = keras.losses.mean_squared_error,
              metrics   = ['mean_absolute_error', 'mean_absolute_percentage_error', coeff_determination])

# Define Stopping Criteria (Loss Threshold)
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',  # or 'val_loss' if you want to monitor validation loss
    patience=30,     # Number of epochs to wait before stopping
    min_delta=0.000001, # an absolute change of less than min_delta, will count as no improvement
    mode='min',      # Stop when the monitored quantity stops decreasing
    verbose=1)

# Saving the Weights with the Minimum Loss
weights_of_min_loss = tf.keras.callbacks.ModelCheckpoint(
    'best_weights.keras',      # File path where the weights will be saved
    monitor='val_loss',         # Monitor the loss value
    save_best_only=True,     # Save only the weights with the best (minimum) loss
    mode='min',             # Mode 'min' indicates that we want to save when loss is minimized
    verbose=1)               # Print out information about the saving process

# Training the ANN on the Training Set
history = dnn.fit(X_train, y_train,
                    batch_size = 32,
                    epochs = 1000,
                    callbacks=[early_stopping, weights_of_min_loss],
                    validation_data=(X_val, y_val),
                    verbose=1)

# Save the Model
dnn.save('pmv_prediction_dnn.keras')

Epoch 1/1000
[1m1104/1107[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 4ms/step - coeff_determination: 0.8843 - loss: 0.0584 - mean_absolute_error: 0.1545 - mean_absolute_percentage_error: 7986909.0000
Epoch 1: val_loss improved from inf to 0.02375, saving model to best_weights.keras
[1m1107/1107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 22ms/step - coeff_determination: 0.8845 - loss: 0.0583 - mean_absolute_error: 0.1544 - mean_absolute_percentage_error: 7983121.0000 - val_coeff_determination: 0.9546 - val_loss: 0.0237 - val_mean_absolute_error: 0.1037 - val_mean_absolute_percentage_error: 6911980.5000
Epoch 2/1000
[1m1107/1107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - coeff_determination: 0.9486 - loss: 0.0257 - mean_absolute_error: 0.1096 - mean_absolute_percentage_error: 6070987.5000
Epoch 2: val_loss improved from 0.02375 to 0.02266, saving model to best_weights.keras
[1m1107/1107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m

# 3. Load the Models

In [21]:
from tensorflow.keras.models import load_model
loaded_dnn = load_model('pmv_prediction_dnn.keras', custom_objects={'coeff_determination': coeff_determination})

# 4. Make Prediction on a New Sample

In [22]:
# Example sample (replace with your own values)
# Ensure the order of features matches your DataFrame `X`
sample = pd.DataFrame([{
    "Clo": 0.57,
    "Met": 1.0,
    "Air temperature (C)": 25.0,
    "Relative humidity (%)": 33.3,
    "Air velocity (m/s)": 0.07,
    "Outdoor monthly air temperature (C)": 32.8,
    "Season": "Summer",
    "Building type": "Office",
    "Climate": "Humid subtropical"
}])

# Preprocess the sample with the same preprocessor fitted earlier
sample_processed = preprocessor.transform(sample)

# Predict PMV using the trained model
predicted_pmv = loaded_dnn.predict(sample_processed)

print("Predicted PMV:", predicted_pmv[0][0])


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 899ms/step
Predicted PMV: -0.47003385


In [23]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Simple DNN-Environment Greedy PMV Controller (PMV* = 0)\n\nThis script mirrors your earlier Fanger-based flow but swaps the environment for your\ntrained DNN and uses your saved sklearn preprocessor (preprocessor.pkl).\n\nRequirements (install if needed):\n  pip install tensorflow pandas numpy scikit-learn joblib\n\nPlace these files in the same folder:\n  - pmv_prediction_dnn.keras        (your saved Keras model)\n  - preprocessor.pkl                (joblib dump of your ColumnTransformer)\n\nRun:\n  python dnn_controller_simple.py\n"""
from __future__ import annotations

import os
import json
import joblib
import numpy as np
import pandas as pd
from typing import Dict
from dataclasses import dataclass
from tensorflow import keras

In [None]:
# -----------------------------
# 0) Config
# -----------------------------
MODEL_PATH = "pmv_prediction_dnn.keras"
SCALER_PATH = "preprocessor.pkl"

# Fixed values (edit if needed)
FIXED = {
    "Met": 1.2,
    "Clo": 0.5,
    "Building type": "Office",
    "Climate": "Hot-summer Mediterranean",
}

# Discrete ranges (match your manuscript)
V_AIR = np.round(np.arange(0.0, 1.6 + 1e-9, 0.1), 2)        # m/s
RH = np.arange(40, 60 + 1e-9, 2)                            # %
T_MONTHLY = np.round(np.arange(10.0, 45.0 + 1e-9, 0.5), 1)  # °C
SEASONS = ["Summer", "Winter", "Spring", "Autumn"]          # must match training categories

# Action space (setpoints)
T_SETPOINTS = np.round(np.arange(15.0, 27.0 + 1e-9, 0.5), 1)

# Greedy controller target (neutral comfort)
PMV_TARGET = 0.0

RANDOM_SEED = 7



In [25]:
# -----------------------------
# 1) Load model & preprocessor
# -----------------------------
def load_artifacts(model_path: str = MODEL_PATH, scaler_path: str = SCALER_PATH):
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Missing model file: {model_path}")
    if not os.path.exists(scaler_path):
        raise FileNotFoundError(f"Missing preprocessor file: {scaler_path}")

    model = keras.models.load_model(model_path)
    preprocessor = joblib.load(scaler_path)
    print(f"[info] Loaded model: {model_path}")
    print(f"[info] Loaded preprocessor: {scaler_path}")
    return model, preprocessor

In [26]:
# -----------------------------
# 2) Row builder & prediction using sklearn preprocessor
# -----------------------------
def make_row(state: Dict[str, float | str], t_set: float) -> pd.DataFrame:
    """Return a 1-row DataFrame with the exact column names used at training time.
    Required columns in training X:
      Numeric:   Clo, Met, Air temperature (C), Relative humidity (%), Air velocity (m/s), Outdoor monthly air temperature (C)
      Categorical: Season, Building type, Climate
    """
    row = {
        "Clo": FIXED["Clo"],
        "Met": FIXED["Met"],
        "Air temperature (C)": float(t_set),
        "Relative humidity (%)": float(state["rh"]),
        "Air velocity (m/s)": float(state["v_air"]),
        "Outdoor monthly air temperature (C)": float(state["t_monthly"]),
        "Season": str(state["season"]),
        "Building type": FIXED["Building type"],
        "Climate": FIXED["Climate"],
    }
    return pd.DataFrame([row])


def predict_pmv(model, preprocessor, state: Dict[str, float | str], t_set: float) -> float:
    df = make_row(state, t_set)
    X = preprocessor.transform(df)   # exact same pipeline as training (scaling + one-hot)
    y = model.predict(X, verbose=0)
    return float(y.ravel()[0])


In [27]:
# -----------------------------
# 3) Greedy controller (PMV* = 0)
# -----------------------------
def greedy_action(model, preprocessor, state: Dict[str, float | str], target_pmv: float = PMV_TARGET) -> float:
    best_a, best_err = None, 1e9
    for a in T_SETPOINTS:
        pmv_hat = predict_pmv(model, preprocessor, state, a)
        err = abs(pmv_hat - target_pmv)
        if err < best_err:
            best_err, best_a = err, float(a)
    return best_a



In [28]:
# -----------------------------
# 4) Quick evaluation utilities (Greedy only)
# -----------------------------
def random_states(n: int = 1000, seed: int = RANDOM_SEED):
    rng = np.random.default_rng(seed)
    for _ in range(n):
        yield {
            "v_air": float(rng.choice(V_AIR)),
            "rh": float(rng.choice(RH)),
            "t_monthly": float(rng.choice(T_MONTHLY)),
            "season": str(rng.choice(SEASONS)),
        }


def evaluate_greedy(model, preprocessor, n_samples: int = 1000, pmv_target: float = PMV_TARGET, seed: int = RANDOM_SEED) -> dict:
    total = 0
    mae = 0.0
    within03 = 0
    within05 = 0
    for st in random_states(n_samples, seed):
        a = greedy_action(model, preprocessor, st, pmv_target)
        pmv_hat = predict_pmv(model, preprocessor, st, a)
        e = abs(pmv_hat - pmv_target)
        mae += e
        within03 += int(e <= 0.3)
        within05 += int(e <= 0.5)
        total += 1
    return {
        "MAE": float(mae / total),
        "Pct_within_±0.3": float(100.0 * within03 / total),
        "Pct_within_±0.5": float(100.0 * within05 / total),
        "N": int(total),
    }


In [29]:
# -----------------------------
# 5) Main demo
# -----------------------------
def main():
    model, preprocessor = load_artifacts(MODEL_PATH, SCALER_PATH)

    # quick demo on one state
    demo_state = {"v_air": 0.3, "rh": 50.0, "t_monthly": 30.0, "season": "Summer"}
    best_a = greedy_action(model, preprocessor, demo_state, PMV_TARGET)
    pmv_demo = predict_pmv(model, preprocessor, demo_state, best_a)
    print(f"[demo] Greedy (PMV*=0) → T_set={best_a:.1f}°C, PMV_hat={pmv_demo:+.3f}")

    # optional: quick randomized evaluation (uncomment to run)
    # metrics = evaluate_greedy(model, preprocessor, n_samples=1000, pmv_target=0.0)
    # print(json.dumps({"Greedy_PM V*=0": metrics}, indent=2))


if __name__ == "__main__":
    main()


TypeError: Could not locate function 'coeff_determination'. Make sure custom classes are decorated with `@keras.saving.register_keras_serializable()`. Full object config: {'module': 'builtins', 'class_name': 'function', 'config': 'coeff_determination', 'registered_name': 'function'}

In [30]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Simple DNN-Environment Greedy PMV Controller (PMV* = 0)\n\nThis script mirrors your earlier Fanger-based flow but swaps the environment for your\ntrained DNN and uses your saved sklearn preprocessor (preprocessor.pkl).\n\nRequirements (install if needed):\n  pip install tensorflow pandas numpy scikit-learn joblib\n\nPlace these files in the same folder:\n  - pmv_prediction_dnn.keras        (your saved Keras model)\n  - preprocessor.pkl                (joblib dump of your ColumnTransformer)\n\nRun:\n  python dnn_controller_simple.py\n"""
from __future__ import annotations

import os
import json
import joblib
import numpy as np
import pandas as pd
from typing import Dict
from dataclasses import dataclass
from tensorflow import keras

# -----------------------------
# 0) Config
# -----------------------------
MODEL_PATH = "pmv_prediction_dnn.keras"
SCALER_PATH = "preprocessor.pkl"

# Fixed values (edit if needed)
FIXED = {
    "Met": 1.2,
    "Clo": 0.5,
    "Building type": "Office",
    "Climate": "Hot-summer Mediterranean",
}

# Discrete ranges (match your manuscript)
V_AIR = np.round(np.arange(0.0, 1.6 + 1e-9, 0.1), 2)       # m/s
RH = np.arange(40, 60 + 1e-9, 2)                            # %
T_MONTHLY = np.round(np.arange(10.0, 45.0 + 1e-9, 0.5), 1)  # °C
SEASONS = ["Summer", "Winter", "Spring", "Autumn"]          # must match training categories

# Action space (setpoints)
T_SETPOINTS = np.round(np.arange(15.0, 27.0 + 1e-9, 0.5), 1)

# Greedy controller target (neutral comfort)
PMV_TARGET = 0.0

RANDOM_SEED = 7


# -----------------------------
# 1) Load model & preprocessor
# -----------------------------
from tensorflow.keras import backend as K

def coeff_determination(y_true, y_pred):
    """Custom R^2 metric used when you saved the model.
    We register it here so Keras can deserialize your .keras file.
    """
    SS_res = K.sum(K.square(y_true - y_pred))
    SS_tot = K.sum(K.square(y_true - K.mean(y_true)))
    return 1 - SS_res / (SS_tot + K.epsilon())

def load_artifacts(model_path: str = MODEL_PATH, scaler_path: str = SCALER_PATH):
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Missing model file: {model_path}")
    if not os.path.exists(scaler_path):
        raise FileNotFoundError(f"Missing preprocessor file: {scaler_path}")

    # Load without recompiling; provide custom_objects for the saved metric.
    try:
        model = keras.models.load_model(
            model_path,
            custom_objects={"coeff_determination": coeff_determination},
            compile=False,
        )
    except TypeError:
        # Older/newer Keras variants: try safe_mode=False fallback.
        model = keras.models.load_model(
            model_path,
            custom_objects={"coeff_determination": coeff_determination},
            compile=False,
            safe_mode=False,
        )

    preprocessor = joblib.load(scaler_path)
    print(f"[info] Loaded model: {model_path}")
    print(f"[info] Loaded preprocessor: {scaler_path}")
    return model, preprocessor

# -----------------------------
# 2) Row builder & prediction using sklearn preprocessor
# -----------------------------
def make_row(state: Dict[str, float | str], t_set: float) -> pd.DataFrame:
    """Return a 1-row DataFrame with the exact column names used at training time.
    Required columns in training X:
      Numeric:   Clo, Met, Air temperature (C), Relative humidity (%), Air velocity (m/s), Outdoor monthly air temperature (C)
      Categorical: Season, Building type, Climate
    """
    row = {
        "Clo": FIXED["Clo"],
        "Met": FIXED["Met"],
        "Air temperature (C)": float(t_set),
        "Relative humidity (%)": float(state["rh"]),
        "Air velocity (m/s)": float(state["v_air"]),
        "Outdoor monthly air temperature (C)": float(state["t_monthly"]),
        "Season": str(state["season"]),
        "Building type": FIXED["Building type"],
        "Climate": FIXED["Climate"],
    }
    return pd.DataFrame([row])


def predict_pmv(model, preprocessor, state: Dict[str, float | str], t_set: float) -> float:
    df = make_row(state, t_set)
    X = preprocessor.transform(df)   # exact same pipeline as training (scaling + one-hot)
    y = model.predict(X, verbose=0)
    return float(y.ravel()[0])


# -----------------------------
# 3) Greedy controller (PMV* = 0)
# -----------------------------
def greedy_action(model, preprocessor, state: Dict[str, float | str], target_pmv: float = PMV_TARGET) -> float:
    best_a, best_err = None, 1e9
    for a in T_SETPOINTS:
        pmv_hat = predict_pmv(model, preprocessor, state, a)
        err = abs(pmv_hat - target_pmv)
        if err < best_err:
            best_err, best_a = err, float(a)
    return best_a


# -----------------------------
# 4) Quick evaluation utilities (Greedy only)
# -----------------------------
def random_states(n: int = 1000, seed: int = RANDOM_SEED):
    rng = np.random.default_rng(seed)
    for _ in range(n):
        yield {
            "v_air": float(rng.choice(V_AIR)),
            "rh": float(rng.choice(RH)),
            "t_monthly": float(rng.choice(T_MONTHLY)),
            "season": str(rng.choice(SEASONS)),
        }


def evaluate_greedy(model, preprocessor, n_samples: int = 1000, pmv_target: float = PMV_TARGET, seed: int = RANDOM_SEED) -> dict:
    total = 0
    mae = 0.0
    within03 = 0
    within05 = 0
    for st in random_states(n_samples, seed):
        a = greedy_action(model, preprocessor, st, pmv_target)
        pmv_hat = predict_pmv(model, preprocessor, st, a)
        e = abs(pmv_hat - pmv_target)
        mae += e
        within03 += int(e <= 0.3)
        within05 += int(e <= 0.5)
        total += 1
    return {
        "MAE": float(mae / total),
        "Pct_within_±0.3": float(100.0 * within03 / total),
        "Pct_within_±0.5": float(100.0 * within05 / total),
        "N": int(total),
    }


# -----------------------------
# 5) Main demo
# -----------------------------
def main():
    model, preprocessor = load_artifacts(MODEL_PATH, SCALER_PATH)

    # quick demo on one state
    demo_state = {"v_air": 0.3, "rh": 50.0, "t_monthly": 30.0, "season": "Summer"}
    best_a = greedy_action(model, preprocessor, demo_state, PMV_TARGET)
    pmv_demo = predict_pmv(model, preprocessor, demo_state, best_a)
    print(f"[demo] Greedy (PMV*=0) → T_set={best_a:.1f}°C, PMV_hat={pmv_demo:+.3f}")

    # optional: quick randomized evaluation (uncomment to run)
    # metrics = evaluate_greedy(model, preprocessor, n_samples=1000, pmv_target=0.0)
    # print(json.dumps({"Greedy_PM V*=0": metrics}, indent=2))


if __name__ == "__main__":
    main()


[info] Loaded model: pmv_prediction_dnn.keras
[info] Loaded preprocessor: preprocessor.pkl
[demo] Greedy (PMV*=0) → T_set=26.0°C, PMV_hat=+0.047


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Generate full optimal-setpoint dataset (CSV) and plot utilities
==============================================================

This script uses your trained Keras DNN (pmv_prediction_dnn.keras) and your
saved sklearn ColumnTransformer (preprocessor.pkl) to:

1) Enumerate **every state** on your grid (v_air, rh, t_monthly, season),
   find the **greedy-optimal** AC setpoint (15.0–27.0 °C, step 0.5) for
   target PMV*=0, and write a **CSV** with:
      v_air, rh, t_monthly, season, optimal_t_set, pmv_hat

2) Provide plotting helpers:
   - Reward vs Air Temperature for **multiple arbitrary states** (like your fig 4)
   - All **six** 3D pair plots across the 4 changeable features, fixing the
     other two at given values (or their medians).

Usage (edit paths below if needed):
    python generate_optimal_setpoints_and_plots.py --make-csv
    python generate_optimal_setpoints_and_plots.py --plots
    python generate_optimal_setpoints_and_plots.py --make-csv --plots

If you're running inside a notebook, you can `import` this file and call the
functions directly.
"""

In [31]:
from __future__ import annotations

import os
import json
import math
import argparse
from itertools import product, combinations
from typing import Dict, List, Tuple, Iterable, Optional

import numpy as np
import pandas as pd
import joblib
from tensorflow import keras
from tensorflow.keras import backend as K
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401 (needed for 3D projection)

In [32]:
# -----------------------------
# 0) Config
# -----------------------------
MODEL_PATH = "pmv_prediction_dnn.keras"
SCALER_PATH = "preprocessor.pkl"

OUTPUT_CSV = "optimal_setpoints_dataset.csv"

# Fixed values (edit if needed)
FIXED = {
    "Met": 1.2,
    "Clo": 0.5,
    "Building type": "Office",
    "Climate": "Hot-summer Mediterranean",
}

# Discrete grids (match manuscript)
V_AIR = np.round(np.arange(0.0, 1.6 + 1e-9, 0.1), 2)       # 0.0..1.6 step 0.1 (17)
RH = np.arange(40, 60 + 1e-9, 2)                            # 40..60 step 2   (11)
T_MONTHLY = np.round(np.arange(10.0, 45.0 + 1e-9, 0.5), 1)  # 10..45 step 0.5 (71)
SEASONS = ["Summer", "Winter", "Spring", "Autumn"]          # (4)

# Action space (setpoints)
T_SETPOINTS = np.round(np.arange(15.0, 27.0 + 1e-9, 0.5), 1)  # 25 actions

# Target PMV
PMV_TARGET = 0.0

# Reproducibility
RANDOM_SEED = 7

In [33]:
# -----------------------------
# 1) Load model & preprocessor (handles custom metric)
# -----------------------------
def coeff_determination(y_true, y_pred):
    SS_res = K.sum(K.square(y_true - y_pred))
    SS_tot = K.sum(K.square(y_true - K.mean(y_true)))
    return 1 - SS_res / (SS_tot + K.epsilon())


def load_artifacts(model_path: str = MODEL_PATH, scaler_path: str = SCALER_PATH):
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Missing model file: {model_path}")
    if not os.path.exists(scaler_path):
        raise FileNotFoundError(f"Missing preprocessor file: {scaler_path}")

    try:
        model = keras.models.load_model(
            model_path,
            custom_objects={"coeff_determination": coeff_determination},
            compile=False,
        )
    except TypeError:
        model = keras.models.load_model(
            model_path,
            custom_objects={"coeff_determination": coeff_determination},
            compile=False,
            safe_mode=False,
        )

    preprocessor = joblib.load(scaler_path)
    print(f"[info] Loaded model: {model_path}")
    print(f"[info] Loaded preprocessor: {scaler_path}")
    return model, preprocessor


In [34]:
# -----------------------------
# 2) State → DataFrame row and PMV prediction
# -----------------------------
def make_row(state: Dict[str, float | str], t_set: float) -> pd.DataFrame:
    row = {
        "Clo": FIXED["Clo"],
        "Met": FIXED["Met"],
        "Air temperature (C)": float(t_set),
        "Relative humidity (%)": float(state["rh"]),
        "Air velocity (m/s)": float(state["v_air"]),
        "Outdoor monthly air temperature (C)": float(state["t_monthly"]),
        "Season": str(state["season"]),
        "Building type": FIXED["Building type"],
        "Climate": FIXED["Climate"],
    }
    return pd.DataFrame([row])


def predict_pmv(model, preprocessor, state: Dict[str, float | str], t_set: float) -> float:
    df = make_row(state, t_set)
    X = preprocessor.transform(df)
    y = model.predict(X, verbose=0)
    return float(y.ravel()[0])


In [35]:
# -----------------------------
# 3) Greedy policy (PMV*=0)
# -----------------------------
def greedy_action(model, preprocessor, state: Dict[str, float | str], target_pmv: float = PMV_TARGET) -> Tuple[float, float]:
    """Return (best_t_set, best_pmv_hat) for the given state."""
    best_a, best_err, best_pmv = None, 1e9, None
    for a in T_SETPOINTS:
        pmv_hat = predict_pmv(model, preprocessor, state, a)
        err = abs(pmv_hat - target_pmv)
        if err < best_err:
            best_err, best_a, best_pmv = err, float(a), float(pmv_hat)
    return best_a, best_pmv


In [36]:
# -----------------------------
# 4) Dataset generation (full grid → CSV)
# -----------------------------
def iter_states() -> Iterable[Dict[str, float | str]]:
    for v in V_AIR:
        for h in RH:
            for tm in T_MONTHLY:
                for s in SEASONS:
                    yield {"v_air": float(v), "rh": float(h), "t_monthly": float(tm), "season": s}


def generate_optimal_csv(output_csv: str = OUTPUT_CSV,
                         pmv_target: float = PMV_TARGET,
                         verbose_every: int = 5000) -> None:
    model, preprocessor = load_artifacts()

    rows = []
    count = 0
    for st in iter_states():
        best_a, best_pmv = greedy_action(model, preprocessor, st, pmv_target)
        rows.append({
            "v_air": st["v_air"],
            "rh": st["rh"],
            "t_monthly": st["t_monthly"],
            "season": st["season"],
            "optimal_t_set": best_a,
            "pmv_hat": best_pmv,
        })
        count += 1
        if verbose_every and count % verbose_every == 0:
            print(f"[progress] processed {count} states...")

    df = pd.DataFrame(rows)
    df.to_csv(output_csv, index=False)
    print(f"[done] Wrote {len(df):,} rows to {output_csv}")


In [37]:
# -----------------------------
# 5) Plots
# -----------------------------
# (A) Reward vs Air Temperature (multi-line), like your Fig. 4

def reward_from_pmv(pmv: float, target: float = PMV_TARGET, use_squared: bool = False) -> float:
    """Return a simple reward: -|error| or -error^2 (set use_squared=True for squared)."""
    e = pmv - target
    return -(e*e) if use_squared else -abs(e)


def plot_reward_vs_temp(states: List[Dict[str, float | str]],
                        pmv_target: float = PMV_TARGET,
                        use_squared: bool = False,
                        show: bool = True) -> None:
    """Plot reward curves across T_SETPOINTS for *any* number of states you pass in.
    Each state is a dict: {v_air, rh, t_monthly, season}.
    """
    model, preprocessor = load_artifacts()

    plt.figure(figsize=(10, 5))
    for st in states:
        rewards = []
        for a in T_SETPOINTS:
            pmv_hat = predict_pmv(model, preprocessor, st, a)
            r = reward_from_pmv(pmv_hat, pmv_target, use_squared)
            rewards.append(r)
        label = f"Tm={st['t_monthly']}°C, RH={st['rh']}%, AV={st['v_air']} m/s, {st['season']}"
        plt.plot(T_SETPOINTS, rewards, marker="o", label=label)

    plt.axvline(15.0, linestyle="--")
    plt.axvline(27.0, linestyle="--")
    plt.xlabel("Air Temperature (°C)")
    plt.ylabel("Reward (−|PMV−PMV*|)" if not use_squared else "Reward (−(PMV−PMV*)²)")
    plt.title("Reward vs Air Temperature (multiple states)")
    plt.legend(loc="best")
    plt.grid(True, alpha=0.3)
    if show:
        plt.show()


# (B) 3D pair plots (all six pairs)

PAIR_FEATURES = [
    ("v_air", "rh"),
    ("v_air", "t_monthly"),
    ("v_air", "season"),
    ("rh", "t_monthly"),
    ("rh", "season"),
    ("t_monthly", "season"),
]

FEATURE_GRID = {
    "v_air": V_AIR,
    "rh": RH,
    "t_monthly": T_MONTHLY,
    "season": SEASONS,
}

# For seasons, map to numeric positions for 3D ; keep tick labels.
SEASON_TO_NUM = {s: i for i, s in enumerate(SEASONS)}
NUM_TO_SEASON = {i: s for s, i in SEASON_TO_NUM.items()}


def default_fixed_values():
    """Return default fixed values (medians) for features not on axes."""
    return {
        "v_air": float(np.median(V_AIR)),
        "rh": float(np.median(RH)),
        "t_monthly": float(np.median(T_MONTHLY)),
        "season": SEASONS[0],  # you can change to a different default
    }


def build_grid_for_pair(fx: str, fy: str, fixed: Optional[Dict[str, float | str]] = None) -> List[Dict[str, float | str]]:
    if fixed is None:
        fixed = default_fixed_values()

    Xs = FEATURE_GRID[fx]
    Ys = FEATURE_GRID[fy]

    states = []
    for x in Xs:
        for y in Ys:
            st = {
                "v_air": fixed["v_air"],
                "rh": fixed["rh"],
                "t_monthly": fixed["t_monthly"],
                "season": fixed["season"],
            }
            st[fx] = float(x) if fx != "season" else x
            st[fy] = float(y) if fy != "season" else y
            states.append(st)
    return states


def plot_3d_pair(fx: str, fy: str, fixed: Optional[Dict[str, float | str]] = None,
                 pmv_target: float = PMV_TARGET, show: bool = True) -> None:
    """3D scatter: x=fx, y=fy, z=optimal_t_set; color=|PMV|."""
    model, preprocessor = load_artifacts()

    states = build_grid_for_pair(fx, fy, fixed)
    xs, ys, zs, cs = [], [], [], []

    for st in states:
        best_a, best_pmv = greedy_action(model, preprocessor, st, pmv_target)
        # map season to numeric
        x = st[fx] if fx != "season" else SEASON_TO_NUM[st[fx]]
        y = st[fy] if fy != "season" else SEASON_TO_NUM[st[fy]]
        xs.append(x)
        ys.append(y)
        zs.append(best_a)
        cs.append(abs(best_pmv - pmv_target))

    fig = plt.figure(figsize=(7, 6))
    ax = fig.add_subplot(111, projection="3d")
    sc = ax.scatter(xs, ys, zs, c=cs, cmap="viridis")
    cb = fig.colorbar(sc, pad=0.1)
    cb.set_label("|PMV|")

    ax.set_xlabel(axis_label(fx))
    ax.set_ylabel(axis_label(fy))
    ax.set_zlabel("Optimal Air Temp (°C)")

    # If an axis is season, set ticks/labels
    if fx == "season":
        ax.set_xticks(list(NUM_TO_SEASON.keys()))
        ax.set_xticklabels([NUM_TO_SEASON[i] for i in NUM_TO_SEASON])
    if fy == "season":
        ax.set_yticks(list(NUM_TO_SEASON.keys()))
        ax.set_yticklabels([NUM_TO_SEASON[i] for i in NUM_TO_SEASON])

    title_fixed = fixed or default_fixed_values()
    ax.set_title(f"Optimal Setpoints: {fx} vs {fy}\n(fixed: AV={title_fixed['v_air']} m/s, RH={title_fixed['rh']}%, Tm={title_fixed['t_monthly']}°C, Season={title_fixed['season']})")

    if show:
        plt.show()


def axis_label(name: str) -> str:
    return {
        "v_air": "Air Velocity (m/s)",
        "rh": "Relative Humidity (%)",
        "t_monthly": "Outdoor Monthly Air Temp (°C)",
        "season": "Season (index)",
    }[name]


def plot_all_3d_pairs(fixed: Optional[Dict[str, float | str]] = None, pmv_target: float = PMV_TARGET):
    for fx, fy in PAIR_FEATURES:
        plot_3d_pair(fx, fy, fixed, pmv_target, show=True)


In [38]:
# -----------------------------
# 6) CLI
# -----------------------------
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--make-csv", action="store_true", help="Generate full optimal-setpoint CSV")
    parser.add_argument("--plots", action="store_true", help="Show example plots (all six 3D pairs + a reward demo)")
    args = parser.parse_args()

    if args.make_csv:
        generate_optimal_csv(OUTPUT_CSV, PMV_TARGET)

    if args.plots:
        # Reward figure demo with a few states you can edit
        demo_states = [
            {"v_air": 0.0, "rh": 40.0, "t_monthly": 12.0, "season": "Winter"},
            {"v_air": 0.8, "rh": 50.0, "t_monthly": 25.0, "season": "Summer"},
            {"v_air": 1.6, "rh": 60.0, "t_monthly": 38.0, "season": "Summer"},
        ]
        plot_reward_vs_temp(demo_states, pmv_target=PMV_TARGET, use_squared=False, show=True)

        # All six 3D pairs with default fixed values (medians)
        plot_all_3d_pairs(default_fixed_values(), pmv_target=PMV_TARGET)

        print("[done] Shown plots. Close the figures to finish.")


usage: ipykernel_launcher.py [-h] [--make-csv] [--plots]
ipykernel_launcher.py: error: unrecognized arguments: --f=c:\Users\user\AppData\Roaming\jupyter\runtime\kernel-v36f292455ffd27cfc7124414106ac943fd1189816.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
