In [3]:
import pandas as pd
import os
import numpy as np
from dataclasses import dataclass
import random
import time



np.random.seed(14)

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [4]:
@dataclass
class Particle:
    x: float
    y: float
    theta: float
    weight: float 
        


@dataclass
class LandMark:
    x: float
    y: float
    index: int

In [5]:
# location of car in (x,y,orientation)
gt_data = pd.read_csv('data/gt_data.txt', names=['X','Y','Orientation'], sep=' ')
#list of landmarks in the map [x,y,# landmark]
map_data = pd.read_csv('data/map_data.txt', names=['X','Y','# landmark'])
# velocity and theta of the car
control_data = pd.read_csv('data/control_data.txt', names=['velocity','Yaw rate'], sep=' ')

#observation = pd.read_csv('data/observation/observations_000001.txt', names = ['X cord','Y cord'], sep=' ')


result = [(x,y, landmark) for x,y,landmark in zip(map_data['X'],map_data['Y'], map_data['# landmark'])]
landarkList=[]
for res in result:
    l = LandMark(res[0],res[1],res[2])
    landarkList.append(l)

    

a = os.listdir("data/SensorDataFiles")
a.sort()
#list of  items observations from car point of view
observation=[]
for i in range(len(a)):
    fileName = 'data/SensorDataFiles/'+a[i]
    observationTmp = pd.read_csv(fileName, names = ['X cord','Y cord'], sep=' ')
    observation.append(observationTmp)

print(observation[1])


     X cord    Y cord
0    3.6700   4.58240
1    8.2123  -9.63970
2  -20.0050   4.10470
3   -5.5529 -22.42600
4    6.0674 -25.28000
5   32.0180   3.14540
6  -24.2870 -30.68500
7   19.1080 -38.63300
8  -42.4870 -13.04300
9    8.4421 -46.62000
10 -48.7180   0.14176


In [6]:
def calculateDistance(landmark1, landmark2):
    x_1 = landmark1[0]
    y_1 = landmark1[1]
    sum = 0
    x_2 = landmark2['X cord'].tolist()
    y_2 = landmark2['Y cord'].tolist()
    for i in range(0,len(x_2)):
        sum+= (x_1 - x_2[i])**2 + (y_2 - y_2[i])**2
    sum = np.sqrt(sum)
    return sum
    #return np.sqrt((x1 - x2)**2 + (y1 - y2)**2)

    #a =  np.sqrt((landmark1.x - landmark2.x)**2 + (landmark1.y - landmark2.y)**2)
    #return a
    
 
    
def findClosestLandmark(map_landmarks, singleObs):
    
   #todo: write code:
    min_distance = float('inf')
    closest_landmark = None

    for landmark in map_landmarks:
        print("lan",landmark)
        print("si",singleObs)
        distance = calculateDistance(landmark, singleObs)
        if distance < min_distance:
            min_distance = distance
            closest_landmark = landmark 
       
    return closest_landmark
    
    
    
        
def getError(gt_data, bestParticle):
    error1 = np.abs(gt_data[0] - bestParticle.x)
    error2 = np.abs(gt_data[1] - bestParticle.y)
    error3 = np.abs(gt_data[2] - bestParticle.theta)
    if(error3>2*np.pi):
        error3 = 2*np.pi - error3
    return (error1, error2, error3)

def findObservationProbability(closest_landmark,map_coordinates, sigmaX, sigmaY):
    
    mew_x = closest_landmark.x
    mew_y = closest_landmark.y
    
    x = map_coordinates.x;
    y = map_coordinates.y;
    frac =  (1/ (2 * np.pi * sigmaX * sigmaY ))
    weight1 = (x-mew_x)**2/((sigmaX)**2)  +(y-mew_y)**2/(sigmaY**2)    
    ans = np.exp(-0.5*weight1)
    return ans



def mapObservationToMapCoordinates(observation, particle):
    x = observation.x
    y = observation.y

    xt = particle.x
    yt = particle.y
    theta = particle.theta

    MapX = x * np.cos(theta) - y * np.sin(theta) + xt
    MapY = x * np.sin(theta) + y * np.cos(theta) + yt

    return MapX, MapY

def mapObservationsToMapCordinatesList(observations, particle):
    
    convertedObservations=[]
    i=0
    for obs in observations.iterrows():
        singleObs = LandMark(obs[1][0],obs[1][1],1)
        mapX, mapY = mapObservationToMapCoordinates(singleObs, particle)
        tmpLandmark = LandMark(x=mapX, y=mapY, index=i)
        i+=1
        convertedObservations.append(tmpLandmark)
    return convertedObservations



In [7]:
class ParticleFilter:
    particles = []
    def __init__(self, intialX, initialY, std, numOfParticles):
        self.number_of_particles = numOfParticles
        for i in range(self.number_of_particles):
           # tmpParticle = Particle(intialX,initialY,0,1)
            x = random.gauss(intialX, std)
            y = random.gauss(initialY, std)
            theta = random.uniform(0, 2*np.pi)
            tmpParticle = Particle(x,y , theta,1)
            self.particles.append(tmpParticle)
            
            
            
    def moveParticles(self, velocity, yaw_rate, delta_t=0.1):
        
        for i in range(self.number_of_particles):
            if(yaw_rate!=0):
                theta = self.particles[i].theta
                newTheta = theta + delta_t*yaw_rate;
                newX =  self.particles[i].x +(velocity/yaw_rate)*(np.sin(newTheta)-np.sin(theta));
                newY =  self.particles[i].y +(velocity/yaw_rate)*(np.cos(theta)-np.cos(newTheta));
                
                #todo Add noise!!
                self.particles[i].x = newX +random.gauss(0, 0.3)
                self.particles[i].y = newY +random.gauss(0, 0.3)
                self.particles[i].theta = newTheta +random.gauss(0, 0.01)
            else:
                print("ZERO!!!")
            

        
        
    def UpdateWeight(self, observations):
        
           #todo: Write code:
        for particle in range(0,self.number_of_particles):
            total_weight = 0
            for obs in observations:
                print("ob",obs)
                closest_landmark = findClosestLandmark(landarkList, obs)
                map_coordinates = LandMark(closest_landmark.x, closest_landmark.y, closest_landmark.index)
                sigmaX = 0.3  # You may need to adjust these values
                sigmaY = 0.3  # based on your application
                observation_prob = findObservationProbability(closest_landmark, map_coordinates, sigmaX, sigmaY)
                total_weight += observation_prob
            particle.weight = total_weight
    
            
    
    def getBestParticle(self):
        best_particle =  max(self.particles, key= lambda particle: particle.weight)
        return best_particle    
    
    def getBestParticleOut(self):
        x=0
        y=0
        theta=0
        for i in range(self.number_of_particles):
            x+= self.particles[i].x
            y+= self.particles[i].y
            theta+= self.particles[i].theta
        x=x/self.number_of_particles
        y=y/self.number_of_particles
        theta=theta/self.number_of_particles
        best_particle =  Particle(x,y,theta, weight=1)
        return best_particle        
    
    def PrintWeights(self):
        for i in range(self.number_of_particles):
            print("Weight:",self.particles[i].weight, self.particles[i].x,self.particles[i].y)
    
    
    def Resample(self):
        
    #todo: writecode
        weights = [particle.weight for particle in self.number_of_particles]
        max_weight = max(weights)
        index = random.randint(0, len(self.number_of_particles) - 1)
        offset = 0.0
        new_particles = []

        for i in range(len(self.number_of_particles)):
            offset += random.uniform(0, 2 * max_weight)
            while offset > weights[index]:
                offset -= weights[index]
                index = (index + 1) % len(self.number_of_particles)
            new_particles.append(self.number_of_particles[index])

        self.number_of_particles = new_particles
    
       
        
        

In [8]:
sigmaY = 0.3

magicNumberOfParticles = 200


start = time.time()

def main():
    particleFilter = ParticleFilter(0 ,0 ,0.3, numOfParticles=magicNumberOfParticles)
    #articleFilter = ParticleFilter(6.2785 ,1.9598 ,0.3, numOfParticles=magicNumberOfParticles)


    for i in range(0,10):
        #prediction
        if(i!=0):
            velocity = control_data.iloc[i-1][0]
            yaw_rate = control_data.iloc[i-1][1]
            particleFilter.moveParticles(velocity, yaw_rate)
        #a = observation[i].copy()
        #observations = mapObservationsToMapCordinatesList(observation[i], particleFilter)
        particleFilter.UpdateWeight(observation)
        particleFilter.Resample()
        bestP = particleFilter.getBestParticle()
        error = getError(gt_data.iloc[i], bestP)
        print(i,error)
    end = time.time()
    print(end - start)
        
main()

ob      X cord   Y cord
0    2.4853   5.6048
1   11.1420  -6.5591
2  -19.9200  -2.0582
3    1.9234 -22.9300
4   13.8610 -22.1100
5   29.9250  12.8670
6  -13.4070 -36.5000
7   30.3480 -30.8580
8  -36.1140 -25.2370
9   22.6200 -41.7140
10 -46.0640 -14.5750
lan LandMark(x=92.064, y=-34.777, index=1)
si      X cord   Y cord
0    2.4853   5.6048
1   11.1420  -6.5591
2  -19.9200  -2.0582
3    1.9234 -22.9300
4   13.8610 -22.1100
5   29.9250  12.8670
6  -13.4070 -36.5000
7   30.3480 -30.8580
8  -36.1140 -25.2370
9   22.6200 -41.7140
10 -46.0640 -14.5750


TypeError: 'LandMark' object is not subscriptable

In [None]:
import cv2

In [None]:
# WIDTH=1600
# HEIGHT=1200

# def mouseCallback(event, x, y, flags,null):
#     global i
    

# WINDOW_NAME="Particle Filter"
# img = np.zeros((HEIGHT,WIDTH,3), np.uint8)
# cv2.namedWindow(WINDOW_NAME)
# DELAY_MSEC=50
# #cv2.setMouseCallback(WINDOW_NAME,mouseCallback)

# i=1


    

# particleFilter = ParticleFilter(6.2785 ,1.9598 ,0.3, numOfParticles=50)

# while(1):
 
#     cv2.imshow(WINDOW_NAME,img)
#     img = np.zeros((HEIGHT,WIDTH,3), np.uint8)
    
#     if(i>2000):
#         break
    
#     for landmark in landarkList:
#         cv2.circle(img,tuple((int(landmark.x)*2+400, int(landmark.y)*2+300)),3,(255,0,0),-1)
    
#     #draw_particles:
#     for particle in particleFilter.particles:
#         cv2.circle(img,tuple((int(particle.x)*2+400,int(particle.y)*2+300)),1,(255,255,255),-1)
#     cv2.circle(img,tuple((int(gt_data.iloc[i].X)*2+400,int(gt_data.iloc[i].Y)*2+300)),1,(0,255,0),-1)
    
#     if cv2.waitKey(DELAY_MSEC) & 0xFF == 27:
#         break
#     if cv2.waitKey(33) == ord('a'):
#         velocity = control_data.iloc[i-1][0]
#         yaw_rate = control_data.iloc[i-1][1]
#         particleFilter.moveParticles(velocity, yaw_rate)
#         a = observation[i].copy()
#         particleFilter.UpdateWeight(a)
#         particleFilter.PrintWeights()
#         particleFilter.Resample()
        
#         i+=1

# cv2.destroyAllWindows()
