<a href="https://colab.research.google.com/github/daviscvance/Random/blob/master/pybullet_ann.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Building wheels will take a couple minutes! Make sure to use the GPU instance
!pip install pybullet

In [0]:
!apt-get -qq -y install ffmpeg > /dev/null
# !ffmpeg

In [0]:
import random, numpy, math, time
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import display, HTML
import pybullet as pb
import tensorflow as tf
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense
from tensorflow.python.keras.layers import LSTM
from tensorflow.python.keras.optimizers import RMSprop

####################
# Environment (game)
####################

MAX_STEPS = 1000        # maximum number of simulation steps
STEPS_AFTER_TARGET = 30 # number of simulation steps after reaching the goal
TARGET_DELTA = 0.2      # value of acceptable rolling near the target (absolute value)
FORCE_DELTA = 0.1       # step change of force (absolute value)
PB_BallMass = 1         # ball weight
PB_BallRadius = .2     # radius of the ball
PB_HEIGHT = 10          # maximum height of raising the ball
MAX_FORCE = 20          # maximum vertical force applied to the ball
MIN_FORCE = 0           # minimum vertical force applied to the ball
MAX_VEL = 14.2          # maximum vertical speed of the ball
MIN_VEL = -14.2         # minimum vertical speed of the ball

class Environment:
    def __init__(self):
        # current state of environment
        self.pb_z = 0               # current ball height
        self.pb_force = 0           # current force applied to the ball
        self.pb_velocity = 0        # current vertical speed of the ball
        self.z_target = 0           # target height
        self.start_time = 0         # start time of the new game
        self.steps = 0              # number of steps after the start of the simulation
        self.target_area = 0        # fact of reaching the target
        self.steps_after_target = 0 # number of steps after reaching the goal
 
        # create a simulation
        self.pb_physicsClient = pb.connect(pb.DIRECT)

    def reset(self):
        # random height of the ball and target height
        z_target = random.uniform(0.01, 0.99)
        self.z_target = PB_BallRadius + z_target*PB_HEIGHT
        z = random.uniform(0.05, 0.95)
        self.pb_z = PB_BallRadius + z*PB_HEIGHT
        
        # reset of environmental parameters
        pb.resetSimulation()
        self.target_area = 0
        self.start_time = time.time()
        self.steps = 0
        self.steps_after_target = 0
        
        # simulation step 1/60 sec..
        pb.setTimeStep(1./60)
        
        # surface
        floorColShape = pb.createCollisionShape(pb.GEOM_PLANE)
        # for GEOM_PLANE, visualShape - not displayed, we will use GEOM_BOX
        floorVisualShapeId = pb.createVisualShape(pb.GEOM_BOX,halfExtents=[100,100,0.0001], rgbaColor=[1,1,.98,1])
        self.pb_floorId = pb.createMultiBody(0,floorColShape,floorVisualShapeId, [0,0,0], [0,0,0,1])# (mass,collisionShape,visualShape)
        
        # orb
        ballPosition = [0,0,self.pb_z]
        ballOrientation=[0,0,0,1]
        ballColShape = pb.createCollisionShape(pb.GEOM_SPHERE,radius=PB_BallRadius)
        ballVisualShapeId = pb.createVisualShape(pb.GEOM_SPHERE,radius=PB_BallRadius, rgbaColor=[0.25, 0.75, 0.25,1])
        self.pb_ballId = pb.createMultiBody(PB_BallMass, ballColShape, ballVisualShapeId, ballPosition, ballOrientation) #(mass, collisionShape, visualShape, ballPosition, ballOrientation)
        #pb.changeVisualShape(self.pb_ballId,-1,rgbaColor=[1,0.27,0,1])
        
        # target pointer (without CollisionShape, only display (VisualShape))
        targetPosition = [0,0,self.z_target]
        targetOrientation=[0,0,0,1]
        targetVisualShapeId = pb.createVisualShape(pb.GEOM_BOX,halfExtents=[1,0.025,0.025], rgbaColor=[0,0,0,1])
        self.pb_targetId = pb.createMultiBody(0,-1, targetVisualShapeId, targetPosition, targetOrientation)

        # gravity
        pb.setGravity(0,0,-10)

        # limit the motion of the ball only along the vertical axes
        pb.createConstraint(self.pb_floorId, -1, self.pb_ballId, -1, pb.JOINT_PRISMATIC, [0,0,1], [0,0,0], [0,0,0])

        # set the acting force on the ball to compensate for gravity
        self.pb_force = 10 * PB_BallMass
        pb.applyExternalForce(self.pb_ballId, -1, [0,0,self.pb_force], [0,0,0], pb.LINK_FRAME)
                
        # return values
        observation = self.getObservation()
        reward, done = self.getReward()
        info = self.getInfo()
        return [observation, reward, done, info]

    # Observations (return normalized state)
    def getObservation(self):
        # distance to target
        d_target =  0.5 + (self.pb_z - self.z_target)/(2*PB_HEIGHT)
        # acting force
        force = (self.pb_force-MIN_FORCE)/(MAX_FORCE-MIN_FORCE)
        # current ball height
        z = (self.pb_z-PB_BallRadius)/PB_HEIGHT
        # current speed
        z_velocity = (self.pb_velocity-MIN_VEL)/(MAX_VEL-MIN_VEL)
        state = [d_target, force, z_velocity]
        return state

    # reward per action calculation
    def getReward(self):
        done = False
        z_reward = 0
        # The fact of achieving the goal, then wait for the STEPS_AFTER_TARGET steps and complete the game.
        if (TARGET_DELTA >= math.fabs(self.z_target - self.pb_z)):
            self.target_area = 1
            z_reward = 1
        # Out of range
        if (self.pb_z > (PB_HEIGHT + PB_BallRadius) or self.pb_z < PB_BallRadius):
            done = True
        # Completion of the game after reaching the goal
        if (self.target_area > 0):
            self.steps_after_target += 1
            if (self.steps_after_target>=STEPS_AFTER_TARGET):
                done = True
        # Timeout game completion
        if (self.steps >= MAX_STEPS):
            done = True

        return [z_reward, done]
    
    # Additional information for collecting statistics
    def getInfo(self):
        game_time = time.time() - self.start_time
        if game_time:
            fps = round(self.steps/game_time)
        return {'step': self.steps, 'fps': fps}

    # Run the simulation step according to the passed action
    def step(self, action):
        self.steps += 1
        if action == 0:
            # 0 - increase in applied force
            self.pb_force -= FORCE_DELTA
            if self.pb_force < MIN_FORCE:
                self.pb_force = MIN_FORCE
        else:
            # 1 - reduction of applied force
            self.pb_force += FORCE_DELTA
            if self.pb_force > MAX_FORCE:
                self.pb_force = MAX_FORCE
        
        # change the current forces and run the simulation step
        pb.applyExternalForce(self.pb_ballId, -1, [0,0,self.pb_force], [0,0,0], pb.LINK_FRAME)
        pb.stepSimulation()
        
        # update the environment state parameters (ball position and speed)
        curPos, curOrient = pb.getBasePositionAndOrientation(self.pb_ballId)
        lin_vel, ang_vel= pb.getBaseVelocity(self.pb_ballId)
        self.pb_z = curPos[2]
        self.pb_velocity = lin_vel[2]
        
        # we will return the observations, the reward, the fact of the end of the game and additional information
        observation = self.getObservation()
        reward, done = self.getReward()
        info = self.getInfo()
        return [observation, reward, done, info]
    
    # The current image from the camera
    def render(self):
        camTargetPos = [0,0,5] # target location (focus) of the camera
        camDistance = 10       # camera distance from target
        yaw = 0                # yaw angle relative to target
        pitch = 0              # camera tilt relative to target
        roll=0                 # camera roll angle relative to target
        upAxisIndex = 2        # camera vertical axis (z)

        fov = 60               # camera angle
        nearPlane = 0.01       # distance to the near clipping plane
        farPlane = 20          # distance to the distant cut plane
        pixelWidth = 320       # image width
        pixelHeight = 200      # image height
        aspect = pixelWidth/pixelHeight;  # image aspect ratio
       
        # view matrix
        viewMatrix = pb.computeViewMatrixFromYawPitchRoll(camTargetPos, camDistance, yaw, pitch, roll, upAxisIndex)
        # projection matrix
        projectionMatrix = pb.computeProjectionMatrixFOV(fov, aspect, nearPlane, farPlane);
        # rendering camera images
        img_arr = pb.getCameraImage(pixelWidth, pixelHeight, viewMatrix, 
                                    projectionMatrix, 
                                    shadow=0, 
                                    lightDirection=[0,1,1],
                                    renderer=pb.ER_TINY_RENDERER)

        w=img_arr[0]            # width of the image, in pixels
        h=img_arr[1]            # height of the image, in pixels
        rgb=img_arr[2]          # color data RGB
        dep=img_arr[3]          # depth data
        
        # return rgb matrix
        return rgb
    
#################################
# Memory for teaching examples
#################################

MEMORY_CAPACITY = 200000
class Memory:
    def __init__(self):
        self.samples = []   # tuples of the type (s, a, r, s_) are stored

    def add(self, sample):
        self.samples.append(sample)
        if len(self.samples) > MEMORY_CAPACITY:
            self.samples.pop(0)

    def sample(self, n):
        n = min(n, len(self.samples))
        return random.sample(self.samples, n)

    
##################
# Neural network
##################

LAYER_SIZE = 512       # layer size
STATE_CNT  = 3         # number of input parameters (distance to target + current force + speed)
ACTION_CNT = 2         # number of outputs (reward for reducing and increasing strength)
class Brain:
    def __init__(self):
        self.model = self._QNetwork()
        
    def _QNetwork(self):
        # Create a network using Keras
        model = Sequential()
        model.add(Dense(units=LAYER_SIZE, activation='relu', input_dim=STATE_CNT))
        model.add(Dense(units=LAYER_SIZE, activation='relu'))
        model.add(Dense(units=ACTION_CNT, activation='linear'))
        opt = RMSprop(lr=0.00025)
        model.compile(loss='mse', optimizer=opt)
        return model
    
    # learning by one package of teaching examples
    def train(self, x, y, batch_size=32, epoch=1, verbose=0):
        self.model.fit(x, y, batch_size=batch_size, epochs=epoch, verbose=verbose)

    # network predictions from the list of initial states
    def predict(self, s):
        return self.model.predict(s)

    #  network predictions for one initial state
    def predictOne(self, s):
        s = numpy.array(s)
        predictions = self.predict(s.reshape(1, STATE_CNT)).flatten()
        return predictions

###############
# Agent
###############

GAMMA = 0.98        # discount factor
MAX_EPSILON = 0.5   # maximum probability of choosing a random action
MIN_EPSILON = 0.1   # minimum probability of choosing a random action
LAMBDA = 0.001      # parameter that determines the rate of decrease in the probability of choosing a random action
BATCH_SIZE = 32     # training package size

class Agent:
    def __init__(self):
        self.brain = Brain()                     # Neural network for learning
        self.memory = Memory()                   # Repository of case studies
        self.epsilon = MAX_EPSILON               # Determines the probability of choosing a random action.

    # choice of action
    def act(self, s):
        if random.random() < self.epsilon:
            return random.randint(0, ACTION_CNT - 1)        # choose a random action
        else:
            return numpy.argmax(self.brain.predictOne(s))   # we choose the optimal action
        
    # изменение состояния агента
    def observe(self, sample, game_num):  # sample = (s, a, r, s_)
        self.memory.add(sample)
        self.epsilon = MIN_EPSILON + (MAX_EPSILON-MIN_EPSILON)*math.exp(-LAMBDA*game_num)

    # training in a random batch of learning examples
    def train(self):
        batch = self.memory.sample(BATCH_SIZE)
        batchLen = len(batch)  
        if batchLen<BATCH_SIZE: # we will be trained only if there are enough examples in memory
            return

        # initial states from package
        states = numpy.array([ o[0] for o in batch ])
        # initial states from package
        states_ = numpy.array([ o[3] for o in batch ])

        # benefits for initial states
        p = agent.brain.predict(states)
        # выгоды для конечных состояний
        p_ = agent.brain.predict(states_)

        # сформируем пустой обучающий пакет
        x = numpy.zeros((batchLen, STATE_CNT))
        y = numpy.zeros((batchLen, ACTION_CNT))

        # заполним пакет
        for i in range(batchLen):
            o = batch[i]
            s = o[0]; a = o[1]; r = o[2]; s_ = o[3]

            t = p[i] # выгоды действий для начального состояния
            # обновим выгоду только для совершенного действия, для неиспользованных действий выгоды останутся прежними
            t[a] = r + GAMMA * numpy.amax(p_[i]) # вычислим новую выгоду действия используя награду и максимальную выгоду конечного состояния
            
            # сохраним значения в batch
            x[i] = s
            y[i] = t

        # обучим сеть по данному пакету
        self.brain.train(x, y)

#######################
# Статистика
#######################

class Stats():
    def __init__(self):
        self.stats={"game_num": [],"rewards": [], "success_steps": [], "fps": [], "steps":[], "epsilon":[]}

    def save_stat(self, R, info, epsilon, game_num):
        self.stats["rewards"].append(R)
        self.stats["success_steps"].append(R/STEPS_AFTER_TARGET)
        self.stats["game_num"].append(game_num)
        self.stats["epsilon"].append(epsilon)
        self.stats["steps"].append(info["step"])
        self.stats["fps"].append(info["fps"])
    def show_stat(self):
        # отобраим процент удачных шагов за опыт
        plt.plot(self.stats["game_num"], self.stats["success_steps"], "b.")
        # отобразим сглаженный график
        x, y = self.fit_data(self.stats["game_num"],  self.stats["success_steps"])
        plt.plot(x, y, "r-")
        # второй вариант сглаживания    
        # plt.plot(numpy.linspace(self.stats["game_num"][0], self.stats["game_num"][-1],50), numpy.average(numpy.array_split(self.stats["success_steps"][:-1], 50),1), "g-")
        plt.show()
    #  Полиномиальное сглаживание
    def fit_data(self, x, y):
        z = numpy.polyfit(x, y, 3)
        f = numpy.poly1d(z)
        # новые данные размерностью 50
        x_new = numpy.linspace(x[0], x[-1], 50)
        y_new = f(x_new)
        return [x_new, y_new]

###########
# MAIN
###########
%matplotlib inline
MAX_GAMES = 50000   # максимальное количество игр
RENDER_PERIOD = 100 # период генерации видео с опытом (0 для отключения)

env = Environment()
agent = Agent()
stats = Stats()

for game_num in range(MAX_GAMES):
    print ("Game %d:" % game_num)
    render_imgs = []
    observation, r, done, info = env.reset()
    s = observation
    R = r
    
    if RENDER_PERIOD and (game_num % RENDER_PERIOD == 0):
        plt.subplots()
    
    while True:
        # возьмем оптимальное действие на основе текущего состояния
        a = agent.act(s)
        # запустим шаг симуляции
        observation, r, done, info = env.step(a)
        s_ = observation      # новое состояние
        # сохраним состояние агента
        agent.observe((s, a, r, s_), game_num)
        # обучим сеть по случайносу batch-у
        agent.train()
        
        s = s_
        R += r
        
        # сохраним изображение, если необходимо
        if RENDER_PERIOD and game_num % RENDER_PERIOD == 0:
            rgb = env.render()
            render_imgs.append([plt.imshow(rgb, animated=True)])

        if done:
            break
        #time.sleep(1./130)

    print("Total reward:", R, " FPS:", info['fps'])
    
    # сохраним статистику
    stats.save_stat(R, info, agent.epsilon, game_num)
    
    # сформируем анимацию игры и графики статистики обучения
    if len(render_imgs):
        render_start = time.time()
        ani = animation.ArtistAnimation(plt.gcf(), render_imgs, interval=10, blit=True,repeat_delay=1000)
        plt.close()
        display(HTML(ani.to_html5_video()))
        # статистика
        if game_num != 0:
            plt.subplots(figsize=(10,4))
            stats.show_stat()
            plt.close()
        render_stop = time.time()
        print ("render time: %f sec.\n---\n" % (render_stop - render_start))

