In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

import torch
from torch.optim import Adam

In [None]:
df = pd.read_csv('case5.csv')
df

In [None]:
load_p_columns = [c for c in df.columns if 'load_' in c and '_p' in c]
load_q_columns = [c for c in df.columns if 'load_' in c and '_q' in c]
sgen_p_columns = [c for c in df.columns if 'sgen_' in c and '_p' in c]
ll_columns = [c for c in df.columns if 'loading' in c]

Data plots
--------

List of plots:
 - Generators and loads (entire year)
 - Generators and loads (5000 timesteps - 52 days, 5-hour sliding window)
 - Line loading (log scale -- 5000 timesteps)

In [None]:
fig,axes = plt.subplots(1,1,figsize=(16,4))
ax = axes

max_timestep_plot = 35136

ax.plot(df[load_p_columns],label=load_p_columns)
ax.plot(df[sgen_p_columns],label=sgen_p_columns)

ax.set(xlabel='timestep',ylabel='Power [MW]')
ax.legend(ncol=2);
ax.set_ylim([0,700])

In [None]:
fig,axes = plt.subplots(1,1,figsize=(16,4))
ax = axes

max_timestep_plot = 5000
wnd = 20

ax.plot(df[load_p_columns].rolling(window=wnd).mean()[:max_timestep_plot],label=load_p_columns)
ax.plot(df[sgen_p_columns].rolling(window=wnd).mean()[:max_timestep_plot],label=sgen_p_columns)

ax.set(xlabel='timestep',ylabel='Power [MW]')
ax.legend(ncol=2);
ax.set_ylim([0,700])

In [None]:
fig,axes = plt.subplots(1,1,figsize=(16,4))
ax = axes

max_timestep_plot = 5000
wnd = 20

ax.plot(df[ll_columns].rolling(window=wnd).mean()[:max_timestep_plot],label=ll_columns)

ax.set(xlabel='timestep',ylabel='Loading [%]')
ax.legend(ncol=2);
ax.set_yscale('log')
ax.set_ylim([None,150])

Create the X and Y data
----------

In [None]:
x_columns = load_p_columns + load_q_columns + sgen_p_columns
X = df[x_columns].values
X.shape

In [None]:
Y = df[ll_columns].values
Y.shape

Split into test and train datasets
----------

In [None]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
print('Train size: {} {}, test size: {} {}'.format(X_train.shape,Y_train.shape,X_test.shape,Y_test.shape))

Define the pytorch feed-forward neural network
---------

In [None]:
class FeedForwardNetwork(torch.nn.Module):
    """
    A simple torch feed-forward Neural Network
    """

    def __init__(self, input_dim, output_dim, hidden_dim):
        super(FeedForwardNetwork, self).__init__()

        self.network = torch.nn.Sequential(
            torch.nn.Linear(input_dim, hidden_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, X):
        return self.network(X)

In [None]:
class FeedForwardNetwork_Explicit(torch.nn.Module):
    """
    A simple torch feed-forward Neural Network
    """

    def __init__(self, input_dim, output_dim, hidden_dim):
        super(FeedForwardNetwork_Explicit, self).__init__()

        # simple ANN with two layers
        self.linear1 = torch.nn.Linear(input_dim, hidden_dim)
        self.linear2 = torch.nn.Linear(hidden_dim, output_dim)
        
    def forward(self, X):
        # forward pass
        X = self.linear1(X)
        X = torch.relu(X)
        X = self.linear2(X)
        return X

In [None]:
model = FeedForwardNetwork(9, 3, 6)

In [None]:
for i in model.named_parameters() :
    print(i)

In [None]:
class TorchModel(object):
    """
    Class which contains a torch ANN and some scaler etc.
    """

    def __init__(self, input_dim, output_dim, hidden_dim, lr=0.001, epochs=1500):

        self.model = FeedForwardNetwork(input_dim, output_dim, hidden_dim)
        self.optimizer = Adam(self.model.parameters(), lr=lr)

        self.epochs = epochs
        # criterion for loss evaluation (-> reduce the mean squared error loss)
        self.criterion = torch.nn.MSELoss()

        self.losses = list()
        self.x_scaler = StandardScaler()
        self.y_scaler = StandardScaler()

    def train(self, X, Y):
        
        X = torch.tensor(model.x_scaler.fit_transform(X)).float()
        Y = torch.tensor(model.y_scaler.fit_transform(Y)).float()

        for e in range(self.epochs):
            # reset the gradient
            self.optimizer.zero_grad()
            # predict
            y_pred = self.model(X)
            # check "how good" the prediction was according to the criterion (the loss function)
            loss = self.criterion(y_pred, Y)
            # update the ANN parameters
            loss.backward()
            # store the loss
            self.losses.append(loss.item())
            self.optimizer.step()
            if not e % (int(self.epochs/10)):
                print(f"ANN training epoch {e}: loss {loss.item():.4f}")

    def predict(self, X):
        # since we scaled the data for training, we have to scale the data before predictions as well
        X = torch.tensor(self.x_scaler.transform(X)).float()
        # get tensors from numpy and predict
        Y = self._predict(X)
        # reverse the transformation to get the true values (line loadings, bus voltage...)
        Y = self.y_scaler.inverse_transform(Y.detach().numpy())
        return Y

    def _predict(self, X):
        return self.model(X)

In [None]:
# get the torch model
model = TorchModel(input_dim=X.shape[1], output_dim=Y.shape[1], hidden_dim=10, epochs=10000)

# train the model. Since torch works with tensors, you have to call torch.from_numpy()
model.train(torch.tensor(X).float(), torch.tensor(Y).float())

Loss on the test set
------

In [None]:
Y_test_pred = model.predict(X_test)

In [None]:
model.criterion(torch.tensor(model._predict(torch.tensor(model.x_scaler.transform(X_test)).float() ) ),
                torch.tensor(model.y_scaler.transform(Y_test)))

Test it, and display how it did
----------

In [None]:
fig,axes = plt.subplots(1,2,figsize=(16,8))

ax = axes[0]
ax.plot(model.losses)
ax.set_yscale('log')

ax = axes[1]
ax.plot(Y_test_pred/Y_test,label=['line {}'.format(a) for a in range(len(Y_test_pred[0]))] )
ax.set(ylabel='line loading ratio (prediction/actual)')
ax.legend()