<a href="https://colab.research.google.com/github/IshmaelRogers/ML_Project_2024/blob/main/mlproj.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

# Constants
sampling_rate = 100  # Hz
duration = 400  # seconds
time = np.arange(0, duration, 1/sampling_rate)

# Function to generate synthetic IMU data
def generate_trajectory(amplitude, frequency, phase_shift):
    return amplitude * np.sin(2 * np.pi * frequency * time + phase_shift)

# Simulate different motion patterns
amplitudes = [1, 2, 0.5, 1.5, 2.5, 3]
frequencies = [0.1, 0.2, 0.15, 0.05, 0.3, 0.25]
phase_shifts = [0, np.pi/4, np.pi/2, 3*np.pi/4, np.pi, 5*np.pi/4]

# Generate trajectories
trajectories = []
for amp, freq, phase in zip(amplitudes, frequencies, phase_shifts):
    trajectory = generate_trajectory(amp, freq, phase)
    trajectories.append(trajectory)

# Stack trajectories for accelerometer (3-axis) and gyroscope (3-axis)
accel_data = np.vstack((trajectories, trajectories, trajectories)).T
gyro_data = np.vstack((trajectories, trajectories, trajectories)).T

# Add noise characteristics
def add_noise(data, noise_level):
    noise = noise_level * np.random.randn(*data.shape)
    return data + noise

noise_levels = np.linspace(0.001, 0.025, 15)
noisy_datasets = []
ground_truth_labels = []

for noise_level in noise_levels:
    noisy_accel_data = add_noise(accel_data, noise_level)
    noisy_gyro_data = add_noise(gyro_data, noise_level)
    noisy_datasets.append((noisy_accel_data, noisy_gyro_data))
    ground_truth_labels.append(noise_level)

# Combine all datasets and ground truth labels
data = np.vstack([np.hstack((acc, gyro)) for acc, gyro in noisy_datasets])
labels = np.repeat(ground_truth_labels, accel_data.shape[0], axis=0)

# Normalize the data
scaler = StandardScaler()
data = scaler.fit_transform(data)

# Save the dataset
output_dir = "simulated_datasets"
os.makedirs(output_dir, exist_ok=True)

np.save(os.path.join(output_dir, "data.npy"), data)
np.save(os.path.join(output_dir, "labels.npy"), labels)


In [None]:
import numpy as np

# Load the .npy file
file_path = "simulated_datasets/noisy_accel_data_0.npy"  #
data = np.load(file_path)

# Display the contents
print("Data shape:", data.shape)
print("Data contents:\n", data)

#
print("Data contents (first 10 rows):\n", data[:10])


Data shape: (40000, 18)
Data contents:
 [[-1.58128165e-03  1.41420565e+00  4.99502553e-01 ...  1.06109692e+00
  -1.83406368e-03 -2.12161112e+00]
 [ 6.26383895e-03  1.43161120e+00  4.99766879e-01 ...  1.05693954e+00
  -4.78414260e-02 -2.15215245e+00]
 [ 1.06939478e-02  1.44895221e+00  5.00316744e-01 ...  1.05284044e+00
  -9.46603346e-02 -2.18762413e+00]
 ...
 [-1.82250696e-02  1.35844734e+00  5.00043968e-01 ...  1.07055178e+00
   1.41389526e-01 -2.01960533e+00]
 [-1.09408095e-02  1.37689902e+00  4.97840151e-01 ...  1.06555959e+00
   9.53505535e-02 -2.05371363e+00]
 [-5.17364875e-03  1.39573595e+00  4.99268336e-01 ...  1.06465506e+00
   4.81040437e-02 -2.08899516e+00]]
Data contents (first 10 rows):
 [[-1.58128165e-03  1.41420565e+00  4.99502553e-01  1.06259141e+00
  -2.77054266e-03 -2.12046681e+00  7.00260194e-04  1.41442079e+00
   5.00333292e-01  1.05999189e+00 -3.70575663e-05 -2.12316244e+00
  -9.56215401e-04  1.41523602e+00  5.02403347e-01  1.06109692e+00
  -1.83406368e-03 -2.1216111

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# DNN Class with Adjusted Kernel Sizes and Dropout
class DNN(nn.Module):
    def __init__(self):
        super(DNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=6, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.dropout = nn.Dropout(0.5)
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(256, 100)
        self.fc2 = nn.Linear(100, 80)
        self.fc3 = nn.Linear(80, 50)
        self.fc4 = nn.Linear(50, 20)
        self.fc5 = nn.Linear(20, 6)  # 6 output values for process noise variances

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = self.dropout(x)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.relu(self.fc4(x))
        x = self.fc5(x)  # No activation in the final layer
        return x

# Helper function to create data loaders
def create_data_loaders(train_data, train_labels, val_data, val_labels, batch_size):
    train_dataset = torch.utils.data.TensorDataset(torch.tensor(train_data, dtype=torch.float32),
                                                   torch.tensor(train_labels, dtype=torch.float32))
    val_dataset = torch.utils.data.TensorDataset(torch.tensor(val_data, dtype=torch.float32),
                                                 torch.tensor(val_labels, dtype=torch.float32))
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, val_loader

# Function to train the DNN with Learning Rate Finder
def train_dnn(model, train_loader, val_loader, num_epochs=30, initial_lr=0.001):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=initial_lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs = inputs.view(inputs.size(0), 6, -1)  # Change shape to (batch_size, num_channels, sequence_length)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.view(inputs.size(0), 6, -1)  # Change shape to (batch_size, num_channels, sequence_length)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

        scheduler.step(val_loss)
        print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {running_loss/len(train_loader):.4f}, Validation Loss: {val_loss/len(val_loader):.4f}, Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}')

    print('Training complete')
    return model

# Load the simulated data and ground truth
output_dir = "simulated_datasets"
noisy_datasets = []
ground_truth_labels = []

for i in range(15):
    noisy_accel_data = np.load(os.path.join(output_dir, f"noisy_accel_data_{i}.npy"))
    noisy_gyro_data = np.load(os.path.join(output_dir, f"noisy_gyro_data_{i}.npy"))
    ground_truth = np.load(os.path.join(output_dir, f"ground_truth_{i}.npy"))
    noisy_data = np.hstack((noisy_accel_data, noisy_gyro_data))
    noisy_datasets.append(noisy_data)
    ground_truth_labels.append(ground_truth)

# Combine all datasets
data = np.vstack(noisy_datasets)
labels = np.vstack(ground_truth_labels)

# Normalize the data
scaler = StandardScaler()
data = scaler.fit_transform(data.reshape(-1, data.shape[-1])).reshape(data.shape)

# Ensure sequence length is appropriate
sequence_length = data.shape[1] // 6
print(f"Calculated sequence length: {sequence_length}")

# Split the data into training, validation, and test sets
train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, random_state=42)
train_data, val_data, train_labels, val_labels = train_test_split(train_data, train_labels, test_size=0.25, random_state=42)

# Reshape data to ensure correct input dimensions
train_data = train_data.reshape(train_data.shape[0], 6, -1)
val_data = val_data.reshape(val_data.shape[0], 6, -1)
test_data = test_data.reshape(test_data.shape[0], 6, -1)

# Create data loaders
batch_size = 500
train_loader, val_loader = create_data_loaders(train_data, train_labels, val_data, val_labels, batch_size)

# Initialize and train the DNN
model = DNN()
model = train_dnn(model, train_loader, val_loader, num_epochs=30, initial_lr=0.001)

# Save the trained model
torch.save(model.state_dict(), 'dnn_model.pth')


Calculated sequence length: 6




Epoch [1/30], Training Loss: 0.0003, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [2/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [3/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [4/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [5/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [6/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [7/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [8/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [9/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [10/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [11/30], Training Loss: 0.0001, Validation Loss: 0.0001, Learning Rate: 0.001000
Epoch [12/30], Training Loss: 0.0000, Validation Los

In [None]:
import numpy as np

class HybridAdaptiveESKF:
    def __init__(self, state_dim, control_dim, measurement_dim):
        self.state_dim = state_dim
        self.control_dim = control_dim
        self.measurement_dim = measurement_dim

        self.x_n = np.zeros(state_dim)          # Nominal state
        self.delta_x = np.zeros(state_dim)      # Error state
        self.P = np.eye(state_dim)              # Error covariance matrix
        self.Q = np.eye(state_dim)              # Process noise covariance
        self.R = np.eye(measurement_dim)        # Measurement noise covariance

    def predict(self, u, f, F, G, dt):
        # Predict nominal state
        self.x_n = f(self.x_n, u, dt)

        # Predict error covariance
        F_matrix = F(self.x_n, u, dt)
        G_matrix = G(self.x_n, u, dt)
        self.P = F_matrix @ self.P @ F_matrix.T + G_matrix @ self.Q @ G_matrix.T

    def update(self, z, h, H):
        # Measurement residual
        y = z - h(self.x_n)

        # Kalman gain
        H_matrix = H(self.x_n)
        S = H_matrix @ self.P @ H_matrix.T + self.R
        K = self.P @ H_matrix.T @ np.linalg.inv(S)

        # Update error state
        self.delta_x = K @ y

        # Update error covariance
        self.P = (np.eye(self.state_dim) - K @ H_matrix) @ self.P

        # Correct nominal state
        self.x_n = self.x_n + self.delta_x

    def set_process_noise(self, Q):
        self.Q = Q

    def set_measurement_noise(self, R):
        self.R = R

    def get_state(self):
        return {
            'position': self.x_n[:3],
            'velocity': self.x_n[3:6],
            'orientation': self.x_n[6:9]
        }

# Example usage with INS data:
if __name__ == "__main__":
    # Define the system dimensions
    state_dim = 9  # Example: [position (3), velocity (3), orientation (3)]
    control_dim = 6  # Example: [acceleration (3), angular velocity (3)]
    measurement_dim = 6  # Example: [position (3), orientation (3)]

    # Instantiate the filter
    eskf = HybridAdaptiveESKF(state_dim, control_dim, measurement_dim)

    # Define the system functions
    def f(x, u, dt):
        # State transition function for INS data
        F = np.eye(state_dim)
        F[:3, 3:6] = dt * np.eye(3)
        F[3:6, 6:9] = dt * np.eye(3)

        # State transition with control input (IMU data)
        x_new = F @ x
        x_new[3:6] += u[:3] * dt  # Update velocity with acceleration
        x_new[6:9] += u[3:6] * dt  # Update orientation with angular velocity

        return x_new

    def F(x, u, dt):
        # Jacobian of the state transition function
        F_matrix = np.eye(state_dim)
        F_matrix[:3, 3:6] = dt * np.eye(3)
        F_matrix[3:6, 6:9] = dt * np.eye(3)
        return F_matrix

    def G(x, u, dt):
        # Process noise influence
        return np.eye(state_dim)

    def h(x):
        # Measurement function (e.g., GPS providing position and orientation)
        return x[:measurement_dim]

    def H(x):
        # Jacobian of the measurement function
        return np.eye(measurement_dim, state_dim)

    # Simulate a step with INS data
    u = np.array([0.0, 0.0, 9.81, 0.1, 0.1, 0.0])  # [ax, ay, az, wx, wy, wz]
    z = np.array([1.0, 1.0, 1.0, 0.0, 0.0, 0.0])  # [px, py, pz, ox, oy, oz]
    dt = 0.1  # Time step

    # Prediction step
    eskf.predict(u, f, F, G, dt)

    # Update step
    eskf.update(z, h, H)

    # Get the estimated state
    state = eskf.get_state()

    # Print the estimated state with labels
    print("Estimated State:")
    print(f"Position:    {state['position']}")
    print(f"Velocity:    {state['velocity']}")
    print(f"Orientation: {state['orientation']}")


Estimated State:
Position:    [0.66740699 0.66740699 0.65656733]
Velocity:    [0.0110496  0.0110496  0.33732334]
Orientation: [ 0.00889504  0.00889504 -0.03373233]
