In [None]:
LabelledPath = " " #Please provide path to labelled dataset

In [None]:
import numpy as np
import h5py

Loading labelled and Unlabelled datasets

In [None]:
f = h5py.File("LabelledPath" , "r")
H = f["H_Est"][:].transpose(0,3,1,2)
Pos = f["Pos"][:]
f.close()

Creating a custom test set

In [None]:
from sklearn.model_selection import train_test_split
H_Train, H_Test , Pos_Train , Pos_Test = train_test_split(H,Pos,test_size=0.05, random_state=42)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

Validation set

In [None]:
H_Train, H_Val , Pos_Train , Pos_Val = train_test_split(H_Train,Pos_Train,test_size=0.1, random_state=99)

In [None]:
H_Train = torch.tensor(H_Train,dtype=torch.float)
Pos_Train = torch.tensor(Pos_Train,dtype=torch.float)
H_Val = torch.tensor(H_Val,dtype=torch.float)
Pos_Val = torch.tensor(Pos_Val,dtype=torch.float)

In [None]:
class CNN(nn.Module):
    ''' Deep Convolutional NN for position Estimation '''
    def __init__(self):
        super(CNN, self).__init__()
        
        '''Convolutional Layers'''
        self.cnn = nn.Sequential(
            nn.Conv2d(in_channels=5, out_channels=64, kernel_size=5, padding=2) ,
            nn.ReLU(),
            nn.AvgPool2d((1,4) ),
            nn.Conv2d(in_channels=64, out_channels=32, kernel_size=3, padding=1) ,
            nn.ReLU(),
            nn.AvgPool2d((1,4) )
        )
        
        ''' Fully Connected Layers'''
        self.fc = nn.Sequential(nn.Linear(57*56*32,512),
                                     nn.ReLU(),
                                     nn.Linear(512,256),
                                     nn.ReLU(),
                                     nn.Linear(256,128),
                                     nn.ReLU(),
                                     nn.Linear(128,3))

    def forward(self, H):
        out = self.fc(self.cnn(H).reshape(-1,57*56*32))
        return out

In [None]:
device = torch.device("cuda:0" if (torch.cuda.is_available()) else "cpu")
model = CNN().to(device)

In [None]:
criterion=torch.nn.L1Loss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=0)

In [None]:
best_acc = 1e9
N_b2=len(H_Train) #Train Size
N_b3=len(H_Val) #Val Size
bs2 = 32 #Batch Size
N_b4 = len(H_Test) #Test Size

In [None]:
for i in range(1,151):
    loss, acc = 0, 0
    iters_per_epoch = 0
    ''' Training'''
    for k in range(0,N_b2,bs2):
        end = min(k+bs2 , N_b2)
        h = H_Train[k:end]
        pos = Pos_Train[k:end]
        iters_per_epoch+=1
        h = torch.autograd.Variable(h,requires_grad=True)
        pos = torch.autograd.Variable(pos,requires_grad=True)
        h = h.to(device).float()
        pos = pos.to(device).float()
        optimizer.zero_grad()
        with torch.set_grad_enabled(True):
            '''Forward'''
            pred = model(h)
            iloss = criterion(pred, pos)
            loss += iloss.item()
            iacc = torch.dist(pred, pos, 2) / pred.shape[0]
            acc += iacc.item()

            '''BackProp'''
            iloss.backward()
            optimizer.step()
            if iters_per_epoch % 10 == 0:
                print(f'Batch {iters_per_epoch}/{N_b2/bs2}: loss = {iloss:.4f} acc = {iacc:.4f}')

    loss /= iters_per_epoch
    acc /= iters_per_epoch
    print(k)
    print(f'Epoch {i}/{100}: loss = {loss:.4f} acc = {acc:.4f}')
 
    loss, acc = 0, 0
    iters_per_epoch = 0

    ''' Validation'''
    
    for k in range(0,N_b3,bs2):
        iters_per_epoch +=1    
        with torch.set_grad_enabled(False):
                end = min(k+bs2 , N_b3)
                h = H_Val[k:end].to(device).float()
                pos = Pos_Val[k:end].to(device).float()
                pred = model(h)
                iloss = criterion(pred, pos)
                loss += iloss.item()
                iacc = torch.dist(pred, pos, 2) / pred.shape[0]
                acc += iacc.item()
    acc/=iters_per_epoch
    loss/=iters_per_epoch
    print(f'Validation , loss = {loss:.4f} acc = {acc:.4f}')
    if(acc<=best_acc):
        torch.save(model.state_dict(), "best.pt")
        best_acc = acc
    
   
    '''Testing'''
    if i%5==0:
        model2 = CNN().to(device)
        model2.load_state_dict(torch.load("best.pt",map_location=torch.device('cpu') )) 
        iters_per_epoch = 0
        loss, acc = 0, 0
        pos_p = torch.zeros(Pos_Test.shape)
        for k in range(0,N_b4,bs2):
            iters_per_epoch +=1    
            with torch.set_grad_enabled(False):
                    end = min(k+bs2 , N_b4)
                    h = H_Test[k:end].to(device).float()
                    pos = Pos_Test[k:end].to(device).float()
                    pred = model2(h)
                    iloss = criterion(pred, pos)
                    loss += iloss.item()
                    iacc = torch.dist(pred, pos, 2) / pred.shape[0]
                    acc += iacc.item()
                    pos_p[k:end]=pred
        acc/=iters_per_epoch
        loss/=iters_per_epoch
        mde = torch.mean(torch.sqrt(torch.sum((pos_p-Pos_Test)**2 , dim=1))) 
        print(f'Test , loss = {loss:.4f} acc = {acc:.4f}  mde = {mde:.4f}')