In [1]:
!git clone https://github.com/RIPS-2024-Aerospace/Aerospace-Project.git

Cloning into 'Aerospace-Project'...
remote: Enumerating objects: 440, done.[K
remote: Counting objects: 100% (171/171), done.[K
remote: Compressing objects: 100% (116/116), done.[K
remote: Total 440 (delta 113), reused 82 (delta 55), pack-reused 269[K
Receiving objects: 100% (440/440), 30.88 MiB | 11.90 MiB/s, done.
Resolving deltas: 100% (203/203), done.


In [2]:
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import math
import random

np.random.seed(29)

%run ./Aerospace-Project/DiffusionLunarKF.ipynb
%run ./Aerospace-Project/CentralizedLunarKF.ipynb
%run ./Aerospace-Project/FilterComparison.ipynb

In [48]:
# code from the testFilterComparisonFile for example data

C = np.array([[0.2,0.2,0.2,0.2,0.2], [0.5, 0.5, 0, 0,0], [0.5, 0, 0.5, 0,0], [0.5,0,0, 0.5,0],[0.5,0,0,0,0.5]])
C_ckf = 0.2*np.ones((5,5))
D = np.array([[3,3,3,3,3], [3, 3, 0, 0,0], [3, 0, 3, 0,0], [3,0,0, 3,0],[3,0,0,0,3]])
D_ckf = 3*np.ones((5,5))
n = len(C)

true_biases = np.array([[np.random.normal(0,np.sqrt(12/(c**2))) for _ in range(n)]]).T
true_drifts = np.array([[np.random.normal(0,np.sqrt(0.1/(c**2))) for _ in range(n)]]).T
# true_drifts = np.array([[0 for _ in range(n)]]).T

F = np.array([[1,dt],[0,1]])
F_full = np.kron(np.eye(n),F)

def get_station_truth(x,id):
    return np.array([[x[2*id][0]],[x[2*id+1][0]]])

x = c*np.vstack(tuple([np.array([true_biases[i],true_drifts[i]]) for i in range(n)]))

# random initial estimates for each node

x0 = [np.array([[np.random.normal(0,np.sqrt(12))],[np.random.normal(0,np.sqrt(0.1))]]) for i in range(n)]


P = [100*np.copy(R(1)) for _ in range(n)]

stations = [Station(i) for i in range(n)]
stations_ckf = [Station(i) for i in range(n)]
filter_initialize(stations,D,x0,P)

filter_initialize(stations_ckf,D_ckf,x0,P)

iterations = 200

# num_msmts = np.random.randint(0,10,(iterations,5))
num_msmts = np.array([[0,2,2,2,2] for _ in range(iterations)])

filter_outputs = run_both_filters(iterations, num_msmts,C,F_full,stations,stations_ckf, x)

errors_df,errors_cf,P_hist_cf,P_hist_df,truth,measurements = filter_outputs

TypeError: run_both_filters() missing 1 required positional argument: 'x'

In [5]:
# cost function & optimize.

# first define kullback leibler divergence value (correct)
def kld(mu1, mu2, Sigma1, Sigma2):
  k = len(mu1)
  inv_Sigma2 = np.linalg.inv(Sigma2)
  term1 = np.trace(np.dot(inv_Sigma2, Sigma1))
  term2 = np.dot(np.dot((mu2 - mu1).T, inv_Sigma2), (mu2 - mu1))
  term3 = np.log(np.linalg.det(Sigma2) / np.linalg.det(Sigma1))
  return 0.5 * (term1 + term2 - k + term3)

# now use kld for the cost function -- this function has wrong values but inputs are the means and covariances so fix this!
def cost_function_kld(truth, errors_df, errors_cf, P_hist_df, P_hist_cf):
  # values
  Sigma_df = np.block(P_hist_df[-1])
  Sigma_cf = np.array(P_hist_cf[-1])

  #zero values
  #mu_df = np.zeros(Sigma_df.shape[0])
  #mu_cf = np.zeros(Sigma_cf.shape[0])
  # Using the difference between truth and errors as mean
  mu_df = truth[-1].flatten() - errors_df[-1].flatten()
  mu_cf = truth[-1].flatten() - errors_cf[-1].flatten()

  kld_value = kld(mu_df, mu_cf, Sigma_df, Sigma_cf)
  return kld_value

# add in a station weights thing to Station class, not sure if this is optimal though to store weights. checking to see if there is a better way.
Station.nbhr_weights = {}

# define the cost_function - sets up the weights
def cost_function(weights):
  n = len(stations)
  weight_index = 0
  for i in range(n):
    for j in range(n):
      if C_adj[i, j] == 1:
        stations[i].nbhr_weights[j] = weights[weight_index]
        weight_index += 1
    weight_index = 0

# define the wrapper cost function - plugs in the updated values into cost_kld to compute kld for each set of weights
def wrapper_cost_function(weights):
  cost_function(weights)
  # re-run run_both_filters
  filter_outputs = run_both_filters(iterations, num_msmts, C, P_prev, F_full, stations, kf, x)
  full_P_hist_df, errors_df, errors_cf, P_hist_cf, P_hist_df, truth, measurements, predictions_cf, predictions_df = filter_outputs
  return cost_function_kld(truth, errors_df, errors_cf, P_hist_df, P_hist_cf)

# simulated annealing
def simulated_annealing(initial_weights, iterations, temperature, cooling_rate):
  current_weights = initial_weights
  best_weights = np.copy(initial_weights)
  current_cost = cost_function(current_weights)
  best_cost = current_cost

  for i in range(iterations):
    candidate_weights = current_weights + np.random.normal(0, 0.1, size=current_weights.shape) # add noise
    candidate_weights = np.abs(candidate_weights) # all positive
    candidate_weights /= np.sum(candidate_weights, axis=1, keepdims=True) # sum to 1

    candidate_cost = wrapper_cost_function(candidate_weights) # compute the cost of the new weights

    # check if the cost is lower or random is less than difference/temp by sim anneal alg
    if candidate_cost < current_cost or random.uniform(0, 1) < np.exp((current_cost - candidate_cost) / temperature):
      current_weights = candidate_weights
      current_cost = candidate_cost

      if current_cost < best_cost:
        best_weights = current_weights
        best_cost = current_cost

    # decrease temperature
    temperature *= cooling_rate

  return best_weights, best_cost

# Provided initial weights (C matrix flattened)
initial_weights = np.array([[0.34, 0.33, 0, 0, 0.33],
                            [0.33, 0.34, 0.33, 0, 0],
                            [0, 0.33, 0.34, 0.33, 0],
                            [0, 0, 0.33, 0.34, 0.33],
                            [0.33, 0, 0, 0.33, 0.34]])

# Adjacency matrix indicating possible connections (1 if there is a connection, 0 otherwise)
C_adj = np.array([[1, 1, 0, 0, 1],
                  [1, 1, 1, 0, 0],
                  [0, 1, 1, 1, 0],
                  [0, 0, 1, 1, 1],
                  [1, 0, 0, 1, 1]])
# Perform simulated annealing
optimal_weights, optimal_cost = simulated_annealing(initial_weights, iterations=1000, temperature=10.0, cooling_rate=0.99)

print(f"Optimal Weights: {optimal_weights.reshape(C_adj.shape)}")
print(f"Optimal Cost: {optimal_cost}")

NameError: name 'stations' is not defined