In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import scipy.fft as fft

from torch.utils.data import DataLoader, Dataset

from sklearn.model_selection import train_test_split

from tqdm.notebook import tqdm

In [None]:
# load frequency data
data = np.load("../Data/RayTracingData/Remcom_4x4_IR_100taps.npy")
# load Phi and Theta
phi = np.load('../Data/RayTracingData/Remcom_4x4_AoA_phi.npy')
theta = np.load('../Data/RayTracingData/Remcom_4x4_AoA_theta.npy')

# load receiver positions
rx_positions = np.load("../Data/RayTracingData/Remcom_4x4_rxpos.npy")
# load transmitter positions
tx_positions = np.load("../Data/RayTracingData/Remcom_4x4_txpos.npy")

# fft and smooth our data to reduce noise
data_fft = fft.fft(data , workers=-1)[:,:,::2]

In [None]:
def standarize(x):
    return (np.array(x)-np.mean(x))/np.std(x)

def euclidean_distance(x1,x2):
    return np.linalg.norm(x1-x2)

def take_norm(x):
    return np.absolute(x)

def drop_top_right(data, rx_positions):
    idxx = rx_positions[:,0] > 300
    idxy = rx_positions[:,1] > 150
    idx = np.logical_and(idxx, idxy)
    good_idcs = ~idx
    return data[good_idcs]

def drop_outliers(data):
    upper_quantile = np.absolute(np.percentile(np.mean(data, axis=(1,2)), 0.99))
    lower_quantile = np.absolute(np.percentile(np.mean(data, axis=(1,2)), 0.25))
    IQR = (upper_quantile - lower_quantile) * 0
    quartile_set = (lower_quantile -IQR , upper_quantile + IQR)
    result = data[np.where((np.absolute(np.mean(data,axis=(1,2))) <= quartile_set[1]))]
    return result
    
    

def normalize(x):
    return (x - x.min(0))/x.ptp(0)

def fillna(x, value=0):
    x[np.where(np.isnan(x))] = value
    return x

def zero_padding_as(x, target):
    width = (target.shape[2] - x.shape[2])//2
    x = np.pad(x, (width,width))
    return x

def random_sample_and_remove(X, y, sample_size):
    """A function that takes a random subset of samples out of a numpy array
    inputs: (X::np.array)
            (y::np.array)
            (sample_size: (integer))
    outputs: subset_X::np.array
             subset_y::np.array
             (original_X - subset_X)::np.array
             (original_y - subset_y)::np.array
    """
    indices = np.random.choice(data.shape[0], sample_size, replace=False)
    return (X[indices], X[~indices], y[indices], y[~indices])
    

In [None]:
X_1 = standarize(drop_top_right(data_fft, rx_positions))
X_1 = np.hstack([np.real(X_1), np.imag(X_1)])
X_2 = zero_padding_as(fillna(standarize(drop_top_right(phi, rx_positions)))[10:], X_1)
X_3 = zero_padding_as(fillna(standarize(drop_top_right(theta, rx_positions)))[10:], X_1)


X = np.hstack([X_1, X_2, X_3])
Y = drop_top_right(standarize(rx_positions)[:,:2], rx_positions)


In [None]:
train_X, test_X, train_y, test_y = train_test_split(X,Y, test_size=0.9)

In [None]:
class MyDataSet(Dataset):
    def __init__(self, X,y):
        self.X = torch.Tensor(X)
        self.y = torch.Tensor(y)
    def __len__(self):
        return self.X.shape[0]
    def channels(self):
        return self.X.shape[1]
    def timesteps(self):
        return self.X.shape[2]
        
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
train_DS = MyDataSet(train_X, train_y)
test_DS = MyDataSet(test_X, test_y)

train_loader = DataLoader(train_DS, batch_size=128, drop_last=True, shuffle=True)
test_loader = DataLoader(test_DS, batch_size=128, drop_last=True)

In [None]:
train_DS[0:3][1]

In [None]:
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=train_DS.channels(), 
                               out_channels=128, 
                               kernel_size=8,
                              stride=2)
        self.conv2 = nn.Conv1d(in_channels=128, out_channels=64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=32, kernel_size=2, stride=2)
        self.lin1 = nn.Linear(160, 64)
        self.lin2 = nn.Linear(64, 32)
        self.out = nn.Linear(32,2)
        
    def forward(self, x):
        x = F.normalize(F.relu(self.conv1(x)))
        x = F.normalize(F.relu(self.conv2(x)))
        x = F.normalize(F.relu(self.conv3(x)))
        #x = F.avg_pool1d(x, kernel_size=3)
        x = torch.flatten(x,1)
        x = F.dropout(F.selu(self.lin1(x)), 0.2)
        x = F.dropout(F.selu(self.lin2(x)), 0.2)

        x = self.out(x)
        return x
    
        

In [None]:
model = SimpleNN()
#model.load_state_dict(torch.load('../trained_models/supervised_model.pkl'))
optimizer = torch.optim.Adam(params=model.parameters())
criterion = nn.MSELoss()
train_Loss_normalizer = len(train_DS)
test_Loss_normalizer = len(test_DS)

In [None]:
last_val_loss = 9999

for e in range(200):
    #early stopping
    val_loss = 0
    if val_loss < last_val_loss+0.0005:
        last_val_loss=val_loss
        patience_counter = 0
    else:
        patience_counter += 1
    if patience_counter >0.0005:
        break
        
    # train
    model.train()
    loss=0
    for x, y in tqdm(train_loader):
        optimizer.zero_grad()
        y_hat = model(x)
        batch_loss = criterion(y, y_hat)
        
        batch_loss.backward()
        
        optimizer.step()
        
        loss+=batch_loss.item()
    loss /= train_Loss_normalizer
    #validate
    model.eval()
    val_loss = 0
    for x,y in test_loader:
        y_hat = model(x)
        val_loss += criterion(y, y_hat).item()
    val_loss/=test_Loss_normalizer
    print(f"Epoch {e}: Train_loss: {loss} Validation_loss: {val_loss}")
        

In [None]:
y_real = test_DS[:][1]
yhats = model(test_DS[:][0])

from sklearn.cluster import KMeans
km = KMeans(n_clusters=4) 
km = km.fit(y_real)
labels = km.predict(y_real)

In [None]:
import seaborn as sns
sns.set()
#plt.figure(figsize=(15,15))
sns.scatterplot(yhats[:,0].detach(), yhats[:,1].detach(), 
                #hue=torch.sum(torch.abs(y_real - yhats), dim=1).detach(), 
                hue=labels,
                color='r', alpha=1) 


In [None]:
sns.scatterplot(y_real[:,0], y_real[:,1],
                #hue=torch.sum(torch.abs(y_real - yhats), dim=1).detach(), 
                hue=labels,
                color='g', alpha=1)


In [None]:
predicted_labels = km.predict(yhats.detach())

In [None]:
sns.scatterplot(yhats[:,0].detach(), yhats[:,1].detach(), 
                #hue=torch.sum(torch.abs(y_real - yhats), dim=1).detach(), 
                hue=predicted_labels,
                color='r', alpha=1) 

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
print(classification_report(labels, predicted_labels))