# Importing packages

## Setup

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import open3d
import pptk
from logging import raiseExceptions

## Data loader

In [None]:
class DataLoader():
    
    def __init__(self, modality = 'Radar', data_type='vector', normalize=True , dataset_directory='/home/artin/Documents/10dB_CR_10deg_in_2021-10-15-14-15-49'):

        self.dataset_directory = dataset_directory
        self.modality = modality
        self.data_type = data_type
        self.normalize = normalize
        
        if modality == 'Radar': 
            self.columns = ['Frame ID', 'Pocket ID' , 'Y', '-X', 'Z', 'Range', 'Azimuth Angle', 'Elevation Angle', 'RCS' , 'Velocity' , 'Range Index', 'Azimuth Index', 'Elevation Index', 'Velocity Index', 'Amplitude Index', 'Timestamp', 'Temperature', '?']
            self.rename_columns_dict = {'Range Index':'Range', 'Azimuth Index':'Azimuth', 'Elevation Index':'Elevation', 'Amplitude Index':'Amplitude', 'RCS':'RCS', 'Velocity Index':'Velocity'}
            
        elif modality == 'Lidar':
            self.columns = ['X', 'Y', 'Z', '?a' , '?b' , '?c']
            self.rename_columns_dict =  {'X':'X', 'Y':'Y', 'Z':'Z', '?a':'?a', '?b':'?b' , '?c':'?c'}
            
            
    def get_dataframe(self, dir: str):
        
        def _read_data(dir):
            self.dataframe_original = pd.read_csv(dir)        
            self.dataframe_original.columns = self.columns
            self.dataframe = self.dataframe_original.copy()
            
        def _filter_columns(columns: list):
            self.dataframe = self.dataframe[columns]
        
        def _rename_columns(columns: dict):
            self.dataframe.rename(columns=columns, inplace=True)
        
        _read_data(dir=dir)
        _filter_columns( columns=self.rename_columns_dict.keys() )
        _rename_columns( columns=self.rename_columns_dict )
        
    def conversion_to_3D( self , matrix_dimentions={'Range':600, 'Azimuth':180, 'Elevation':20}, matrix_value='Amplitude'):
        
        ''' Range is assumed to be in the range [0, 600]
            Elevation Angle is in the range [-10, 10]
            Azimuth angle is in the range [0, 180]
            matrix_value can either be an integer/float or a string that corresponds to a column name. It's value will be used to fill the matrix. '''
        
        
        # Creating an empty array to store the data        
        self.data_3d = np.zeros( list(matrix_dimentions.values()) )
        
        # Shifting the Elevation values to the range (0, 20)
        self.dataframe.Elevation += 10
        
                
        x,y,z = matrix_dimentions.keys()
        
        for _, row in self.dataframe.iterrows():
            self.data_3d [ int(row[x]) ] [ int(row[y]) ] [ int(row[z]) ] = row[matrix_value] if isinstance(matrix_value,str) else matrix_value
            
                    
        return self.data_3d
     
    def normalizing_to_0_1( self, normalization_values = {'Range':1000, 'Azimuth':180, 'Elevation':20, 'RCS':100, 'Velocity':200, 'Amplitude':100000} ):

        
        for key, value in normalization_values.items():
            self.dataframe[key] /= value
        
        self.dataframe.Elevation += 0.5
        self.dataframe.Velocity  += 0.5
    
        return self.dataframe
    
    def get_data(self, filename = '1_.txt'):
        ''' data_type can be 'vector' or 'matrix' '''
        
        dir = f'{self.dataset_directory}/{self.modality}/{filename}'
        
        self.get_dataframe(dir=dir)

        if self.modality == 'Radar':
            
            if self.data_type == 'vector':   return self.normalizing_to_0_1() if self.normalize else self.dataframe
            
            elif self.data_type == 'matrix': return self.conversion_to_3D( matrix_dimentions={'Range':600, 'Azimuth':180, 'Elevation':20}, matrix_value=1)
            
            else: raise ValueError('data_type can be either "vector" or "matrix"')
            
        elif self.modality == 'Lidar':
            return self.dataframe
            

    # @staticmethod
    # def toSpherical(df):

    #     theta = np.arctan2(np.sqrt(df['-X']**2 + df['Y']**2), df['Z'])

    #     phi = np.arctan2(df['Y'],  df['-X'])

    #     r = np.sqrt(df['-X']**2 + df['Y']**2 + df['Z']**2)

    #     return (r, theta, phi)

    # @staticmethod
    # def toCartesian(df):
    #     x = self.r*np.cos(self.phi)*sin(self.theta)
    #     y = self.r*np.sin(self.phi)*sin(self.theta)
    #     z = self.r*np.cos(self.theta)
    #     return Pt(x,y,z)


    @staticmethod
    def appendSpherical_np(xyz):
        
        ptsnew = np.hstack((xyz, np.zeros(xyz.shape)))
        
        xy = xyz[:,0]**2 + xyz[:,1]**2
        
        ptsnew[:,3] = np.sqrt(xy + xyz[:,2]**2)
        
        # for elevation angle defined from Z-axis down
        # ptsnew[:,4] = np.arctan2(np.sqrt(xy), xyz[:,2]) 
        
        # for elevation angle defined from XY-plane up
        ptsnew[:,4] = np.arctan2(xyz[:,2], np.sqrt(xy)) 
        
        ptsnew[:,5] = np.arctan2(xyz[:,1], xyz[:,0])
        
        return ptsnew

    @staticmethod
    def visualize(points=None, method='open3d'):
        
        if method == 'open3d':
            
            pcd = open3d.geometry.PointCloud()
            pcd.points = open3d.utility.Vector3dVector(points)
            # pcd.colors = open3d.utility.Vector3dVector(data[data.columns[3]].to_numpy())
            open3d.visualization.draw_geometries([pcd])
            

        elif method == 'pptk':
            
            # points = pptk.points(points)
            v = pptk.viewer(points)
            # v.attribute('color', data[data.columns[3]].to_numpy())
            
        else:
            raiseExceptions('method should be either "open3d" or "pptk"')

In [None]:
i = 6
dataloader = DataLoader(data_type='vector', modality = 'Radar')
data = dataloader.get_data(filename=f'{i}_.txt')

data

## Build the autoencoder

We are going to use the Functional API to build our convolutional autoencoder.

## <span style="color:orange; font-size:0.8em"> Vector Input </span>

In [None]:
class Optimization_Vector_Input(DataLoader):
    
    def __init__(self, dataset_directory='/home/artin/Documents/10dB_CR_10deg_in_2021-10-15-14-15-49' , modality='Radar'):

        
        DataLoader.__init__(self, data_type='vector' , modality=modality, dataset_directory=dataset_directory, normalize=True)
        
        self.data = self.get_data(filename='1_.txt')

        # data2 = DataLoader(filname='2_.txt', modality=modality, dataset_directory=dataset_directory).get_data(data_type='vector' , normalize=True)
        
        
        self._separate_train_valid_test(data=self.data, train_valid_test_ratio=[3 , 1 , 1])        
        
        self.model = self.architecture(input_shape=(self.data.shape[1],1))
                
    def _separate_train_valid_test(self, data , train_valid_test_ratio=[0.6, 0.2, 0.2]):
        
        frac = {}
        for ix, mode in enumerate(['train' , 'valid' , 'test']):
            frac[mode] = train_valid_test_ratio[ix]/sum(train_valid_test_ratio)
            
            
        self.train_data = data.sample(frac=frac['train'], random_state=42)        
        data.drop(self.train_data.index)

        self.valid_data = data.sample(frac=frac['valid'], random_state=42)
        data.drop(self.valid_data.index)
        
        self.test_data  = data.copy()

    def architecture(self, input_shape: tuple):

        input = layers.Input(shape=input_shape)

        # Encoder
        x = layers.Conv1D(filters=256, kernel_size=3, strides=1, padding='same', activation='relu')(input)
        x = layers.Conv1D(filters=512, kernel_size=3, strides=1, padding='same', activation='relu')(x)
        x = layers.Conv1D(filters=512, kernel_size=3, strides=2, padding='same', activation='relu')(x)

        # Decoder
        x = layers.Conv1DTranspose(filters=256, strides=2, kernel_size=3, padding='same', activation='relu')(x)
        x = layers.Conv1DTranspose(filters=256, strides=1, kernel_size=3, padding='same', activation='relu')(x)
        x = layers.Conv1DTranspose(filters=1,   strides=1, kernel_size=3, padding='same', activation='sigmoid')(x)


        # Autoencoder
        model = Model(input, x)
        model.compile(optimizer="adam", loss="binary_crossentropy")
        
        return model
    
    def fit(self, epochs=5, batch_size=32):
        
        self.model.fit(x=self.train_data, y=self.train_data, epochs=epochs, batch_size=batch_size, validation_data=(self.valid_data, self.valid_data))
        
    def predict(self, data):
        
        predictions = self.model.predict(data)
        predictions = pd.DataFrame(predictions[:,:,0], columns=data.columns, index=data.index)
        return predictions
        
             
autoencoder_vectorized_input = Optimization_Vector_Input(dataset_directory='/home/artin/Documents/10dB_CR_10deg_in_2021-10-15-14-15-49' , modality='Radar')
autoencoder_vectorized_input.fit(epochs=5, batch_size=32)
predictions = autoencoder_vectorized_input.predict(data=autoencoder_vectorized_input.test_data)
predictions

## <span style="color:orange; font-size:0.8em"> Matrix Input </span>

In [None]:
class Optimization_Matrix_Input(DataLoader):
    
    def __init__(self, dataset_directory='/home/artin/Documents/10dB_CR_10deg_in_2021-10-15-14-15-49' , modality='Radar'):

        
        DataLoader.__init__(self, data_type='matrix' , modality=modality, dataset_directory=dataset_directory, normalize=True)
    
        self.data = self.get_data(filename='1_.txt')        
        
        # self._separate_train_valid_test(data=self.data, train_valid_test_ratio=[3 , 1 , 1])        
        
        self.model = self.architecture(input_shape=(self.data.shape[1],1))

# Visualization

Now we can train our autoencoder using `train_data` as both our input data
and target. Notice we are setting up the validation data using the same
format.

Let's predict on our test dataset and display the original image together with
the prediction from our autoencoder.

Notice how the predictions are pretty close to the original images, although
not quite the same.

In [None]:
import seaborn as sns

test_data2 = test_data.reset_index()
predictions2 = predictions.reset_index()

sns.set()
for y in test_data2.columns[1:]:

    plt.figure(figsize=(12, 6))
    sns.scatterplot(x='index', y=y, data=test_data2, label='test_data')
    sns.scatterplot(x='index', y=y, data=predictions2, label='predictions')
    plt.legend()
    plt.xlabel('index')
    plt.show()
    



In [None]:
(test_data2-predictions2)[data.columns].plot(kind='box', figsize=(12, 6))
plt.show()

Now that we know that our autoencoder works, let's retrain it using the noisy
data as our input and the clean data as our target. We want our autoencoder to
learn how to denoise the images.