# MP3 Lost in Space(Particle Filter)

In [21]:
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp
import scipy.stats as st
import time
import turtle

In [22]:
timsSpan = 100 # set a time limit for simulation, will stop after 100 steps. 
numberOfParticles = 5000
grid_height = 25
grid_width = 25
num_rows = 20
num_cols = 20
wall_prob = 0.2
random_seed = 100
window_height = grid_height*num_rows # 500
window_width = grid_width*num_cols # 500

We are going to use turtle graphics package to draw the maze. Notice that when you want to stop the simulation, you need to close the turtle graphic window first. Otherwise the python kernel will get stuck.   

In [23]:
class Maze(object):
    def __init__(self,dimension=2, maze = None):
        '''
        maze: 2D numpy array.
        passages are coded as a 4-bit number, with a bit value taking
        0 if there is a wall and 1 if there is no wall.
        The 1s register corresponds with a square's top edge,
        2s register the right edge,
        4s register the bottom edge,
        and 8s register the left edge.
        (numpy array)
        '''
        self.dimension = dimension
        self.grid_height = grid_height
        self.grid_width = grid_width
        self.window = turtle.Screen()
        self.window.setup (width = window_width, height = window_height)

        if maze is not None:
            self.maze = maze
            self.num_rows = maze.shape[0]
            self.num_cols = maze.shape[1]
            self.fix_maze_boundary()
            self.fix_wall_inconsistency()
        else:
            assert num_rows is not None and num_cols is not None and wall_prob is not None, 'Parameters for random maze should not be None.'
            self.random_maze(num_rows = num_rows, num_cols = num_cols, wall_prob = wall_prob, random_seed = random_seed)

        self.height = self.num_rows * self.grid_height
        self.width = self.num_cols * self.grid_width

        self.turtle_registration()

    def turtle_registration(self):
        turtle.register_shape('tri', ((-3, -2), (0, 3), (3, -2), (0, 0)))

    def check_wall_inconsistency(self):
        wall_errors = list()
        # Check vertical walls
        for i in range(self.num_rows):
            for j in range(self.num_cols-1):
                if (self.maze[i,j] & 2 != 0) != (self.maze[i,j+1] & 8 != 0):
                    wall_errors.append(((i,j), 'v'))
        # Check horizonal walls
        for i in range(self.num_rows-1):
            for j in range(self.num_cols):
                if (self.maze[i,j] & 4 != 0) != (self.maze[i+1,j] & 1 != 0):
                    wall_errors.append(((i,j), 'h'))

        return wall_errors

    def fix_wall_inconsistency(self, verbose = True):
        '''
        Whenever there is a wall inconsistency, put a wall there.
        '''
        wall_errors = self.check_wall_inconsistency()

        if wall_errors and verbose:
            print('Warning: maze contains wall inconsistency.')

        for (i,j), error in wall_errors:
            if error == 'v':
                self.maze[i,j] |= 2
                self.maze[i,j+1] |= 8
            elif error == 'h':
                self.maze[i,j] |= 4
                self.maze[i+1,j] |= 1
            else:
                raise Exception('Unknown type of wall inconsistency.')
        return

    def fix_maze_boundary(self):
        '''
        Make sure that the maze is bounded.
        '''
        for i in range(self.num_rows):
            self.maze[i,0] |= 8
            self.maze[i,-1] |= 2
        for j in range(self.num_cols):
            self.maze[0,j] |= 1
            self.maze[-1,j] |= 4

    def random_maze(self, num_rows, num_cols, wall_prob, random_seed = None):
        if random_seed is not None:
            np.random.seed(random_seed)
        self.num_rows = num_rows
        self.num_cols = num_cols
        self.maze = np.zeros((num_rows, num_cols), dtype = np.int8)
        for i in range(self.num_rows):
            for j in range(self.num_cols-1):
                if np.random.rand() < wall_prob:
                    self.maze[i,j] |= 2
        for i in range(self.num_rows-1):
            for j in range(self.num_cols):
                if np.random.rand() < wall_prob:
                    self.maze[i,j] |= 4

        self.fix_maze_boundary()
        self.fix_wall_inconsistency(verbose = False)
        np.save('maze.npy', self.maze)

    def permissibilities(self, cell):#(row number, col number)
        '''
        Check if the directions of a given cell are permissible.
        Return:
        (down, right, up, left)
        '''
        cell_value = self.maze[cell[0], cell[1]]
        return (cell_value & 1 == 0, cell_value & 2 == 0, cell_value & 4 == 0, cell_value & 8 == 0)

    def distance_to_walls(self, coordinates):
        '''
        Measure the distance of coordinates to nearest walls at four directions.
        Return:
        (down, right, up, left)
        '''
        x, y = coordinates
        i = int(y // self.grid_height)
        j = int(x // self.grid_width)
        d1 = y - y // self.grid_height * self.grid_height
        while self.permissibilities(cell = (i,j))[0]:
            i -= 1
            d1 += self.grid_height

        i = int(y // self.grid_height)
        j = int(x // self.grid_width)
        d2 = self.grid_width - (x - x // self.grid_width * self.grid_width)
        while self.permissibilities(cell = (i,j))[1]:
            j += 1
            d2 += self.grid_width

        i = int(y // self.grid_height)
        j = int(x // self.grid_width)
        d3 = self.grid_height - (y - y // self.grid_height * self.grid_height)
        while self.permissibilities(cell = (i,j))[2]:
            i += 1
            d3 += self.grid_height

        i = int(y // self.grid_height)
        j = int(x // self.grid_width)
        d4 = x - x // self.grid_width * self.grid_width
        while self.permissibilities(cell = (i,j))[3]:
            j -= 1
            d4 += self.grid_width

        return [d1, d2, d3, d4]

    def show_maze(self):
        turtle.setworldcoordinates(0, 0, self.width * 1.005, self.height * 1.005)
        wally = turtle.Turtle()
        wally.speed(0)
        wally.width(1.5)
        wally.hideturtle()
        turtle.tracer(0, 0)

        for i in range(self.num_rows):
            for j in range(self.num_cols):
                permissibilities = self.permissibilities(cell = (i,j))
                turtle.up()
                wally.setposition((j * self.grid_width, i * self.grid_height))
                # Set turtle heading orientation
                # 0 - east, 90 - north, 180 - west, 270 - south
                wally.setheading(0)
                if not permissibilities[0]:
                    wally.down()
                else:
                    wally.up()
                wally.forward(self.grid_width)
                wally.setheading(90)
                wally.up()
                if not permissibilities[1]:
                    wally.down()
                else:
                    wally.up()
                wally.forward(self.grid_height)
                wally.setheading(180)
                wally.up()
                if not permissibilities[2]:
                    wally.down()
                else:
                    wally.up()
                wally.forward(self.grid_width)
                wally.setheading(270)
                wally.up()
                if not permissibilities[3]:
                    wally.down()
                else:
                    wally.up()
                wally.forward(self.grid_height)
                wally.up()
        turtle.update()

    def show_valid_particles(self, particles, show_frequency = 10):
        turtle.shape('tri')
        for i, particle in enumerate(particles):
            if i % show_frequency == 0:
                turtle.setposition((particle[0], particle[1]))
                turtle.setheading(90)
                turtle.color('blue')
                turtle.stamp()            
        turtle.update()
        
    def show_estimated_location(self, estimate):
        y_estimate, x_estimate, heading_estimate= estimate[1], estimate[0], 0
        turtle.color('orange')
        turtle.setposition((x_estimate, y_estimate))
        turtle.setheading(90 - heading_estimate)
        turtle.shape('turtle')
        turtle.stamp()
        turtle.update()

    def clear_objects(self):
        turtle.clearstamps()
        
    def show_robot_position(self, robotX, robotY, robotHeading=0):
        turtle.color('green')
        turtle.shape('turtle')
        turtle.shapesize(0.7, 0.7)
        turtle.setposition((robotX, robotY))
        turtle.setheading(90 - robotHeading)
        turtle.stamp()
        turtle.update()
    
    def finish(self):
        turtle.done()
        turtle.exitonclick()

print ('...')

...


In [24]:
#default 2-D model for BB-8 robot 
class default_2D_Model:
    def __init__(self, mazeMap = 'maze.npy', ):
        self.height = num_rows * grid_height
        self.width = num_cols * grid_width
        self.grid_height = grid_height
        self.grid_width = grid_width
        self.num_rows = num_rows
        self.num_cols = num_cols
        self.x = 12 #+ 2*grid_width
        self.y = 12 #+ 12* grid_height
        #self.heading = 0
        self.map = Maze(maze = np.load(mazeMap))
        self.accuracy = 10#35 #std
        self.motionNoise = 40 
        self.max = [num_cols*grid_width-1, num_rows*grid_height-1]   #max valid values
        self.min = [0, 0]
        self.map.show_maze()
        
    #input a postion [x, y]
    #return the map reading, which are the distances to the closest wall on four directions at this position
    #[d1, d2, d3, d4]
    def readingMap(self, position): 
        validPosition = [0,0]
        for i in range(2):
            validPosition[i] = max(int(position[i]), self.min[i])
            validPosition[i] = min(int(position[i]), self.max[i])
            
        reading = self.map.distance_to_walls((validPosition[0], validPosition[1]))
        return reading
    
    #return the BB-8's sensor reading, which are the distances to the closest wall on four directions 
    #[d1, d2, d3, d4]
    def readingSensor(self):
        reading = self.map.distance_to_walls((self.x, self.y))
        for i in range(len(reading)):
            reading[i] += np.random.normal(0, self.accuracy) #add some noise
        return reading
    
    #input: the positoin of the previous particle [x',y'], (optional) the control signal integer currentControl
    #return: if the robot is at the position of the previous particle, the current robot position
    #[x,y]
    def simulateNextPosition(self, previousEstimate, currentControl=0):#Control command:0 halt, 1 down, 2 right, 3 up, 4 left
        for i in range(2):
            previousEstimate[i] = max(previousEstimate[i], self.min[i])
            previousEstimate[i] = min(previousEstimate[i], self.max[i])
        x, y = previousEstimate[0], previousEstimate[1]
        cellX, cellY = int(x // self.grid_width), int(y // self.grid_height)
        
        permissibilities = self.map.permissibilities((cellY, cellX))  #(down, right, up, left)
        if (currentControl == 3 and permissibilities[2]):
            y += self.grid_height 
        elif (currentControl == 1 and permissibilities[0]):
            y -= self.grid_height
        elif (currentControl == 2 and permissibilities[1]):
            x += self.grid_width
        elif (currentControl == 4 and permissibilities[3]):
            x -= self.grid_width
        
        x += np.random.normal(0, self.motionNoise)
        y += np.random.normal(0, self.motionNoise)
        nextEstimate = np.array([x, y])
        
        for i in range(2):
            nextEstimate[i] = max(self.min[i], nextEstimate[i])
            nextEstimate[i] = min(self.max[i], nextEstimate[i])
        
        return nextEstimate
    
    #Input: Control command:0 halt, 1 down, 2 right, 3 up, 4 left. 
    #Can only move from the center of one cell to the center of one of four neighboring cells.
    #
    def run(self, currentControl=0): 
        self.map.show_robot_position(self.x, self.y, 0)#####
        
        cellX, cellY = int(self.x // self.grid_width), int(self.y // self.grid_height)
        permissibilities = self.map.permissibilities((cellY, cellX))  ##(down, right, up, left)
        #print (permissibilities, (cellX, cellY))
        if (currentControl == 3 and permissibilities[2]):
            self.y += self.grid_height
        elif (currentControl == 1 and permissibilities[0]):
            self.y -= self.grid_height
        elif (currentControl == 2 and permissibilities[1]):
            self.x += self.grid_width
        elif (currentControl == 4 and permissibilities[3]):
            self.x -= self.grid_width
            
        self.map.clear_objects()
        
    # input is 2D python list containing position of all particles: [[x1,y1], [x2,y2]...]
    def plotParticles(self, particles):
        self.map.show_valid_particles(particles)
    
    #input is the estimated position: [x, y]
    def plotEstimation(self, estimatePosition):
        self.map.show_estimated_location(estimatePosition)
    
    #return the max value at each dimension [maxX, maxY, maxZ...]
    def readMax(self):
        return self.max
    
    #return the min value at each dimension [minX, minY, minZ...]
    def readMin(self):
        return self.min
    
    #Should only be used for debug, return actual position 
    def readPosition(self):
        return (self.x, self.y)
print ("...")

...


In [1]:
class particleFilter:
    def __init__(self, dimension = 2, model = default_2D_Model(), numParticles = numberOfParticles, timeSpan = timsSpan, resamplingThres = 0.00005, resamplingNoise = 0.01, positionStd = 10):
        self.model = model
        self.numParticles = numParticles
        self.dimension = dimension
        self.timeSpan = timeSpan
        self.thres = resamplingThres / numParticles
        self.std = positionStd
        self.curMax = self.model.readMax()
        self.curMin = self.model.readMin()
        self.resNoise = [x*resamplingNoise for x in self.curMax]
        
        #the initial particles are uniformely distributed 
        #store particles in the format: [[x1,y1],[x2,y2]...]
        ## TODO
        self.particles = []
        # randomly initialize self.numParticles particles, each of which has self.dimension dimensions
        for i in range(self.numParticles):
            curp = []
            for j in range(self.dimension): # generalize to all dimensions
                a = np.random.randint(self.curMin[j], self.curMax[j])
                curp.append(a)
            self.particles.append(curp)
        
        self.weights = np.ones(numParticles) / numParticles
        
    # for each particle, x_t = sample motion model(u_t, x_(t-1))
    # use simulateNextPosition function in default_2d_model class
    def Sample_Motion_Model(self, u_t=0):
        newParticles = []
        for i in range(self.numParticles):
            newp = self.model.simulateNextPosition(self.particles[i], u_t)
            newParticles.append(newp)
        self.particles = newParticles

    # for each particle, weight = measurement model(z_t, x_t)
    def Measurement_Model(self):
        actual_pos = self.model.readingSensor() # d1, d2, d3, d4
        errors = np.zeros(self.numParticles) # squared distance
        # for all particles, calculate squared l2 distance between actual_pos and predicted_pos
        for i in range(self.numParticles):
            predicted_pos = self.model.readingMap(self.particles[i])
            errors[i] = np.sum((np.asarray(predicted_pos) - np.asarray(actual_pos))**2) # squared distance or l2 norm???
        weights = 1 / errors # not sure
        self.weights = weights / np.sum(weights) # normalize weights
            
        return self.weights #array of weights [w1,w2...], the sum of weights should be normalized to 1

    # returns a position calculated by the normalized weighted sum of all particles
    def calcPosition(self): #rescale
        position = np.zeros(self.dimension)
        for i in range(self.numParticles):
            position += np.asarray(self.particles[i]) * self.weights[i]   
        return position #position = [x,y,z ...]
        
    # draw self.numParticles NEW particles with probability proportional to the old particles' weights
    def resampling(self):
        # take values from each dimension in self.particles
        # particles_array = [[x1, ..., xn], [y1, ..., yn], ...]
        particles_array = np.transpose(np.asarray(self.particles))
        
        newParticles = []
        for i in range(self.numParticles):
            curp = []
            for j in range(self.dimension):
                a = np.random.choice(particles_array[j, :], p = self.weights)
                curp.append(a)
            newParticles.append(curp)
            
        self.particles = newParticles    
        
    # number of effective particles
    def neff(self): 
        return 1. / (np.sum(np.square(self.weights))+1e-10)
    
    def runParticleFilter(self): #main function
        estimates = [] #array to store the estimated positions
        actual = [] # array to store actual robot positions, for debug purpose only
        #Control command:0 halt, 1 down, 2 right, 3 up, 4 left
        ### TODO 
        # hard code your control input in this array
        controls = [2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 2, 2, 2, 3, 3, 2, 2, 2, 3, 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, \
                   3, 3, 3, 2, 3, 3]  
        ####
        
        for t in range(self.timeSpan-1):
            if controls:
                control = controls.pop()
            else: 
                control = 0
                
            #if not the initial round, generate new samples
            if (t > 0):
                self.Sample_Motion_Model(control)
            
            #assign weights to each particles
            self.weights = self.Measurement_Model()
            #plot particles
            self.model.plotParticles(self.particles)
            
            #estimate current position
            estimatePosition = self.calcPosition()
            estimates.append(estimatePosition)
            #print (estimatePosition)
            
            #resample the particles if number of effective particles is small
            if self.neff() < 1000:   
                self.resampling()
            
            #plot estimated position
            self.model.plotEstimation(estimatePosition)
            
            #move one step forward
            self.model.run(control)
            ##Store the actual position for debug purpose
            actual.append(self.model.readPosition())            

        self.model.map.finish()
        return estimates, actual
    
pf = particleFilter(2)
estimates, actual = pf.runParticleFilter() 