# Imports

Importing all the necessary libraries

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import CosineAnnealingLR , ReduceLROnPlateau
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm

Checking for availability of GPU

In [None]:
print(torch.cuda.is_available())

# Data Extraction and Pre-processing

Class to extract data from the csv and scale it using min-max scaling. Further commented code can be used to get the correlation heatmap and scatter plots.

In [None]:
class Data():
    def __init__(self, path, input_features, output_features):
        self.path = path
        self.inp = input_features
        self.out = output_features

    def extract(self):
        df = pd.read_csv(self.path)
        first_non_zero_index = (df[['Ia_amps', 'Ib_amps', 'Ic_amps']] != 0).any(axis=1).idxmax()
        df = df.loc[first_non_zero_index:]
        df = df.reset_index(drop=True)
        # correlation_matrix = df.corr()
        # plt.figure(figsize=(10, 8))
        # sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt='.2f', linewidths=.5)
        # plt.title('Correlation Heatmap')
        # plt.show()
        # fig, axes = plt.subplots(nrows=len(self.out), ncols=len(self.inp), figsize=(15, 10))
        # Create individual scatter plots
        # for i, output_var in enumerate(self.out):
        #     for j, input_var in enumerate(self.inp):
        #         sns.scatterplot(data=df, x=input_var, y=output_var, ax=axes[i, j], label=input_var)
        #         axes[i, j].set_xlabel(input_var)
        #         axes[i, j].set_ylabel(output_var)
        #         axes[i, j].legend()

        # plt.suptitle('Scatter Plots: Currents vs. Input Variables', y=1.02)
        # plt.tight_layout()
        # plt.show()
        inp_scaler = MinMaxScaler(feature_range=(-1, 1))
        out_scaler = MinMaxScaler(feature_range=(-1, 1))
        # df = pd.DataFrame(scaler.fit_transform(df), columns=df.columns)
        # df = df.drop_duplicates()
        X = df[self.inp]
        y = df[self.out]
        X = pd.DataFrame(inp_scaler.fit_transform(X), columns=X.columns)
        y = pd.DataFrame(out_scaler.fit_transform(y), columns=y.columns)
        X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.35, shuffle=True)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.15, shuffle=True)
        X_train = torch.tensor(X_train.values, dtype=torch.float32)
        y_train = torch.tensor(y_train.values, dtype=torch.float32)
        X_test = torch.tensor(X_test.values, dtype=torch.float32)
        y_test = torch.tensor(y_test.values, dtype=torch.float32)
        X_val = torch.tensor(X_val.values, dtype=torch.float32)
        y_val = torch.tensor(y_val.values, dtype=torch.float32)
        return X_train, X_test, X_val, y_train, y_test, y_val, inp_scaler, out_scaler

Changing the cell below can change the variables used as input/output

In [None]:
input_columns = ['GaH_V', 'GbH_V', 'GcH_V', 'loadTorque_Nm', 'dcI_amps', 'speed_radps', 'mAngle_rad'] #['mAngle_rad'] # ,
output_columns = ['Ia_amps', 'Ib_amps', 'Ic_amps']
data = Data('/content/drive/MyDrive/PMSM.csv', input_columns, output_columns)
train_in, test_in, val_in, train_out, test_out, val_out, inp_scaler, out_scaler = data.extract()

# The Model

Model designed to have few layers, better inference time

In [None]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.binary_signal_branch = nn.Sequential(
            nn.Linear(3, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
        )
        self.speed_branch = nn.Sequential(
            nn.Linear(4, 128),
            nn.Tanh(),
            nn.Linear(128, 256),
            nn.ReLU()
        )
        self.combine_layer = nn.Sequential(
            nn.Linear(256 * 2, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.phase_output_layer = nn.Linear(512, 3)

    def forward(self, x_binary, x_continuous):
        binary_signal_out = self.binary_signal_branch(x_binary)
        speed_out = self.speed_branch(x_continuous)
        combined = torch.cat((binary_signal_out, speed_out), dim=1)
        combined_out = self.combine_layer(combined)
        phase_prediction = self.phase_output_layer(combined_out)
        return phase_prediction

# Motor Class

The below class is used to train, validate and test the model

In [None]:
class Motor(nn.Module):
    def __init__(self, train_in, train_out, val_in, val_out, test_in, test_out, criterion, batch_size):
        super(Motor, self).__init__()

        # Create DataLoader for training data
        train_dataset = TensorDataset(train_in[:, :3], train_in[:, 3:], train_out)
        self.trainloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)

        # Create DataLoader for validation data
        val_dataset = TensorDataset(val_in[:, :3], val_in[:, 3:], val_out)
        self.valloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)

        # Store test data and criterion
        self.test_in = test_in.to('cuda')
        self.test_out = test_out.to('cuda')
        self.criterion = criterion

        # Lists to store training and validation losses
        self.train_losses = []
        self.val_losses = []

    def train(self, epochs, optimizer, model, scheduler):
        best_val_loss = float('inf')
        best_model = None

        for epoch in range(epochs):
            train_loss = 0

            # Training loop
            model.train()
            progress_bar = tqdm(enumerate(self.trainloader), total=len(self.trainloader), desc=f'Epoch {epoch}', leave=False)
            for batch_idx, (X_bin, X_conti, y_batch) in progress_bar:
            # for X_bin, X_conti, y_batch in self.trainloader:
                X_bin, X_conti, y_batch = X_bin.to('cuda'), X_conti.to('cuda'), y_batch.to('cuda')
                optimizer.zero_grad()
                predictions = model(X_bin, X_conti)
                loss = self.criterion(predictions, y_batch)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            # Validation loop
            model.eval()
            val_loss = 0
            with torch.no_grad():
                for X_bin_val, X_conti_val, y_val_batch in self.valloader:
                    X_bin_val, X_conti_val, y_val_batch = X_bin_val.to('cuda'), X_conti_val.to('cuda'), y_val_batch.to('cuda')
                    val_predictions = model(X_bin_val, X_conti_val)
                    val_loss += self.criterion(val_predictions, y_val_batch).item()

            # Update learning rate with scheduler based on training loss
            scheduler.step(train_loss)

            # Log training and validation losses
            self.train_losses.append(train_loss / len(self.trainloader))
            self.val_losses.append(val_loss / len(self.valloader))

            # Print training and validation loss
            print(f'Epoch {epoch}: Train loss = {self.train_losses[-1]}, Val loss = {self.val_losses[-1]}')

            # Check if the current model has the best validation loss
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_model = model.state_dict()

        # Load the best model state
        model.load_state_dict(best_model)

        # Plot the training and validation curves
        self.plot_losses()

        return model

    def test(self, model):
        model.eval()
        preds = model(self.test_in[:, :3], self.test_in[:, 3:])
        test_loss = torch.sum(self.criterion(preds, self.test_out))
        print(f'Test loss = {test_loss}')

    def plot_losses(self):
        plt.plot(self.train_losses, label='Training Loss')
        plt.plot(self.val_losses, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.show()

In [None]:
criterion = nn.MSELoss()
motor = Motor(train_in, train_out, val_in, val_out, test_in, test_out, criterion, 32)
model = Model().to('cuda')
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.000001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)
model = motor.train(5, optimizer, model, scheduler)
motor.test(model)

In [None]:
torch.save(model.state_dict(), 'loss_0.00082_2.pth')

In [None]:
model = Model().to('cuda')
model.load_state_dict(torch.load('/content/best_2.pth'))