<a href="https://colab.research.google.com/github/LeeYuuuan/MLP_EIT/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import scipy.io as scio
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from torch.utils.data import TensorDataset,DataLoader,Dataset
import time
import numpy as np
import sys
from sklearn.model_selection import train_test_split

## Load Dataset

In [2]:
class CustomEITDataSet(Dataset):
    """EIT simulation dataset"""
    def __init__(self, dataset_type):
        """
        Args:
            root_dfir: Directory with all the data.
        """  

        dic_path = "/content/drive/MyDrive/Colab Notebooks/EIT/EITDeepLearning/data/"
        
        # load the .csv simulation data 
        # voltage_data = np.loadtxt(dic_path + 'datacsv1.csv', delimiter=';', dtype=np.float32)
        # label = np.loadtxt(dic_path + 'datacsv.csv', delimiter=';', dtype=np.float32)
        print(f"Loading {dataset_type}")
        print("Loading voltages...")

        
        time_0 = time.time()
        voltage_data = np.loadtxt(dic_path + 'data04.csv', delimiter=',', dtype=np.float32)
        time_1 = time.time() - time_0
        print(f"using {time_1}s.")
        print("Loading labels...")
        label = np.loadtxt(dic_path + 'data04R.csv', delimiter=',', dtype=np.float32)
        time_2 = time.time() - time_0
        print(f"Using {time_2}s.")
  
        # split train/test dataset
        print("Creating the dataset...")
        X_train_raw, X_test_raw, y_train, y_test = train_test_split(voltage_data, label, test_size=0.2, random_state=42)

        X_train = np.zeros([X_train_raw.shape[0], 1, 16, 12]) # shape: count * 1 * 16(angle) * 12(voltage)
        X_test = np.zeros([X_test_raw.shape[0], 1, 16, 12])

        for i, data_element in enumerate(X_train_raw):
            X_train[i, 0, :, :] = X_train_raw[i].reshape(16, 12)
        for i, data_element in enumerate(X_test_raw):
            X_test[i, 0, :, :] = X_test_raw[i].reshape(16,12)
        
        X_train = torch.tensor(X_train, dtype=torch.float32)
        X_test = torch.tensor(X_test, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.float32)
        y_test = torch.tensor(y_test, dtype=torch.float32)
        # return different type of data
        if dataset_type == 'train':
            self.x = X_train
            self.y = y_train
            self.len = X_train.shape[0]
            time_3 = time.time() - time_0
            print(f"Finished loading train set! Using{time_3}s.")
        elif dataset_type == 'test':
            self.x = X_test
            self.y = y_test
            self.len = X_test.shape[0]
            time_4 = time.time() - time_0
            print(f"Finished loading test set! Using{time_4}s.")
        else:
            raise ValueError

    def __getitem__(self, index):
        return self.x[index], self.y[index]

    def __len__(self):
        return self.len

# Load  EIT dataset.
batch_size = 64
dataset_train = CustomEITDataSet('train')
train_dataloader = DataLoader(dataset=dataset_train, batch_size=batch_size, shuffle=True)

dataset_test = CustomEITDataSet('test')  
test_dataloader = DataLoader(dataset=dataset_test, batch_size=batch_size, shuffle=True)
for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"shape of y: {y.shape} {y.dtype}")
    break

Loading train
Loading voltages...
using 2.11143159866333s.
Loading labels...
Using 8.320978164672852s.
Creating the dataset...
Finished loading train set! Using8.489453554153442s.
Loading test
Loading voltages...
using 2.6979057788848877s.
Loading labels...
Using 7.732635974884033s.
Creating the dataset...
Finished loading test set! Using7.854297876358032s.
Shape of X [N, C, H, W]: torch.Size([64, 1, 16, 12])
shape of y: torch.Size([64, 576]) torch.float32


## MLP model

In [3]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(192, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 576),
        )
    
    def forward(self, x):
        x = self.flatten(x)
        x = nn.functional.normalize(x)
        x = self.linear_relu_stack(x)
        return x

In [4]:
device = ("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
model = NeuralNetwork().to(device)
print(model)

NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=192, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=512, bias=True)
    (3): ReLU()
    (4): Linear(in_features=512, out_features=576, bias=True)
  )
)


## train & test

In [5]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    train_loss = 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        
        # comput prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        #Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"training loss: {loss:>7f} [{current:5d}/{size:>5d}]")
    
    return train_loss /num_batches



In [6]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()

    test_loss /= num_batches
    print(f"Test loss: {test_loss:>8f} \n")

    return test_loss


## training

In [None]:
# loss_fn = nn.CrossEntropyLoss()
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
epoch = 40

train_loss = []
test_loss = []
for t in range(epoch):
    print(f"Epoch {t+1}--------------------------------")
    train_loss.append(train(train_dataloader, model, loss_fn, optimizer))
    test_loss.append(test(test_dataloader, model, loss_fn))
print("Done")

Epoch 1--------------------------------
training loss: 0.026406 [   64/20352]
training loss: 0.002034 [ 6464/20352]
training loss: 0.002038 [12864/20352]
training loss: 0.002024 [19264/20352]
Test loss: 0.002032 

Epoch 2--------------------------------
training loss: 0.002057 [   64/20352]
training loss: 0.002063 [ 6464/20352]
training loss: 0.002030 [12864/20352]
training loss: 0.002017 [19264/20352]
Test loss: 0.002028 

Epoch 3--------------------------------
training loss: 0.002054 [   64/20352]
training loss: 0.002039 [ 6464/20352]
training loss: 0.002037 [12864/20352]
training loss: 0.002019 [19264/20352]
Test loss: 0.002021 

Epoch 4--------------------------------
training loss: 0.002026 [   64/20352]
training loss: 0.002021 [ 6464/20352]
training loss: 0.001955 [12864/20352]
training loss: 0.002005 [19264/20352]
Test loss: 0.001998 

Epoch 5--------------------------------
training loss: 0.001984 [   64/20352]
training loss: 0.001937 [ 6464/20352]
training loss: 0.001959 [128