<a href="https://colab.research.google.com/github/artaalikhani/FSCNN/blob/master/Fault_classification_FSCNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.nn import Parameter
from torch.utils.data import Dataset, DataLoader, TensorDataset
import random
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score
import time
import os

from FSCNN import *

global case_study

#case_study = 'ITSC'
case_study = 'CWRU'
#case_study = 'Ottawa'


def train_val_test_split(data, labels, split_ratio):
    n_samples = data.shape[0]
    div1 = round(n_samples*split_ratio)
    div2 = round((n_samples - div1)/2 + div1)
    l = list(range(n_samples))
    if case_study == 'CWRU':
        test_ind_file = 'data_test_ind_CWRU.csv'
    elif case_study == 'ITSC':
        test_ind_file = 'data_test_ind_ITSC.csv'
    else:
        test_ind_file = 'data_test_ind_Ottawa.csv'
    if os.path.isfile(test_ind_file):

      lr = np.loadtxt(test_ind_file, delimiter=",").astype(int)
    else:
      lr = random.sample(l, n_samples)
      np.savetxt(test_ind_file,lr, delimiter=",")

    data_train = np.array([data[idx] for idx in lr[:div1]])
    data_val = np.array([data[idx] for idx in lr[div1:div2]])
    data_test = np.array([data[idx] for idx in lr[div2:]])
    labels_train = np.array([labels[idx] for idx in lr[:div1]])
    labels_val = np.array([labels[idx] for idx in lr[div1:div2]])
    labels_test = np.array([labels[idx] for idx in lr[div2:]])

    return (data_train, data_val, data_test), (labels_train, labels_val, labels_test)

def dataloader_preparation(split_ratio=0.7, batch_size=50):
    global num_classes

    if case_study == 'CWRU':
      datasets = np.loadtxt('Vib_data2.csv',delimiter=",")
    elif case_study == 'ITSC':
      datasets = np.loadtxt('Iq_ITSC_PMSM.csv',delimiter=",")
    else:
      datasets = np.loadtxt('dataset_ottawa.csv',delimiter=",")

    N_s=datasets.shape[1]-1
    num_classes=len(np.unique(datasets[:,N_s]))

    data =  datasets[:,np.newaxis,:N_s]
    labels = datasets[:,N_s]


    # dataloaders
    data_split, labels_split = train_val_test_split(data, labels, split_ratio)
    dataloader_train = DataLoader(
        TensorDataset(torch.tensor(data_split[0], dtype=torch.float), torch.tensor(labels_split[0], dtype=torch.long)),
        batch_size=batch_size,
        shuffle=True,
    )

    dataloader_val = DataLoader(
        TensorDataset(torch.tensor(data_split[1], dtype=torch.float), torch.tensor(labels_split[1], dtype=torch.long)),
        batch_size=batch_size,
    )

    dataloader_test = DataLoader(
        TensorDataset(torch.tensor(data_split[2], dtype=torch.float), torch.tensor(labels_split[2], dtype=torch.long)),
        batch_size=batch_size,
    )

    return dataloader_train, dataloader_val, dataloader_test

def train(dataloader, device, model, criterion, optimizer):
    total_loss = 0.0
    total_corrects = 0.0
    n_train = 0
    model.train()
    model.zero_grad()

    for batch_idx, batch in enumerate(dataloader):
        input, label = tuple(t.to(device) for t in batch)
        label = label.long()
        output = model(input)
        loss = criterion(output, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        pred = output.data.max(1, keepdim=True)[1]
        total_corrects += pred.eq(label.data.view_as(pred)).cpu().sum()
        total_loss += loss.item()
        n_train += label.size()[0]

    epoch_loss = total_loss / n_train
    epoch_acc = 100. * total_corrects.numpy() / n_train
    print('Train loss: '+str(epoch_loss)+' || Train accuracy: '+str(epoch_acc))
    return epoch_loss, epoch_acc

def test(dataloader, device, model, criterion):
    total_loss = 0.0
    total_corrects = 0.0
    n_test = 0
    model.eval()
    model.zero_grad()

    with torch.no_grad():
        for batch_idx, batch in enumerate(dataloader):
            input, label = tuple(t.to(device) for t in batch)
            label = label.long()
            model=model.to(device)
            output = model(input.double())
            loss = criterion(output, label)
            pred = output.data.max(1, keepdim=True)[1]
            total_corrects += pred.eq(label.data.view_as(pred)).cpu().sum()
            total_loss += loss.item()
            n_test += label.size()[0]
    epoch_loss = total_loss / n_test
    epoch_acc = 100.0 * total_corrects.numpy() / n_test
    print('Test loss: '+str(epoch_loss)+' || Test accuracy: '+str(epoch_acc))
    return epoch_loss, epoch_acc

if __name__ == '__main__':
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    log_file_name = 'results_logged_'+case_study+'_FSCNN'+'.csv'
    n_epoch = 500
    batch = 14

    dataloader_train, dataloader_val, dataloader_test = dataloader_preparation(batch_size=batch)
    model = Net(n_class=num_classes,
                   case_study = case_study,
                  )


    model.to(device)
    model=model.double()
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    best_acc = 0.0
    results = np.zeros([n_epoch,6])
    for epoch in range(n_epoch):
        print('------------------ epoch '+str(epoch)+' ------------------')
        model=model.double()
        model.eval()
        t0 = time.time()
        loss_train, acc_train = train(dataloader_train, device, model, criterion, optimizer)
        t1 = time.time()
        train_time= t1-t0
        print('Training time is:'+str(train_time))
        loss_val, acc_val = test(dataloader_val, device, model, criterion)
        test_time = time.time()-t1
        print('Test time is:'+str(test_time))
        results[epoch,:] = np.array([loss_train, acc_train, loss_val, acc_val, train_time, test_time])
        df = pd.DataFrame(results, columns=['loss_train', 'acc_train', 'loss_val', 'acc_val','train_time', 'test_time'])
        df.to_csv(log_file_name)

        # save better model
        if best_acc <= acc_val:
            best_acc = acc_val
            loss_test, acc_test = test(dataloader_test, device, model, criterion)

            if acc_train==100 and acc_val==100:
              model_path = './FSCNN_epoch_'+case_study+str(epoch)+'.pt'
              torch.save(model.state_dict(), model_path)
              print('The model saved: '+model_path)
