## Load the data

In [None]:
import numpy as np
import matplotlib.pylab as plt
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

from keras.models import Sequential
from keras import Model
from keras.layers import Dense, Input, Conv2D, Flatten, MaxPooling2D, UpSampling2D
from keras.regularizers import l2

from xgboost import XGBClassifier

import shap

In [None]:
path_to_dataset = ''
dataset = np.load(path_to_dataset, allow_pickle=True)

In [None]:
labels = [data['class_name'] for data in dataset]
unique_classes = list(set(labels)) #unique classes in dataset
unique_classes

In [None]:
for label in list(set(labels)):
    print(f'Num of {label} in dataset: {labels.count(label)}')

## Preapare radar spectra

In [None]:
signatures_shape = [data['signature'].shape for data in dataset]
first_dim = [sig[0] for sig in signatures_shape]
second_dim = [sig[1] for sig in signatures_shape]

print(f'Second dimension size: {list(set(second_dim))[0]}')
print(f'First dimension size: {min(first_dim)} - {max(first_dim)}')

In [None]:

np.median(first_dim)

In [None]:
signatures = []
labels_for_training = []
ids = []

for i, data in enumerate(dataset):
    if (data['signature'].shape[0] >= 252):
        if (len(data['snr_db']) > 500) or (data['class_name']=="uav" and len(data['snr_db']) > 100):
            signatures.append(20 * np.log10(np.abs(data['signature'])[:252,470:546]).transpose().reshape(-1,252,1))
            labels_for_training.append(data['class_name'])
            
            ids.append(i)

In [None]:
signatures = np.array(signatures)

In [None]:
plt.imshow(signatures[-1])

In [None]:
signatures[-1].shape

In [None]:
params = ['azimuth', 'velocity', 'snr_db']

parameters = []

for i, data in enumerate(dataset):
    if i in ids:
        data_params = data['radar_parameters']
        for param in params:       
            data_params[f'{param}_mean'] = data[param].mean()
            data_params[f'{param}_min'] = data[param].min()
            data_params[f'{param}_max'] = data[param].max()
            data_params[f'{param}_std'] = data[param].std()
        parameters.append(data_params)

In [None]:
merged_data = pd.concat([pd.DataFrame(parameters),pd.DataFrame(signatures.reshape(-1, 76*252))], axis=1, ignore_index=True)

## Divde dataset to train and test set

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    merged_data, labels_for_training, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(
    X_test, y_test, test_size=0.5, random_state=42)

In [None]:
X_train.shape, X_test.shape

In [None]:
encoder_labels = LabelEncoder()
encoder_labels = encoder_labels.fit(['person', 'bicycle', 'uav', 'vehicle'])
print("encoded labels")
print(
        f"Class 'person' is {encoder_labels.transform(['person'])[0]} and "
        f"class 'bicycle' is {encoder_labels.transform(['bicycle'])[0]} and "
        f"Class 'uav' is {encoder_labels.transform(['uav'])[0]} and "
        f"class 'vehicle' is {encoder_labels.transform(['vehicle'])[0]}")
labels_ = encoder_labels.transform(labels_for_training)
y_train_ = encoder_labels.transform(y_train)
y_test_ = encoder_labels.transform(y_test)
y_val_ = encoder_labels.transform(y_val)

In [None]:
encoder_labels = OneHotEncoder(handle_unknown='ignore')
encoder_labels = encoder_labels.fit(np.array(labels_).reshape(-1,1))
y_train = encoder_labels.transform(np.array(y_train_).reshape(-1,1))
y_test = encoder_labels.transform(np.array(y_test_).reshape(-1,1))
y_val = encoder_labels.transform(np.array(y_val_).reshape(-1,1))

In [None]:
X_test.shape

In [None]:

y_train = y_train.toarray()
y_test = y_test.toarray()
y_val = y_val.toarray()

## Train simple dense autoencoder

In [None]:

latent_dim = 32
activators = 'relu'

input_img = Input(shape=(76 * 252))
encoded_dense = Dense(units=1024, activation=activators)(input_img)
encoded_dense = Dense(units=256, activation=activators)(encoded_dense)
encoded_dense = Dense(units=64, activation=activators)(encoded_dense)
encoded_dense = Dense(units=latent_dim, activation='sigmoid')(encoded_dense)
decoded_dense = Dense(units=64, activation=activators)(encoded_dense)
decoded_dense = Dense(units=256, activation=activators)(decoded_dense)
decoded_dense = Dense(units=1024, activation=activators)(decoded_dense)
decoded_dense = Dense(units=76 * 252, activation='sigmoid')(decoded_dense)

autoencoder_dense = Model(input_img, decoded_dense)
encoder_dense = Model(input_img, encoded_dense)


In [None]:
autoencoder_dense.compile(optimizer='adam', loss='mean_squared_error', metrics=['mean_squared_error'])
train_spectra_dense = np.array(X_train[X_train.columns[20:]]).reshape(-1,76 * 252)
test_spectra_dense = np.array(X_test[X_test.columns[20:]]).reshape(-1,76 * 252)

autoencoder_dense.fit(train_spectra_dense, train_spectra_dense,
                    epochs=20,
                    batch_size=128,
                    shuffle=True,
                    validation_data=(test_spectra_dense, test_spectra_dense))

In [None]:
test_latent_dense = encoder_dense.predict(test_spectra_dense)
train_latent_dense = encoder_dense.predict(train_spectra_dense)

In [None]:
merged_train_data_dense = pd.np.column_stack([X_train[X_train.columns[:20]],pd.DataFrame(train_latent_dense.reshape(-1, 32))])
merged_test_data_dense = pd.np.column_stack([X_test[X_test.columns[:20]],pd.DataFrame(test_latent_dense.reshape(-1, 32))])

In [None]:
model = XGBClassifier(use_label_encoder=False)
model.fit(merged_train_data_dense, y_train_, eval_metric='mlogloss')

In [None]:
predicted = model.predict(merged_test_data_dense)
results_df = pd.DataFrame(zip(predicted, y_test), columns=['predicted', 'true'])
report_pp = classification_report(y_test_, predicted, target_names=['person', 'bicycle', 'uav', 'vehicle'])
print(report_pp)

cm = confusion_matrix(y_test_, predicted, labels=[0,1,2,3])
cm_display = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['person', 'bicycle', 'uav', 'vehicle'])
cm_display.plot()

In [None]:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(merged_test_data_dense)
shap.summary_plot(shap_values, merged_test_data_dense, show=True)

## CNN autoencoder

In [None]:
input_img = Input(shape=(76, 252, 1))

x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_img)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
encoded = MaxPooling2D((2, 2), padding='same')(x)

# at this point the representation is (4, 4, 8) i.e. 128-dimensional

x = Conv2D(8, (3, 3), activation='relu', padding='same')(encoded)
x = UpSampling2D((2, 2))(x)
x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
x = Conv2D(16, (3, 3), activation='relu')(x)
x = UpSampling2D((2, 2))(x)
decoded = Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)

autoencoder = Model(input_img, decoded)
encoder = Model(input_img, encoded)

In [None]:
autoencoder.compile(optimizer='adam', loss='mean_squared_error', metrics=['mean_squared_error'])
train_spectra = np.array(X_train[X_train.columns[20:]]).reshape(-1,76,252)
test_spectra = np.array(X_test[X_test.columns[20:]]).reshape(-1,76,252)

autoencoder.fit(train_spectra, train_spectra,
                    epochs=20,
                    batch_size=128,
                    shuffle=True,
                    validation_data=(test_spectra, test_spectra))

In [None]:
test_latent = encoder.predict(test_spectra)
train_latent = encoder.predict(train_spectra)

In [None]:
test_latent.shape, train_latent.shape

In [None]:
merged_train_data = pd.np.column_stack([X_train[X_train.columns[:20]],pd.DataFrame(train_latent.reshape(-1, 320*8))])
merged_test_data = pd.np.column_stack([X_test[X_test.columns[:20]],pd.DataFrame(test_latent.reshape(-1, 320*8))])

In [None]:
merged_test_data.shape, len(y_test_), X_test.shape

In [None]:
merged_train_data.shape, len(y_train_), X_train.shape

In [None]:
model = XGBClassifier(use_label_encoder=False)
model.fit(merged_train_data, y_train_, eval_metric='mlogloss')

In [None]:
predicted = model.predict(merged_test_data)
results_df = pd.DataFrame(zip(predicted, y_test), columns=['predicted', 'true'])
report_pp = classification_report(y_test_, predicted, target_names=['person', 'bicycle', 'uav', 'vehicle'])
print(report_pp)

cm = confusion_matrix(y_test_, predicted, labels=[0,1,2,3])
cm_display = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['person', 'bicycle', 'uav', 'vehicle'])
cm_display.plot()

In [None]:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(merged_test_data)
shap.summary_plot(shap_values, merged_test_data, show=True)