https://openmdao.org/newdocs/versions/latest/advanced_user_guide/example/euler_integration_example.html

In [25]:
from cosapp.systems import System

class FlightPathEOM2D(System):
    """
    Computes the position and velocity equations of motion using a 2D flight path
    parameterization of states per equations 4.42 - 4.46 of _[1].

    References
    ----------
    .. [1] Bryson, Arthur Earl. Dynamic optimization. Vol. 1. Prentice Hall, p.172, 1999.
    """
    def setup(self):
        self.add_inward('g', 9.80665, unit='m/s**2', desc='gravity')
        self.add_inward('rho', 0.0, unit='kg/m**3', desc='air density')

        self.add_inward('m', 1.0, unit='kg', desc='aircraft mass')
        self.add_inward('S', 1.0, unit='m**2', desc='aerodynamic reference area')
        self.add_inward('CL', 0.0, desc='lift coefficient')
        self.add_inward('CD', 0.0, desc='drag coefficient')


        self.add_inward('T', 0.0, unit='N', desc='thrust')
        self.add_inward('alpha', 0.0, unit='rad', desc='angle of attack')

        self.add_inward('v', 1.0, unit='m/s', desc='aircraft velocity magnitude')
        self.add_inward('gam', 0.0, unit='rad', desc='flight path angle')
        self.add_inward('h', 1.0, unit='m', desc='aircraft altitude')
        self.add_inward('r', 1.0, unit='m', desc='aircraft distance')
        
        self.add_outward('v_dot', 0.0, unit='m/s**2', desc='rate of change of velocity magnitude')
        self.add_outward('gam_dot', 0.0, unit='rad/s', desc='rate of change of flight path angle')
        self.add_outward('h_dot', 0.0, unit='m/s', desc='rate of change of altitude')
        self.add_outward('r_dot', 0.0, unit='m/s', desc='rate of change of range')

        # transients
        self.add_transient('v', der='v_dot')
        self.add_transient('gam', der='gam_dot')
        self.add_transient('h', der='h_dot')
        self.add_transient('r', der='r_dot')
        

    def compute(self):
        def dynamicPressureComp(rho, v) -> float:
            """
            Parameters
            ----------
            rho : float
                atmospheric density in kg/m**3
            v : speed
                air-relative velocity in m/s

            Returns
            -------
            q : float
                dynamic pressure in N/m**2

            """

            return 0.5 * rho * v ** 2

        def liftDragForceComp(CL, CD, q, S) -> (float, float):
            """
            Compute the aerodynamic forces on the vehicle in the wind axis frame
            (lift, drag, cross) force.

            Parameters
            ----------
            CL:
                lift coefficient
            CD:
                drag coefficient')
            q:
                dynamic pressure in N/m**2
            S:
                aerodynamic reference area in m**2

            Returns
            -------
            (f_lift, f_drag):
                aerodynamic lift force in N
                aerodynamic drag force in N
            """
            qS = q * S

            f_lift = qS * CL
            f_drag = qS * CD

            return (f_lift, f_drag)
    
        g = self.g
        rho = self.rho
        CL = self.CL
        CD = self.CD
        
        m = self.m
        S = self.S
        
        v = self.v
        T = self.T
        
        gam = self.gam
        alpha = self.alpha

        calpha = np.cos(alpha)
        salpha = np.sin(alpha)

        cgam = np.cos(gam)
        sgam = np.sin(gam)

        mv = m * v

        q = dynamicPressureComp(rho, v)
        (L, D) = liftDragForceComp(CL, CD, q, S)
        
        self.v_dot = (T * calpha - D) / m - g * sgam
        self.gam_dot = (T * salpha + L) / mv - (g / v) * cgam
        self.h_dot = v * sgam
        self.r_dot = v * cgam


In [35]:
def graph(recorder):
    # Retrieve recorded data
    import plotly.graph_objs as go

    data = recorder.export_data()
    time = np.asarray(data['Reference'])
    traj = np.asarray(data['r'].tolist())

    # Plot results
    x = [traj[:, i] for i in range(traj.shape[1])][0]
    z = [traj[:, i] for i in range(traj.shape[1])][1]
    return go.Figure(data=go.Scatter(x=x, y=z))


In [48]:
from cosapp.ports.units import convert_units

prob = FlightPathEOM2D('prob')
prob.CL = 0.0                          # Lift Coefficient
prob.CD = 0.05                         # Drag Coefficient
prob.S = np.pi * convert_units(1., "ft", "m")**2   # Wetted Area (1 ft diameter ball)
prob.rho = 1.225                       # Atmospheric Density
prob.m = 5.5                        # Cannonball Mass

prob.alpha = 0.0                      # Angle of Attack (Not Applicable)
prob.T = 0.0                        # Thrust (Not Applicable)

prob.h = 1.0    # Height of cannon.
prob.v = 100.0  # Initial cannonball velocity.
prob.gam = convert_units(45., "deg", "rad")

rk = prob.add_driver(RungeKutta(order=3, dt=0.01, time_interval=[0., 10.]))
rk.add_recorder(DataFrameRecorder(includes=['r', 'v', 'h', 'gam']))

prob.run_drivers()
rk.recorder.export_data()

Unnamed: 0,Section,Status,Error code,Reference,gam,h,r,time,v
0,,,0,0.0,0.785398,1.000000,1.000000,0.0,100.000000
1,,,0,0.1,0.778359,7.965592,8.014364,0.1,97.720813
2,,,0,0.2,0.771105,14.724127,14.918201,0.2,95.517804
3,,,0,0.3,0.763631,21.281234,21.715686,0.3,93.387247
4,,,0,0.4,0.755932,27.642236,28.410776,0.4,91.325690
...,...,...,...,...,...,...,...,...,...
96,,,0,9.59999999999998,-0.873466,78.397936,434.004954,9.6,47.916166
97,,,0,9.69999999999998,-0.886455,74.690273,437.070023,9.7,48.295899
98,,,0,9.79999999999998,-0.899141,70.913915,440.111129,9.8,48.677614
99,,,0,9.89999999999998,-0.911531,67.069635,443.128273,9.9,49.060927


## Utilities and inputs

In [1]:
import pytest
import numpy as np
import math

from cosapp.systems import System
from cosapp.drivers import Driver, RungeKutta, NonLinearSolver, RunSingleCase
from cosapp.recorders import DataFrameRecorder

In [2]:
def graph(d : Driver):
    # Retrieve recorded data
    import plotly.graph_objs as go

    data = d.results0.export_data()
    time = np.asarray(data['Reference'])
    traj = np.asarray(data['x'].tolist())

    # Plot results
    x = [traj[:, i] for i in range(traj.shape[1])][0]
    z = [traj[:, i] for i in range(traj.shape[1])][1]
    return go.Figure(data=go.Scatter(x=x, y=z))


## RungeKutta driver update

In [3]:
import numpy
from typing import Tuple
from numbers import Number

class RungeKutta(RungeKutta):
    @property
    def time_interval_new(self) -> Tuple[Number, Number]:
        """Time interval covered by driver"""
        try:
            self._ExplicitTimeDriver__time_interval = self.owner.time_interval
        except:
            pass
        return self._ExplicitTimeDriver__time_interval

    @property
    def dt_manager(self):
        """dt manager"""
        return self._ExplicitTimeDriver__dt_manager

    def compute(self) -> None:
        """Simulate the time-evolution of owner System over a prescribed time interval"""
        self._initialize()

        t0 = self.time_interval_new[0]
        dt_manager = self.dt_manager
        dt = self.dt or dt_manager.time_step()
        record_all = False
        t_record = numpy.inf

        if self._recorder is None:
            must_record = lambda t, t_record: False
            record = lambda : None
        else:
            if self.recording_period is None:
                if self.dt is None:
                    logger.warning(
                        "Unspecified recording period and time step"
                        "; all time steps will be recorded"
                    )
                record_all = True
            eps = min(1e-8, dt / 100)
            must_record = lambda t, t_record: abs(t - t_record) < eps
            record = lambda : self._recorder.record_state(
                float(f"{self.time:.14e}"), self.status, self.error_code)
            t_record = numpy.inf if record_all else t0

        recorded_dt = []
        if self.record_dt:
            record_dt = lambda dt: recorded_dt.append(dt)
        else:
            record_dt = lambda dt: None
        
        t = t0
        n_record = 0
        prev_dt = None

        while True:  # time loop
            self._set_time(t)
            if record_all:
                record()
            elif must_record(t, t_record):
                record()
                n_record += 1
                t_record = t0 + n_record * self.recording_period
            dt = dt_manager.time_step(prev_dt)
            next_t = t + dt
            # Update previous dt, unless current dt is artificially
            # limited by recording timestamp `t_record`
            if next_t > t_record:
                next_t = t_record
                dt = next_t - t
            else:
                prev_dt = dt
            tn = self.time_interval_new[1]
            if next_t > tn:
                break
            self._update_transients(dt)
            record_dt(dt)
            t = next_t

        remaining_dt = tn - t
        if remaining_dt > 1e-3 * dt:
            self._update_transients(remaining_dt)
            record_dt(remaining_dt)
            self._set_time(tn)
            record()

        self.__recorded_dt = numpy.asarray(recorded_dt)


## Simple system mass

In [4]:
class Mobile(System):
    """A point mass"""
    def setup(self):
        self.add_inward('mass', 1.2, unit='kg', desc='Mass')
        self.add_inward('g', np.r_[0, -10.], unit='m/s**2', desc='Uniform acceleration field')

        self.add_outward('a', np.zeros(2), unit='m/s**2')
        
        # transients
        self.add_transient('v', der='a')
        self.add_transient('x', der='v')
        
        # inwards
        self.add_inward('f', np.zeros((2)), unit='N', desc='Force generated by the mobile')

        
    def compute(self):
        self.a = self.g + self.f / self.mass
        

## Simple driver

This driver is managing unsteady computation and prepare data for output graph.
 

In [5]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder

class Flight(Driver):
    """Free flight driver
        
    Parameters
    ----------
    owner: System
        System that contains the driver

    Attributes
    ----------
    design: MathematicalModel
    results: DataFrame
        results of the computation
    
    """    
    def __init__(self, name : str, owner: System):
                
        super().__init__(name, owner)
        
        # owner update
        owner._locked = False

        owner.add_inward('x_target', np.zeros((2, 2)))
        owner.add_inward('v_target', np.zeros((2, 2)))
        
        owner.add_design_method('target').add_equation(f'x == x_target[1]').add_unknown(f'v_target[0]')
        
        owner._locked = True

        # add driver
        solver = self.add_driver(NonLinearSolver('solver'))
        runner = solver.add_driver(RunSingleCase('runner'))        
        rk = runner.add_driver(RungeKutta(order=3, dt=0.01, time_interval=[0., 2.]))        
        rk.add_recorder(DataFrameRecorder(includes=['x', 'v']))
        rk.set_scenario(init = {'x': 'x_target[0]', 'v': 'v_target[0]'})
            
        # inwards
        self.design = runner.design
        self.results = rk.recorder
        
        

In [6]:
point = Mobile('point')
ff = point.add_driver(Flight('ff', point))

# Direct trajectory : point reached after a given time
print('direct trajectory')
point.x_target[0] = [0., 0.]
point.v_target[0] = [10., 10.]
point.run_drivers()

print('v initial ', point.v_target[0])
print('x final ', point.x)
print('v final ', point.v)

assert pytest.approx(point.x, 1e-6) == [20., 0.]

# Design trajectory : initial speed to reach a point in the given time
print('design trajectory')
ff.design.extend(point.design_methods['target'])
point.x_target[-1] = np.array([21., 5.])

point.run_drivers()

assert pytest.approx(point.x, 1e-6) == [21., 5.]
assert pytest.approx(point.v_target[0], 1e-6) == [10.5, 12.5]

print('v initial ', point.v_target[0])
print('x final ', point.x)
print('v final ', point.v)

#graph(ff)

direct trajectory
v initial  [10. 10.]
x final  [ 2.00000000e+01 -1.14214194e-14]
v final  [ 10. -10.]
design trajectory
v initial  [10.5 12.5]
x final  [21.  5.]
v final  [10.5 -7.5]


In [7]:
class Flight(Driver): 
    
    def setup(self, data_list=[], scenario_init={}):
        
        from cosapp.drivers import RungeKutta
        from cosapp.recorders import DataFrameRecorder
                
        # add driver
        solver = self.add_driver(NonLinearSolver('solver'))
        rk = solver.runner.add_driver(RungeKutta(order=3, dt=self.owner.dt, time_interval=[0., self.owner.time_max]))        
        rk.add_recorder(DataFrameRecorder(includes=data_list), period=0.1)
        rk.set_scenario(init = scenario_init)

        # inwards
        self.design = solver.design
        self.recorder = rk.recorder

