In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelBinarizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import seaborn as sn
from matplotlib.colors import ListedColormap
import matplotlib.pyplot as plt
from mlxtend.plotting import plot_decision_regions

import pandas as pd
import numpy as np
import os
import glob


### *functions*

In [None]:
def get_label_from_filename(filename): 
    if "Phantom" in filename: return "Phantom"
    elif "Ruko_F11_Pro" in filename: return "Ruko_F11_Pro"
    elif "Ruko_F11_base" in filename: return "Ruko_F11_base"
    elif "Mavic_Air_2_" in filename: return "Mavic_Air_2"
    elif "Mavic_Air_2S" in filename: return "Mavic_Air_2S"
    elif "DeerC" in filename: return "DeerC_DE2"
    elif "Mini_SE" in filename: return "Mini_SE"
    elif "Holystone_HS100" in filename: return "Holystone_HS100"
    elif "None" in filename: return "None"
    else: return "UNKNOWN_LABEL"



def load_iq(f, chunk_size, every_n_rows):
    # The chunking in this algo is so that we can read in large files (156M lines or more). 
    # In order to reduce compution time, we skip n number of rows. 
    result = pd.DataFrame()

    for chunk in pd.read_csv(f, chunksize=chunk_size):
        result = pd.concat([result, chunk.iloc[::every_n_rows, :]], ignore_index=True)

    # print("THE DF:\n", result)
    result = result[['i', 'q']].to_numpy()
    return result


# metric's functions 
def checkForFilename(base_name): 
    ext = ".txt"
    i = 1
    filename = f"{base_name}{ext}"
    while os.path.exists(filename):
        filename = f"{base_name}_{i}{ext}"
        i += 1
    return filename

def saveMetricsToFile(base_name, perc_accuracy, model): 
    filename = checkForFilename(base_name)
    with open(filename, "w") as f:
        f.write(f"Model: {model}")
        f.write(f"Accuracy: {perc_accuracy:.2f}%\n\n")
        f.write("The important vars:\n")
        f.write(f"window_size: {window_size}\n")
        f.write(f"step: {step}\n")
        f.write(f"every n rows: {every_n_rows}\n")
        f.write(f"chunk size: {chunk_size}\n\n")
        f.write("Classification Report:\n")
        f.write(f"{cr}\n\n")
        f.write("Confusion Matrix:\n")
        f.write(f"{cm}\n")

### *variables*

In [None]:
data_dir = '/Volumes/DRIVE 128GB/iqSamples_Ruko_F11_Pro.csv'
drone_csv_files = glob.glob(os.path.join(data_dir, "*.csv"))
print(drone_csv_files)

chunk_size = 10_000 # how many rows to hold in memory at a time
every_n_rows = 1000

# Example: sample windows with 50% overlap
# window_size = 1024 # number of iq samples per chunk (window_size=1 is 1 sample, i,q)
window_size = 1024 # number of iq samples per chunk (window_size=1 is 1 sample, i,q)
window_size = 2048 # number of iq samples per chunk (window_size=1 is 1 sample, i,q)
window_size = 4096 # number of iq samples per chunk (window_size=1 is 1 sample, i,q)
window_size = 8192 # number of iq samples per chunk (window_size=1 is 1 sample, i,q)
# window_size = 16_384 # number of iq samples per chunk (window_size=1 is 1 sample, i,q)
# window_size = 32_768 # number of iq samples per chunk (window_size=1 is 1 sample, i,q)

# step = 8192
# step = 4096
# step = 2048
# step = 1024
step = 512

skipSVC = False
skipRF = True

## Loading Dataset

### *Reading in files*

In [None]:
X = []
y = []

segments = []
labels = []


# create an array of dataframes.
# each dataframe containing iq samples from one file
for file in drone_csv_files:
    print("File: ", file)
    label = get_label_from_filename(file)
    # print("Label: ", label)
    iq = load_iq(file, chunk_size, every_n_rows)  # shape (N, 2)
    # print("File:\n", iq)
    for i in range(0, len(iq)-window_size, step):
        window = iq[i:i+window_size].flatten()
        segments.append(window)
        labels.append(label)
    # print("Segments:\n", segments)
    # print("Labels:\n", labels)

X = np.array(segments)
y = np.array(labels)

### *Displaying things*

In [None]:
print("Segments:")
print("Shape: ", X.shape)
print(X)
print("-------------------------------------------------------------------")
print("Labels:")
print("Shape: ", y.shape)
print(y)

In [None]:
# print("labels: ")
# print(np.unique(y))
# print()

for sample, label in zip(X, np.unique(y)): 
    print("X feature:\n", sample)
    print("IQ samples in X feature: ", int(sample.size / 2))
    print("y label: ", label)
    print('------------------------------------------------------')

## Support Vector Classifier 

### *Training, fitting & predicting*

In [None]:
# Train/test split (e.g. 80/20)
 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

# Standardize features (zero mean, unit variance)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [None]:
# Fit and predict

SVCModel = SVC(kernel='rbf', C=10, gamma='scale', max_iter=160_000)  # tweak C/gamma as needed
SVCModel.fit(X_train, y_train)
y_pred = SVCModel.predict(X_test)


### *Metrics*

In [None]:

accuracy = accuracy_score(y_test, y_pred)
perc_accuracy = accuracy * 100
print(f"Accuracy: {perc_accuracy:.2f}%")
print()
print("The important vars: ")
print("window_size: ", window_size)
print("step: ", step)
print("every n rows: ", every_n_rows)
print("chunk size: ", chunk_size)
print()

cr = classification_report(y_test, y_pred)
cm = confusion_matrix(y_test, y_pred)
print(cr)
print(cm)

saveMetricsToFile('svc_metrics', perc_accuracy, 'Support Vector Classifier')

In [None]:
# Create a heatmap of the confusion matrix

plt.figure(figsize=(7, 7))
sn.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=np.unique(labels), yticklabels=np.unique(labels))

# Labels and title
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Heatmap for SVC Predictions')

plt.show()
