In [113]:
#import stuff we need
import numpy as np
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
import cv2
import os
import copy 
import torch
import torch.nn as nn

from routines import *


import warnings
warnings.filterwarnings('ignore')

np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x1396a8490>

In [111]:
class Environment:
    #initialization of internal state
    def __init__(self,m1=1,m2=1,l1=1,l2=1,g=1,max_steps=200,dt=0.005):
        self.m1=m1
        self.m2=m2
        self.l1=l1
        self.l2=l2
        self.g=g
        self.dt=dt
        self.steps_left=max_steps
        self.reset()

        
    def reset(self,theta1=np.pi+0.03*np.random.rand(),theta2=np.pi+0.05*np.random.rand(),theta1_d=0,theta2_d=0):
        self.state=[theta1,theta2,theta1_d,theta2_d]
        self.state_history=[self.state.copy()]
        
    #returns the current environment's observation to the agent
    def get_observation(self):
        return self.state
    
    #allows the agent to query the set of actions it can execute
    def sample_action(self):
        #return self.state[3]+0.1*(np.random.rand()-0.5)
        return 1.2

    
    #signals the end of the episode to the agent
    def is_done(self):
        return self.steps_left==0
    
    #central piece: handles agents action and returns reward for the action
    def action(self,action):
        if self.is_done():
            raise Exception('Game is over')
        self.step(action)
        delta=np.abs(self.state[1]-np.pi)
        if delta<np.pi/2:
            self.steps_left -=1
            return 1-delta/(np.pi/2)
        else:
            self.steps_left=0
            #self.steps_left -=1#if you want to play the game until max step reached
            return 0
    
    def explicite_euler(self,dt,state,F=0):
        theta1_dd,theta2_dd=self.get_theta_dd(state,F)
        next_state= [state[0]+dt*state[2],state[1]+dt*state[3],state[2]+dt*theta1_dd,state[3]+dt*theta2_dd]
        return next_state
    
    #the decoupled equations of motion
    def get_theta_dd(self,state,F=0):
        theta1=state[0]
        theta2=state[1]
        theta1_d=state[2]
        theta2_d=state[3]
        #----theta1_dd-----
        num1=-self.g*((2*self.m1+self.m2)*np.sin(theta1)+self.m2*np.sin(theta1-2*theta2))
        num2=-2*np.sin(theta1-theta2)*self.m2*(theta2_d**2*self.l2+theta1_d**2*self.l1*np.cos(theta1-theta2))
        num3=2*F
        denum1=2*self.m1+self.m2-self.m2*np.cos(2*theta1-2*theta2)
        denum=self.l1*denum1
        theta1_dd=(num1+num2+num3)/denum
        #----theta2_dd----
        num1=2*np.sin(theta1-theta2)
        num2=theta1_d**2*self.l1*(self.m1+self.m2)+self.g*(self.m1+self.m2)*np.cos(theta1)+theta2_d**2*self.l2*self.m2*np.cos(theta1-theta2)
        num3=-2*F*np.cos(theta1-theta2)
        denum=self.l2*denum1
        theta2_dd=(num3+num1*num2)/denum
        return theta1_dd,theta2_dd
    
    #differentail time step using explicite midpoint method
    def step(self,F):
        next_state=self.explicite_euler(self.dt/2,self.state,F)
        theta1_dd,theta2_dd=self.get_theta_dd(next_state,F)
        self.state[0]+=self.dt*next_state[2]
        self.state[1]+=self.dt*next_state[3]
        self.state[2]+=self.dt*theta1_dd
        self.state[3]+=self.dt*theta2_dd
        self.state_history.append(self.state.copy())
        
    
    def render(self,img_res=1,save_path='trash_figures/'):
        frames_per_second=20
        take_frame_every=int(1/(self.dt*frames_per_second))
        frames=[]
        h=int(img_res*200)
        w=h
        x0=int(w/2)
        y0=int(h/2)
        h_red=int(0.4*h)
        l_tot=self.l1+self.l2
        l1_ratio=self.l1/l_tot
        l2_ratio=self.l2/l_tot
        L1=l1_ratio*h_red
        L2=l2_ratio*h_red
        d=int(0.02*h)
        d1=d*self.m1**(1/3)
        d2=d*self.m2**(1/3)
        d_4=d/4
        #max_theta2_d=1.2*np.max(np.abs(phase_traject[:,3]))
        for i,state_i in enumerate(self.state_history):
            if i%5000==0:
                print('rendering iteration: '+str(i)+'/'+str(len(self.state_history)))           
            if i%take_frame_every==0:
                theta1=state_i[0]
                theta2=state_i[1]
                #----transform to cartesian coordinates---
                x1=x0+L1*np.sin(theta1)
                y1=y0+L1*np.cos(theta1)
                x2=x1+L2*np.sin(theta2)
                y2=y1+L2*np.cos(theta2)
                #---draw the image ----
                img = Image.new("RGB", (w, h), "white")
                draw = ImageDraw.Draw(img)
                draw.line([(x0,y0),(x1,y1)],fill=(0,0,0),width=1)
                draw.ellipse([(x1-d1,y1-d1),(x1+d1,y1+d1)], fill=(0,0,0), outline=None)
                draw.line([(x1,y1),(x2,y2)],fill=(0,0,0),width=1)
                draw.ellipse([(x2-d2,y2-d2),(x2+d2,y2+d2)], fill=(0,0,255), outline=None)
                frames.append(img)
        cv2_list=self.pil_list_to_cv2(frames)
        self.generate_video(cv2_list,path=save_path+'inverted_pendulum.avi',fps=1000/40)
    
    #calculates the potential and kinetic energy of the two masses at a given state
    def get_energy(self,state):
        theta1=state[0]
        theta2=state[1]
        theta1_d=state[2]
        theta2_d=state[3]
        y1=self.l1*np.cos(theta1)
        y2=y1+self.l2*np.cos(theta2)
        e_pot=np.array([-self.m1*self.g*y1,-self.m2*self.g*y2])
        e_kin_1=self.m1/2*(self.l1*theta1_d)**2
        e_kin_2=(self.l1*theta1_d)**2
        e_kin_2+=(self.l2*theta2_d)**2
        e_kin_2+=2*self.l1*self.l2*theta1_d*theta2_d*(np.cos(theta1)*np.cos(theta2)+np.sin(theta1)*np.sin(theta2))
        e_kin_2*=self.m2/2
        e_kin=np.array([e_kin_1,e_kin_2])
        return e_pot,e_kin
    
    #used for video converting
    def pil_list_to_cv2(self,pil_list):
        #converts a list of pil images to a list of cv2 images
        png_list=[]
        for pil_img in pil_list:
            pil_img.save('trash_image.png',format='png')
            png_list.append(cv2.imread('trash_image.png'))
        os.remove('trash_image.png')
        return png_list

    def generate_video(self,cv2_list,path='car_race.avi',fps=10): 
        #makes a video from a given cv2 image list
        if len(cv2_list)==0:
            raise ValueError('the given png list is empty!')
        video_name = path
        frame=cv2_list[0] 
        # setting the frame width, height width 
        # the width, height of first image 
        height, width, layers = frame.shape   
        video = cv2.VideoWriter(video_name, 0, fps, (width, height))  
        # Appending the images to the video one by one 
        for cv2_image in cv2_list:  
            video.write(cv2_image) 
        # Deallocating memories taken for window creation 
        cv2.destroyAllWindows()  
        video.release()  # releasing the video generated 
            

In [115]:
class GeneticAgent:
    #initialize the counter for the total reward
    def __init__(self,n_neurons=[3,2]):
        self.total_reward=0.0
        self.initialize_policy(n_neurons)
            
    #accepts the environment instance as an argument and allows the agents to observe and act
    def step(self,env):
        observation=env.get_observation()
        #action = env.sample_action()
        action=self.get_action(observation)
        reward=env.action(action)
        self.total_reward+=reward
        
    def get_action(self,state):
        with torch.no_grad():
            return self.policy(torch.FloatTensor(state)).item()
        
    def mutate(self,muatation_rate=0.1):
        with torch.no_grad():
            for param in self.policy.parameters():
                param.add_(torch.randn(param.size()) * muatation_rate)
        
    def initialize_policy(self,n_neurons):
        self.policy=self.get_nn(4,1,n_neurons)
    
    def get_nn(self,input_dim,output_dim,n_neurons):
        neural_network=nn.Sequential()
        depth=len(n_neurons)
        expanded_n_neurons=[input_dim]
        expanded_n_neurons+=n_neurons
        expanded_n_neurons.append(output_dim)
        for i in range(depth):
            neural_network.add_module("layer"+str(i),nn.Sequential(nn.Linear(expanded_n_neurons[i],expanded_n_neurons[i+1]),nn.Sigmoid()))
        neural_network.add_module("layer"+str(depth),nn.Sequential(nn.Linear(expanded_n_neurons[depth],expanded_n_neurons[depth+1])))
        return neural_network
        


In [None]:
#functions for genetic algorithm
def get_first_generations(population_size):
    pass

def get_scores(agent_list,environment):
    pass

def get_next_generation(scores,mutants,n_survivors)

In [121]:
n_generations=3
n_mutants=10
n_survivors=2

new_mutants=get_first_generation(N_mutants)#all agents initialized independently at random
for i in range(n_generations):
    env=Environment(max_steps=3000,dt=0.005,m1=0.5,m2=1,l1=0.5,l2=1)
    #env.reset(theta1=0,theta2=0)
    scores=get_scores(new_mutants,env)
    new_mutants,sorted_scores=get_next_generation(scores,mutants,n_survivors)
    print('best score: '+str(sorted_scores[-1])[:6])




mutants=[]
for i in range(N_mutants):
    mutants.append(GeneticAgent())


for mutant_agent in mutants:
    env=Environment(max_steps=3000,dt=0.005,m1=0.5,m2=1,l1=0.5,l2=1)
    #env.reset(theta1=0,theta2=0)
    while not env.is_done():
        mutant_agent.step(env)
    print('Total reward got: %.4f' %mutant_agent.total_reward)

#for i in range(N_mutants):
    #mutant_agent=copy.deepcopy(agent)
    #mutant_agent.mutate()
    #mutants.append(mutant_agent)
#---render epoch--
env.render()

#---check the energy
#e_pot_start,e_kin_start=env.get_energy(env.state_history[0])
#e_pot_end,e_kin_end=env.get_energy(env.state_history[-1])
#e_start=np.sum(e_pot_start)+np.sum(e_kin_start)
#e_end=np.sum(e_pot_end)+np.sum(e_kin_end)
#print('starting energy: '+str(e_start))
#print('final energy: '+str(e_end))





Total reward got: 907.7033
Total reward got: 531.6779
Total reward got: 413.6388
Total reward got: 577.3973
Total reward got: 528.6088
Total reward got: 1015.5056
Total reward got: 597.1271
Total reward got: 1279.2166
Total reward got: 609.4569
Total reward got: 389.6676
rendering iteration: 0/534
