## Prototype 01 - Space Mission DES

In [1]:
from dataclasses import dataclass, field
from heapq import heappush, heappop
from pprint import pprint # pretty-printing basic data structures

import pandas as pd

Simulation Time

In [2]:
# https://stackoverflow.com/questions/279561/what-is-the-python-equivalent-of-static-variables-inside-a-function
def static_vars(**kwargs):
    def decorate(func):
        for k in kwargs:
            setattr(func, k, kwargs[k])
        return func
    return decorate

@static_vars(t=0)
def now():
    return now.t

def set_time(t_new=0):
    now.t = t_new
    return now()

print("The current simulation time is", now(), "o'clock.")

The current simulation time is 0 o'clock.


Events

In [11]:
@dataclass
class Event:
    name: str

Activities

In [12]:
@dataclass
class Activity:
    name: str
    start: Event
    end: Event
    duration: float

# waiting = Activity("waiting", INIT, liftoff, 0)
# s1_ascent = Activity("s1_ascent", liftoff, stage, 5)
# s2_ascent = Activity("s2_ascent", stage, orbit, 5)

Vehicles

In [13]:
@dataclass
class Vehicle:
    name: str
    activity: Activity
    propload: float
    trace: pd.DataFrame = pd.DataFrame(columns=['CurrentEvent', 'NextEvent', 'Prop', 'Activity'])

    def __repr__(self):
        return (f'{self.__class__.__name__} - {self.name}')

    @staticmethod
    def initialize(name, conops, propload = 100):
        v = Vehicle(name, conops.first(), propload)
        v.update_trace()
        return v

    def schedule_next_event(self, future_event_list):
        template = self.activity.end
        name = self.activity.end.name
        heappush(
            future_event_list.events,
            ScheduledEvent(name, template, self, now() + self.activity.duration)
        )

    def update_trace(self):
        self.trace = pd.concat([
            self.trace,
            pd.DataFrame({
                "CurrentEvent": self.activity.start, 
                "NextEvent": self.activity.end, 
                "Prop": self.propload, 
                "Activity": self.activity}, index = [len(self.trace) + 1])
        ])

Future Events

In [14]:
@dataclass(order=True)
class ScheduledEvent:
    name: str=field(compare=False)
    template: Event=field(compare=False)
    vehicle: Vehicle=field(compare=False)
    time: float

In [15]:
# Implement the FutureEventList from Prof. Vuduc's example
class FutureEventList:
    def __init__(self):
        self.events = []
        
    def __iter__(self):
        return self
    
    def __next__(self) -> Event:
        from heapq import heappop
        if self.events:
            return heappop(self.events)
        raise StopIteration
    
    def __repr__(self) -> str:
        from pprint import pformat
        return pformat(self.events)

    def get_next(self):
        return heappop(self.events)

ConOps

In [16]:
@dataclass
class ConOps:
    sequence: dict

    def first(self):
        return self.sequence["INIT"]

    def after(self, current_event):
        # Get the activity which starts with a particular event
        return self.sequence[current_event.name]

### Process Example 01

First define the ConOps as a sequence of processes

In [17]:
# Events
INIT = Event("INIT")
liftoff = Event("Liftoff")
stage = Event("Stage")
orbit = Event("Orbit")

# Activities
activities = {
    INIT.name: Activity("waiting", INIT, liftoff, 0),         # Note that each activity has a starting event, ending event, and duration
    liftoff.name: Activity("s1_ascent", liftoff, stage, 10),  # Events are transitions between activities. 
    stage.name: Activity("s2_ascent", stage, orbit, 10)       # Ending event for one activity is the starting event for the next
}

conops = ConOps(activities)

Initialize the Simulation

In [18]:
# Reset the clock
set_time(0)

# Create and empty events list
future = FutureEventList()

# Start a vehicle
current_vehicle = Vehicle.initialize("LV1", conops)

# Vehicle starts in some activity, which will end when that activities event is processed
current_vehicle.schedule_next_event(future)

In [26]:
future

[ScheduledEvent(name='Liftoff',
                template=Event(name='Liftoff'),
                vehicle=Vehicle - LV1,
                time=0)]

Walk the events

In [28]:
# Trigger the ending event
current_event = future.get_next()

# Process the event
# - Update the system state
# - Update the vehicle state
# v.propload -= 60  # some fancy code to handle this

# Get the next activity
next_activity = conops.after(current_event)

print(f"The current activity is {current_vehicle.activity}")
print(f"The current event is {current_event}")
print(f"The next activity is {next_activity}")

# - Change the activity
current_vehicle.activity = next_activity

# Schedule the next event
current_vehicle.schedule_next_event(future)

assert current_event.template == next_activity.start

# - Update the vehicle trace
current_vehicle.update_trace()

current_vehicle.trace

The current activity is Activity(name='s1_ascent', start=Event(name='Liftoff'), end=Event(name='Stage'), duration=10)
The current event is ScheduledEvent(name='Stage', template=Event(name='Stage'), vehicle=Vehicle - LV1, time=10)
The next activity is Activity(name='s2_ascent', start=Event(name='Stage'), end=Event(name='Orbit'), duration=10)


Unnamed: 0,CurrentEvent,NextEvent,Prop,Activity
1,Event(name='INIT'),Event(name='Liftoff'),100,"Activity(name='waiting', start=Event(name='INI..."
2,Event(name='Liftoff'),Event(name='Stage'),100,"Activity(name='s1_ascent', start=Event(name='L..."
3,Event(name='Stage'),Event(name='Orbit'),100,"Activity(name='s2_ascent', start=Event(name='S..."


### Process Example 02

First define the ConOps as a sequence of processes

In [35]:
# Events
INIT = Event("INIT")

liftoff = Event("Liftoff")
stage = Event("Stage")
orbit = Event("Orbit")

DONE = Event("Done")

# Activities
activities = {
    INIT.name: Activity("waiting", INIT, liftoff, 0),
    liftoff.name: Activity("s1_ascent", liftoff, stage, 10),
    stage.name: Activity("s2_ascent", stage, orbit, 10),
    orbit.name: Activity("insertion", orbit, DONE, 5)
}

conops = ConOps(activities)

system_state = {
    activity.name: [] for activity in conops.sequence.values()
}

In [36]:
system_state

{'waiting': [], 's1_ascent': [], 's2_ascent': [], 'insertion': []}

Initialize the Simulation

In [37]:
# Reset the clock
set_time(0)

# Create and empty events list
future = FutureEventList()

# Start a vehicle
current_vehicle = Vehicle.initialize("LV1", conops)

# Vehicle starts in some activity, which will end when that activities event is processed
current_vehicle.schedule_next_event(future)

system_state[current_vehicle.activity.name].append(current_vehicle)

system_state  # update the system state trace here

{'waiting': [Vehicle - LV1], 's1_ascent': [], 's2_ascent': [], 'insertion': []}

Loop over events

In [38]:
print("\n\n********************\n* BEGIN SIMULATION *\n********************\n")

for event in future:

    # Update the simulation clock
    set_time(event.time)

    if event.template is DONE:
        system_state[event.vehicle.activity.name].remove(vehicle)
        print(f"Time is {now()}")
        print("\n******************\n* END SIMULATION *\n******************")
        break

    # Transfer control to the vehicle process
    vehicle = event.vehicle

    # Process the event

    # - Update the vehicle state
    vehicle.propload -= 60  #TODO: Add proper logic for propellant updates

    # Get the next activity & update the vehilce
    previous_activity = vehicle.activity
    vehicle.activity = conops.after(event)

    # - Update the vehicle trace
    vehicle.update_trace()

    print(f"Time is {now()}")
    print(f"\tThe previous activity was {previous_activity.name}")
    print(f"\tThe current event is {event.name}")
    print(f"\tThe next activity is {vehicle.activity.name}\n")
    print(f"\t {system_state}")

    # Update the system state
    system_state[previous_activity.name].remove(vehicle)
    system_state[vehicle.activity.name].append(vehicle)

    # Schedule the next event
    vehicle.schedule_next_event(future)

    # Return control to the scheduler



********************
* BEGIN SIMULATION *
********************

Time is 0
	The previous activity was waiting
	The current event is Liftoff
	The next activity is s1_ascent

	 {'waiting': [Vehicle - LV1], 's1_ascent': [], 's2_ascent': [], 'insertion': []}
Time is 10
	The previous activity was s1_ascent
	The current event is Stage
	The next activity is s2_ascent

	 {'waiting': [], 's1_ascent': [Vehicle - LV1], 's2_ascent': [], 'insertion': []}
Time is 20
	The previous activity was s2_ascent
	The current event is Orbit
	The next activity is insertion

	 {'waiting': [], 's1_ascent': [], 's2_ascent': [Vehicle - LV1], 'insertion': []}
Time is 25

******************
* END SIMULATION *
******************


In [41]:
system_state


{'waiting': [], 's1_ascent': [], 's2_ascent': [], 'insertion': []}

In [42]:
vehicle.trace

Unnamed: 0,CurrentEvent,NextEvent,Prop,Activity
1,Event(name='INIT'),Event(name='Liftoff'),100,"Activity(name='waiting', start=Event(name='INI..."
2,Event(name='Liftoff'),Event(name='Stage'),40,"Activity(name='s1_ascent', start=Event(name='L..."
3,Event(name='Stage'),Event(name='Orbit'),-20,"Activity(name='s2_ascent', start=Event(name='S..."
4,Event(name='Orbit'),Event(name='Done'),-80,"Activity(name='insertion', start=Event(name='O..."


---
#### Some ideas for implementing failures

In [None]:
"""
end Activity

some event

    check if state is still viable (i.e. is propellant > 0)

result = sample()

if result == success:
    Get next activty from conops
    schedule event that ends Activity

if result == failure:

    Get next event from conops
    if event is dead:
        end sim

    otherwise 
        schedule

"""