# Sensor Management Example

In [160]:
import numpy as np
import random
from ordered_set import OrderedSet
from datetime import datetime, timedelta
from mpar_sim.radar import PhasedArrayRadar


start_time = datetime.now()

from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, ConstantVelocity
from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState

## Generate Ground Truths

In [161]:
np.random.seed(1990)
random.seed(1990)

# Generate transition model
transition_model = CombinedLinearGaussianTransitionModel([
    ConstantVelocity(1),
    ConstantVelocity(1),
    ConstantVelocity(0),
])

yps = range(0, 100, 10)  # y value for prior state
truths = OrderedSet()
ntruths = 0
time_max = 100

xdirection = 50
ydirection = 1
zdirection = 0

# Generate ground truths
# for i in range(ntruths):
#   truth = GroundTruthPath([
#       GroundTruthState([15, xdirection, yps[i], ydirection, 0, zdirection],
#                        timestamp=start_time)],
#       id=f"id{i}")
#   # TODO: Create a radar ground truth class with an RCS field
#   truth.rcs = 10
#   for j in range(1, time_max):
#     current_truth = GroundTruthState(transition_model.function(truth[j-1], noise=True, time_interval=timedelta(seconds=1)),
#                          timestamp=start_time + timedelta(seconds=j))
#     current_truth.rcs = 10
#     truth.append(current_truth)
    
#   truths.add(truth)
  
#   # Alternate directions when initiating tracks
#   xdirection *= -1
#   if i % 2 == 0:
#     ydirection *= -1
    
# Generate ground truths
ntruths += 1
truth = GroundTruthPath([
    GroundTruthState([150, xdirection, 0, 0, 0, 0],
                     timestamp=start_time)])
# TODO: Create a radar ground truth class with an RCS field
truth.rcs = 10
for j in range(1, time_max):
  current_truth = GroundTruthState(transition_model.function(truth[j-1], noise=True, time_interval=timedelta(seconds=1)),
                       timestamp=start_time + timedelta(seconds=j))
  current_truth.rcs = 10
  truth.append(current_truth)
  
truths.add(truth)

0

Plot ground truths using the Plotterly class

In [162]:
from stonesoup.plotter import Plotterly

plotter = Plotterly()
plotter.plot_ground_truths(truths, [0, 2])
plotter.fig

## Tracker Setup

Create the predictor and updater

In [163]:
from stonesoup.predictor.kalman import KalmanPredictor

predictor = KalmanPredictor(transition_model)

from stonesoup.updater.kalman import ExtendedKalmanUpdater
# Measurement model is added to detections by the sensor
updater = ExtendedKalmanUpdater(measurement_model=None)


Initialize a prior for each target.

**TODO: This should be done when a new detection is made**

In [164]:
from stonesoup.types.state import GaussianState

priors = []
# xdirection = 1.2*100
# ydirection = 1.2
for j in range(ntruths):
    priors.append(GaussianState([250, xdirection+10, 0, 1, 0, 0],
                                np.diag([0.5, 0.5, 0.5, 0.5, 0, 0]+np.random.normal(0,5e-4,6)),
                                timestamp=start_time))

Initialize tracks from each prior

In [165]:
from stonesoup.types.track import Track

tracks = {Track([prior]) for prior in priors}

Create a hypothesizer and data associator

In [166]:
from stonesoup.hypothesiser.distance import DistanceHypothesiser
from stonesoup.measures import Mahalanobis
hypothesiser = DistanceHypothesiser(
    predictor, updater, measure=Mahalanobis())

from stonesoup.dataassociator.neighbour import NearestNeighbour
data_associator = NearestNeighbour(hypothesiser)

## 

## Sensor setup

In [167]:
from mpar_sim.radar import PhasedArrayRadar
from stonesoup.sensor.radar import AESARadar
from stonesoup.types.state import StateVector

sensor = PhasedArrayRadar(
  position=np.array([0, 0, 0]),
  rotation_offset=StateVector(np.array([0, 0, 0])),
  position_mapping=(0, 2, 4),
)
sensor.timestamp = start_time

## Run the Simulation

In [168]:
# Generate list of timesteps from ground truth timestamps
from mpar_sim.beam.common import aperture2beamwidth
from mpar_sim.look import RadarLook


timesteps = []
for state in truths[0]:
  timesteps.append(state.timestamp)

for timestep in timesteps[1:]:
  # Select action(s)
  look = RadarLook(
      start_time=start_time,
      azimuth_beamwidth=10,
      elevation_beamwidth=10,
      n_elements_x=4,
      n_elements_y=4,
      azimuth_steering_angle=np.random.randint(-5, 5),
      elevation_steering_angle=0,
      bandwidth=10e6,
      pulsewidth=1e-6,
      prf=5e3,
      n_pulses=10,
  )

  # Schedule looks
  sensor.load_look(look)

  # Collect measurements for the current timestep
  measurements = set()
  measurements |= sensor.measure(OrderedSet(
      truth[timestep] for truth in truths), noise=True)

  # Associate measurements to tracks
  hypotheses = data_associator.associate(tracks, measurements, timestep)
  # Update tracks
  for track in tracks:
    hypothesis = hypotheses[track]
    if hypothesis.measurement:
      post = updater.update(hypothesis)
      track.append(post)
    else:
      track.append(hypothesis.prediction)

Plot the ground truth, tracks, and uncertainty ellipses for each target

In [169]:
plotter = Plotterly()
plotter.plot_sensors(sensor)
plotter.plot_ground_truths(truths, [0, 2])
plotter.plot_tracks(tracks, [0, 2], uncertainty=False)
plotter.fig