In [None]:
"""
Author: Yuqiang (Ethan) Heng
"""
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.models import Model
from keras.layers import Input
from keras import backend as K
from keras.utils import np_utils

def cart2sph(xyz,center):
    x = np.subtract(xyz[:,0],center[0])
    y = np.subtract(xyz[:,1],center[1])
    z = np.subtract(xyz[:,2],center[2])
    rtp = np.zeros(xyz.shape)
    r = np.sqrt(np.power(x,2)+np.power(y,2)+np.power(z,2))
    theta = np.arccos(np.divide(z,r))
    phi = np.arctan2(y,x)
    rtp[:,0] = r
    rtp[:,1] = theta
    rtp[:,2] = phi
    return rtp

In [None]:
loc_noise = 2.0409 # noise in the cartesian coordinates of UEs
shuffled_sa_idx = np.random.permutation(np.arange(100)+1)
ntrain_datasets = 80   
nval_datasets = 10
ntest_datasets = 10
X_train = []
Y_train = []
X_val = []
Y_val = []
X_test = []
Y_test = []
bf_gains_train = []
bf_gains_val = []
bf_gains_test = []
prev_dataset = []
for j in range(100):
    data_fname = './Dataset/Dynamic_MISO_ULA_dense_polar_SA_%d.npy'%shuffled_sa_idx[j]
    dataset = np.load(data_fname)
    X_cart = np.copy(dataset[:,0:3])
    Y = np.copy(dataset[:,3])
    cartesian_x = np.multiply(np.multiply(dataset[:,0],np.sin(dataset[:,1])),np.cos(dataset[:,2]))
    cartesian_y = np.multiply(np.multiply(dataset[:,0],np.sin(dataset[:,1])),np.sin(dataset[:,2]))
    cartesian_z = np.multiply(dataset[:,0],np.cos(dataset[:,1]))
    X_cart[:,0] = cartesian_x + 641
    X_cart[:,1] = cartesian_y + 435
    X_cart[:,2] = cartesian_z + 10
    # add noise to cartesian coordinates
    X = X_cart + np.random.normal(0,loc_noise,X_cart.shape)
    X = cart2sph(X, [641,435,10])
    prev_dataset = dataset
    nbeam = 64
    nvalid_rx = dataset.shape[0]
    bf_rx_per_beam = {i:[] for i in range(nbeam)}
    for i in range(nvalid_rx):
        bf_rx_per_beam[int(Y[i])].append(i)
    test_ratio = 1/100    
    test_rx_idx = []
    true_valid_beam = []
    prev_valid_beam = []
    for i in range(nbeam):
        if len(bf_rx_per_beam[i]) >= 10:
            sampled_inst = np.random.choice(bf_rx_per_beam[i], int(np.floor(test_ratio*len(bf_rx_per_beam[i]))), replace = False)
            for k in sampled_inst:
                test_rx_idx.append(k)
    X_sampled = X[test_rx_idx,:]
    Y_sampled = Y[test_rx_idx]

    if j < ntrain_datasets:
        if j==0:
            X_train = X_sampled
            Y_train = Y_sampled
        else:
            X_train = np.concatenate([X_train,X_sampled],axis = 0)
            Y_train = np.concatenate([Y_train,Y_sampled],axis = 0)
    elif j < ntrain_datasets + nval_datasets:
        if j==ntrain_datasets:
            X_val = X_sampled
            Y_val = Y_sampled
        else:
            X_val = np.concatenate([X_val,X_sampled],axis = 0)
            Y_val = np.concatenate([Y_val,Y_sampled],axis = 0)
    else:
        if j==ntrain_datasets + nval_datasets:
            X_test = X_sampled
            Y_test = Y_sampled
        else:
            X_test = np.concatenate([X_test,X_sampled],axis = 0)
            Y_test = np.concatenate([Y_test,Y_sampled],axis = 0)

encoder = LabelEncoder()
encoder.fit(np.concatenate((Y_train, Y_test, Y_val),0))
Y_train_onehot = np_utils.to_categorical(encoder.transform(Y_train))
Y_val_onehot = np_utils.to_categorical(encoder.transform(Y_val))
Y_test_onehot = np_utils.to_categorical(encoder.transform(Y_test))

In [None]:
K.clear_session()
input_location = Input(shape=(3,))
dense1 = Dense(6, activation ='sigmoid')(input_location)
dense2 = Dense(18, activation = 'sigmoid')(dense1)
dense3 = Dense(48, activation = 'sigmoid')(dense2)
output = Dense(Y_train_onehot.shape[1], activation = 'softmax')(dense3)
model = Model(inputs = input_location, outputs = output)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, Y_train_onehot, validation_data = (X_val, Y_val_onehot), epochs = 20, batch_size = 100, verbose=1)