This notebook implement a Recurrent Neural Network (RNN) for power system event calssification. 

The workflow includes data preprocessing, sequence construction, model training and evaluation.

To reproduce the results, run all cell sequentially. The dataset file should be placed in the same directory as this notebook.

## Data Loading and Preprocessing

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset, Dataset

import matplotlib.pyplot as plt
import seaborn as sns

#Reproducibility
Seed = 42
np.random.seed(Seed)
torch.manual_seed(Seed)
# device setting
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print(f'Using device: {device}')

### Load the data 

In [None]:
file_path = 'Bus39_Competition_Data.xlsx'
df = pd.read_excel(file_path, engine='openpyxl')

In [None]:
df = df.sort_values('TIMESTAMP').reset_index(drop=True)
x_raw = df.drop(['TIMESTAMP','Event'],axis=1).values
y_raw = df['Event'].values

### Data preprocessing

In [None]:
missing_values = df.isnull().sum()
print(missing_values)

According to the results of isnull().sum(), the dataset contains no missing values. Therefore, all raw data are preserved without any additional processing.

### Data split and standardization

In [None]:
encode = LabelEncoder()
y_encoded = encode.fit_transform(y_raw)
print(f'Classes: {encode.classes_}')

X_train, X_test, y_train, y_test = train_test_split(
    x_raw, y_encoded, test_size=0.2, random_state=Seed, shuffle=True, stratify=y_encoded    
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

### Create sequence

In [None]:
def create_sequences(X,y,sequence_length):
    xs,ys = [],[]
    for i in range(len(X)-sequence_length +1):
        x_i = X[i:(i+sequence_length)]
        y_i = y[i+sequence_length-1]
        xs.append(x_i)
        ys.append(y_i)
    return np.array(xs),np.array(ys)

In [None]:
sequence = 10

X_train_seq, y_train_seq = create_sequences(X_train_scaled, y_train, sequence)
X_test_seq,  y_test_seq  = create_sequences(X_test_scaled,  y_test,  sequence)

print(X_train_seq.shape)
print(y_train_seq.shape)
print(X_test_seq.shape)
print(y_test_seq.shape)

### Batch loading

In [None]:
class PMUDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    
    def __len__(self):
        return len(self.y)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
train_dataset = PMUDataset(X_train_seq, y_train_seq)
test_dataset = PMUDataset(X_test_seq, y_test_seq)

In [None]:
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
for xb, yb in train_loader:
    print("Batch X shape:", xb.shape)
    print("Batch y shape:", yb.shape)
    break

# Model Design

In [None]:
class RNNclassifier(nn.Module):
    def __init__(self,input_size,hidden_size,num_layer,num_classes):
        super(RNNclassifier,self).__init__()
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layer,
            batch_first=True,
            nonlinearity='tanh'
        )
        self.dropout = nn.Dropout(0.4)
        self.fc = nn.Linear(hidden_size,num_classes)
    def forward(self,x):
        out,hidden = self.rnn(x)
        out = out[:,-1,:]
        out = self.dropout(out)
        out = self.fc(out)
        return out

### Parameters setting

In [None]:
input_size =14
hidden_size = 32
num_layers = 2
num_classes = len(encode.classes_)
model = RNNclassifier(input_size,hidden_size,num_layers,num_classes).to(device)
model.to(device)

### loss function and optimizer

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr=0.001,weight_decay=1e-4)

### training process

In [None]:
train_losses = []
val_losses = []
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for xb,yb in train_loader:
        xb,yb = xb.to(device),yb.to(device)

        outputs = model(xb)
        loss = criterion(outputs,yb)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    epoch_train_loss = running_loss / len(train_loader)
    train_losses.append(epoch_train_loss)
    
    model.eval()
    val_running_loss = 0.0

    with torch.no_grad():
        for xb, yb in test_loader:
            xb, yb = xb.to(device), yb.to(device)
            outputs = model(xb)
            loss = criterion(outputs, yb)
            val_running_loss += loss.item()

    epoch_val_loss = val_running_loss / len(test_loader)
    val_losses.append(epoch_val_loss)

    print(f"Epoch [{epoch+1}/{num_epochs}]  "
          f"Train Loss: {epoch_train_loss:.4f}  "
          f"Val Loss: {epoch_val_loss:.4f}")

# Evaluation and Metrics

In [None]:
all_preds =[]
all_true =[]
model.eval()
with torch.no_grad():
    for xb,yb in test_loader:
        xb,yb = xb.to(device),yb.to(device)
        outputs = model(xb)
        _,predicted = torch.max(outputs.data,1)
        all_preds.extend(predicted.cpu().numpy())
        all_true.extend(yb.cpu().numpy())
        
print("Accuracy: ", accuracy_score(all_true, all_preds))
print("\nClassification Report:\n")
print(classification_report(all_true, all_preds, digits=4))

In [None]:
cm = confusion_matrix(all_true, all_preds)

plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(7,5))
plt.plot(train_losses, label="Training Loss", linewidth=2)
plt.plot(val_losses, label="Validation Loss", linewidth=2)
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training and Validation Loss Curves")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

Replace the original loss with weighted cross-entropy to mitigate class imbalance.

In [None]:
class_counts = np.bincount(y_train)
num_classes = len(class_counts)
class_weights = (len(y_train))/(num_classes * class_counts)**0.5
class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(),lr=0.001,weight_decay=1e-4)