In [8]:
import pandas as pd              # Imports the Pandas library as 'pd', commonly used for data manipulation and analysis.
import numpy as np               # Imports the NumPy library as 'np', which is used for numerical operations and working with arrays.
import torch                     # Imports the PyTorch library, which is used for building and training deep learning models.
import torch.nn as nn            # Imports the neural network module from PyTorch as 'nn', which contains tools for creating neural networks.
import torch.nn.functional as F  # Imports the functional module from PyTorch, often used for defining operations in neural networks such as activations (e.g., ReLU) and loss functions.


In [9]:
# Define constants for the experiment
RE = "Solar_PBE"  # Experiment name or identifier
address = "../data/"  # Directory path where the CSV files are located

# Load training data from two CSV files, using the first column as the index
data_train_csv1 = pd.read_csv(address + RE + '_16.csv', index_col=0)
data_train_csv2 = pd.read_csv(address + RE + '_17.csv', index_col=0)

# Concatenate the two training datasets into a single DataFrame
data_train_csv = pd.concat([data_train_csv1, data_train_csv2])

# Load validation and test datasets from their respective CSV files
data_val_csv = pd.read_csv(address + RE + '_18.csv', index_col=0)
data_test_csv = pd.read_csv(address + RE + '_19.csv', index_col=0)

# Load price data for imbalance from a separate CSV file
data_price = pd.read_csv(address + 'Price_Elia_Imbalance_16_19.csv', index_col=0)

# Assign the price data to the training DataFrame, matching lengths
data_train_csv['Price(€)'] = data_price['Positive imbalance price'][:len(data_train_csv)]

# Assign the price data to the validation DataFrame
data_val_csv['Price(€)'] = data_price['Positive imbalance price'][len(data_train_csv):len(data_train_csv) + len(data_val_csv)]

# Assign the price data to the test DataFrame
data_test_csv['Price(€)'] = data_price['Positive imbalance price'][len(data_train_csv) + len(data_val_csv):]


In [10]:
# Data Preprocessing

# Define the battery size in per unit (p.u.)
Battery_Size = 0.15  # Battery capacity relative to the total capacity

# Define the time unit for data aggregation (15 minutes in this case)
unit = 1  # unit: 15 minute

# Get the maximum renewable energy capacity from the training dataset
RE_Capacity1 = max(data_train_csv['Power(MW)'])

# Get the maximum renewable energy capacity from the validation dataset
RE_Capacity2 = max(data_val_csv['Power(MW)'])

# Get the maximum renewable energy capacity from the test dataset
RE_Capacity3 = max(data_test_csv['Power(MW)'])

# Get the maximum marginal incremental price from the price data
max_price = max(data_price['Marginal incremental price'])

# Calculate the number of time units (15-minute intervals) in the training dataset
size_train0 = int(len(data_train_csv) / unit)

# Calculate the number of time units (15-minute intervals) in the validation dataset
size_val0 = int(len(data_val_csv) / unit)

# Calculate the number of time units (15-minute intervals) in the test dataset
size_test0 = int(len(data_test_csv) / unit)

# Initialize lists to store normalized power and price data for training
data_train0 = []  # Temporary list to hold normalized power data
data_train = []   # Final list to hold valid normalized power data
price_train0 = [] # Temporary list to hold normalized price data
price_train = []  # Final list to hold valid normalized price data

# Loop through each time unit in the training dataset
for i in range(size_train0):
    # Calculate and normalize the mean power for the current time unit, then round to 3 decimal places
    data_train0 += [round(pd.Series.mean(data_train_csv['Power(MW)'][i * unit:(i + 1) * unit]) / RE_Capacity1, 3)]
    
    # Calculate and normalize the mean price for the current time unit, then round to 3 decimal places
    price_train0 += [round(pd.Series.mean(data_train_csv['Price(€)'][i * unit:(i + 1) * unit]) / max_price, 3)]
    
    # If the normalized power is greater than 0, add it to the final lists
    if data_train0[i] > 0: 
        data_train += [data_train0[i]]  # Add valid normalized power to the final list
        price_train += [price_train0[i]]  # Add corresponding valid price to the final list

# Initialize lists to store normalized power and price data for validation
data_val0 = []  # Temporary list for normalized power data
data_val = []   # Final list for valid normalized power data
price_val0 = [] # Temporary list for normalized price data
price_val = []  # Final list for valid normalized price data

# Loop through each time unit in the validation dataset
for i in range(size_val0):
    # Calculate and normalize the mean power for the current time unit, then round to 3 decimal places
    data_val0 += [round(pd.Series.mean(data_val_csv['Power(MW)'][i * unit:(i + 1) * unit]) / RE_Capacity2, 3)]
    
    # Calculate and normalize the mean price for the current time unit, then round to 3 decimal places
    price_val0 += [round(pd.Series.mean(data_val_csv['Price(€)'][i * unit:(i + 1) * unit]) / max_price, 3)]
    
    # If the normalized power is greater than 0, add it to the final lists
    if data_val0[i] > 0: 
        data_val += [data_val0[i]]  # Add valid normalized power to the final list
        price_val += [price_val0[i]]  # Add corresponding valid price to the final list

# Initialize lists to store normalized power and price data for testing
data_test0 = []  # Temporary list for normalized power data
data_test = []   # Final list for valid normalized power data
price_test0 = [] # Temporary list for normalized price data
price_test = []  # Final list for valid normalized price data

# Loop through each time unit in the test dataset
for i in range(size_test0):
    # Calculate and normalize the mean power for the current time unit, then round to 3 decimal places
    data_test0 += [round(pd.Series.mean(data_test_csv['Power(MW)'][i * unit:(i + 1) * unit]) / RE_Capacity3, 3)]
    
    # Calculate and normalize the mean price for the current time unit, then round to 3 decimal places
    price_test0 += [round(pd.Series.mean(data_test_csv['Price(€)'][i * unit:(i + 1) * unit]) / max_price, 3)]
    
    # If the normalized power is greater than 0, add it to the final lists
    if data_test0[i] > 0: 
        data_test += [data_test0[i]]  # Add valid normalized power to the final list
        price_test += [price_test0[i]]  # Add corresponding valid price to the final list


In [12]:
# Import necessary libraries for building an LSTM model
import torch
import torch.nn as nn
import torch.nn.functional as F

# Define hyperparameters for the LSTM model
n_layers       = 2           # Number of LSTM layers
in_size        = 1           # Input size (dimensionality of input features)
hidden_size    = 64          # Number of features in the hidden state
out_size       = 1           # Output size (dimensionality of output features)
batch_size     = 128         # Number of samples in each batch
learning_rate  = 0.001       # Learning rate for the optimizer

# Define the LSTM class which inherits from nn.Module
class LSTM(nn.Module):
    def __init__(self):
        # Initialize the parent class
        super(LSTM, self).__init__()
        # Define a fully connected layer to transform input features to hidden size
        self.fc_in  = nn.Linear(in_size, hidden_size)
        # Define the LSTM layer with specified hidden size, number of layers, and batch_first flag
        self.rnn    = nn.LSTM(hidden_size, hidden_size, n_layers, batch_first=True)
        # Define a fully connected layer to transform hidden state to output size
        self.fc_out = nn.Linear(hidden_size, out_size)
    
    # Define the forward pass of the model
    def forward(self, x, hidden):
        # Pass input through the input layer and apply ReLU activation
        x = F.relu(self.fc_in(x))
        # Reshape the input tensor to match LSTM's expected input shape
        x = x.view(1, -1, hidden_size)  # Reshape to (batch_size, seq_length, input_size)
        # Pass the input through the LSTM layer
        x, hidden = self.rnn(x, hidden)
        # Pass the output of the LSTM through the output layer
        out = self.fc_out(x)
        # Apply ReLU activation and reshape the output for final output
        out = F.relu(out.view(-1, out_size))  # Reshape to (batch_size, output_size)
        return out, hidden  # Return the output and the hidden state
        
# Function to train the model on a batch of data
def train_net(model, batch, optimizer):
    # Unpack the batch into inputs, hidden state, and target outputs
    x, h, y = batch[0], batch[1], batch[2]
    # Calculate the mean squared error loss between model predictions and true targets
    loss = F.mse_loss(model.forward(x, h)[0], y)
    # Zero the gradients for the optimizer before backpropagation
    optimizer.zero_grad()
    # Compute gradients using backpropagation
    loss.backward()
    # Update the model parameters using the optimizer
    optimizer.step()


In [6]:
# Define training parameters
total_epoch = 100  # Total number of epochs for training
print_interval = 1  # Print performance metrics every 'print_interval' epochs

# Initialize LSTM model
model = LSTM()

# Get the sizes of training, validation, and test datasets
size_train = len(data_train)
size_val = len(data_val)
size_test = len(data_test)

# Prepare training data
train_input = np.zeros((size_train - 1, 1))  # Input for training
train_output = np.zeros((size_train - 1, 1))  # Output for training
for i in range(size_train - 1):
    train_input[i, :] = data_train[i]  # Assign current value to input
    train_output[i, :] = data_train[i + 1]  # Assign next value to output

# Prepare validation data
val_input = np.zeros((size_val - 1, 1))  # Input for validation
val_output = np.zeros((size_val - 1, 1))  # Output for validation
for i in range(size_val - 1):
    val_input[i, :] = data_val[i]  # Assign current value to input
    val_output[i, :] = data_val[i + 1]  # Assign next value to output

# Prepare test data
test_input = np.zeros((size_test - 1, 1))  # Input for testing
test_output = np.zeros((size_test - 1, 1))  # Output for testing
for i in range(size_test - 1):
    test_input[i, :] = data_test[i]  # Assign current value to input
    test_output[i, :] = data_test[i + 1]  # Assign next value to output

# Calculate total number of batches for training
total_batch = int((size_train - 1) / batch_size) + 1

# Initialize lists to store predicted values and Mean Absolute Error (MAE)
pred_train, pred_val, pred_test = [], [], []  # Predicted Values
mae_train, mae_val, mae_test = [], [], []  # Mean Absolute Error

# Initialize hidden state for LSTM
hidden = (torch.zeros([n_layers, 1, hidden_size], dtype=torch.float), 
          torch.zeros([n_layers, 1, hidden_size], dtype=torch.float))

# Initialize optimizer for the model
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(total_epoch):
    for i in range(total_batch):
        # Get the current batch of training data
        batch_x = torch.tensor(train_input[batch_size * i:batch_size * (i + 1), :], dtype=torch.float)
        batch_y = torch.tensor(train_output[batch_size * i:batch_size * (i + 1), :], dtype=torch.float)
        
        # Combine batch inputs and hidden state
        batch = [batch_x, hidden, batch_y]
        
        # Train the model on the current batch
        train_net(model, batch, optimizer)
        
        # Forward pass through the model
        _, hidden = model.forward(batch_x, hidden)
        hidden = (hidden[0].detach(), hidden[1].detach())  # Detach hidden state to prevent backpropagation through the entire history

    # Reset hidden state for next epoch
    hidden = (torch.zeros([n_layers, 1, hidden_size], dtype=torch.float), 
              torch.zeros([n_layers, 1, hidden_size], dtype=torch.float))

    # Print performance metrics at specified intervals
    if epoch == 0 or (epoch + 1) % print_interval == 0:
        # Train predictions
        train_predict = model.forward(torch.tensor(train_input, dtype=torch.float), hidden)[0].detach().numpy()
        pred_train += [list(train_predict.flatten())]
        mae_train += [list(np.abs(train_predict - train_output).flatten())]
        
        # Validation predictions
        val_predict = model.forward(torch.tensor(val_input, dtype=torch.float), hidden)[0].detach().numpy()
        pred_val += [list(val_predict.flatten())]
        mae_val += [list(np.abs(val_predict - val_output).flatten())]
        
        # Test predictions
        test_predict = model.forward(torch.tensor(test_input, dtype=torch.float), hidden)[0].detach().numpy()
        pred_test += [list(test_predict.flatten())]
        mae_test += [list(np.abs(test_predict - test_output).flatten())]

        # Calculate and print MAE for train, val, and test sets
        MAE_train = round(100 * np.mean(mae_train[-1]), 2)
        MAE_val = round(100 * np.mean(mae_val[-1]), 2)
        MAE_test = round(100 * np.mean(mae_test[-1]), 2)

        print("epoch: {}".format(epoch + 1))
        print("MAE_train: {}%".format(MAE_train).ljust(25), end="")
        print("MAE_val: {}%".format(MAE_val).ljust(25), end="")
        print("MAE_test: {}%".format(MAE_test).ljust(25))
        print("------------------------------------------------------------------------------------------")


epoch: 1
MAE_train: 27.78%        MAE_val: 31.39%          MAE_test: 27.96%         
------------------------------------------------------------------------------------------
epoch: 2
MAE_train: 27.78%        MAE_val: 31.39%          MAE_test: 27.96%         
------------------------------------------------------------------------------------------
epoch: 3
MAE_train: 27.78%        MAE_val: 31.39%          MAE_test: 27.96%         
------------------------------------------------------------------------------------------
epoch: 4
MAE_train: 27.78%        MAE_val: 31.39%          MAE_test: 27.96%         
------------------------------------------------------------------------------------------
epoch: 5
MAE_train: 27.78%        MAE_val: 31.39%          MAE_test: 27.96%         
------------------------------------------------------------------------------------------
epoch: 6
MAE_train: 27.78%        MAE_val: 31.39%          MAE_test: 27.96%         
-----------------------------------

In [7]:
# Environment parameters for the battery management system
E_max = Battery_Size  # Maximum energy capacity of the battery
P_max = E_max  # Maximum power capacity of the battery
tdelta = unit / 4  # Time interval for the simulation
soc_min = 0.1  # Minimum state of charge (SoC)
soc_max = 0.9  # Maximum state of charge (SoC)

# Coefficients for Voc, Rs, Rts, Rtl calculations based on state of charge
a0, a1, a2, a3, a4, a5 = -1.031, 35, 3.685, 0.2156, 0.1178, 0.3201
b0, b1, b2, b3, b4, b5 = 0.1463, 30.27, 0.1037, 0.0584, 0.1747, 0.1288
c0, c1, c2 = 0.1063, 62.49, 0.0437
d0, d1, d2 = 0.0712, 61.4, 0.0288

# Calculate number of cells based on battery capacity
N = 130 * 215 * E_max / 0.1

# Beta parameter calculation for reward shaping
beta = 10 / max_price

# Select the best model based on validation MAE
select_num = np.argmin(np.mean(mae_val, axis=1))  # Get index of model with lowest MAE
select_train = np.array(pred_train[select_num][:])  # Selected training predictions
select_val = np.array(pred_val[select_num][:])  # Selected validation predictions
select_test = np.array(pred_test[select_num][:])  # Selected test predictions
select_test_real = np.array(data_test[1:])  # Real test values (without the first value)
select_test_price = np.array(price_test[1:])  # Real test prices (without the first value)

# Initial state of charge for the battery
E = E_max / 2

# Lists to store Mean Bias Error (MBE), rewards, and information for each iteration
mbe = []  # Mean Bias Error
reward = []  # Rewards
info = []  # Information for logging

# Iterate through the selected test data
for i in range(len(select_test)):
    bid = select_test[i]  # Predicted bid price
    gen = select_test_real[i]  # Real generation price
    rat = 0  # Rate adjustment factor
    imb = select_test_price[i]  # Imbalance price

    # Calculate state of charge and associated parameters
    soc = E / E_max  # State of charge
    Voc = a0 * np.exp(-a1 * soc) + a2 + a3 * soc - a4 * soc**2 + a5 * soc**3  # Open-circuit voltage
    Rs = b0 * np.exp(-b1 * soc) + b2 + b3 * soc - b4 * soc**2 + b5 * soc**3  # Series resistance
    Rts = c0 * np.exp(-c1 * soc) + c2  # Total resistance for charging
    Rtl = d0 * np.exp(-d1 * soc) + d2  # Total resistance for discharging
    R = Rs + Rts + Rtl  # Total resistance

    # Calculate maximum charging and discharging current based on state of charge
    I_cmax = 1000000 * E_max * (soc_max - soc) / N / (Voc * tdelta)
    I_dmax = 1000000 * E_max * (soc - soc_min) / N / (Voc * tdelta)

    # Calculate maximum charging and discharging power
    p_cmax = N * (Voc * I_cmax + I_cmax**2 * R)
    p_dmax = N * (Voc * I_dmax - I_dmax**2 * R)

    # Convert power to megawatts
    P_cmax = p_cmax / 1000000
    P_dmax = p_dmax / 1000000

    # Determine actual charging and discharging power based on generation and bid
    P_c = min(max(rat * (gen - bid), 0), P_max, P_cmax)
    P_d = min(max(rat * (bid - gen), 0), P_max, P_dmax)
    p_c = 1000000 * P_c / N
    p_d = 1000000 * P_d / N

    # Calculate currents for charging and discharging
    I_c = -(Voc - np.sqrt(Voc**2 + 4 * R * p_c)) / (2 * R)  # Charging current
    I_d = (Voc - np.sqrt(Voc**2 - 4 * R * p_d)) / (2 * R)  # Discharging current

    # Charging process
    if not np.isclose(p_c, 0):  # If charging power is not zero
        eff_c = (Voc * I_c) / p_c  # Charging efficiency
        E = E + eff_c * P_c * tdelta  # Update energy state
        disp = gen - P_c  # Dispatched generation
        # Log information for charging action
        info += [[gen, round(bid, 4), 'C', round(P_c, 4), round(disp, 4), round(eff_c, 4), round(E, 4)]]

    # Discharging process
    elif not np.isclose(p_d, 0):  # If discharging power is not zero
        eff_d = p_d / (Voc * I_d)  # Discharging efficiency
        E = E - (1 / eff_d) * P_d * tdelta  # Update energy state
        disp = gen + P_d  # Dispatched generation
        # Log information for discharging action
        info += [[gen, round(bid, 4), 'D', round(P_d, 4), round(disp, 4), round(eff_d, 4), round(E, 4)]]

    # No action taken
    else:
        disp = gen  # No dispatch change
        # Log information for no action
        info += [[gen, round(bid, 4), 'N', 'N', round(disp, 4), 'N', round(E, 4)]]

    # Calculate Mean Bias Error for the bid vs dispatched generation
    mbe += [abs(bid - disp)]
    # Calculate reward based on the dispatched generation and imbalance cost
    reward += [(imb * disp - imb * abs(bid - disp) - beta * (P_c + P_d)) * tdelta]

# Calculate Mean Absolute Error and Mean Bias Error for test set
MAE_test = round(100 * np.mean(np.abs(select_test_real - select_test)), 2)
MBE_test = round(100 * np.mean(mbe), 2)

# Print evaluation metrics
print("MAE_test: {}%".format(MAE_test))
print("MBE_test: {}%".format(MBE_test))
print("REV_test: ${}".format(round(max_price * RE_Capacity3 * np.mean(reward), 3)))

# Save predictions to CSV files for analysis
pd.DataFrame(select_train).to_csv(RE + "_Model1_train.csv")
pd.DataFrame(select_val).to_csv(RE + "_Model1_val.csv")
pd.DataFrame(select_test).to_csv(RE + "_Model1_FB.csv")


MAE_test: 27.96%
MBE_test: 27.96%
REV_test: $-0.003
