In [1]:
import numpy as np
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import accuracy_score
from scipy.stats import uniform, randint


###PRE-PROCESSING

In [22]:
import zipfile
import os

zip_file_path = '/content/processed_files_separated (2).zip'  # Adjust this path as needed

extract_dir = '/content/processed_files_separated(2)'

os.makedirs(extract_dir, exist_ok=True)

# Extract the zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)


In [23]:

# Define the paths to the directories containing the radar data
class_dirs = {
    'no_human': '/content/processed_files_separated(2)/processed_files_separated/N',
    'human_approaching': '/content/processed_files_separated(2)/processed_files_separated/T',
    'human_going_away': '/content/processed_files_separated(2)/processed_files_separated/A'
}


In [24]:

chunk_size = 350

def process_data(file_path):
    with open(file_path, 'r') as file:
        data = []
        for line in file:
            try:
                data.append(float(line.strip()))
            except ValueError:
                continue
        data = np.array(data)

    # Truncate or pad the data to have exactly chunk_size elements
    if len(data) > chunk_size:
        data = data[-chunk_size:]
    elif len(data) < chunk_size:
        pad_size = chunk_size - len(data)
        data = np.concatenate([data, np.full(pad_size, data.mean())])

    return data


In [25]:
class_to_label = {class_name: i for i, class_name in enumerate(class_dirs.keys())}

In [26]:
data = []
labels = []

for class_name, dir_path in class_dirs.items():
    label = class_to_label[class_name]
    for file_name in os.listdir(dir_path):
        file_path = os.path.join(dir_path, file_name)
        processed_data = process_data(file_path)
        data.append(processed_data)
        labels.append(label)

data = np.array(data)
labels = np.array(labels)

# Normalize the data
data = (data - data.mean()) / data.std()


In [28]:

# Split data into training, validation, and test sets
train_data, temp_data, train_labels, temp_labels = train_test_split(data, labels, test_size=0.2, random_state=42)
val_data, test_data, val_labels, test_labels = train_test_split(temp_data, temp_labels, test_size=0.2, random_state=42)


In [29]:
# Create dataset class
class RadarDataset(Dataset):
    def __init__(self, data, labels):
        self.data = torch.tensor(data, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]


In [30]:

# Create dataset and dataloaders
train_dataset = RadarDataset(train_data, train_labels)
val_dataset = RadarDataset(val_data, val_labels)
test_dataset = RadarDataset(test_data, test_labels)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


###DL MODELS

In [31]:
######-----3
# Define the LSTM model with dropout and increased complexity
class LSTMClassifier(nn.Module):
    def __init__(self, input_size = 1, hidden_size = 128, num_layers = 3, num_classes = 3, dropout_prob=0.5):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout_prob, bidirectional=True)
        self.fc1 = nn.Linear(hidden_size*2, 128)
        self.fc2 = nn.Linear(128, num_classes)
        #self.fc3 = nn.Linear(64, num_classes)

        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)  # *2 for bidirectional
        c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x.unsqueeze(-1), (h0, c0))  # Adding a feature dimension
        out = self.dropout(out[:, -1, :])
        out = torch.relu(self.fc1(out))
        #out = torch.relu(self.fc2(out))
        out = self.fc2(out)
        return out

# Hyperparameters
input_size = 1  # Since we only have one feature per time step
hidden_size = 128  # Increased hidden size
num_layers = 3  # Increased number of layers
num_classes = 3
num_epochs = 100  # Increased number of epochs
learning_rate = 0.001
dropout_prob = 0.5


In [32]:
# Define the RNN model with dropout and increased complexity
class RNNClassifier(nn.Module):
    def __init__(self, input_size = 1, hidden_size = 128, num_layers = 3, num_classes = 3, dropout_prob=0.5):
        super(RNNClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout_prob, bidirectional=True)
        self.fc1 = nn.Linear(hidden_size*2, 128)
       # self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)  # *2 for bidirectional

        out, _ = self.rnn(x.unsqueeze(-1), h0)  # Adding a feature dimension
        out = self.dropout(out[:, -1, :])
        out = torch.relu(self.fc1(out))
        #out = torch.relu(self.fc2(out))
        out = self.fc3(out)
        return out
# Hyperparameters
input_size = 1  # Since we only have one feature per time step
hidden_size = 128  # Increased hidden size
num_layers = 3  # Increased number of layers
num_classes = 3
num_epochs = 100  # Increased number of epochs
learning_rate = 0.001
dropout_prob = 0.5


In [None]:
# Define the GRU model
class GRUClassifier(nn.Module):
    def __init__(self, input_size=1, hidden_size=128, num_layers=3, num_classes=3, dropout_prob=0.5):
        super(GRUClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout_prob, bidirectional=True)
        self.fc1 = nn.Linear(hidden_size*2, 128)
        self.batch_norm1 = nn.BatchNorm1d(128)
        self.fc2 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)  # *2 for bidirectional
        out, _ = self.gru(x.unsqueeze(-1), h0)  # Adding a feature dimension
        out = self.dropout(out[:, -1, :])
        out = torch.relu(self.batch_norm1(self.fc1(out)))
        out = self.fc2(out)
        return out

input_size = 1  # Since we only have one feature per time step
hidden_size = 128  # Increased hidden size
num_layers = 3  # Increased number of layers
num_classes = 3
num_epochs = 100  # Increased number of epochs
learning_rate = 0.001
dropout_prob = 0.5


In [34]:

# Training function
def train_model(model, train_loader, val_loader, num_epochs=100, learning_rate=0.001):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    best_val_loss = float('inf')
    early_stopping_patience = 100
    no_improvement_epochs = 0
    best_model_path = f'{model.__class__.__name__}_best_model.pth'

    for epoch in range(num_epochs):
        model.train()
        correct1 = 0
        total1 = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, predicted1 = torch.max(outputs.data, 1)
            total1 += labels.size(0)
            correct1 += (predicted1 == labels).sum().item()
            accuracy1 = 100 * correct1 / total1

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            val_loss = 0
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            val_loss /= len(val_loader)
            accuracy = 100 * correct / total
            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {loss.item():.4f}, Train Accuracy: {accuracy1:.2f}%, Val Loss: {val_loss:.4f}, Val Accuracy: {accuracy:.2f}%')

            # Early stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model.state_dict(), best_model_path)
                no_improvement_epochs = 0
            else:
                no_improvement_epochs += 1
                if no_improvement_epochs >= early_stopping_patience:
                    print("Early stopping due to no improvement in validation loss")
                    break

        scheduler.step(val_loss)  # Adjust the learning rate based on validation loss

    model.load_state_dict(torch.load(best_model_path))
    return model


In [35]:

# Train RNN, LSTM, and GRU models
rnn_model = RNNClassifier()
lstm_model = LSTMClassifier()
gru_model = GRUClassifier()

rnn_model = train_model(rnn_model, train_loader, val_loader)
lstm_model = train_model(lstm_model, train_loader, val_loader)
gru_model = train_model(gru_model, train_loader, val_loader)


Epoch [1/100], Train Loss: 0.8881, Train Accuracy: 44.63%, Val Loss: 0.9510, Val Accuracy: 52.24%
Epoch [2/100], Train Loss: 0.9613, Train Accuracy: 50.75%, Val Loss: 0.9240, Val Accuracy: 55.97%
Epoch [3/100], Train Loss: 1.0733, Train Accuracy: 40.60%, Val Loss: 1.0869, Val Accuracy: 26.87%
Epoch [4/100], Train Loss: 0.7926, Train Accuracy: 37.01%, Val Loss: 0.9513, Val Accuracy: 55.97%
Epoch [5/100], Train Loss: 0.7379, Train Accuracy: 48.51%, Val Loss: 0.9146, Val Accuracy: 57.46%
Epoch [6/100], Train Loss: 0.7881, Train Accuracy: 48.81%, Val Loss: 0.9090, Val Accuracy: 55.97%
Epoch [7/100], Train Loss: 0.7870, Train Accuracy: 52.54%, Val Loss: 0.8861, Val Accuracy: 55.97%
Epoch [8/100], Train Loss: 0.7880, Train Accuracy: 56.42%, Val Loss: 0.8705, Val Accuracy: 60.45%
Epoch [9/100], Train Loss: 0.8159, Train Accuracy: 58.96%, Val Loss: 0.8593, Val Accuracy: 61.19%
Epoch [10/100], Train Loss: 0.7327, Train Accuracy: 49.55%, Val Loss: 0.9073, Val Accuracy: 47.01%
Epoch [11/100], Tra

In [36]:
# Ensemble prediction
def ensemble_predict(models, test_loader):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    all_outputs = []
    for model in models:
        model.to(device)
        model.eval()
        outputs = []
        with torch.no_grad():
            for inputs, _ in test_loader:
                inputs = inputs.to(device)
                output = model(inputs)
                outputs.append(output.cpu().numpy())
        all_outputs.append(np.concatenate(outputs))

    # Average the outputs
    averaged_outputs = np.mean(all_outputs, axis=0)
    ensemble_predictions = np.argmax(averaged_outputs, axis=1)
    return ensemble_predictions


In [37]:

# Evaluate ensemble model
models = [rnn_model, lstm_model, gru_model]
ensemble_predictions = ensemble_predict(models, test_loader)
ensemble_accuracy = accuracy_score(test_labels, ensemble_predictions)
print(f'Ensemble Test Accuracy: {ensemble_accuracy:.2f}%')

Ensemble Test Accuracy: 0.85%
