# Flappy Bird Junco Sim
2D flight sim for EECE5500 Mobile Robotics

In [67]:
############################### Import libraries ###############################
import os
import glob
import time
from datetime import datetime

from PIL import Image

import matplotlib.pyplot as plt
%matplotlib qt

import numpy as np
from numpy import matlib
import scipy as sci

import gym
import control

In [68]:
class Drone():
    dt = 0.01
    m = 3.2
    Iyy = 1/12*m*0.8**2

    S = 0.25
    c = 0.13

    Cl_0 = 0.5
    Cl_alpha = 0.1*180/np.pi

    Cd_0 = 0.1
    K = 0.05

    Cm_0 = 0.5
    Cm_alpha = -0.14*180/np.pi
    Cm_alpha_dot = -0.008*180/np.pi
    Cm_delta_e = 0.2

    g = 9.81

    vNom = 20
    rhoNom = 1.225
    qNom = 0.5*rhoNom*vNom**2

    ANom = np.array([[0, 1], [Cm_alpha*qNom*S/Iyy, Cm_alpha_dot*qNom*S/Iyy]])
    BNom = np.array([[0], [qNom*Cm_delta_e*S]])
    Q =  np.array([[1, 0], [0, 10]])
    R =  np.array([[0.5]])
    Kgain = control.lqr(ANom, BNom, Q, R)[0]

    KFull = np.zeros(4)

    Qestimation = np.eye(4)/100
    Pestimation = np.eye(4)/100
    Restimation = np.array([[1, 0], [0, (0.5/180*np.pi)**2]])/1000
    stateEstimate = np.array([0.0,0.0,0.0,0.0])

    lidar_range = 30
    lidar_angle = 100*np.pi/180
    lidar_res = int(lidar_angle*180/np.pi)
    lidar = [lidar_range, lidar_angle, lidar_res]
    
    def __init__(self):
        self.x_thres = 10000
        self.z_thresHigh = 500
        self.z_thresLow = 0
        self.v_thres = 50
        self.theta_thres = np.pi/6
        self.theta_dot_thres = np.pi
        self.gamma_thres = np.pi/6

        # ???
        self.delta_e_stale = 0
        self.thrust_stale = 0


        self.limits = np.array([
            self.x_thres,
            self.z_thresHigh,
            self.z_thresLow,
            self.v_thres,
            self.theta_thres,
            self.theta_dot_thres,
            self.gamma_thres
        ])

        self.state = None
        self.prev_action = None
        self.steps_beyond_terminated = None
        self.time = 0.

        self.objects = None

    def thetaFromElevator(self, T,thetaDes, delta_e):
        alphaNom = -(self.Cm_0+self.Cm_delta_e*delta_e)/self.Cm_alpha
        Clnom = self.Cl_0 + self.Cl_alpha*alphaNom
        Cdnom = self.Cd_0 + self.K*Clnom**2
        return thetaDes- (-Cdnom/Clnom + T/(self.m*self.g)*(1-Cdnom/Clnom*alphaNom))-alphaNom

    def vGammaFromElevator(self, T, delta_e):
        alphaNom = -(self.Cm_0+self.Cm_delta_e*delta_e)/self.Cm_alpha
        Clnom = self.Cl_0 + self.Cl_alpha*alphaNom
        Cdnom = self.Cd_0 + self.K*Clnom**2
        gammaNom = Cdnom/Clnom*(1-T/self.m/self.g*np.sin(alphaNom)) + T*np.cos(alphaNom)/self.m/self.g
        vNom = np.sqrt(2*(-T*np.sin(alphaNom)+self.m*self.g*np.cos(gammaNom))/(self.S*self.rhoNom))
        return vNom, gammaNom

    def solveForElevator(self, T, thetaDes):
        func = lambda x: self.thetaFromElivator(T, thetaDes,x)
        sol = sci.optimize.root_scalar(func, method='secant', x0 = 0, x1 = 1)
        return sol.root
    
    def elevatorFromAlpha(self, alpha):
        return -(self.Cm_alpha*alpha+self.Cm_0)/self.Cm_delta_e 

    # Calculates a "coherent" (steady state stable) command from thrust and gamma command
    # Returns airspeed, theta, thetadot, and flight path angle (gamma)
    def coherentCommand(self, T, gamma):
        airspeed = self.vNom
        theta = 0
        thetaDot = 0
        verticalForces = lambda X: ((self.Cl_0 + self.Cl_alpha*(X[0]- gamma))*1/2*(X[1]**2)*self.rhoNom*self.S - self.m*self.g*np.cos(gamma) + T*np.sin(X[0]-gamma), \
               ((self.Cd_0+self.K*(self.Cl_0 + self.Cl_alpha*(X[0]- gamma))**2)*1/2*(X[1]**2)*self.rhoNom*self.S -T*np.cos(X[0]-gamma)+self.m*self.g*np.sin(gamma)))

        sol = sci.optimize.root(verticalForces, x0 = [theta, airspeed])
        X =sol.x      
        return [sol.x[1], sol.x[0], thetaDot, gamma]

    # Returns a continous time Jacobian for state and control
    def calculateCTSABMatrix(self, stateIn):
        v, theta, theta_dot, gamma = stateIn
        q = 0.5*self.rhoNom*v**2
        alpha = theta-gamma

        #Calculate alphaDot
        Cl = self.Cl_0 + self.Cl_alpha*alpha
        L = q*self.S*Cl
        gamma_dot = (L - self.m*self.g*np.cos(gamma) +self.thrust_stale*np.sin(alpha)) / (self.m*v)
        alpha_dot = theta_dot-gamma_dot

        #Other aero
        Cd = self.Cd_0 + self.K*Cl**2
        Cm = self.Cm_0 + self.Cm_alpha*alpha + self.Cm_alpha_dot*alpha_dot + self.Cm_delta_e*self.delta_e_stale
        D = q*self.S*Cd
        M = q*self.S*Cm

        #derivatives
        v_dot = (-D - self.m*self.g*np.sin(gamma) + self.thrust_stale*np.cos(alpha)) / self.m
        #theta_dot = theta_dot
        theta_ddot = M / self.Iyy
        gamma_dot = (L - self.m*self.g*np.cos(gamma) +self.thrust_stale*np.sin(alpha)) / (self.m*v)

        #Partial derivatives of aeroforces
        pDpv = self.rhoNom*v*self.S*Cd
        pDptheta = 2*self.K*q*self.S*(self.Cl_alpha * self.Cl_0 + self.Cl_alpha**2*alpha)
        pDpthetaDot = 0
        pDpgamma = -2*self.K*q*self.S*(self.Cl_alpha+self.Cl_alpha**2*alpha)
        pMpv = self.rhoNom*v*self.S*Cm
        pMptheta = q*self.S*self.Cm_alpha
        pMpthetaDot = 0
        pMpgamma = -q*self.S*self.Cm_alpha
        pLpv = self.rhoNom*v*self.S*Cl
        pLptheta = q*self.Cl_alpha
        pLpthetaDot = 0
        pLpgamma = -q*self.Cl_alpha

        #out of order entries
        pf4ptheta = (pLptheta + self.thrust_stale*np.cos(alpha))/(self.m*v)
        pf4pgamma = (pLpgamma/self.m +self.g*np.sin(gamma)-self.thrust_stale/self.m*np.cos(alpha))/v

        #Partial derivatives of nonlinear map with respect to tstate
        pf1pv = - 1/self.m*pDpv
        pf1ptheta = (-pDptheta-self.thrust_stale*np.sin(alpha))/self.m
        pf1pthetaDot = 0
        pf1pgamma = (-pDpgamma/self.m - self.g*np.cos(gamma) + self.thrust_stale/self.m*np.sin(alpha))
        pf2pv = 0
        pf2ptheta = 0
        pf2pthetaDot = 1
        pf2pgamma = 0
        pf3pv = pMpv/self.Iyy
        pf3ptheta = pMptheta/self.Iyy
        pf3pthetaDot = 0
        pf3pgamma = pMpgamma/self.Iyy
        pf4pv = pLpv/(self.m*v)
        pf4pthetaDot = 0

        AFull = np.array([[pf1pv, pf1ptheta, pf1pthetaDot, pf1pgamma],[pf2pv, pf2ptheta, pf2pthetaDot, pf2pgamma],
                      [pf3pv, pf3ptheta, pf3pthetaDot, pf3pgamma],[pf4pv, pf4ptheta, pf4pthetaDot, pf4pgamma]])
        BFull = np.array([[0], [0], [q*self.Cm_delta_e*self.S],[0]])
        return AFull, BFull

    def calculateGains(self):
        x, y, v, theta, theta_dot, gamma = self.state
        AFull, BFull =  self.calculateCTSABMatrix(self.state[2:])

        Qcontrol =  np.array([[0.1,0,0,0],[0,1, 0,0], [0,0, 10,0], [0, 0, 0, 20]])
        Rcontrol =  np.array([[1]])
        Ncontrol = np.zeros([4, 1])
        H = np.array([[1,0,0,0],[0,1,0,0]])

        self.KFull = control.lqr(AFull, BFull, Qcontrol, Rcontrol)[0]

    def EKF(self):
        v, theta, theta_dot, gamma = self.stateEstimate
        alpha = theta-gamma
        q = 0.5*self.rhoNom*v**2

        #Calculate alphaDot
        Cl = self.Cl_0 + self.Cl_alpha*alpha
        L = q*self.S*Cl
        gamma_dot = (L - self.m*self.g*np.cos(gamma) +self.thrust_stale*np.sin(alpha)) / (self.m*v)
        alpha_dot = theta_dot-gamma_dot

        #Other aero
        Cd = self.Cd_0 + self.K*Cl**2
        Cm = self.Cm_0 + self.Cm_alpha*alpha + self.Cm_alpha_dot*alpha_dot + self.Cm_delta_e*self.delta_e_stale
        D = q*self.S*Cd
        M = q*self.S*Cm

        #derivatives
        v_dot = (-D - self.m*self.g*np.sin(gamma) + self.thrust_stale*np.cos(alpha)) / self.m
        #theta_dot = theta_dot
        theta_ddot = M / self.Iyy
        gamma_dot = (L - self.m*self.g*np.cos(gamma) +self.thrust_stale*np.sin(alpha)) / (self.m*v)

        AFull, BFull = self.calculateCTSABMatrix([v, theta, theta_dot, gamma])
        F = AFull*self.dt + np.eye(4) 

        #Observability 
        h = np.array([[self.state[2]],[self.state[3]]])
        H = np.array([[1.0,0.0,0.0,0.0],[0.0,1.0,0.0,0.0]])

        #predict
        v += v_dot*self.dt
        theta += theta_dot*self.dt 
        theta_dot += theta_ddot * self.dt
        gamma += gamma_dot*self.dt
        xEst = np.array([[v], [theta], [theta_dot], [gamma]])

        self.Pestimation = (F)@self.Pestimation @ np.transpose(F) + self.Qestimation
        #update
        y = np.array([[v],[theta]]) - np.array([[self.state[2]],[self.state[3]]]) + np.array([np.random.multivariate_normal([0,0], self.Restimation)]).T
        S = H @ self.Pestimation @ H.T + self.Restimation
        K = self.Pestimation @ np.transpose(H) @ sci.linalg.inv(S)
        xEst -= K @ y
        self.Pestimation = (np.eye(4) - K @ H) @ self.Pestimation
        self.stateEstimate = xEst.flatten()
    
    def step(self, action):

        def update_plot(ax,state,lidar,objects,beams=False):

            # filter objects to ones in lidar cone
            def seenObjects(state,angle,beam,objects):

                x,y,theta = state

                # objects within range
                idx = np.logical_and(
                    (objects[0,:]>= x),
                    (np.sqrt((objects[0,:]-x)**2+(objects[1,:]-y)**2) <= beam)
                )

                # above and below lines
                upper = np.tan(theta+angle/2)
                lower = np.tan(theta-angle/2)
                ind = np.arange(len(idx))
                for i in ind:
                    if idx[i]:
                        dx = objects[0,i] - x
                        if y+upper*dx < objects[1,i] or y+lower*dx > objects[1,i]:
                            idx[i] = False

                return objects[:,idx]

            # intersection of line and circle
            def intersect(p1,p2,obj):
                x1 = p1[0] - obj[0]
                x2 = p2[0] - obj[0]
                y1 = p1[1] - obj[1]
                y2 = p2[1] - obj[1]

                r = obj[2]

                dx = x2-x1
                dy = y2-y1
                dr = np.sqrt(dx*dx + dy*dy)
                D = x1*y2 - x2*y1

                cross = False
                disc = r**2*dr**2 - D**2
                if disc>=0:
                    cross = True
                    xp = (D*dy + np.sign(dy)*dx*np.sqrt(r**2*dr**2 - D**2)) / dr**2
                    xm = (D*dy - np.sign(dy)*dx*np.sqrt(r**2*dr**2 - D**2)) / dr**2
                    yp = (-D*dx + np.abs(dy)*np.sqrt(r**2*dr**2 - D**2)) / dr**2
                    ym = (-D*dx - np.abs(dy)*np.sqrt(r**2*dr**2 - D**2)) / dr**2
                    
                    dp = np.sqrt( (xp+obj[0])**2 + (yp+obj[1])**2 )
                    dm = np.sqrt( (xm+obj[0])**2 + (ym+obj[1])**2 )
                    if dp >= dm:
                        p2 = (xm+obj[0],ym+obj[1])
                    else:
                        p2 = (xp+obj[0],yp+obj[1])

                return p2, cross
            
            if self.animate and round(self.time,1)%.1==0:
                ax.clear()
            x,z,_,theta,_,_ = state
            beam,angle,res = lidar
            
            rays = np.zeros((2,res))
            if beams:
                objects = seenObjects([x,z,theta],angle,beam,objects)
                
                angles = np.linspace(-angle/2,angle/2,num=res)
                for i,a in enumerate(angles):
                    x1,y1 = x,z
                    x2,y2 = x+beam*np.cos(theta+a),z+beam*np.sin(theta+a)
                    
                    # check for lidar intersection
                    for obj in objects.T:
                        p,cross = intersect((x1,y1),(x2,y2),obj)
                        if cross:
                            if rays[0,i] == 0 or np.linalg.norm(rays[:,i]) > np.sqrt((p[0]-x1)**2+(p[1]-y1)**2):
                                x2,y2 = p
                                rays[0,i] = x2-x1
                                rays[1,i] = y2-y1

                    if self.animate and round(self.time,1)%.1==0:
                        ax.plot([x1,x2],[y1,y2],'k-')

            else:
                if self.animate and round(self.time,1)%.1==0:
                    ax.plot([x,x+beam*np.cos(theta+angle/2)],[z,z+beam*np.sin(theta+angle/2)],'k--')
                    ax.plot([x,x+beam*np.cos(theta-angle/2)],[z,z+beam*np.sin(theta-angle/2)],'k--')
                    a = np.linspace(-angle/2,angle/2,int(res/3))
                    xs = x+beam*np.cos(theta+a)
                    zs = z+beam*np.sin(theta+a)
                    ax.plot(xs,zs,'k.',markersize=1)

                    xs = objects[0,:]
                    ys = objects[1,:]
                    s = objects[2,:]
                    ax.scatter(xs,ys,50*s,'r')

            if self.animate and round(self.time,1)%.1==0:
                ax.set_ylabel('Height (m)')
                ax.set_xlim([x-0.5*beam,x+2.5*beam])
                ax.set_ylim([z-0.75*beam,z+0.75*beam])
                ax.set_aspect('equal')

            return rays

        def update_grid(ax,lidar,rays,res=1):
            
            beam,_,_ = lidar
            r,c = int(1.5*beam/res), int(3*beam/res)
            grid = np.zeros((r,c))
            x,y = int(c/6),int(r/2)
            grid[y,x] = 1

            r,c = np.shape(rays)
            for i in range(c):
                if rays[0,i] != 0:
                    grid[(y+rays[1,i]).astype(int),(x+rays[0,i]).astype(int)] = 1

            if self.animate and round(self.time,1)%.1==0:
                ax.clear()
                ax.imshow(grid,cmap='gray')
                ax.invert_yaxis()
                ax.set_aspect('equal')
                ax.set_yticklabels([])
                ax.set_xticklabels([])
                ax.set_xlabel('Distance (m)')
                ax.set_ylabel('Height (m)')

            return grid

        err_msg = f"{action!r} ({type(action)}) invalid"
        assert self.state is not None, "Call reset before using step method."
        
        #Extra states
        x, z, v, theta, theta_dot, gamma = self.state
        rho = np.random.normal(self.rhoNom,0.0)
        alpha = theta - gamma
        q = 0.5*rho*v**2
        
        #Control inputs
        if (self.time > 0):
            self.EKF()
        thrust, stateCommand = action

        delta_e = self.elevatorFromAlpha(stateCommand[1]-stateCommand[3])
        controlDirect = self.Kgain @ (np.array([[theta], [theta_dot]])-np.array([[stateCommand[1]], [0]]))
        error = (np.array([self.stateEstimate]).T-np.array([stateCommand]).T)
        controlFull = self.KFull @ error
        delta_e = delta_e-controlFull[0][0]

        #alpha dot
        Cl = self.Cl_0 + self.Cl_alpha*alpha
        L = q*self.S*Cl
        gamma_dot = (L - self.m*self.g*np.cos(gamma) +thrust*np.sin(alpha)) / (self.m*v)
        alpha_dot = theta_dot-gamma_dot

        # Forces
        Cd = self.Cd_0 + self.K*Cl**2
        Cm = self.Cm_0 + self.Cm_alpha*alpha + self.Cm_alpha_dot*alpha_dot + self.Cm_delta_e*delta_e
        D = q*self.S*Cd
        M = q*self.S*Cm

        #derivatives
        x_dot = v*np.cos(gamma)
        z_dot = v*np.sin(gamma)
        v_dot = (-D - self.m*self.g*np.sin(gamma) + thrust*np.cos(alpha)) / self.m
        #theta_dot = theta_dot
        theta_ddot = M / self.Iyy
        #Old controls for filter
        self.delta_e_stale = delta_e
        self.thrust_stale = thrust

        # integrate
        x += x_dot*self.dt
        z += z_dot*self.dt
        v += v_dot*self.dt 
        theta += theta_dot*self.dt
        theta_dot += theta_ddot*self.dt
        gamma += gamma_dot*self.dt

        state = np.array([x,z,v,theta,theta_dot,gamma])
        action = np.array([thrust,delta_e])

        self.state = (list(np.reshape(state,(6,))))
        self.time += self.dt

        terminated = bool(
            x < -self.x_thres
            or x > self.x_thres
            or z > self.z_thresHigh
            or z < self.z_thresLow
            or v < -self.v_thres
            or v > self.v_thres
            or theta < -self.theta_thres
            or theta > self.theta_thres
            or theta_dot < -self.theta_dot_thres
            or theta_dot > self.theta_dot_thres
            or gamma < -self.gamma_thres
            or gamma > self.gamma_thres
        )

        if not terminated:
            reward = 1.0
        elif self.steps_beyond_terminated is None:
            self.steps_beyond_terminated = 0
            reward = 0.0
        else:
            if self.steps_beyond_terminated == 0:
                gym.logger.warn(
                    "You are calling 'step()' even though this "
                    "environment has already returned terminated = True. You "
                    "should always call 'reset()' once you receive 'terminated = "
                    "True' -- any further steps are undefined behavior."
                )
            self.steps_beyond_terminated += 1
            reward = 0.0

        update_plot(self.axs[0],state,self.lidar,self.objects)
        rays = update_plot(self.axs[1],state,self.lidar,self.objects,beams=True)
        grid = update_grid(self.axs[2],self.lidar,rays)

        if self.animate and round(self.time,2)%.1==0:
            self.axs[0].set_title('Ground Truth')
            self.axs[1].set_title('Lidar View')
            self.axs[2].set_title('Occupancy Grid')
            plt.show()
            plt.pause(0.01)

        return state, reward, terminated, rays.astype(int)

    def reset(self,animate=True):
        self.state = np.array([0.0,100.0,self.vNom,0.0,0.0,0.0])
        self.state[2:] = self.coherentCommand(5,1/180*np.pi)
        self.stateEstimate = self.coherentCommand(5,1/180*np.pi)
        self.Pestimation = self.Qestimation.copy()
        self.steps_beyond_terminated = None
        self.time = 0
        self.calculateGains()

        self.animate = animate
        %matplotlib qt
        plt.close()
        self.fig,self.axs = plt.subplots(3,1,figsize=(6,10))

        num = 20
        x_pos = np.linspace(self.state[0]+15,self.state[0]+200,num)
        y_pos = np.random.uniform(low=self.state[1]-10,high=self.state[1]+10,size=num)
        sizes = np.random.uniform(low=0.5,high=2.0,size=num)
        self.objects = np.stack((x_pos,y_pos,sizes))

        return self.state

    def close(self):
        pass

In [69]:
env = Drone()
state = env.reset(animate=False)

iterations = 2000
traj = np.zeros((7,iterations))
thrustCommand = np.zeros(iterations)+5+np.linspace(0,1,iterations)
for iter in range(0, iterations):
    action = [thrustCommand[iter], env.coherentCommand(thrustCommand[iter],1/180*np.pi)] 
    state, reward, done, grid = env.step(action)
    traj[:,iter] = np.hstack((state,env.time))
    if done:
        
        break

r,c = np.shape(grid)
for i in range(r-1):
    for j in range(c-1):
        if grid[i,j]:
            plt.scatter(i,j)
plt.show()

In [70]:
def state_plots(traj,control):

    def plot_state(ax,x,y,ylabel):
        lim = 2
        ax.plot(x,y)
        ax.set_ylim([min(y)-lim,max(y)+lim])
        ax.set_ylabel(ylabel)
        return
    
    x = traj[0,:]
    z = traj[1,:]
    v = traj[2,:]
    theta = traj[3,:]
    thetaDot = traj[4,:]
    gamma = traj[5,:]
    time = traj[6,:]

    plt.close()
    fig, axs = plt.subplots(3,2, figsize=(8,8))
    plot_state(axs[0, 0],x,z,"Height (m)")
    plot_state(axs[0, 1],time,(theta-gamma)*180/np.pi,"alpha (deg)")
    plot_state(axs[1, 0],time,gamma,"Gamma (deg)")
    plot_state(axs[1, 1],time,v,"Velocity (m/s)")
    plot_state(axs[2, 0],time,theta*180/np.pi,"Theta (deg)")
    plot_state(axs[2, 1],time,control,"Thrust Command (N)")
    fig.show()
    return

state_plots(traj,thrustCommand)