In [1]:
import numpy as np
import cv2, os
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
from scipy.signal import find_peaks

In [2]:
rawData1 = np.load('data_12_14_2023_16_14_27.npy')
rawData2 = np.load('data_12_14_2023_16_16_41.npy')
data1 = np.array([frame["adcSamples"][:, 128:] for frame in rawData1])
data2 = np.array([frame["adcSamples"][:, 128:] for frame in rawData2])

In [29]:
def l1_norm(matrix):
    msum = np.sum(matrix)
    return matrix / msum

def get_highest_n_idxs(idxs, heights, n):
    srtd_idxs = np.argsort(heights['peak_heights'])
    return idxs[srtd_idxs][:n]

def avg_dist(array):
    if len(array) == 1:
        return array[0]
    total = 0
    for i in range(len(array) - 1):
        total += abs(array[i] - array[i+1])
    return total / (len(array) - 1)

def get_avg_peak_idxs_dist(peaks_list):
    total = 0
    for peaks in peaks_list:
        total += avg_dist(peaks)
    return total / len(peaks_list)

def get_num_values(list_of_lists):
    total = 0
    for vals in list_of_lists:
        total += len(vals)
    return total

num_peaks = 10
def get_features(matrix):
    norm_matrix = l1_norm(matrix)
    matrix_mean = np.mean(norm_matrix)
    matrix_std = np.std(norm_matrix)
    horiz_peak_idxs_list = []
    horiz_peaks_list = []
    vert_peak_idxs_list = []
    vert_peaks_list = []
    for i in range(matrix.shape[0]):
        horiz_peak_idxs, horiz_peaks = find_peaks(norm_matrix[i], height=(matrix_mean + matrix_std))
        horiz_peaks_list.append(horiz_peaks['peak_heights'])
        horiz_peak_idxs_list.append(get_highest_n_idxs(horiz_peak_idxs, horiz_peaks, num_peaks))
    for i in range(matrix.shape[1]):
        vert_peak_idxs, vert_peaks = find_peaks(norm_matrix[:, i], height=(matrix_mean + matrix_std))
        vert_peaks_list.append(vert_peaks['peak_heights'])
        vert_peak_idxs_list.append(get_highest_n_idxs(vert_peak_idxs, vert_peaks, num_peaks))
    
    num_horiz_peaks = get_num_values(horiz_peak_idxs_list)
    num_vert_peaks = get_num_values(vert_peak_idxs_list)
    features = [
        get_avg_peak_idxs_dist(horiz_peak_idxs_list),  # average horizontal peak idx distance
        get_avg_peak_idxs_dist(vert_peak_idxs_list),   # average vertical peak idx distance
        num_horiz_peaks                                # stringiness
    ]
    return features

def divide_data_matrix(whole_matrix, bit_chirp_size):
    bit_matrices = np.array_split(whole_matrix, len(whole_matrix) // bit_chirp_size)
    return bit_matrices

data1_f = data1[:, 0, :]
data2_f = data2[:, 0, :]

labelled_data = []
print("Forward and Back")
bit_matrices = divide_data_matrix(data1_f, 20)
for bit_matrix in bit_matrices:
    fft1_range = np.abs(np.fft.fft(bit_matrix, axis=1))[:, 15:64]
    fft1_vel = np.abs(np.fft.fft(fft1_range, axis=0))[:, :]
    labelled_data.append([get_features(fft1_range), 1])
    print(get_features(fft1_range))

print("Side to Side")
bit_matrices = divide_data_matrix(data2_f, 20)
for bit_matrix in bit_matrices:
    fft2_range = np.abs(np.fft.fft(bit_matrix, axis=1))[:, 15:64]
    fft2_vel = np.abs(np.fft.fft(fft2_range, axis=0))[:, :]
    labelled_data.append([get_features(fft2_range), 0])
    print(get_features(fft2_range))

fig, ax = plt.subplots(1, 2, figsize=(20, 40))
ax[0].imshow(l1_norm(fft1_range), cmap='inferno', aspect='auto')
ax[0].set_title("Forward and Back")
ax[0].set_ylabel("Chirps")
ax[0].set_xlabel("Range Bin")
ax[0].set_aspect('equal', adjustable='box')

ax[1].imshow(l1_norm(fft2_range), cmap='inferno', aspect='auto')
ax[1].set_title("Side to Side")
ax[1].set_ylabel("Chirps")
ax[1].set_xlabel("Range Bin")
ax[1].set_aspect('equal', adjustable='box')

**Writing Train Files**

In [4]:
# X, y  = [], []
# for i in range(len(labelled_data)):
#     X.append(labelled_data[i][0])
#     y.append(labelled_data[i][1])


# with open('test_labels.txt', 'w') as f:
#     for i in y:
#         f.write(f'{i}\n')
# with open('test_features.txt', 'w') as f:
#     for i in X:
#         f.write(str(i) + '\n')

**Reading Train Files**

In [5]:
import ast

with open('train_labels.txt', 'r') as f:
    y = f.readlines()

with open('features.txt', 'r') as f:
    X = f.readlines()

for i in range(len(X)):
    if X[i][-1] == '\n':
        X[i] = X[i][:len(X[i])-1]
    X[i] = ast.literal_eval(X[i])
    X[i] = np.array(X[i])
    
    y[i] = int(y[i][0])

**Training SVM**

In [27]:
X_train = np.array(X)    
y_train = np.array(y)

clf = SVC(kernel='linear')
clf.fit(X_train, y_train)

**Helper Functions**

In [26]:
def split_string_by_length(input_string, chunk_length):
    return [input_string[i:i+chunk_length] for i in range(0, len(input_string), chunk_length)]

def binary_to_string(bits):
    string = split_string_by_length(bits, 8)
    return ''.join([chr(int(i, 2)) for i in string])

values = '0110100001100101011011000110110001101111'
binary_to_string(values)

In [8]:
def print_summary(features, predictions, real):
    print(f'Features: {features}, Prediction: {predictions[0]}, Real: {real}')
def show_accuracy(predicted, real):
    print(f'Accuracy: {accuracy_score(predicted, real) * 100}%')

**Test Set Evaluation**

In [22]:
with open('test_labels.txt', 'r') as f:
    test_values = f.read().replace('\n', '')

In [23]:
with open('test_features.txt', 'r') as f:
    features = f.readlines()

for i in range(len(features)):
    if features[i][-1] == '\n':
        features[i] = features[i][:len(features[i])-1]
    features[i] = ast.literal_eval(features[i])
    features[i] = np.array(features[i])

features = np.array(features) 

In [25]:
y_pred = clf.predict(features)
test_list = list(test_values)
for i in range(len(test_list)):
    test_list[i] = int(test_list[i])
test_list = np.array(test_list)
show_accuracy(y_pred, test_list)

**'hello' string test**

In [12]:
rawTestData = np.load('data_12_14_2023_17_36_04.npy')
testData = np.array([frame["adcSamples"][:, 128:] for frame in rawTestData])

In [20]:
testData_f = testData[:, 0, :]
bit_matrices = divide_data_matrix(testData_f, 20)
bin_values = '0110100001100101011011000110110001101111' # bits for 'hello', change for other target string

final = []
for i, bit_matrix in enumerate(bit_matrices):
    fft1_range = np.abs(np.fft.fft(bit_matrix, axis=1))[:, 15:64]
    features = get_features(fft1_range)
    y_pred = clf.predict([features])
    final.append(y_pred[0])
    print_summary(features, y_pred, bin_values[i])
print('\n')

bin_list = list(bin_values)
for i in range(len(bin_list)):
    bin_list[i] = int(bin_list[i])
bin_list = np.array(bin_list)
final = np.array(final)
show_accuracy(bin_list, final)
final_message = binary_to_string(''.join(str(i) for i in final))
print(f'Decoded Message: {final_message}') #ã

**Raspberry Pi- UDP Client**

In [19]:
import socket

ip_addr = "172.16.99.107"    # IP address of Raspberry Pi 17, run udp_server.py on Pi
port_num = 5000

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # UDP
sock.sendto(bytes(final_message, "utf-8"), (ip_addr, port_num))

**Miscellaneous**

In [15]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# import torch.optim as optim
# from torch.utils.data import TensorDataset, DataLoader
# from tqdm import tqdm

# class Model(nn.Module):
#     def __init__(self, input_size, output_size):
#         super(Model, self).__init__()
#         #self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
#         self.fc = nn.Linear(input_size, output_size)
#         self.sigmoid = nn.Sigmoid()
    
#     def forward(self, x):
#         x = self.fc(x)
#         x = self.sigmoid(x)
#         return x.squeeze()

# input_size = 3  # Number of features
# output_size = 1 

# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15)

# model = Model(input_size, output_size)
# criterion = nn.BCELoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)

# train_set = TensorDataset(torch.Tensor(X_train), torch.Tensor(y_train))
# train_loader = DataLoader(train_set, batch_size=40, shuffle=True)

# num_epochs = 20

# for epoch in tqdm(range(num_epochs)):
#     model.train()
#     for features, labels in train_loader:
#         outputs = model(features)
#         print(outputs)
#         print(labels)
#         loss = criterion(outputs, labels)

#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

In [16]:
# model.eval()
# test_loss = 0
# test_correct = 0
# test_total = 0
# threshold = 0.5
# X_test = torch.Tensor(X_test)
# y_test = torch.Tensor(y_test)

# model.eval()
# with torch.no_grad():
#     test_outputs = model(X_test)

# # Discretize the output using a threshold (e.g., 0.5)
# threshold = 0.5
# predicted_labels = (test_outputs > threshold).float()


# # Calculate accuracy on the test set
# accuracy = torch.sum(predicted_labels == y_test) / len(y_test)
# print(f"Test Accuracy: {accuracy * 100:.2f}%")
# print(f'Predicted: {predicted_labels}')
# print(f'Real:      {y_test}')