In [None]:
# DeepPos

import torch
import numpy as np
import torch.nn as nn
import torch.utils.data as data
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision
import tensorflow as tf
import pandas as pd

# Define the data structure
class MyDataset(Dataset):
    def __init__(self,filepath):
        xy = np.loadtxt(filepath,delimiter=',',dtype=np.float32)
        self.len = xy.shape[0]
        
        # load the data after the second column of the dataset (RSS data)
        self.x_data = torch.from_numpy(xy[:,2:])
        # load the first two columns of the dataset (Coordinate)
        self.y_data = torch.from_numpy(xy[:,0:2])
        
        #print("The data has prepared...")
        
    def __getitem__(self,index):
        return self.x_data[index],self.y_data[index]
    
    def __len__(self):
        return self.len

# Initialize the positioning error
error_0 = [[]for i in range(15)]

realization = 500 # number of realizations
for z in range(realization):
    print(z)
    # Settings
    epochs = 1500
    batch_size = 14
    lr = 0.008
    k=1
    
    # load traning and testing dataset
    train_file = "./dataset/03_14_train.csv"
    test_file = "./dataset/03_10_test.csv"
    mydataset_train = MyDataset(train_file)
    mydataset_test = MyDataset(test_file)
    train_loader = DataLoader(dataset=mydataset_train,
                             batch_size=batch_size,
                             shuffle=False,
                             num_workers=2)
    test_loader = DataLoader(dataset=mydataset_test,
                             batch_size=10,
                             shuffle=False,
                             num_workers=2)

    # Model structure
    class AutoEncoder(nn.Module):
        def __init__(self):
            super(AutoEncoder, self).__init__()

            # Encoder
            self.encoder = nn.Sequential(
                nn.Linear(64, 48),
                nn.Tanh(),
                nn.Linear(48, 32),
                nn.Tanh(),
                nn.Linear(32, 2),
                #nn.Tanh(),
                #nn.Linear(8, 4),
                #nn.Tanh(),
                #nn.Linear(4, 2),
                #nn.Tanh(),
                #nn.Linear(16, 2),
            )
            # Decoder
            self.decoder = nn.Sequential(
                #nn.Linear(2, 4),
                #nn.Tanh(),
                #nn.Linear(4, 8),
                #nn.Tanh(),
                #nn.Linear(8, 16),
                #nn.Tanh(),
                nn.Linear(16, 32),
                nn.Tanh(),
                nn.Linear(32, 48),
                nn.Tanh(),
                nn.Linear(48, 64),
                nn.Tanh()
            )

        def forward(self, inputs):
            codes = self.encoder(inputs)
            codes_new = torch.cat([codes, torch.eye(14)], 1) # Add label 
            decoded = self.decoder(codes_new)

            return codes,decoded

    #Loss Function
    class My_loss(nn.Module):
        def __init__(self):
            super().__init__()

        def forward(self, x, y):
            return torch.mean(torch.sqrt((torch.pow((30*x - 30*y), 2)).sum(axis=-1)))
            #return torch.mean((torch.pow((30*x - 30*y), 2)).sum(axis=1))

    # Optimizer and loss function
    model = AutoEncoder()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_function = nn.MSELoss()
    loss_function_locate = My_loss()

    # Train
    for epoch in range(epochs):
        for data, labels in train_loader:
            inputs = data.view(-1, 64) 
            locate = labels.view(-1,2)
            # Forward
            codes, decoded = model(inputs)
            # Backward
            optimizer.zero_grad()
            # Calculate loss
            loss = loss_function(decoded, inputs)
            loss.backward()
            optimizer.step()

        RP_train=locate
        
        #Test
        if (epoch+1)%100 == 0 and epoch > 10:
            print(epoch)
            for data, labels in test_loader:
                inputs = data.view(-1, 64) 
                locate = labels.view(-1,2)
            RSS_test=inputs
            RP_test=locate
            loss_locate = 0
            for i in range(10):
                inputs_test=RSS_test[i].repeat(14,1)
                codes, decoded = model(inputs_test)
                loss_test=torch.mean((decoded-inputs_test)*(decoded-inputs_test),axis=1)
                index=torch.argmin(loss_test)
                loss_locate += loss_function_locate(RP_train[index],RP_test[i])
            loss_locate=(loss_locate/10).numpy()
            error_0[int((epoch+1)/100-1)].append(loss_locate)

# Save the result            
for i in range(15):
    outcome_file = "./outcome/SAE_"+str(100+i*100)+".csv"  
    error = [error_0[i]]
    error = np.array(error)
    data1 = pd.DataFrame(error.T)
    data1.to_csv(outcome_file,header=None,index=None)